Mercurial > libervia-backend
diff sat/memory/sqlite.py @ 2699:310e41bd6666
core (memory/sqlite): added stanza_id:
/!\ database schema change /!\
stanza_id is a new field in history added to prepare the implementation of MAM for messages.
A new "last_stanza_id" can be used in filters to retrieve last message with a know stanza id (useful for history synchronisation).
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 01 Dec 2018 10:08:17 +0100 |
parents | 26edcf3a30eb |
children | 9adf44996e58 |
line wrap: on
line diff
--- a/sat/memory/sqlite.py Sat Dec 01 10:04:17 2018 +0100 +++ b/sat/memory/sqlite.py Sat Dec 01 10:08:17 2018 +0100 @@ -36,7 +36,7 @@ import sqlite3 import json -CURRENT_DB_VERSION = 5 +CURRENT_DB_VERSION = 6 # XXX: DATABASE schemas are used in the following way: # - 'current' key is for the actual database schema, for a new base @@ -56,7 +56,7 @@ ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), ('message_types', (("type TEXT PRIMARY KEY",), ())), - ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", + ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) "type TEXT", "extra BLOB"), ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", @@ -98,6 +98,8 @@ )), )), }, + 6: {'cols create': {'history': ('stanza_id TEXT',)}, + }, 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), @@ -117,7 +119,7 @@ }, } -NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field +NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field # this is specific to this sqlite storage and for now only used for received_timestamp # because this value is stored in a separate field @@ -423,8 +425,8 @@ extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) from_jid = data['from'] to_jid = data['to'] - d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", - (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) + d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", + (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) d.addErrback(self._logHistoryError, from_jid, to_jid, data) return d @@ -434,7 +436,7 @@ result = [] current = {'uid': None} for row in reversed(query_result): - uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ + uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row if uid != current['uid']: # new message @@ -452,6 +454,8 @@ 'extra': extra, 'timestamp': timestamp, } + if stanza_id is not None: + current['extra']['stanza_id'] = stanza_id if update_uid is not None: current['extra']['update_uid'] = update_uid if received_timestamp is not None: @@ -492,7 +496,7 @@ - 0 for no message (returns the empty list) - None for unlimited @param between (bool): confound source and dest (ignore the direction) - @param search (unicode): pattern to filter the history results + @param filters (dict[unicode, unicode]): pattern to filter the history results @param profile (unicode): %(doc_profile)s @return: list of tuple as in [messageNew] """ @@ -502,29 +506,54 @@ if limit == 0: return defer.succeed([]) - query_parts = [u"SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ + query_parts = [u"SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ FROM history LEFT JOIN message ON history.uid = message.history_uid\ LEFT JOIN subject ON history.uid=subject.history_uid\ LEFT JOIN thread ON history.uid=thread.history_uid\ - WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here + WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here values = [self.profiles[profile]] - def test_jid(type_, _jid): - values.append(_jid.userhost()) - if _jid.resource: - values.append(_jid.resource) - return u'(%s=? AND %s_res=?)' % (type_, type_) - return u'%s=?' % (type_, ) + def test_jid(type_, jid_): + values.append(jid_.userhost()) + if jid_.resource: + values.append(jid_.resource) + return u'({type_}=? AND {type_}_res=?)'.format(type_=type_) + return u'{type_}=?'.format(type_=type_) - if between: - query_parts.append(u"((%s AND %s) OR (%s AND %s))" % (test_jid('source', from_jid), - test_jid('dest', to_jid), - test_jid('source', to_jid), - test_jid('dest', from_jid))) + if not from_jid and not to_jid: + # not jid specified, we want all one2one communications + pass + elif between: + if not from_jid or not to_jid: + # we only have one jid specified, we check all messages + # from or to this jid + jid_ = from_jid or to_jid + query_parts.append(u"AND ({source} OR {dest})".format( + source=test_jid(u'source', jid_), + dest=test_jid(u'dest' , jid_))) + else: + # we have 2 jids specified, we check all communications between + # those 2 jids + query_parts.append( + u"AND (({source_from} AND {dest_to}) " + u"OR ({source_to} AND {dest_from}))".format( + source_from=test_jid('source', from_jid), + dest_to=test_jid('dest', to_jid), + source_to=test_jid('source', to_jid), + dest_from=test_jid('dest', from_jid))) else: - query_parts.append(u"%s AND %s" % (test_jid('source', from_jid), - test_jid('dest', to_jid))) + # we want one communication in specific direction (from somebody or + # to somebody). + q = [] + if from_jid is not None: + q.append(test_jid('source', from_jid)) + if to_jid is not None: + q.append(test_jid('dest', to_jid)) + query_parts.append(u"AND " + u" AND ".join(q)) + + # set to True if "ORDER BY" is already added + order = False if filters: if 'body' in filters: @@ -542,10 +571,20 @@ types = filters['not_types'].split() query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) values.extend(types) - + if 'last_stanza_id' in filters: + # this request get the last message with a "stanza_id" that we + # have in history. This is mainly used to retrieve messages sent + # while we were offline, using MAM (XEP-0313). + if (filters[u'last_stanza_id'] is not True + or from_jid is not None or to_jid is not None + or limit != 1): + raise ValueError(u"Unexpected values for last_stanza_id filter") + query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC") + order = True - query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList - # we use DESC here so LIMIT keep the last messages + if not order: + query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList + # we use DESC here so LIMIT keep the last messages if limit is not None: query_parts.append(u"LIMIT ?") values.append(limit)