diff src/memory/sqlite.py @ 1955:633b5c21aefd

backend, frontend: messages refactoring (huge commit, not finished): /!\ database schema has been modified, do a backup before updating message have been refactored, here are the main changes: - languages are now handled - all messages have an uid (internal to SàT) - message updating is anticipated - subject is now first class - new naming scheme is used newMessage => messageNew, getHistory => historyGet, sendMessage => messageSend - minimal compatibility refactoring in quick_frontend/Primitivus, better refactoring should follow - threads handling - delayed messages are saved into history - info messages may also be saved in history (e.g. to keep track of people joining/leaving a room) - duplicate messages should be avoided - historyGet return messages in right order, no need to sort again - plugins have been updated to follow new features, some of them need to be reworked (e.g. OTR) - XEP-0203 (Delayed Delivery) is now fully handled in core, the plugin just handle disco and creation of a delay element - /!\ jp and Libervia are currently broken, as some features of Primitivus It has been put in one huge commit to avoid breaking messaging between changes. This is the main part of message refactoring, other commits will follow to take profit of the new features/behaviour.
author Goffi <goffi@goffi.org>
date Tue, 24 May 2016 22:11:04 +0200
parents 2daf7b4c6756
children c73e08094a95
line wrap: on
line diff
--- a/src/memory/sqlite.py	Mon Apr 18 18:35:19 2016 +0200
+++ b/src/memory/sqlite.py	Tue May 24 22:11:04 2016 +0200
@@ -27,13 +27,12 @@
 from twisted.enterprise import adbapi
 from twisted.internet import defer
 from collections import OrderedDict
-from time import time
 import re
 import os.path
 import cPickle as pickle
 import hashlib
 
-CURRENT_DB_VERSION = 2
+CURRENT_DB_VERSION = 3
 
 # XXX: DATABASE schemas are used in the following way:
 #      - 'current' key is for the actual database schema, for a new base
