# -*- coding: utf-8 -*-
"""
The database classes provide the primary API for loading and saving KeePass 1.x databases,
in addition to creating new groups and entries.
"""
import binascii
import logging
import os
import os.path
import hashlib
from Crypto.Random import get_random_bytes
from keepassdb import exc, util, const
from keepassdb.model import Group, Entry, RootGroup
from keepassdb.structs import HeaderStruct, GroupStruct, EntryStruct
__authors__ = ["Karsten-Kai König <kkoenig@posteo.de>", "Hans Lellelid <hans@xmpl.org>", "Brett Viren <brett.viren@gmail.com>"]
__license__ = """
keepassdb is free software: you can redistribute it and/or modify it under the terms
of the GNU General Public License as published by the Free Software Foundation,
either version 3 of the License, or at your option) any later version.
keepassdb is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along with
keepassdb. If not, see <http://www.gnu.org/licenses/>.
"""
[docs]class Database(object):
"""
This class represents the KeePass 1.x database.
:ivar root: The group-like virtual root object (not actually part of database).
:ivar groups: The flat list of groups (:class:`keepassdb.model.Group`) in this database.
:ivar entries: The flat list of entries (:class:`keepassdb.model.Entry`) in this database.
:ivar readonly: Whether database was opened read-only.
:ivar filepath: The path to the database that is opened or will be written (if specified).
:ivar password: The passphrase to use to encrypt the database.
:ivar keyfile: A path to a keyfile that can be used instead or in combination with passphrase.
:ivar header: The database header struct (:class:`keepassdb.structs.HeaderStruct`).
"""
root = None
groups = None # The flat list of :class:`keepassdb.model.Group` groups in this database.
entries = None
readonly = False
header = None
password = None
keyfile = None
_filepath = None
def __init__(self, dbfile=None, password=None, keyfile=None, readonly=False, new=False):
"""
Initialize a new or an existing database.
:param dbfile: The path to the database file or a file-like object from which to read the database.
:type dbfile: str or file
:param password: Passphrase from which to derive key for file.
:param password: str
:param keyfile: The keyfile to use instead of or in conjunction with passphrase.
:param keyfile: str or file
:param readonly: Whether to open the database read-only (e.g. if already open in another process)
:type readonly: bool
:param new: Whether this is a new database (only necessary when specifying filepath so that file
will not attempt to be loaded).
:type new: bool
"""
self.log = logging.getLogger('{0.__module__}.{0.__name__}'.format(self.__class__))
self.password = password
self.keyfile = keyfile
self.root = RootGroup()
self.groups = []
self.entries = []
if new:
if hasattr(dbfile, 'read'):
raise TypeError("Cannot specify file object for new database")
self.filepath = dbfile # XXX: Consider if we should be invoking the setter here or not.
elif dbfile:
self.load(dbfile, password=password, keyfile=keyfile, readonly=readonly)
def _clear(self):
"""
Resets/clears out internal object state.
"""
self.root = RootGroup()
self.groups = []
self.entries = []
self.readonly = False
self.header = None
self.password = None
self.keyfile = None
self.filepath = None
@property
def filepath(self):
""" Property for retrieving current filepath (or None if db not loaded from file) """
return self._filepath
@filepath.setter
[docs] def filepath(self, value):
""" Proerty for setting current filepath. """
self._filepath = value
[docs] def create_default_group(self):
"""
Create a default 'Internet' group on an empty database.
:returns: The new 'Internet' group.
:rtype: :class:`keepassdb.model.Group`
"""
assert len(self.groups) == 0, "initialize_empty() should only be used with a new database."
return self.create_group(u'Internet', icon=1)
[docs] def load(self, dbfile, password=None, keyfile=None, readonly=False):
"""
Load the database from file/stream.
:param dbfile: The database file path/stream.
:type dbfile: str or file-like object
:param password: The password for the database.
:type password: str
:param keyfile: Path to a keyfile (or a stream) that can be used instead of or in conjunction with password for database.
:type keyfile: str or file-like object
:param readonly: Whether to open the database read-only.
:type readonly: bool
"""
self._clear()
buf = None
is_stream = hasattr(dbfile, 'read')
if is_stream:
buf = dbfile.read()
else:
if not os.path.exists(dbfile):
raise IOError("File does not exist: {0}".format(dbfile))
with open(dbfile, 'rb') as fp:
buf = fp.read()
self.load_from_buffer(buf, password=password, keyfile=keyfile, readonly=readonly)
# One we have successfully loaded the file, go ahead and set the internal attribute
# (in the LockingDatabase subclass, this will effectivley take out the lock on the file)
if not is_stream:
self.filepath = dbfile
[docs] def load_from_buffer(self, buf, password=None, keyfile=None, readonly=False):
"""
Load a database from passed-in buffer (bytes).
:param buf: A string (bytes) of the database contents.
:type buf: str
:param password: The password for the database.
:type password: str
:param keyfile: Path to a keyfile (or a stream) that can be used instead of or in conjunction with password for database.
:type keyfile: str or file-like object
:param readonly: Whether to open the database read-only.
:type readonly: bool
"""
if password is None and keyfile is None:
raise ValueError("Password and/or keyfile is required.")
# Save these to use as defaults when saving the database
self.password = password
self.keyfile = keyfile
# The header is 124 bytes long, the rest is content
hdr_len = HeaderStruct.length
header_bytes = buf[:hdr_len]
crypted_content = buf[hdr_len:]
self.header = HeaderStruct(header_bytes)
self.log.debug("Extracted header: {0}".format(self.header))
# Check if the database is supported
if self.header.version & const.DB_SUPPORTED_VERSION_MASK != const.DB_SUPPORTED_VERSION & const.DB_SUPPORTED_VERSION_MASK:
raise exc.UnsupportedDatabaseVersion('Unsupported file version: {0}'.format(hex(self.header.version)))
#Actually, only AES is supported.
if not self.header.flags & HeaderStruct.AES:
raise exc.UnsupportedDatabaseEncryption('Only AES encryption is supported.')
final_key = util.derive_key(seed_key=self.header.seed_key,
seed_rand=self.header.seed_rand,
rounds=self.header.key_enc_rounds,
password=password, keyfile=keyfile)
# FIXME: Remove this once we've tracked down issues.
self.log.debug("(load) Final key: {0!r}, pass={1}".format(final_key, password))
decrypted_content = util.decrypt_aes_cbc(crypted_content, key=final_key, iv=self.header.encryption_iv)
# Check if decryption failed
if ((len(decrypted_content) > const.DB_MAX_CONTENT_LEN) or
(len(decrypted_content) == 0 and self.header.ngroups > 0)):
raise exc.IncorrectKey("Decryption failed! The key is wrong or the file is damaged.")
if not self.header.contents_hash == hashlib.sha256(decrypted_content).digest():
self.log.debug("Decrypted content: {0!r}".format(decrypted_content))
self.log.error("Hash mismatch. Header hash = {0!r}, hash of contents = {1!r}".format(self.header.contents_hash, hashlib.sha256(decrypted_content).digest()))
raise exc.AuthenticationError("Hash test failed. The key is wrong or the file is damaged.")
# First thing (after header) are the group definitions.
for _i in range(self.header.ngroups):
gstruct = GroupStruct(decrypted_content)
self.groups.append(Group.from_struct(gstruct))
length = len(gstruct)
decrypted_content = decrypted_content[length:]
# Next come the entry definitions.
for _i in range(self.header.nentries):
estruct = EntryStruct(decrypted_content)
self.entries.append(Entry.from_struct(estruct))
length = len(estruct)
decrypted_content = decrypted_content[length:]
# Sets up the hierarchy, relates the group/entry model objects.
self._bind_model()
[docs] def save(self, dbfile=None, password=None, keyfile=None):
"""
Save the database to specified file/stream with password and/or keyfile.
:param dbfile: The path to the file we wish to save.
:type dbfile: The path to the database file or a file-like object.
:param password: The password to use for the database encryption key.
:type password: str
:param keyfile: The path to keyfile (or a stream) to use instead of or in conjunction with password for encryption key.
:type keyfile: str or file-like object
:raise keepassdb.exc.ReadOnlyDatabase: If database was opened with readonly flag.
"""
if self.readonly:
# We might wish to make this more sophisticated. E.g. if a new path is specified
# as a parameter, then it's probably ok to ignore a readonly flag? In general
# this flag doens't make a ton of sense for a library ...
raise exc.ReadOnlyDatabase()
if dbfile is not None and not hasattr(dbfile, 'write'):
self.filepath = dbfile
if password is not None or self.keyfile is not None:
# Do these together so we don't end up with some hybrid of old & new key material
self.password = password
self.keyfile = keyfile
else:
raise ValueError("Password and/or keyfile is required.")
if self.filepath is None and dbfile is None:
raise ValueError("Unable to save without target file.")
buf = bytearray()
# First, serialize the groups
for group in self.groups:
# Get the packed bytes
group_struct = group.to_struct()
self.log.debug("Group struct: {0!r}".format(group_struct))
buf += group_struct.encode()
# Then the entries.
for entry in self.entries:
entry_struct = entry.to_struct()
buf += entry_struct.encode()
# Hmmmm ... these defaults should probably be set elsewhere....?
header = HeaderStruct()
header.signature1 = const.DB_SIGNATURE1
header.signature2 = const.DB_SIGNATURE2
header.flags = header.AES
header.version = 0x00030002
header.key_enc_rounds = 50000
header.seed_key = get_random_bytes(32)
# Convert buffer to bytes for API simplicity
buf = bytes(buf)
# Generate new seed & vector; update content hash
header.encryption_iv = get_random_bytes(16)
header.seed_rand = get_random_bytes(16)
header.contents_hash = hashlib.sha256(buf).digest()
self.log.debug("(Unencrypted) content: {0!r}".format(buf))
self.log.debug("Generating hash for {0}-byte content: {1}".format(len(buf), hashlib.sha256(buf).digest()))
# Update num groups/entries to match curr state
header.nentries = len(self.entries)
header.ngroups = len(self.groups)
final_key = util.derive_key(seed_key=header.seed_key,
seed_rand=header.seed_rand,
rounds=header.key_enc_rounds,
password=password, keyfile=keyfile)
# FIXME: Remove this once we've tracked down issues.
self.log.debug("(save) Final key: {0!r}, pass={1}".format(final_key, password))
encrypted_content = util.encrypt_aes_cbc(buf, key=final_key, iv=header.encryption_iv)
if hasattr(dbfile, 'write'):
dbfile.write(header.encode() + encrypted_content)
else:
with open(self.filepath, "wb") as fp:
fp.write(header.encode() + encrypted_content)
[docs] def create_group(self, title, parent=None, icon=1, expires=None):
"""
This method creates a new group.
A group title is needed or no group will be created.
If a parent is given, the group will be created as a sub-group.
title must be a string, image an unsigned int >0 and parent a Group.
:return: The newly created group.
:rtype: :class:`keepassdb.model.Group`
"""
if parent and not isinstance(parent, Group):
raise TypeError("Parent must be of type Group")
if expires is None:
expires = const.NEVER
if self.groups:
group_id = max([g.id for g in self.groups]) + 1
else:
group_id = 1
group = Group(id=group_id, title=title, icon=icon, db=self,
created=util.now(), modified=util.now(), accessed=util.now(),
expires=expires)
# If no parent is given, just append the new group at the end
if parent is None:
group.parent = self.root
self.root.children.append(group)
group.level = 0
self.groups.append(group)
# Else insert the group behind the parent
else:
if parent not in self.groups:
raise ValueError("Group doesn't exist / is not bound to this database.")
parent.children.append(group)
group.parent = parent
group.level = parent.level + 1
self.groups.insert(self.groups.index(parent) + 1, group)
return group
[docs] def remove_group(self, group):
"""
Remove the specified group.
"""
if not isinstance(group, Group):
raise TypeError("group must be Group")
if group not in self.groups:
raise ValueError("Group doesn't exist / is not bound to this database.")
# Recurse down to remove sub-groups
for child in group.children: # We may need to copy this to avoid CME (see below)
self.remove_group(child)
for entry in group.entries:
self.remove_entry(entry)
# Finally remove group from the parent's list.
group.parent.children.remove(group) # Concurrent modification exception? Parent in recursive stack is iterating ...
self.groups.remove(group)
[docs] def move_group(self, group, parent, index=None):
"""
Move group to be a child of new parent.
:param group: The group to move.
:type group: :class:`keepassdb.model.Group`
:param parent: The new parent for the group.
:type parent: :class:`keepassdb.model.Group`
:param index: The 0-based index within the parent (defaults to appending
group to end of parent's children).
:type index: int
"""
if not isinstance(group, Group):
raise TypeError("group param must be of type Group")
if parent is not None and not isinstance(parent, Group):
raise TypeError("parent param must be of type Group")
if group is parent:
raise ValueError("group and parent are the same")
if parent is None:
parent = self.root
if group not in self.groups:
raise exc.UnboundModelError("Group doesn't exist / is not bound to this database.")
if parent not in self.groups:
raise exc.UnboundModelError("Parent group doesn't exist / is not bound to this database.")
curr_parent = group.parent
curr_parent.children.remove(group)
if index is None:
parent.children.append(group)
self.log.debug("Moving {0!r} to child of {1!r}, (appending)".format(group, parent))
else:
parent.children.insert(index, group)
self.log.debug("Moving {0!r} to child of {1!r}, (at position {2!r})".format(group, parent, index))
group.parent = parent
group.modified = util.now()
self._rebuild_groups()
def _rebuild_groups(self):
"""
Recreates the groups master list based on the groups hierarchy (order matters here,
since the parser uses order to determine lineage).
"""
self.groups = []
def collapse_group(group):
for subgroup in group.children:
self.groups.append(subgroup)
collapse_group(subgroup)
collapse_group(self.root)
[docs] def create_entry(self, group, **kwargs):
"""
Create a new Entry object.
The group which should hold the entry is needed.
image must be an unsigned int >0, group a Group.
:param group: The associated group.
:keyword title:
:keyword icon:
:keyword url:
:keyword username:
:keyword password:
:keyword notes:
:keyword expires: Expiration date (if None, entry will never expire).
:type expires: datetime
:return: The new entry.
:rtype: :class:`keepassdb.model.Entry`
"""
if group not in self.groups:
raise ValueError("Group doesn't exist / is not bound to this database.")
uuid = binascii.hexlify(get_random_bytes(16))
entry = Entry(uuid=uuid,
group_id=group.id,
created=util.now(),
modified=util.now(),
accessed=util.now(),
**kwargs)
self.entries.append(entry)
group.entries.append(entry)
return entry
[docs] def remove_entry(self, entry):
"""
Remove specified entry.
:param entry: The Entry object to remove.
:type entry: :class:`keepassdb.model.Entry`
"""
if not isinstance(entry, Entry):
raise TypeError("entry param must be of type Entry.")
if not entry in self.entries:
raise ValueError("Entry doesn't exist / not bound to this datbase.")
entry.group.entries.remove(entry)
self.entries.remove(entry)
[docs] def move_entry(self, entry, group, index=None):
"""
Move an entry to another group.
:param entry: The Entry object to move.
:type entry: :class:`keepassdb.model.Entry`
:param group: The new parent Group object for the entry.
:type group: :class:`keepassdb.model.Group`
:param index: The 0-based index within the parent (defaults to appending
group to end of parent's children).
:type index: int
"""
if not isinstance(entry, Entry):
raise TypeError("entry param must be of type Entry")
if not isinstance(group, Group):
raise TypeError("group param must be of type Group")
if entry not in self.entries:
raise exc.UnboundModelError("Invalid entry (or not bound to this database): {0!r}".format(entry))
if group not in self.groups:
raise exc.UnboundModelError("Invalid group (or not bound to this database): {0!r}".format(group))
curr_group = entry.group
curr_group.entries.remove(entry)
if index is None:
group.entries.append(entry)
self.log.debug("Moving {0!r} to child of {1!r}, (appending)".format(entry, group))
else:
group.entries.insert(index, entry)
self.log.debug("Moving {0!r} to child of {1!r}, (at position {2})".format(entry, group, index))
entry.group = group
entry.modified = util.now()
self._rebuild_entries()
def _rebuild_entries(self):
"""
Recreates the entries master list based on the groups hierarchy (order matters here,
since the parser uses order to determine lineage).
"""
self.entries = []
def collapse_entries(group):
for entry in group.entries:
self.entries.append(entry)
for subgroup in group.children:
collapse_entries(subgroup)
collapse_entries(self.root)
def _bind_model(self):
"""
This method binds the various model objects together in the correct hierarchy
and adds referneces to this database object in the groups.
"""
if self.groups[0].level != 0:
self.log.info("Got invalid first group: {0}".format(self.groups[0]))
raise ValueError("Invalid group tree: first group must have level of 0 (got {0})".format(self.groups[0].level))
# The KeePassX source code maintains that first group to have incremented
# level is a child of the previous group with a lower level.
#
# [R]
# | A (1)
# +-| B (2)
# | | C (2)
# | D (1)
# +-| E (2)
# | F (2)
# +-| G (3)
# | H (3)
# | I (3)
#
class Stack(list):
""" A class to make parsing code slightly more semantic. """
def push(self, el):
self.append(el)
# This is a different parsing approach than taken by KeePassX (or other python
# libs), but seems a little more intuitive. It could be further simplified
# by noting that current_parent is always parent_stack[-1], but this is a bit
# more readable.
parent_stack = Stack([self.root])
current_parent = self.root
prev_group = None
for g in self.groups:
g.db = self # Bind database to group objects
if prev_group is not None: # first iteration is exceptional
if g.level > prev_group.level: # Always true for iteration 1 since root has level of -1
# Dropping down a level; the previous group is the parent
current_parent = prev_group
parent_stack.push(current_parent)
elif g.level < prev_group.level:
# Pop off parents until we have a parent with a level that is less than current
while g.level <= current_parent.level:
current_parent = parent_stack.pop()
parent_stack.push(current_parent) # We want to make sure that the top of the stack always matches current parent
# bi-directional child-parent binding
g.parent = current_parent
current_parent.children.append(g)
prev_group = g
# Bind group objects to entries
for entry in self.entries:
for group in self.groups:
if entry.group_id == group.id:
group.entries.append(entry)
entry.group = group
break
else:
# KeePassX adds these to the first group (i.e. root.children[0])
raise NotImplementedError("Orphaned entries not (yet) supported.")
[docs] def close(self):
"""
Closes the database, performs any necessary cleanup functions.
"""
def to_dict(self, hierarchy=True, hide_passwords=False):
if hierarchy:
d = dict(groups=[g.to_dict(hierarchy=hierarchy, hide_passwords=hide_passwords) for g in self.root.children])
else:
d = dict(groups=[g.to_dict(hide_passwords=hide_passwords) for g in self.groups])
return d
[docs]class LockingDatabase(Database):
"""
A convenience subclass that adds automatic file locking (if db not opened read-only).
The lock is only acquired when the filepath is specified to a load() or save() operation.
The close() method will also release the lock.
"""
_locked = False
@property
def lockfile(self):
return self.filepath + '.lock'
@property
def filepath(self):
""" Property for retrieving current filepath (or None if db not loaded from file) """
return self._filepath
@filepath.setter
[docs] def filepath(self, value):
""" Property for setting current filepath, automatically takes out lock on new file if not readonly db. """
if not self.readonly and self._filepath != value:
if self._locked:
self.log.debug("Releasing previously-held lock file: {0}".format(self.lockfile))
# Release the lock on previous filepath.
self.release_lock()
self._filepath = value
if self._filepath is not None:
self.acquire_lock()
else:
self._filepath = value
def __enter__(self):
"""
Take out a lock on the database file, supporting using as context manager.
Note that the lock has probably already been taken out, but this won't hurt.
"""
self.acquire_lock()
return self
def __exit__(self, exc_type, exc_value, exc_tb):
"""
Release the lock on the database file, supporting using as context manager.
"""
self.release_lock()
return False # Do not suppress.
[docs] def acquire_lock(self, force=False):
"""
Takes out a lock (creates a <dbname>.lock file) for the database.
:param force: Whether to force taking "ownership" of the lock file.
:type force: bool
:raises: :class:`keepassdb.exc.DatabaseAlreadyLocked` - If the database is already locked (and force not set to True).
"""
if self.readonly:
raise exc.ReadOnlyDatabase()
if not self._locked:
self.log.debug("Acquiring lock file: {0}".format(self.lockfile))
if os.path.exists(self.lockfile) and not force:
raise exc.DatabaseAlreadyLocked('Lock file already exists: {0}'.format(self.lockfile))
open(self.lockfile, 'w').close()
self._locked = True
[docs] def release_lock(self, force=False):
"""
Releases the lock (deletes the <dbname>.lock file) if it was acquired by this class or force is set to True.
:param force: Whether to force releasing the lock (e.g. if it was not acquired during this session).
:type force: bool
"""
if self.readonly:
raise exc.ReadOnlyDatabase()
if self._locked or force:
self.log.debug("Removing lock file: {0}".format(self.lockfile))
if os.path.exists(self.lockfile):
os.remove(self.lockfile)
self._locked = False
else:
self.log.debug("Database not locked (not removing)")
[docs] def close(self):
"""
Closes the database, releasing lock.
"""
super(LockingDatabase, self).close()
if not self.readonly:
self.release_lock()