diff sat/memory/sqlite.py @ 2787:298408833ec2

memory (sqlite): optimizations indexes were missing on foreign keys for "message", "subject" and "thread" tables, seriously impacting performances. In addition to those indexes, two indexes have been added to speed ordering by timestamp on "history", and one on "files" table. history.rowid is not used anymore as there is an index on (profile_id, received_timestamp) which will speed up the query. Primary keys order has been changed to use automatic index in most common cases (filtering by profile_id then namespace).
author Goffi <goffi@goffi.org>
date Sat, 19 Jan 2019 22:49:32 +0100
parents 003b8b4b56a7
children a425c1ca51d0
line wrap: on
line diff
--- a/sat/memory/sqlite.py	Sat Jan 19 11:39:02 2019 +0100
+++ b/sat/memory/sqlite.py	Sat Jan 19 22:49:32 2019 +0100
@@ -36,7 +36,7 @@
 import sqlite3
 import json
 
-CURRENT_DB_VERSION = 7
+CURRENT_DB_VERSION = 8
 
 # XXX: DATABASE schemas are used in the following way:
 #      - 'current' key is for the actual database schema, for a new base
@@ -45,6 +45,7 @@
 #      a 'current' data dict can contains the keys:
 #      - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints
 #      - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns)
+#      - 'INDEX':
 #      an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify', 'insert' or 'specific'. See Updater.generateUpdateData for more infos. This method can be used to autogenerate update_data, to ease the work of the developers.
 # TODO: indexes need to be improved
 
@@ -68,17 +69,17 @@
                                                   ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
                               ('thread',          (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
                               ('param_gen',       (("category TEXT", "name TEXT", "value TEXT"),
-                                                   ("PRIMARY KEY (category,name)",))),
+                                                   ("PRIMARY KEY (category, name)",))),
                               ('param_ind',       (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"),
-                                                   ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
+                                                   ("PRIMARY KEY (profile_id, category, name)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
                               ('private_gen',     (("namespace TEXT", "key TEXT", "value TEXT"),
                                                    ("PRIMARY KEY (namespace, key)",))),
                               ('private_ind',     (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"),
-                                                   ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
+                                                   ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
                               ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"),
                                                    ("PRIMARY KEY (namespace, key)",))),
                               ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"),
-                                                   ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
+                                                   ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
                               ('files',           (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
                                                     "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
                                                         file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
@@ -97,7 +98,15 @@
                                                  ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
                                                 )),
                               )),
+                    'INDEX': (('history', (('profile_id', 'timestamp'),
+                                           ('profile_id', 'received_timestamp'))),
+                              ('message', ('history_uid',)),
+                              ('subject', ('history_uid',)),
+                              ('thread', ('history_uid',)),
+                              ('files', ('profile_id', 'mime_type', 'owner', 'parent'))),
                     },
