changeset 2716:06160b529da6

core (memory/sqlite): changed history constraint /!\ Database schema change /!\ History was using a unique constraint on `profile_id, timestamp, source, dest, source_res, dest_res`, which can cause trouble because several messages send quickly by the same person can have a common timestamp (specially with delayed messages where precision is second), resulting in message loss. The new constraint use `profile_id, stanza_id, source, dest` where `stanza_id` is XEP-0359 stanza_id, so it's unique by definition, and no message should be lost anymore. Because sqlite doesn't support altering table with a constraint change, we have to create new tables and copy old data to new one, which can be pretty long. Sqlite update mechanism with "specifics" has been fixed when several updates are applied (e.g. moving from v5 to v7) and a specific is in the workflow.
author Goffi <goffi@goffi.org>
date Sun, 09 Dec 2018 14:07:26 +0100
parents b35c84ea73cf
children e3f6de6ce320
files sat/memory/sqlite.py
diffstat 1 files changed, 127 insertions(+), 16 deletions(-) [+]
line wrap: on
line diff
--- a/sat/memory/sqlite.py	Fri Dec 07 19:13:28 2018 +0100
+++ b/sat/memory/sqlite.py	Sun Dec 09 14:07:26 2018 +0100
@@ -36,7 +36,7 @@
 import sqlite3
 import json
 
-CURRENT_DB_VERSION = 6
+CURRENT_DB_VERSION = 7
 
 # XXX: DATABASE schemas are used in the following way:
 #      - 'current' key is for the actual database schema, for a new base
@@ -60,7 +60,7 @@
                                                     "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
                                                     "type TEXT", "extra BLOB"),
                                                    ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
-                                                    "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones)
+                                                    "UNIQUE (profile_id, stanza_id, source, dest)" # avoid storing 2 times the same message
                                                     ))),
                               ('message',        (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"),
                                                   ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
@@ -98,6 +98,8 @@
                                                 )),
                               )),
                     },
+        7:         {'specific': 'update_v7'
+                   },
         6:         {'cols create': {'history': ('stanza_id TEXT',)},
                    },
         5:         {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
@@ -180,23 +182,15 @@
             return database_creation
 
         def getUpdateSql():
-            updater = Updater(self.dbpool, sat_version)
+            updater = Updater(self, sat_version)
             return updater.checkUpdates()
 
-        def commitStatements(statements):
-
-            if statements is None:
-                return defer.succeed(None)
-            log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
-            d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
-            return d
-
         # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done
 
         init_defer = defer.succeed(None)
 
         init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql())
-        init_defer.addCallback(commitStatements)
+        init_defer.addCallback(self.commitStatements)
 
         def fillProfileCache(ignore):
             d = self.dbpool.runQuery("SELECT profile_id, entry_point FROM components").addCallback(self._cacheComponentsAndProfiles)
@@ -204,6 +198,14 @@
 
         init_defer.addCallback(fillProfileCache)
 
+    def commitStatements(self, statements):
+
+        if statements is None:
+            return defer.succeed(None)
+        log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
+        d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
+        return d
+
     def _updateDb(self, interaction, statements):
         for statement in statements:
             interaction.execute(statement)
@@ -584,7 +586,12 @@
 
 
         if not order:
-            query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
+            # timestamp may be identical for 2 close message (specially when delay is used)
+            # that's why we order ties by rowid (which is in the same order as received_timestamp
+            # but has an index so is quick to order).
+
+            # We'll reverse the order in sqliteHistoryToList
+            query_parts.append(u"ORDER BY timestamp DESC, history.rowid DESC")
                                                            # we use DESC here so LIMIT keep the last messages
         if limit is not None:
             query_parts.append(u"LIMIT ?")
@@ -900,9 +907,13 @@
     CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN')
     TMP_TABLE = "tmp_sat_update"
 
-    def __init__(self, dbpool, sat_version):
+    def __init__(self, sqlite_storage, sat_version):
         self._sat_version = sat_version
