Mercurial > libervia-backend
comparison sat/memory/sqlite.py @ 2716:06160b529da6
core (memory/sqlite): changed history constraint
/!\ Database schema change /!\
History was using a unique constraint on `profile_id, timestamp, source, dest, source_res, dest_res`, which can cause trouble because several messages send quickly by the same person can have a common timestamp (specially with delayed messages where precision is second), resulting in message loss.
The new constraint use `profile_id, stanza_id, source, dest` where `stanza_id` is XEP-0359 stanza_id, so it's unique by definition, and no message should be lost anymore.
Because sqlite doesn't support altering table with a constraint change, we have to create new tables and copy old data to new one, which can be pretty long.
Sqlite update mechanism with "specifics" has been fixed when several updates are applied (e.g. moving from v5 to v7) and a specific is in the workflow.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 09 Dec 2018 14:07:26 +0100 |
parents | b35c84ea73cf |
children | 453a12ff6f51 |
comparison
equal
deleted
inserted
replaced
2715:b35c84ea73cf | 2716:06160b529da6 |
---|---|
34 import cPickle as pickle | 34 import cPickle as pickle |
35 import hashlib | 35 import hashlib |
36 import sqlite3 | 36 import sqlite3 |
37 import json | 37 import json |
38 | 38 |
39 CURRENT_DB_VERSION = 6 | 39 CURRENT_DB_VERSION = 7 |
40 | 40 |
41 # XXX: DATABASE schemas are used in the following way: | 41 # XXX: DATABASE schemas are used in the following way: |
42 # - 'current' key is for the actual database schema, for a new base | 42 # - 'current' key is for the actual database schema, for a new base |
43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update | 43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update |
44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed | 44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed |
58 ())), | 58 ())), |
59 ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", | 59 ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", |
60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) | 60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) |
61 "type TEXT", "extra BLOB"), | 61 "type TEXT", "extra BLOB"), |
62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", | 62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", |
63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones) | 63 "UNIQUE (profile_id, stanza_id, source, dest)" # avoid storing 2 times the same message |
64 ))), | 64 ))), |
65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), | 65 ('message', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"), |
66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | 66 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), |
67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), | 67 ('subject', (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"), |
68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), | 68 ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))), |
96 ("'normal'",), | 96 ("'normal'",), |
97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC | 97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC |
98 )), | 98 )), |
99 )), | 99 )), |
100 }, | 100 }, |
101 7: {'specific': 'update_v7' | |
102 }, | |
101 6: {'cols create': {'history': ('stanza_id TEXT',)}, | 103 6: {'cols create': {'history': ('stanza_id TEXT',)}, |
102 }, | 104 }, |
103 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", | 105 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", |
104 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( | 106 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( |
105 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), | 107 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), |
178 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE'])) | 180 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE'])) |
179 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT'])) | 181 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT'])) |
180 return database_creation | 182 return database_creation |
181 | 183 |
182 def getUpdateSql(): | 184 def getUpdateSql(): |
183 updater = Updater(self.dbpool, sat_version) | 185 updater = Updater(self, sat_version) |
184 return updater.checkUpdates() | 186 return updater.checkUpdates() |
185 | 187 |
186 def commitStatements(statements): | |
187 | |
188 if statements is None: | |
189 return defer.succeed(None) | |
190 log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) | |
191 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) | |
192 return d | |
193 | |
194 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done | 188 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done |
195 | 189 |
196 init_defer = defer.succeed(None) | 190 init_defer = defer.succeed(None) |
197 | 191 |
198 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) | 192 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql()) |
199 init_defer.addCallback(commitStatements) | 193 init_defer.addCallback(self.commitStatements) |
200 | 194 |
201 def fillProfileCache(ignore): | 195 def fillProfileCache(ignore): |
202 d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles) | 196 d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles) |
203 d.chainDeferred(self.initialized) | 197 d.chainDeferred(self.initialized) |
204 | 198 |
205 init_defer.addCallback(fillProfileCache) | 199 init_defer.addCallback(fillProfileCache) |
200 | |
201 def commitStatements(self, statements): | |
202 | |
203 if statements is None: | |
204 return defer.succeed(None) | |
205 log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) | |
206 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) | |
207 return d | |
206 | 208 |
207 def _updateDb(self, interaction, statements): | 209 def _updateDb(self, interaction, statements): |
208 for statement in statements: | 210 for statement in statements: |
209 interaction.execute(statement) | 211 interaction.execute(statement) |
210 | 212 |
582 query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC") | 584 query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC") |
583 order = True | 585 order = True |
584 | 586 |
585 | 587 |
586 if not order: | 588 if not order: |
587 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList | 589 # timestamp may be identical for 2 close message (specially when delay is used) |
590 # that's why we order ties by rowid (which is in the same order as received_timestamp | |
591 # but has an index so is quick to order). | |
592 | |
593 # We'll reverse the order in sqliteHistoryToList | |
594 query_parts.append(u"ORDER BY timestamp DESC, history.rowid DESC") | |
588 # we use DESC here so LIMIT keep the last messages | 595 # we use DESC here so LIMIT keep the last messages |
589 if limit is not None: | 596 if limit is not None: |
590 query_parts.append(u"LIMIT ?") | 597 query_parts.append(u"LIMIT ?") |
591 values.append(limit) | 598 values.append(limit) |
592 | 599 |
898 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s" | 905 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s" |
899 | 906 |
900 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN') | 907 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN') |
901 TMP_TABLE = "tmp_sat_update" | 908 TMP_TABLE = "tmp_sat_update" |
902 | 909 |
903 def __init__(self, dbpool, sat_version): | 910 def __init__(self, sqlite_storage, sat_version): |
904 self._sat_version = sat_version | 911 self._sat_version = sat_version |
905 self.dbpool = dbpool | 912 self.sqlite_storage = sqlite_storage |
913 | |
914 @property | |
915 def dbpool(self): | |
916 return self.sqlite_storage.dbpool | |
906 | 917 |
907 def getLocalVersion(self): | 918 def getLocalVersion(self): |
908 """ Get local database version | 919 """ Get local database version |
909 | 920 |
910 @return: version (int) | 921 @return: version (int) |
977 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1): | 988 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1): |
978 try: | 989 try: |
979 update_data = DATABASE_SCHEMAS[version] | 990 update_data = DATABASE_SCHEMAS[version] |
980 except KeyError: | 991 except KeyError: |
981 raise exceptions.InternalError("Missing update definition (version %d)" % version) | 992 raise exceptions.InternalError("Missing update definition (version %d)" % version) |
993 if "specific" in update_data: | |
994 # if we have a specific, we must commit current statements | |
995 # because a specific may modify database itself, and the database | |
996 # must be in the expected state of the previous version. | |
997 yield self.sqlite_storage.commitStatements(update_raw) | |
998 del update_raw[:] | |
982 update_raw_step = yield self.update2raw(update_data) | 999 update_raw_step = yield self.update2raw(update_data) |
983 update_raw.extend(update_raw_step) | 1000 if update_raw_step is not None: |
1001 # can be None with specifics | |
1002 update_raw.extend(update_raw_step) | |
984 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION) | 1003 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION) |
985 defer.returnValue(update_raw) | 1004 defer.returnValue(update_raw) |
986 | 1005 |
987 @staticmethod | 1006 @staticmethod |
988 def createData2Raw(data): | 1007 def createData2Raw(data): |
1169 cmds = yield getattr(self, specific)() | 1188 cmds = yield getattr(self, specific)() |
1170 ret.extend(cmds or []) | 1189 ret.extend(cmds or []) |
1171 defer.returnValue(ret) | 1190 defer.returnValue(ret) |
1172 | 1191 |
1173 @defer.inlineCallbacks | 1192 @defer.inlineCallbacks |
1193 def update_v7(self): | |
1194 """Update database from v6 to v7 (history unique constraint change)""" | |
1195 log.info(u"Database update to v7, this may take a while") | |
1196 # we have to rename table we will replace | |
1197 # tables referencing history need to be replaced to, else reference would | |
1198 # be to the old table (which will be dropped at the end). This buggy behaviour | |
1199 # seems to be fixed in new version of Sqlite | |
1200 yield self.dbpool.runQuery("ALTER TABLE history RENAME TO history_old") | |
1201 yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old") | |
1202 yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old") | |
1203 yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old") | |
1204 | |
1205 # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema | |
1206 # normally database should not contain any, but better safe than sorry. | |
1207 rows = yield self.dbpool.runQuery( | |
1208 u"SELECT stanza_id, COUNT(*) as c FROM history_old WHERE stanza_id is not NULL " | |
1209 u"GROUP BY stanza_id HAVING c>1") | |
1210 if rows: | |
1211 count = sum([r[1] for r in rows]) - len(rows) | |
1212 log.info(u"{count} duplicate stanzas found, cleaning".format(count=count)) | |
1213 for stanza_id, count in rows: | |
1214 log.info(u"cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id)) | |
1215 row_uids = yield self.dbpool.runQuery( | |
1216 "SELECT uid FROM history_old WHERE stanza_id = ? LIMIT ?", | |
1217 (stanza_id, count-1)) | |
1218 uids = [r[0] for r in row_uids] | |
1219 yield self.dbpool.runQuery( | |
1220 "DELETE FROM history_old WHERE uid IN ({})".format(u",".join(u"?"*len(uids))), | |
1221 uids) | |
1222 log.info(u"Cleaning done") | |
1223 | |
1224 # history | |
1225 query = (u"CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, " | |
1226 u"update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, " | |
1227 u"source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, " | |
1228 u"received_timestamp DATETIME, type TEXT, extra BLOB, " | |
1229 u"FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, " | |
1230 u"FOREIGN KEY(type) REFERENCES message_types(type), " | |
1231 u"UNIQUE (profile_id, stanza_id, source, dest))") | |
1232 yield self.dbpool.runQuery(query) | |
1233 | |
1234 # message | |
1235 query = (u"CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" | |
1236 u", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " | |
1237 u"history(uid) ON DELETE CASCADE)") | |
1238 yield self.dbpool.runQuery(query) | |
1239 | |
1240 # subject | |
1241 query = (u"CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" | |
1242 u", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " | |
1243 u"history(uid) ON DELETE CASCADE)") | |
1244 yield self.dbpool.runQuery(query) | |
1245 | |
1246 # thread | |
1247 query = (u"CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" | |
1248 u", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES " | |
1249 u"history(uid) ON DELETE CASCADE)") | |
1250 yield self.dbpool.runQuery(query) | |
1251 | |
1252 log.info(u"Now transfering old data to new tables, this can be really long " | |
1253 u"depending on your history size, please be patient.") | |
1254 | |
1255 log.info(u"\nTransfering table history") | |
1256 query = (u"INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, " | |
1257 u"dest, source_res, dest_res, timestamp, received_timestamp, type, extra" | |
1258 u") SELECT uid, stanza_id, update_uid, profile_id, source, dest, " | |
1259 u"source_res, dest_res, timestamp, received_timestamp, type, extra " | |
1260 u"FROM history_old") | |
1261 yield self.dbpool.runQuery(query) | |
1262 | |
1263 log.info(u"\nTransfering table message") | |
1264 query = (u"INSERT INTO message (id, history_uid, message, language) SELECT id, " | |
1265 u"history_uid, message, language FROM message_old") | |
1266 yield self.dbpool.runQuery(query) | |
1267 | |
1268 log.info(u"\nTransfering table subject") | |
1269 query = (u"INSERT INTO subject (id, history_uid, subject, language) SELECT id, " | |
1270 u"history_uid, subject, language FROM subject_old") | |
1271 yield self.dbpool.runQuery(query) | |
1272 | |
1273 log.info(u"\nTransfering table thread") | |
1274 query = (u"INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id" | |
1275 u", history_uid, thread_id, parent_id FROM thread_old") | |
1276 yield self.dbpool.runQuery(query) | |
1277 | |
1278 yield self.dbpool.runQuery("DROP TABLE history_old") | |
1279 yield self.dbpool.runQuery("DROP TABLE message_old") | |
1280 yield self.dbpool.runQuery("DROP TABLE subject_old") | |
1281 yield self.dbpool.runQuery("DROP TABLE thread_old") | |
1282 log.info(u"Database update done :)") | |
1283 | |
1284 @defer.inlineCallbacks | |
1174 def update_v3(self): | 1285 def update_v3(self): |
1175 """Update database from v2 to v3 (message refactoring)""" | 1286 """Update database from v2 to v3 (message refactoring)""" |
1176 # XXX: this update do all the messages in one huge transaction | 1287 # XXX: this update do all the messages in one huge transaction |
1177 # this is really memory consuming, but was OK on a reasonably | 1288 # this is really memory consuming, but was OK on a reasonably |
1178 # big database for tests. If issues are happening, we can cut it | 1289 # big database for tests. If issues are happening, we can cut it |