@@ -49,9 +48,18 @@
                               ('profiles',        (("id INTEGER PRIMARY KEY ASC", "name TEXT"),
                                                    ("UNIQUE (name)",))),
                               ('message_types',   (("type TEXT PRIMARY KEY",),
-                                                   tuple())),
-                              ('history',         (("id INTEGER PRIMARY KEY ASC", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", "timestamp DATETIME", "message TEXT", "type TEXT", "extra BLOB"),
-                                                   ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)"))),
+                                  tuple())),
+                              ('history',         (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
+                                                    "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
+                                                    "type TEXT", "extra BLOB"),
+                                                   ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
+                                                    "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed cones)
+                                                    ))),
+                              ('message',        (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "message TEXT", "language TEXT"),
+                                                  ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
+                              ('subject',        (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "subject TEXT", "language TEXT"),
+                                                  ("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
+                              ('thread',          (("id INTEGER PRIMARY KEY ASC", "history_uid INTEGER", "thread_id TEXT", "parent_id TEXT"),("FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE",))),
                               ('param_gen',       (("category TEXT", "name TEXT", "value TEXT"),
                                                    ("PRIMARY KEY (category,name)",))),
                               ('param_ind',       (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"),
@@ -66,15 +74,26 @@
                                                    ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")))
                               )),
                     'INSERT': OrderedDict((
-                              ('message_types', (("'chat'",), ("'error'",), ("'groupchat'",), ("'headline'",), ("'normal'",))),
+                              ('message_types', (("'chat'",),
+                                                 ("'error'",),
+                                                 ("'groupchat'",),
+                                                 ("'headline'",),
+                                                 ("'normal'",),
+                                                 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
+                                                )),
                               )),
                     },
+        3:         {'specific': 'update_v3'
+                   },
         2:         {'specific': 'update2raw_v2'
                    },
-        1:         {'cols create': {'history': ('extra BLOB',)}
+        1:         {'cols create': {'history': ('extra BLOB',)},
                    },
         }
 
+NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
+                                                    # this is specific to this sqlite storage and for now only used for received_timestamp
+                                                    # because this value is stored in a separate field
 
 class SqliteStorage(object):
     """This class manage storage with Sqlite database"""
@@ -114,7 +133,7 @@
 
             if statements is None:
                 return defer.succeed(None)
-            log.debug(u"===== COMMITING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
+            log.debug(u"===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
             d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
             return d
 
@@ -251,57 +270,131 @@
         return d
 
     #History
-    def addToHistory(self, from_jid, to_jid, message, _type='chat', extra=None, timestamp=None, profile=None):
+
+    def _addToHistoryCb(self, dummy, data):
+        # Message metadata were successfuly added to history
+        # now we can add message and subject
+        uid = data['uid']
+        for key in ('message', 'subject'):
+            for lang, value in data[key].iteritems():
+                d = self.dbpool.runQuery("INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)".format(key=key),
+                    (uid, value, lang or None))
+                d.addErrback(lambda dummy: log.error(_(u"Can't save following {key} in history (uid: {uid}, lang:{lang}): {value}".format(
+                    key=key, uid=uid, lang=lang, value=value))))
+        try:
+            thread = data['extra']['thread']
+        except KeyError:
+            pass
+        else:
+            thread_parent = data['extra'].get('thread_parent')
+            d = self.dbpool.runQuery("INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
+                (uid, thread, thread_parent))
+            d.addErrback(lambda dummy: log.error(_(u"Can't save following thread in history (uid: {uid}): thread:{thread}), parent:{parent}".format(
+                uid=uid, thread=thread, parent=thread_parent))))
+
+    def _addToHistoryEb(self, failure, data):
+        try:
+            sqlite_msg = failure.value.args[0]
+        except (AttributeError, IndexError):
+            raise failure
+        else:
+            if "UNIQUE constraint failed" in sqlite_msg:
+                log.debug(u"Message is already in history, not storing it again")
+            else:
+                log.error(u"Can't store message in history: {}".format(failure))
+
+    def addToHistory(self, data, profile):
         """Store a new message in history
-        @param from_jid: full source JID
-        @param to_jid: full dest JID
-        @param message: message
-        @param _type: message type (see RFC 6121 §5.2.2)
-        @param extra: dictionary (keys and values are unicode) of extra message data
-        @param timestamp: timestamp in seconds since epoch, or None to use current time
+
+        @param data(dict): message data as build by SatMessageProtocol.onMessage
         """
-        assert(profile)
-        if extra is None:
-            extra = {}
-        extra_ = pickle.dumps({k: v.encode('utf-8') for k, v in extra.items()}, 0).decode('utf-8')
-        d = self.dbpool.runQuery("INSERT INTO history(source, source_res, dest, dest_res, timestamp, message, type, extra, profile_id) VALUES (?,?,?,?,?,?,?,?,?)",
-                                 (from_jid.userhost(), from_jid.resource, to_jid.userhost(), to_jid.resource, timestamp or time(),
-                                  message, _type, extra_, self.profiles[profile]))
-        d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [%(from_jid)s] to [%(to_jid)s] ==> [%(message)s]" %
-                                          {"from_jid": from_jid.full(), "to_jid": to_jid.full(), "message": message})))
+        extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0)
+        from_jid = data['from']
+        to_jid = data['to']
+        d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)",
+                                 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], extra))
+        d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
+        d.addErrback(lambda ignore: log.error(_(u"Can't save following message in history: from [{from_jid}] to [{to_jid}] (uid: {uid})"
+            .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))))
         return d
 
