Mercurial > libervia-backend
comparison sat/memory/sqlite.py @ 2699:310e41bd6666
core (memory/sqlite): added stanza_id:
/!\ database schema change /!\
stanza_id is a new field in history added to prepare the implementation of MAM for messages.
A new "last_stanza_id" can be used in filters to retrieve last message with a know stanza id (useful for history synchronisation).
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 01 Dec 2018 10:08:17 +0100 |
parents | 26edcf3a30eb |
children | 9adf44996e58 |
comparison
equal
deleted
inserted
replaced
2698:5060cbeec01e | 2699:310e41bd6666 |
---|---|
34 import cPickle as pickle | 34 import cPickle as pickle |
35 import hashlib | 35 import hashlib |
36 import sqlite3 | 36 import sqlite3 |
37 import json | 37 import json |
38 | 38 |
39 CURRENT_DB_VERSION = 5 | 39 CURRENT_DB_VERSION = 6 |
40 | 40 |
41 # XXX: DATABASE schemas are used in the following way: | 41 # XXX: DATABASE schemas are used in the following way: |
42 # - 'current' key is for the actual database schema, for a new base | 42 # - 'current' key is for the actual database schema, for a new base |
43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update | 43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update |
44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed | 44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed |
54 ("UNIQUE (name)",))), | 54 ("UNIQUE (name)",))), |
55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"), | 55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"), |
56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), | 56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), |
57 ('message_types', (("type TEXT PRIMARY KEY",), | 57 ('message_types', (("type TEXT PRIMARY KEY",), |
58 ())), | 58 ())), |
59 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", | 59 ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", |
60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) | 60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) |
61 "type TEXT", "extra BLOB"), | 61 "type TEXT", "extra BLOB"), |
62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", | 62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", |
63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones) | 63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones) |
64 ))), | 64 ))), |
96 ("'normal'",), | 96 ("'normal'",), |
97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC | 97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC |
98 )), | 98 )), |
99 )), | 99 )), |
100 }, | 100 }, |
101 6: {'cols create': {'history': ('stanza_id TEXT',)}, | |
102 }, | |
101 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", | 103 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", |
102 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( | 104 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( |
103 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), | 105 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), |
104 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", | 106 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", |
105 "namespace TEXT", "mime_type TEXT", | 107 "namespace TEXT", "mime_type TEXT", |
115 }, | 117 }, |
116 1: {'cols create': {'history': ('extra BLOB',)}, | 118 1: {'cols create': {'history': ('extra BLOB',)}, |
117 }, | 119 }, |
118 } | 120 } |
119 | 121 |
120 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field | 122 NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field |
121 # this is specific to this sqlite storage and for now only used for received_timestamp | 123 # this is specific to this sqlite storage and for now only used for received_timestamp |
122 # because this value is stored in a separate field | 124 # because this value is stored in a separate field |
123 | 125 |
124 | 126 |
125 class ConnectionPool(adbapi.ConnectionPool): | 127 class ConnectionPool(adbapi.ConnectionPool): |
421 @param data(dict): message data as build by SatMessageProtocol.onMessage | 423 @param data(dict): message data as build by SatMessageProtocol.onMessage |
422 """ | 424 """ |
423 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) | 425 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) |
424 from_jid = data['from'] | 426 from_jid = data['from'] |
425 to_jid = data['to'] | 427 to_jid = data['to'] |
426 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", | 428 d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)", |
427 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) | 429 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) |
428 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) | 430 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) |
429 d.addErrback(self._logHistoryError, from_jid, to_jid, data) | 431 d.addErrback(self._logHistoryError, from_jid, to_jid, data) |
430 return d | 432 return d |
431 | 433 |
432 def sqliteHistoryToList(self, query_result): | 434 def sqliteHistoryToList(self, query_result): |
433 """Get SQL query result and return a list of message data dicts""" | 435 """Get SQL query result and return a list of message data dicts""" |
434 result = [] | 436 result = [] |
435 current = {'uid': None} | 437 current = {'uid': None} |
436 for row in reversed(query_result): | 438 for row in reversed(query_result): |
437 uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ | 439 uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ |
438 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row | 440 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row |
439 if uid != current['uid']: | 441 if uid != current['uid']: |
440 # new message | 442 # new message |
441 try: | 443 try: |
442 extra = pickle.loads(str(extra or "")) | 444 extra = pickle.loads(str(extra or "")) |
450 'subject': {}, | 452 'subject': {}, |
451 'type': type_, | 453 'type': type_, |
452 'extra': extra, | 454 'extra': extra, |
453 'timestamp': timestamp, | 455 'timestamp': timestamp, |
454 } | 456 } |
457 if stanza_id is not None: | |
458 current['extra']['stanza_id'] = stanza_id | |
455 if update_uid is not None: | 459 if update_uid is not None: |
456 current['extra']['update_uid'] = update_uid | 460 current['extra']['update_uid'] = update_uid |
457 if received_timestamp is not None: | 461 if received_timestamp is not None: |
458 current['extra']['received_timestamp'] = str(received_timestamp) | 462 current['extra']['received_timestamp'] = str(received_timestamp) |
459 result.append(current) | 463 result.append(current) |
490 @param to_jid (JID): dest JID (full, or bare for catchall) | 494 @param to_jid (JID): dest JID (full, or bare for catchall) |
491 @param limit (int): maximum number of messages to get: | 495 @param limit (int): maximum number of messages to get: |
492 - 0 for no message (returns the empty list) | 496 - 0 for no message (returns the empty list) |
493 - None for unlimited | 497 - None for unlimited |
494 @param between (bool): confound source and dest (ignore the direction) | 498 @param between (bool): confound source and dest (ignore the direction) |
495 @param search (unicode): pattern to filter the history results | 499 @param filters (dict[unicode, unicode]): pattern to filter the history results |
496 @param profile (unicode): %(doc_profile)s | 500 @param profile (unicode): %(doc_profile)s |
497 @return: list of tuple as in [messageNew] | 501 @return: list of tuple as in [messageNew] |
498 """ | 502 """ |
499 assert profile | 503 assert profile |
500 if filters is None: | 504 if filters is None: |
501 filters = {} | 505 filters = {} |
502 if limit == 0: | 506 if limit == 0: |
503 return defer.succeed([]) | 507 return defer.succeed([]) |
504 | 508 |
505 query_parts = [u"SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ | 509 query_parts = [u"SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ |
506 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ | 510 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ |
507 FROM history LEFT JOIN message ON history.uid = message.history_uid\ | 511 FROM history LEFT JOIN message ON history.uid = message.history_uid\ |
508 LEFT JOIN subject ON history.uid=subject.history_uid\ | 512 LEFT JOIN subject ON history.uid=subject.history_uid\ |
509 LEFT JOIN thread ON history.uid=thread.history_uid\ | 513 LEFT JOIN thread ON history.uid=thread.history_uid\ |
510 WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here | 514 WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here |
511 values = [self.profiles[profile]] | 515 values = [self.profiles[profile]] |
512 | 516 |
513 def test_jid(type_, _jid): | 517 def test_jid(type_, jid_): |
514 values.append(_jid.userhost()) | 518 values.append(jid_.userhost()) |
515 if _jid.resource: | 519 if jid_.resource: |
516 values.append(_jid.resource) | 520 values.append(jid_.resource) |
517 return u'(%s=? AND %s_res=?)' % (type_, type_) | 521 return u'({type_}=? AND {type_}_res=?)'.format(type_=type_) |
518 return u'%s=?' % (type_, ) | 522 return u'{type_}=?'.format(type_=type_) |
519 | 523 |
520 if between: | 524 if not from_jid and not to_jid: |
521 query_parts.append(u"((%s AND %s) OR (%s AND %s))" % (test_jid('source', from_jid), | 525 # not jid specified, we want all one2one communications |
522 test_jid('dest', to_jid), | 526 pass |
523 test_jid('source', to_jid), | 527 elif between: |
524 test_jid('dest', from_jid))) | 528 if not from_jid or not to_jid: |
529 # we only have one jid specified, we check all messages | |
530 # from or to this jid | |
531 jid_ = from_jid or to_jid | |
532 query_parts.append(u"AND ({source} OR {dest})".format( | |
533 source=test_jid(u'source', jid_), | |
534 dest=test_jid(u'dest' , jid_))) | |
535 else: | |
536 # we have 2 jids specified, we check all communications between | |
537 # those 2 jids | |
538 query_parts.append( | |
539 u"AND (({source_from} AND {dest_to}) " | |
540 u"OR ({source_to} AND {dest_from}))".format( | |
541 source_from=test_jid('source', from_jid), | |
542 dest_to=test_jid('dest', to_jid), | |
543 source_to=test_jid('source', to_jid), | |
544 dest_from=test_jid('dest', from_jid))) | |
525 else: | 545 else: |
526 query_parts.append(u"%s AND %s" % (test_jid('source', from_jid), | 546 # we want one communication in specific direction (from somebody or |
527 test_jid('dest', to_jid))) | 547 # to somebody). |
548 q = [] | |
549 if from_jid is not None: | |
550 q.append(test_jid('source', from_jid)) | |
551 if to_jid is not None: | |
552 q.append(test_jid('dest', to_jid)) | |
553 query_parts.append(u"AND " + u" AND ".join(q)) | |
554 | |
555 # set to True if "ORDER BY" is already added | |
556 order = False | |
528 | 557 |
529 if filters: | 558 if filters: |
530 if 'body' in filters: | 559 if 'body' in filters: |
531 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html | 560 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html |
532 query_parts.append(u"AND message LIKE ?") | 561 query_parts.append(u"AND message LIKE ?") |
540 values.extend(types) | 569 values.extend(types) |
541 if 'not_types' in filters: | 570 if 'not_types' in filters: |
542 types = filters['not_types'].split() | 571 types = filters['not_types'].split() |
543 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) | 572 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) |
544 values.extend(types) | 573 values.extend(types) |
545 | 574 if 'last_stanza_id' in filters: |
546 | 575 # this request get the last message with a "stanza_id" that we |
547 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList | 576 # have in history. This is mainly used to retrieve messages sent |
548 # we use DESC here so LIMIT keep the last messages | 577 # while we were offline, using MAM (XEP-0313). |
578 if (filters[u'last_stanza_id'] is not True | |
579 or from_jid is not None or to_jid is not None | |
580 or limit != 1): | |
581 raise ValueError(u"Unexpected values for last_stanza_id filter") | |
582 query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC") | |
583 order = True | |
584 | |
585 if not order: | |
586 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList | |
587 # we use DESC here so LIMIT keep the last messages | |
549 if limit is not None: | 588 if limit is not None: |
550 query_parts.append(u"LIMIT ?") | 589 query_parts.append(u"LIMIT ?") |
551 values.append(limit) | 590 values.append(limit) |
552 | 591 |
553 d = self.dbpool.runQuery(u" ".join(query_parts), values) | 592 d = self.dbpool.runQuery(u" ".join(query_parts), values) |