comparison sat/memory/sqlite.py @ 2716:06160b529da6

core (memory/sqlite): changed history constraint /!\ Database schema change /!\ History was using a unique constraint on `profile_id, timestamp, source, dest, source_res, dest_res`, which can cause trouble because several messages send quickly by the same person can have a common timestamp (specially with delayed messages where precision is second), resulting in message loss. The new constraint use `profile_id, stanza_id, source, dest` where `stanza_id` is XEP-0359 stanza_id, so it's unique by definition, and no message should be lost anymore. Because sqlite doesn't support altering table with a constraint change, we have to create new tables and copy old data to new one, which can be pretty long. Sqlite update mechanism with "specifics" has been fixed when several updates are applied (e.g. moving from v5 to v7) and a specific is in the workflow.
author Goffi <goffi@goffi.org>
date Sun, 09 Dec 2018 14:07:26 +0100
parents b35c84ea73cf
children 453a12ff6f51
comparison
equal deleted inserted replaced
2715:b35c84ea73cf 2716:06160b529da6
34 import cPickle as pickle 34 import cPickle as pickle
35 import hashlib 35 import hashlib
36 import sqlite3 36 import sqlite3
37 import json 37 import json
38 38
39 CURRENT_DB_VERSION = 6 39 CURRENT_DB_VERSION = 7
40 40
41 # XXX: DATABASE schemas are used in the following way: 41 # XXX: DATABASE schemas are used in the following way:
42 # - 'current' key is for the actual database schema, for a new base 42 # - 'current' key is for the actual database schema, for a new base
43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update 43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update
44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed 44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed
58 ())), 58 ())),
59 ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", 59 ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) 60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
61 "type TEXT", "extra BLOB"), 61 "type TEXT", "extra BLOB"),
62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", 62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones) 63 "UNIQUE (profile_id, stanza_id, source, dest)" # avoid storing 2 times the same message
64 ))), 64 ))),
65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), 65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"),
66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), 66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), 67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"),
68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), 68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
96 ("'normal'",), 96 ("'normal'",),
97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC 97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
98 )), 98 )),
99 )), 99 )),
100 }, 100 },
101 7: {'specific': 'update_v7'
102 },
101 6: {'cols create': {'history': ('stanza_id TEXT',)}, 103 6: {'cols create': {'history': ('stanza_id TEXT',)},
102 }, 104 },
103 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", 105 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
104 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( 106 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
105 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), 107 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
178 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE'])) 180 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE']))
179 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT'])) 181 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT']))
180 return database_creation 182 return database_creation
181 183
182 def getUpdateSql(): 184 def getUpdateSql():
183 updater = Updater(self.dbpool, sat_version) 185 updater = Updater(self, sat_version)
184 return updater.checkUpdates() 186 return updater.checkUpdates()
185 187
186 def commitStatements(statements):
187
188 if statements is None:
189 return defer.succeed(None)
190 log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
191 d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
192 return d
193
194 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done 188 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done
195 189
196 init_defer = defer.succeed(None) 190 init_defer = defer.succeed(None)
197 191
198 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) 192 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql())
199 init_defer.addCallback(commitStatements) 193 init_defer.addCallback(self.commitStatements)
200 194
201 def fillProfileCache(ignore): 195 def fillProfileCache(ignore):
202 d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles) 196 d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles)
203 d.chainDeferred(self.initialized) 197 d.chainDeferred(self.initialized)
204 198
205 init_defer.addCallback(fillProfileCache) 199 init_defer.addCallback(fillProfileCache)
200
201 def commitStatements(self, statements):
202
203 if statements is None:
204 return defer.succeed(None)
205 log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
206 d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
207 return d
206 208
207 def _updateDb(self, interaction, statements): 209 def _updateDb(self, interaction, statements):
208 for statement in statements: 210 for statement in statements:
209 interaction.execute(statement) 211 interaction.execute(statement)
210 212
582 query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC") 584 query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC")
583 order = True 585 order = True
584 586
585 587
586 if not order: 588 if not order:
587 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList 589 # timestamp may be identical for 2 close message (specially when delay is used)
590 # that's why we order ties by rowid (which is in the same order as received_timestamp
591 # but has an index so is quick to order).
592
593 # We'll reverse the order in sqliteHistoryToList
594 query_parts.append(u"ORDER BY timestamp DESC, history.rowid DESC")
588 # we use DESC here so LIMIT keep the last messages 595 # we use DESC here so LIMIT keep the last messages
589 if limit is not None: 596 if limit is not None:
590 query_parts.append(u"LIMIT ?") 597 query_parts.append(u"LIMIT ?")
591 values.append(limit) 598 values.append(limit)
592 599
898 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s" 905 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s"
899 906
900 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN') 907 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN')
901 TMP_TABLE = "tmp_sat_update" 908 TMP_TABLE = "tmp_sat_update"
902 909
903 def __init__(self, dbpool, sat_version): 910 def __init__(self, sqlite_storage, sat_version):
904 self._sat_version = sat_version 911 self._sat_version = sat_version
905 self.dbpool = dbpool 912 self.sqlite_storage = sqlite_storage
913
914 @property
915 def dbpool(self):
916 return self.sqlite_storage.dbpool
906 917
907 def getLocalVersion(self): 918 def getLocalVersion(self):
908 """ Get local database version 919 """ Get local database version
909 920
910 @return: version (int) 921 @return: version (int)
977 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1): 988 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1):
978 try: 989 try:
979 update_data = DATABASE_SCHEMAS[version] 990 update_data = DATABASE_SCHEMAS[version]
980 except KeyError: 991 except KeyError:
981 raise exceptions.InternalError("Missing update definition (version %d)" % version) 992 raise exceptions.InternalError("Missing update definition (version %d)" % version)
993 if "specific" in update_data:
994 # if we have a specific, we must commit current statements
995 # because a specific may modify database itself, and the database
996 # must be in the expected state of the previous version.
997 yield self.sqlite_storage.commitStatements(update_raw)
998 del update_raw[:]
982 update_raw_step = yield self.update2raw(update_data) 999 update_raw_step = yield self.update2raw(update_data)
983 update_raw.extend(update_raw_step) 1000 if update_raw_step is not None:
1001 # can be None with specifics
1002 update_raw.extend(update_raw_step)
984 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION) 1003 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION)
985 defer.returnValue(update_raw) 1004 defer.returnValue(update_raw)
986 1005
987 @staticmethod 1006 @staticmethod
988 def createData2Raw(data): 1007 def createData2Raw(data):
1169 cmds = yield getattr(self, specific)() 1188 cmds = yield getattr(self, specific)()
1170 ret.extend(cmds or []) 1189 ret.extend(cmds or [])
1171 defer.returnValue(ret) 1190 defer.returnValue(ret)
1172 1191
1173 @defer.inlineCallbacks 1192 @defer.inlineCallbacks
1193 def update_v7(self):
1194 """Update database from v6 to v7 (history unique constraint change)"""
1195 log.info(u"Database update to v7, this may take a while")
1196 # we have to rename table we will replace
1197 # tables referencing history need to be replaced to, else reference would
1198 # be to the old table (which will be dropped at the end). This buggy behaviour
1199 # seems to be fixed in new version of Sqlite
1200 yield self.dbpool.runQuery("ALTER TABLE history RENAME TO history_old")
1201 yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old")
1202 yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old")
1203 yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old")
1204
1205 # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema
1206 # normally database should not contain any, but better safe than sorry.
1207 rows = yield self.dbpool.runQuery(
1208 u"SELECT stanza_id, COUNT(*) as c FROM history_old WHERE stanza_id is not NULL "
1209 u"GROUP BY stanza_id HAVING c>1")
1210 if rows:
1211 count = sum([r[1] for r in rows]) - len(rows)
1212 log.info(u"{count} duplicate stanzas found, cleaning".format(count=count))
1213 for stanza_id, count in rows:
1214 log.info(u"cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id))
1215 row_uids = yield self.dbpool.runQuery(
1216 "SELECT uid FROM history_old WHERE stanza_id = ? LIMIT ?",
1217 (stanza_id, count-1))
1218 uids = [r[0] for r in row_uids]
1219 yield self.dbpool.runQuery(
1220 "DELETE FROM history_old WHERE uid IN ({})".format(u",".join(u"?"*len(uids))),
1221 uids)
1222 log.info(u"Cleaning done")
1223
1224 # history
1225 query = (u"CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, "
1226 u"update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, "
1227 u"source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, "
1228 u"received_timestamp DATETIME, type TEXT, extra BLOB, "
1229 u"FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, "
1230 u"FOREIGN KEY(type) REFERENCES message_types(type), "
1231 u"UNIQUE (profile_id, stanza_id, source, dest))")
1232 yield self.dbpool.runQuery(query)
1233
1234 # message
1235 query = (u"CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
1236 u", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES "
1237 u"history(uid) ON DELETE CASCADE)")
1238 yield self.dbpool.runQuery(query)
1239
1240 # subject
1241 query = (u"CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
1242 u", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES "
1243 u"history(uid) ON DELETE CASCADE)")
1244 yield self.dbpool.runQuery(query)
1245
1246 # thread
1247 query = (u"CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
1248 u", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES "
1249 u"history(uid) ON DELETE CASCADE)")
1250 yield self.dbpool.runQuery(query)
1251
1252 log.info(u"Now transfering old data to new tables, this can be really long "
1253 u"depending on your history size, please be patient.")
1254
1255 log.info(u"\nTransfering table history")
1256 query = (u"INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, "
1257 u"dest, source_res, dest_res, timestamp, received_timestamp, type, extra"
1258 u") SELECT uid, stanza_id, update_uid, profile_id, source, dest, "
1259 u"source_res, dest_res, timestamp, received_timestamp, type, extra "
1260 u"FROM history_old")
1261 yield self.dbpool.runQuery(query)
1262
1263 log.info(u"\nTransfering table message")
1264 query = (u"INSERT INTO message (id, history_uid, message, language) SELECT id, "
1265 u"history_uid, message, language FROM message_old")
1266 yield self.dbpool.runQuery(query)
1267
1268 log.info(u"\nTransfering table subject")
1269 query = (u"INSERT INTO subject (id, history_uid, subject, language) SELECT id, "
1270 u"history_uid, subject, language FROM subject_old")
1271 yield self.dbpool.runQuery(query)
1272
1273 log.info(u"\nTransfering table thread")
1274 query = (u"INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id"
1275 u", history_uid, thread_id, parent_id FROM thread_old")
1276 yield self.dbpool.runQuery(query)
1277
1278 yield self.dbpool.runQuery("DROP TABLE history_old")
1279 yield self.dbpool.runQuery("DROP TABLE message_old")
1280 yield self.dbpool.runQuery("DROP TABLE subject_old")
1281 yield self.dbpool.runQuery("DROP TABLE thread_old")
1282 log.info(u"Database update done :)")
1283
1284 @defer.inlineCallbacks
1174 def update_v3(self): 1285 def update_v3(self):
1175 """Update database from v2 to v3 (message refactoring)""" 1286 """Update database from v2 to v3 (message refactoring)"""
1176 # XXX: this update do all the messages in one huge transaction 1287 # XXX: this update do all the messages in one huge transaction
1177 # this is really memory consuming, but was OK on a reasonably 1288 # this is really memory consuming, but was OK on a reasonably
1178 # big database for tests. If issues are happening, we can cut it 1289 # big database for tests. If issues are happening, we can cut it