-    def getHistory(self, from_jid, to_jid, limit=None, between=True, search=None, profile=None):
+    def sqliteHistoryToList(self, query_result):
+        """Get SQL query result and return a list of message data dicts"""
+        result = []
+        current = {'uid': None}
+        for row in reversed(query_result):
+            uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
+            type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row
+            if uid != current['uid']:
+                # new message
+                try:
+                    extra = pickle.loads(str(extra or ""))
+                except EOFError:
+                    extra = {}
+                current = {
+                    'from': "%s/%s" % (source, source_res) if source_res else source,
+                    'to': "%s/%s" % (dest, dest_res) if dest_res else dest,
+                    'uid': uid,
+                    'message': {},
+                    'subject': {},
+                    'type': type_,
+                    'extra': extra,
+                    'timestamp': timestamp,
+                    }
+                if update_uid is not None:
+                    current['extra']['update_uid'] = update_uid
+                if received_timestamp is not None:
+                    current['extra']['received_timestamp'] = str(received_timestamp)
+                result.append(current)
+
+            if message is not None:
+                current['message'][message_lang or ''] = message
+
+            if subject is not None:
+                current['subject'][subject_lang or ''] = subject
+
+            if thread is not None:
+                current_extra = current['extra']
+                current_extra['thread'] = thread
+                if thread_parent is not None:
+                    current_extra['thread_parent'] = thread_parent
+            else:
+                if thread_parent is not None:
+                    log.error(u"Database inconsistency: thread parent without thread (uid: {uid}, thread_parent: {parent})"
+                        .format(uid=uid, parent=thread_parent))
+
+        return result
+
+    def listDict2listTuple(self, messages_data):
+        """Return a list of tuple as used in bridge from a list of messages data"""
+        ret = []
+        for m in messages_data:
+            ret.append((m['uid'], m['timestamp'], m['from'], m['to'], m['message'], m['subject'], m['type'], m['extra']))
+        return ret
+
+    def historyGet(self, from_jid, to_jid, limit=None, between=True, search=None, profile=None):
         """Retrieve messages in history
+
         @param from_jid (JID): source JID (full, or bare for catchall)
         @param to_jid (JID): dest JID (full, or bare for catchall)
         @param limit (int): maximum number of messages to get:
             - 0 for no message (returns the empty list)
             - None for unlimited
         @param between (bool): confound source and dest (ignore the direction)
-        @param search (str): pattern to filter the history results
-        @param profile (str): %(doc_profile)s
-        @return: list of tuple as in http://wiki.goffi.org/wiki/Bridge_API#getHistory
+        @param search (unicode): pattern to filter the history results
+        @param profile (unicode): %(doc_profile)s
+        @return: list of tuple as in [messageNew]
         """
-        assert(profile)
+        assert profile
         if limit == 0:
             return defer.succeed([])
 