+        8:         {'specific': 'update_v8'
+                   },
         7:         {'specific': 'update_v7'
                    },
         6:         {'cols create': {'history': ('stanza_id TEXT',)},
@@ -201,6 +210,7 @@
             database_creation = ["PRAGMA user_version=%d" % CURRENT_DB_VERSION]
             database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE']))
             database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT']))
+            database_creation.extend(Updater.indexData2Raw(DATABASE_SCHEMAS['current']['INDEX']))
             return database_creation
 
         def getUpdateSql():
@@ -603,17 +613,15 @@
                 if (filters[u'last_stanza_id'] is not True
                     or limit != 1):
                     raise ValueError(u"Unexpected values for last_stanza_id filter")
-                query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC")
+                query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.received_timestamp DESC")
                 order = True
 
 
         if not order:
-            # timestamp may be identical for 2 close message (specially when delay is used)
-            # that's why we order ties by rowid (which is in the same order as received_timestamp
-            # but has an index so is quick to order).
-
+            # timestamp may be identical for 2 close messages (specially when delay is
+            # used) that's why we order ties by received_timestamp
             # We'll reverse the order in sqliteHistoryToList
-            query_parts.append(u"ORDER BY timestamp DESC, history.rowid DESC")
+            query_parts.append(u"ORDER BY timestamp DESC, history.received_timestamp DESC")
                                                            # we use DESC here so LIMIT keep the last messages
         if limit is not None:
             query_parts.append(u"LIMIT ?")
@@ -939,6 +947,7 @@
     clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$")
     CREATE_SQL = "CREATE TABLE %s (%s)"
     INSERT_SQL = "INSERT INTO %s VALUES (%s)"
+    INDEX_SQL = "CREATE INDEX %s ON %s(%s)"
     DROP_SQL = "DROP TABLE %s"
     ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s"
     RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s"
@@ -984,6 +993,9 @@
 
         @return: deferred which fire a list of SQL update statements, or None if no update is needed
         """
+        # TODO: only "table" type (i.e. "CREATE" statements) is checked,
+        #       "index" should be checked too.
+        #       This may be not relevant is we move to a higher level library (alchimia?)
         local_version = yield self.getLocalVersion()
         raw_local_sch = yield self.getLocalSchema()
 
@@ -994,7 +1006,7 @@
 
         # Force the update if the schemas are unchanged but a specific update is needed
         force_update = local_hash == current_hash and local_version < CURRENT_DB_VERSION \
-                        and 'specific' in DATABASE_SCHEMAS[CURRENT_DB_VERSION]
+                        and {'index', 'specific'}.intersection(DATABASE_SCHEMAS[CURRENT_DB_VERSION])
 
         if local_hash == current_hash and not force_update:
             if local_version != CURRENT_DB_VERSION:
@@ -1073,6 +1085,30 @@
                 ret.append(Updater.INSERT_SQL % (table, ', '.join(values)))
         return ret
 
+    @staticmethod
+    def indexData2Raw(data):
+        """ Generate SQL statements from statements data
+
+        @param data: dictionary with table as key, and statements data in tuples as value
+        @return: list of strings with raw statements
+        """
+        ret = []
+        assert isinstance(data, tuple)
+        for table, col_data in data:
+            assert isinstance(table, basestring)
+            assert isinstance(col_data, tuple)
+            for cols in col_data:
+                if isinstance(cols, tuple):
+                    assert all([isinstance(c, basestring) for c in cols])
+                    indexed_cols = u','.join(cols)
+                elif isinstance(cols, basestring):
+                    indexed_cols = cols
+                else:
+                    raise exceptions.InternalError(u"unexpected index columns value")
+                index_name = table + u'__' + indexed_cols.replace(u',', u'_')
+                ret.append(Updater.INDEX_SQL % (index_name, table, indexed_cols))
+        return ret
+
     def statementHash(self, data):
         """ Generate hash of template data
 
@@ -1222,12 +1258,60 @@
         insert = update.get('insert', {})
         ret.extend(self.insertData2Raw(insert))
 
+        index = update.get('index', tuple())
+        ret.extend(self.indexData2Raw(index))
+
         specific = update.get('specific', None)
         if specific:
             cmds = yield getattr(self, specific)()
             ret.extend(cmds or [])
         defer.returnValue(ret)
 
+    def update_v8(self):
+        """Update database from v7 to v8 (primary keys order changes + indexes)"""
+        log.info(u"Database update to v8")
+        statements = ["PRAGMA foreign_keys = OFF"]
+
+        # here is a copy of create and index data, we can't use "current" table
+        # because it may change in a future version, which would break the update
+        # when doing v8
+        create = {
+            'param_gen': (
+                ("category TEXT", "name TEXT", "value TEXT"),
+                ("PRIMARY KEY (category, name)",)),
+            'param_ind': (
+                ("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"),
+                ("PRIMARY KEY (profile_id, category, name)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")),
+            'private_ind': (
+                ("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"),
+                ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")),
+            'private_ind_bin': (
+                ("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"),
+                ("PRIMARY KEY (profile_id, namespace, key)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")),
+        }
+        index = (
+            ('history', (('profile_id', 'timestamp'),
+            ('profile_id', 'received_timestamp'))),
+            ('message', ('history_uid',)),
+            ('subject', ('history_uid',)),
+            ('thread', ('history_uid',)),
+            ('files', ('profile_id', 'mime_type', 'owner', 'parent')))
+
+        for table in ('param_gen', 'param_ind', 'private_ind', 'private_ind_bin'):
+            statements.append("ALTER TABLE {0} RENAME TO {0}_old".format(table))
+            schema = {table: create[table]}
+            cols = [d.split()[0] for d in schema[table][0]]
+            statements.extend(Updater.createData2Raw(schema))
+            statements.append(u"INSERT INTO {table}({cols}) "
+                              u"SELECT {cols} FROM {table}_old".format(
+                              table=table,
+                              cols=u','.join(cols)))
+            statements.append(u"DROP TABLE {}_old".format(table))
+
+        statements.extend(Updater.indexData2Raw(index))
+        statements.append("PRAGMA foreign_keys = ON")
+        return statements
+
     @defer.inlineCallbacks
     def update_v7(self):
         """Update database from v6 to v7 (history unique constraint change)"""