-        self.dbpool = dbpool
+        self.sqlite_storage = sqlite_storage
+
+    @property
+    def dbpool(self):
+        return self.sqlite_storage.dbpool
 
     def getLocalVersion(self):
         """ Get local database version
@@ -979,8 +990,16 @@
                         update_data = DATABASE_SCHEMAS[version]
                     except KeyError:
                         raise exceptions.InternalError("Missing update definition (version %d)" % version)
+                    if "specific" in update_data:
+                        # if we have a specific, we must commit current statements
+                        # because a specific may modify database itself, and the database
+                        # must be in the expected state of the previous version.
+                        yield self.sqlite_storage.commitStatements(update_raw)
+                        del update_raw[:]
                     update_raw_step = yield self.update2raw(update_data)
-                    update_raw.extend(update_raw_step)
+                    if update_raw_step is not None:
+                        # can be None with specifics
+                        update_raw.extend(update_raw_step)
                 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION)
                 defer.returnValue(update_raw)
 
@@ -1171,6 +1190,98 @@
         defer.returnValue(ret)
 
     @defer.inlineCallbacks
+    def update_v7(self):
+        """Update database from v6 to v7 (history unique constraint change)"""
+        log.info(u"Database update to v7, this may take a while")
+        # we have to rename table we will replace
+        # tables referencing history need to be replaced to, else reference would
+        # be to the old table (which will be dropped at the end). This buggy behaviour
+        # seems to be fixed in new version of Sqlite
+        yield self.dbpool.runQuery("ALTER TABLE history RENAME TO history_old")
+        yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old")
+        yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old")
+        yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old")
+
+        # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema
+        # normally database should not contain any, but better safe than sorry.
+        rows = yield self.dbpool.runQuery(
+            u"SELECT stanza_id, COUNT(*) as c FROM history_old WHERE stanza_id is not NULL "
+            u"GROUP BY stanza_id HAVING c>1")
+        if rows:
+            count = sum([r[1] for r in rows]) - len(rows)
+            log.info(u"{count} duplicate stanzas found, cleaning".format(count=count))
+            for stanza_id, count in rows:
+                log.info(u"cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id))
+                row_uids = yield self.dbpool.runQuery(
+                    "SELECT uid FROM history_old WHERE stanza_id = ? LIMIT ?",
+                    (stanza_id, count-1))
+                uids = [r[0] for r in row_uids]
+                yield self.dbpool.runQuery(
+                    "DELETE FROM history_old WHERE uid IN ({})".format(u",".join(u"?"*len(uids))),
+                    uids)
+            log.info(u"Cleaning done")
+
+        # history
+        query = (u"CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, "
+                 u"update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, "
+                 u"source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, "
+                 u"received_timestamp DATETIME, type TEXT, extra BLOB, "
+                 u"FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, "
+                 u"FOREIGN KEY(type) REFERENCES message_types(type), "
+                 u"UNIQUE (profile_id, stanza_id, source, dest))")
+        yield self.dbpool.runQuery(query)
+
+        # message
+        query = (u"CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
+                 u", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES "
+                 u"history(uid) ON DELETE CASCADE)")
+        yield self.dbpool.runQuery(query)
+
+        # subject
+        query = (u"CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
+                 u", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES "
+                 u"history(uid) ON DELETE CASCADE)")
+        yield self.dbpool.runQuery(query)
+
+        # thread
+        query = (u"CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
+                 u", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES "
+                 u"history(uid) ON DELETE CASCADE)")
+        yield self.dbpool.runQuery(query)
+
+        log.info(u"Now transfering old data to new tables, this can be really long "
+                 u"depending on your history size, please be patient.")
+
+        log.info(u"\nTransfering table history")
+        query = (u"INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, "
+                 u"dest, source_res, dest_res, timestamp, received_timestamp, type, extra"
+                 u") SELECT uid, stanza_id, update_uid, profile_id, source, dest, "
+                 u"source_res, dest_res, timestamp, received_timestamp, type, extra "
+                 u"FROM history_old")
+        yield self.dbpool.runQuery(query)
+
+        log.info(u"\nTransfering table message")
+        query = (u"INSERT INTO message (id, history_uid, message, language) SELECT id, "
+                 u"history_uid, message, language FROM message_old")
+        yield self.dbpool.runQuery(query)
+
+        log.info(u"\nTransfering table subject")
+        query = (u"INSERT INTO subject (id, history_uid, subject, language) SELECT id, "
+                 u"history_uid, subject, language FROM subject_old")
+        yield self.dbpool.runQuery(query)
+
+        log.info(u"\nTransfering table thread")
+        query = (u"INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id"
+                 u", history_uid, thread_id, parent_id FROM thread_old")
+        yield self.dbpool.runQuery(query)
+
+        yield self.dbpool.runQuery("DROP TABLE history_old")
+        yield self.dbpool.runQuery("DROP TABLE message_old")
+        yield self.dbpool.runQuery("DROP TABLE subject_old")
+        yield self.dbpool.runQuery("DROP TABLE thread_old")
+        log.info(u"Database update done :)")
+
+    @defer.inlineCallbacks
     def update_v3(self):
         """Update database from v2 to v3 (message refactoring)"""
         # XXX: this update do all the messages in one huge transaction