-        def sqliteToList(query_result):
-            query_result.reverse()
-            result = []
-            for row in query_result:
-                timestamp, source, source_res, dest, dest_res, message, type_, extra_raw = row
-                try:
-                    extra = pickle.loads(str(extra_raw or ""))
-                except EOFError:
-                    extra = {}
-                result.append((timestamp, "%s/%s" % (source, source_res) if source_res else source,
-                                          "%s/%s" % (dest, dest_res) if dest_res else dest,
-                                          message, type_, extra))
-            return result
-
-        query_parts = ["SELECT timestamp, source, source_res, dest, dest_res, message, type, extra FROM history WHERE profile_id=? AND"]
+        query_parts = ["SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
+                        type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\
+                        FROM history LEFT JOIN message ON history.uid = message.history_uid\
+                        LEFT JOIN subject ON history.uid=subject.history_uid\
+                        LEFT JOIN thread ON history.uid=thread.history_uid\
+                        WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
         values = [self.profiles[profile]]
 
         def test_jid(type_, _jid):
@@ -324,14 +417,16 @@
             query_parts.append("AND message GLOB ?")
             values.append("*%s*" % search)
 
-        query_parts.append("ORDER BY timestamp DESC")
-
+        query_parts.append("ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
+                                                      # we use DESC here so LIMIT keep the last messages
         if limit is not None:
             query_parts.append("LIMIT ?")
             values.append(limit)
 
         d = self.dbpool.runQuery(" ".join(query_parts), values)
-        return d.addCallback(sqliteToList)
+        d.addCallback(self.sqliteHistoryToList)
+        d.addCallback(self.listDict2listTuple)
+        return d
 
     #Private values
     def loadGenPrivates(self, private_gen, namespace):
@@ -665,6 +760,7 @@
 
     def generateUpdateData(self, old_data, new_data, modify=False):
         """ Generate data for automatic update between two schema data
+
         @param old_data: data of the former schema (which must be updated)
         @param new_data: data of the current schema
         @param modify: if True, always use "cols modify" table, else try to ALTER tables
@@ -728,10 +824,10 @@
     @defer.inlineCallbacks
     def update2raw(self, update, dev_version=False):
         """ Transform update data to raw SQLite statements
+
         @param update: update data as returned by generateUpdateData
         @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database.
         @return: list of string with SQL statements needed to update the base
-
         """
         ret = self.createData2Raw(update.get('create', {}))
         drop = []
@@ -767,11 +863,104 @@
         specific = update.get('specific', None)
         if specific:
             cmds = yield getattr(self, specific)()
-            ret.extend(cmds)
+            ret.extend(cmds or [])
         defer.returnValue(ret)
 
+    @defer.inlineCallbacks
+    def update_v3(self):
+        """Update database from v2 to v3 (message refactoring)"""
+        # XXX: this update do all the messages in one huge transaction
+        #      this is really memory consuming, but was OK on a reasonably
+        #      big database for tests. If issues are happening, we can cut it
+        #      in smaller transactions using LIMIT and by deleting already updated
+        #      messages
+        log.info(u"Database update to v3, this may take a while")
+
+        # we need to fix duplicate timestamp, as it can result in conflicts with the new schema
+        rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1")
+        if rows:
+            log.info("fixing duplicate timestamp")
+            fixed = []
+            for timestamp, dummy in rows:
+                ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,))
+                for idx, (id_,) in enumerate(ids_rows):
+                    fixed.append(id_)
+                    yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_))
+            log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed])))
+
+        def historySchema(txn):
+            log.info(u"History schema update")
+            txn.execute("ALTER TABLE history RENAME TO tmp_sat_update")
+            txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))")
+            txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update")
+
+        yield self.dbpool.runInteraction(historySchema)
+
+        def newTables(txn):
+            log.info(u"Creating new tables")
+            txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
+            txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
+            txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
+
+        yield self.dbpool.runInteraction(newTables)
+
+        log.info(u"inserting new message type")
+        yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',))
+
+        log.info(u"messages update")
+        rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update")
+        total = len(rows)
+
+        def updateHistory(txn, queries):
+            for query, args in iter(queries):
+                txn.execute(query, args)
+
+        queries = []
+        for idx, row in enumerate(rows, 1):
+            if idx % 1000 == 0 or total - idx == 0:
+                log.info("preparing message {}/{}".format(idx, total))
+            id_, timestamp, message, extra = row
+            try:
+                extra = pickle.loads(str(extra or ""))
+            except EOFError:
+                extra = {}
+            except Exception:
+                log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_))
+                extra = {}
+
+            queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message)))
+
+            try:
+                subject = extra.pop('subject')
+            except KeyError:
+                pass
+            else:
+                try:
+                    subject = subject.decode('utf-8')
+                except UnicodeEncodeError:
+                    log.warning(u"Error while decoding subject, ignoring it")
+                    del extra['subject']
+                else:
+                    queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject)))
+
+            received_timestamp = extra.pop('timestamp', None)
+            try:
+                del extra['archive']
+            except KeyError:
+                # archive was not used
+                pass
+
+            queries.append(("UPDATE history SET received_timestamp=?,extra=? WHERE uid=?",(id_, received_timestamp, pickle.dumps(extra, 0))))
+
+        yield self.dbpool.runInteraction(updateHistory, queries)
+
+        log.info("Dropping temporary table")
+        yield self.dbpool.runQuery("DROP TABLE tmp_sat_update")
+        log.info("Database update finished :)")
+
     def update2raw_v2(self):
         """Update the database from v1 to v2 (add passwords encryptions):
+
             - the XMPP password value is re-used for the profile password (new parameter)
             - the profile password is stored hashed
             - the XMPP password is stored encrypted, with the profile password as key