comparison src/memory/sqlite.py @ 2500:898b6e1fdc7a

memory (sqlite): files handling: /!\ new database schema version - added a table to handle files metadata and hierarchy. - new methods getFiles and setFile allow respectively to retrieve files corresponding to filters, and to set metadata for one file. - fixed stmnt_regex to detect correcly new schema (during schema validation/update on init)
author Goffi <goffi@goffi.org>
date Wed, 28 Feb 2018 18:28:39 +0100
parents 0046283a285d
children 4c45df43ea44
comparison
equal deleted inserted replaced
2499:af4a38ebf52a 2500:898b6e1fdc7a
24 log = getLogger(__name__) 24 log = getLogger(__name__)
25 from sat.memory.crypto import BlockCipher, PasswordHasher 25 from sat.memory.crypto import BlockCipher, PasswordHasher
26 from sat.tools.config import fixConfigOption 26 from sat.tools.config import fixConfigOption
27 from twisted.enterprise import adbapi 27 from twisted.enterprise import adbapi
28 from twisted.internet import defer 28 from twisted.internet import defer
29 from twisted.words.protocols.jabber import jid
29 from twisted.python import failure 30 from twisted.python import failure
30 from collections import OrderedDict 31 from collections import OrderedDict
31 import re 32 import re
32 import os.path 33 import os.path
33 import cPickle as pickle 34 import cPickle as pickle
34 import hashlib 35 import hashlib
35 import sqlite3 36 import sqlite3
36 37 import json
37 CURRENT_DB_VERSION = 4 38
39 CURRENT_DB_VERSION = 5
38 40
39 # XXX: DATABASE schemas are used in the following way: 41 # XXX: DATABASE schemas are used in the following way:
40 # - 'current' key is for the actual database schema, for a new base 42 # - 'current' key is for the actual database schema, for a new base
41 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update 43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update
42 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed 44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed
43 # a 'current' data dict can contains the keys: 45 # a 'current' data dict can contains the keys:
44 # - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints 46 # - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints
45 # - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns) 47 # - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns)
46 # an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify', 'insert' or 'specific'. See Updater.generateUpdateData for more infos. This method can be used to autogenerate update_data, to ease the work of the developers. 48 # an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify', 'insert' or 'specific'. See Updater.generateUpdateData for more infos. This method can be used to autogenerate update_data, to ease the work of the developers.
47 # TODO: this database currently doesn't use indexes, it should 49 # TODO: indexes need to be improved
48 50
49 DATABASE_SCHEMAS = { 51 DATABASE_SCHEMAS = {
50 "current": {'CREATE': OrderedDict(( 52 "current": {'CREATE': OrderedDict((
51 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"), 53 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"),
52 ("UNIQUE (name)",))), 54 ("UNIQUE (name)",))),
53 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"), 55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"),
54 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), 56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))),
55 ('message_types', (("type TEXT PRIMARY KEY",), 57 ('message_types', (("type TEXT PRIMARY KEY",),
56 tuple())), 58 ())),
57 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", 59 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
58 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) 60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
59 "type TEXT", "extra BLOB"), 61 "type TEXT", "extra BLOB"),
60 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", 62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
61 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed cones) 63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones)
62 ))), 64 ))),
63 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), 65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"),
64 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), 66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
65 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), 67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"),
66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), 68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
74 ('private_ind', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"), 76 ('private_ind', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"),
75 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))), 77 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
76 ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"), 78 ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"),
77 ("PRIMARY KEY (namespace, key)",))), 79 ("PRIMARY KEY (namespace, key)",))),
78 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"), 80 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"),
79 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))) 81 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
82 ('files', (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
83 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
84 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
85 "hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER",
86 "namespace TEXT", "mime_type TEXT",
87 "created DATETIME NOT NULL", "modified DATETIME",
88 "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"),
89 ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
80 )), 90 )),
81 'INSERT': OrderedDict(( 91 'INSERT': OrderedDict((
82 ('message_types', (("'chat'",), 92 ('message_types', (("'chat'",),
83 ("'error'",), 93 ("'error'",),
84 ("'groupchat'",), 94 ("'groupchat'",),
86 ("'normal'",), 96 ("'normal'",),
87 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC 97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
88 )), 98 )),
89 )), 99 )),
90 }, 100 },
101 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
102 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
103 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
104 "hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER",
105 "namespace TEXT", "mime_type TEXT",
106 "created DATETIME NOT NULL", "modified DATETIME",
107 "owner TEXT", "access TEXT", "extra TEXT", "profile_id INTEGER"),
108 ("PRIMARY KEY (id, version)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))},
109 },
91 4: {'create': {'components': (('profile_id INTEGER PRIMARY KEY', 'entry_point TEXT NOT NULL'), ('FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE',))} 110 4: {'create': {'components': (('profile_id INTEGER PRIMARY KEY', 'entry_point TEXT NOT NULL'), ('FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE',))}
92 }, 111 },
93 3: {'specific': 'update_v3' 112 3: {'specific': 'update_v3'
94 }, 113 },
95 2: {'specific': 'update2raw_v2' 114 2: {'specific': 'update2raw_v2'
185 204
186 def _updateDb(self, interaction, statements): 205 def _updateDb(self, interaction, statements):
187 for statement in statements: 206 for statement in statements:
188 interaction.execute(statement) 207 interaction.execute(statement)
189 208
190 #Profiles 209 ## Profiles
191 210
192 def _cacheComponentsAndProfiles(self, components_result): 211 def _cacheComponentsAndProfiles(self, components_result):
193 """Get components results and send requests profiles 212 """Get components results and send requests profiles
194 213
195 they will be both put in cache in _profilesCache 214 they will be both put in cache in _profilesCache
284 d = self.dbpool.runInteraction(delete) 303 d = self.dbpool.runInteraction(delete)
285 d.addCallback(lambda ignore: log.info(_("Profile [%s] deleted") % name)) 304 d.addCallback(lambda ignore: log.info(_("Profile [%s] deleted") % name))
286 d.addErrback(deletionError) 305 d.addErrback(deletionError)
287 return d 306 return d
288 307
289 #Params 308 ## Params
290 def loadGenParams(self, params_gen): 309 def loadGenParams(self, params_gen):
291 """Load general parameters 310 """Load general parameters
292 311
293 @param params_gen: dictionary to fill 312 @param params_gen: dictionary to fill
294 @return: deferred 313 @return: deferred
330 d.addCallback(self.__getFirstResult) 349 d.addCallback(self.__getFirstResult)
331 return d 350 return d
332 351
333 def setGenParam(self, category, name, value): 352 def setGenParam(self, category, name, value):
334 """Save the general parameters in database 353 """Save the general parameters in database
354
335 @param category: category of the parameter 355 @param category: category of the parameter
336 @param name: name of the parameter 356 @param name: name of the parameter
337 @param value: value to set 357 @param value: value to set
338 @return: deferred""" 358 @return: deferred"""
339 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value)) 359 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value))
351 """ 371 """
352 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) 372 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value))
353 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) 373 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile})))
354 return d 374 return d
355 375
356 #History 376 ## History
357 377
358 def _addToHistoryCb(self, dummy, data): 378 def _addToHistoryCb(self, dummy, data):
359 # Message metadata were successfuly added to history 379 # Message metadata were successfuly added to history
360 # now we can add message and subject 380 # now we can add message and subject
361 uid = data['uid'] 381 uid = data['uid']
533 d = self.dbpool.runQuery(u" ".join(query_parts), values) 553 d = self.dbpool.runQuery(u" ".join(query_parts), values)
534 d.addCallback(self.sqliteHistoryToList) 554 d.addCallback(self.sqliteHistoryToList)
535 d.addCallback(self.listDict2listTuple) 555 d.addCallback(self.listDict2listTuple)
536 return d 556 return d
537 557
538 #Private values 558 ## Private values
539 559
540 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None): 560 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None):
541 """generic errback for data queries""" 561 """generic errback for data queries"""
542 log.error(_(u"Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format( 562 log.error(_(u"Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format(
543 operation = operation, 563 operation = operation,
648 args.append(self.profiles[profile]) 668 args.append(self.profiles[profile])
649 d = self.dbpool.runQuery(u" ".join(query_parts), args) 669 d = self.dbpool.runQuery(u" ".join(query_parts), args)
650 d.addErrback(self._privateDataEb, u"delete", namespace, key, profile=profile) 670 d.addErrback(self._privateDataEb, u"delete", namespace, key, profile=profile)
651 return d 671 return d
652 672
673 ## Files
674
675 @defer.inlineCallbacks
676 def getFiles(self, client, file_id=None, version=u'', parent=None, type_=None,
677 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None,
678 owner=None, access=None, projection=None, unique=False):
679 """retrieve files with with given filters
680
681 @param file_id(unicode, None): id of the file
682 None to ignore
683 @param version(unicode, None): version of the file
684 None to ignore
685 empty string to look for current version
686 @param parent(unicode, None): id of the directory containing the files
687 None to ignore
688 empty string to look for root files/directories
689 @param projection(list[unicode], None): name of columns to retrieve
690 None to retrieve all
691 @param unique(bool): if True will remove duplicates
692 other params are the same as for [setFile]
693 @return (list[dict]): files corresponding to filters
694 """
695 query_parts = ["SELECT"]
696 if unique:
697 query_parts.append('DISTINCT')
698 if projection is None:
699 projection = ['id', 'version', 'parent', 'type', 'hash', 'hash_algo', 'name',
700 'size', 'namespace', 'mime_type', 'created', 'modified', 'owner',
701 'access', 'extra']
702 query_parts.append(','.join(projection))
703 query_parts.append("FROM files WHERE")
704 filters = ['profile_id=?']
705 args = [self.profiles[client.profile]]
706
707 if file_id is not None:
708 filters.append(u'id=?')
709 args.append(file_id)
710 if version is not None:
711 filters.append(u'version=?')
712 args.append(version)
713 if parent is not None:
714 filters.append(u'parent=?')
715 args.append(parent)
716 if type_ is not None:
717 filters.append(u'type=?')
718 args.append(type_)
719 if file_hash is not None:
720 filters.append(u'hash=?')
721 args.append(file_hash)
722 if hash_algo is not None:
723 filters.append(u'hash_algo=?')
724 args.append(hash_algo)
725 if name is not None:
726 filters.append(u'name=?')
727 args.append(name)
728 if namespace is not None:
729 filters.append(u'namespace=?')
730 args.append(namespace)
731 if mime_type is not None:
732 filters.append(u'mime_type=?')
733 args.append(mime_type)
734 if owner is not None:
735 filters.append(u'owner=?')
736 args.append(owner.full())
737 if access is not None:
738 raise NotImplementedError('Access check is not implemented yet')
739 # a JSON comparison is needed here
740
741 filters = u' AND '.join(filters)
742 query_parts.append(filters)
743 query = u' '.join(query_parts)
744
745 result = yield self.dbpool.runQuery(query, args)
746 files_data = [dict(zip(projection, row)) for row in result]
747 to_parse = {'access', 'extra'}.intersection(projection)
748 to_filter = {'owner'}.intersection(projection)
749 if to_parse or to_filter:
750 for file_data in files_data:
751 for key in to_parse:
752 value = file_data[key]
753 file_data[key] = {} if value is None else json.loads(value)
754 owner = file_data.get('owner')
755 if owner is not None:
756 file_data['owner'] = jid.JID(owner)
757 defer.returnValue(files_data)
758
759 def setFile(self, client, name, file_id, version=u'', parent=None, type_=C.FILE_TYPE_FILE,
760 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None,
761 created=None, modified=None, owner=None, access=None, extra=None):
762 """set a file metadata
763
764 @param client(SatXMPPClient): client owning the file
765 @param name(unicode): name of the file (must not contain "/")
766 @param file_id(unicode): unique id of the file
767 @param version(unicode): version of this file
768 @param parent(unicode): id of the directory containing this file
769 None if it is a root file/directory
770 @param type_(unicode): one of:
771 - file
772 - directory
773 @param file_hash(unicode): unique hash of the payload
774 @param hash_algo(unicode): algorithm used for hashing the file (usually sha-256)
775 @param size(int): size in bytes
776 @param namespace(unicode, None): identifier (human readable is better) to group files
777 for instance, namespace could be used to group files in a specific photo album
778 @param mime_type(unicode): MIME type of the file, or None if not known/guessed
779 @param created(int): UNIX time of creation
780 @param modified(int,None): UNIX time of last modification, or None to use created date
781 @param owner(jid.JID, None): jid of the owner of the file (mainly useful for component)
782 @param access(dict, None): serialisable dictionary with access rules. See [memory.memory] for details
783 @param extra(dict, None): serialisable dictionary of any extra data
784 will be encoded to json in database
785 """
786 if extra is not None:
787 assert isinstance(extra, dict)
788 query = ('INSERT INTO files(id, version, parent, type, hash, hash_algo, name, size, namespace, '
789 'mime_type, created, modified, owner, access, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)')
790 d = self.dbpool.runQuery(query, (file_id, version.strip(), parent, type_,
791 file_hash, hash_algo,
792 name, size, namespace,
793 mime_type, created, modified,
794 owner.full() if owner is not None else None,
795 json.dumps(access) if access else None,
796 json.dumps(extra) if extra else None,
797 self.profiles[client.profile]))
798 d.addErrback(lambda failure: log.error(_(u"Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure))))
799 return d
800
653 ##Helper methods## 801 ##Helper methods##
654 802
655 def __getFirstResult(self, result): 803 def __getFirstResult(self, result):
656 """Return the first result of a database query 804 """Return the first result of a database query
657 Useful when we are looking for one specific value""" 805 Useful when we are looking for one specific value"""
658 return None if not result else result[0][0] 806 return None if not result else result[0][0]
659 807
660 808
661 class Updater(object): 809 class Updater(object):
662 stmnt_regex = re.compile(r"(?:[\w ]+(?:\([\w, ]+\))?)+") 810 stmnt_regex = re.compile(r"[\w/' ]+(?:\(.*?\))?[^,]*")
663 clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$") 811 clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$")
664 CREATE_SQL = "CREATE TABLE %s (%s)" 812 CREATE_SQL = "CREATE TABLE %s (%s)"
665 INSERT_SQL = "INSERT INTO %s VALUES (%s)" 813 INSERT_SQL = "INSERT INTO %s VALUES (%s)"
666 DROP_SQL = "DROP TABLE %s" 814 DROP_SQL = "DROP TABLE %s"
667 ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s" 815 ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s"
704 852
705 @return: deferred which fire a list of SQL update statements, or None if no update is needed 853 @return: deferred which fire a list of SQL update statements, or None if no update is needed
706 """ 854 """
707 local_version = yield self.getLocalVersion() 855 local_version = yield self.getLocalVersion()
708 raw_local_sch = yield self.getLocalSchema() 856 raw_local_sch = yield self.getLocalSchema()
857
709 local_sch = self.rawStatements2data(raw_local_sch) 858 local_sch = self.rawStatements2data(raw_local_sch)
710 current_sch = DATABASE_SCHEMAS['current']['CREATE'] 859 current_sch = DATABASE_SCHEMAS['current']['CREATE']
711 local_hash = self.statementHash(local_sch) 860 local_hash = self.statementHash(local_sch)
712 current_hash = self.statementHash(current_sch) 861 current_hash = self.statementHash(current_sch)
713 862