comparison sat/memory/sqlite.py @ 2699:310e41bd6666

core (memory/sqlite): added stanza_id: /!\ database schema change /!\ stanza_id is a new field in history added to prepare the implementation of MAM for messages. A new "last_stanza_id" can be used in filters to retrieve last message with a know stanza id (useful for history synchronisation).
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:08:17 +0100
parents 26edcf3a30eb
children 9adf44996e58
comparison
equal deleted inserted replaced
2698:5060cbeec01e 2699:310e41bd6666
34 import cPickle as pickle 34 import cPickle as pickle
35 import hashlib 35 import hashlib
36 import sqlite3 36 import sqlite3
37 import json 37 import json
38 38
39 CURRENT_DB_VERSION = 5 39 CURRENT_DB_VERSION = 6
40 40
41 # XXX: DATABASE schemas are used in the following way: 41 # XXX: DATABASE schemas are used in the following way:
42 # - 'current' key is for the actual database schema, for a new base 42 # - 'current' key is for the actual database schema, for a new base
43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update 43 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update
44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed 44 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed
54 ("UNIQUE (name)",))), 54 ("UNIQUE (name)",))),
55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"), 55 ('components', (("profile_id INTEGER PRIMARY KEY", "entry_point TEXT NOT NULL"),
56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))), 56 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE",))),
57 ('message_types', (("type TEXT PRIMARY KEY",), 57 ('message_types', (("type TEXT PRIMARY KEY",),
58 ())), 58 ())),
59 ('history', (("uid TEXT PRIMARY KEY", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", 59 ('history', (("uid TEXT PRIMARY KEY", "stanza_id TEXT", "update_uid TEXT", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT",
60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception) 60 "timestamp DATETIME NOT NULL", "received_timestamp DATETIME", # XXX: timestamp is the time when the message was emitted. If received time stamp is not NULL, the message was delayed and timestamp is the declared value (and received_timestamp the time of reception)
61 "type TEXT", "extra BLOB"), 61 "type TEXT", "extra BLOB"),
62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)", 62 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)",
63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones) 63 "UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res)" # avoid storing 2 time the same message (specially for delayed ones)
64 ))), 64 ))),
96 ("'normal'",), 96 ("'normal'",),
97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC 97 ("'info'",) # info is not standard, but used to keep track of info like join/leave in a MUC
98 )), 98 )),
99 )), 99 )),
100 }, 100 },
101 6: {'cols create': {'history': ('stanza_id TEXT',)},
102 },
101 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL", 103 5: {'create': {'files': (("id TEXT NOT NULL", "version TEXT NOT NULL", "parent TEXT NOT NULL",
102 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format( 104 "type TEXT CHECK(type in ('{file}', '{directory}')) NOT NULL DEFAULT '{file}'".format(
103 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY), 105 file=C.FILE_TYPE_FILE, directory=C.FILE_TYPE_DIRECTORY),
104 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER", 106 "file_hash TEXT", "hash_algo TEXT", "name TEXT NOT NULL", "size INTEGER",
105 "namespace TEXT", "mime_type TEXT", 107 "namespace TEXT", "mime_type TEXT",
115 }, 117 },
116 1: {'cols create': {'history': ('extra BLOB',)}, 118 1: {'cols create': {'history': ('extra BLOB',)},
117 }, 119 },
118 } 120 }
119 121
120 NOT_IN_EXTRA = ('received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field 122 NOT_IN_EXTRA = ('stanza_id', 'received_timestamp', 'update_uid') # keys which are in message data extra but not stored in sqlite's extra field
121 # this is specific to this sqlite storage and for now only used for received_timestamp 123 # this is specific to this sqlite storage and for now only used for received_timestamp
122 # because this value is stored in a separate field 124 # because this value is stored in a separate field
123 125
124 126
125 class ConnectionPool(adbapi.ConnectionPool): 127 class ConnectionPool(adbapi.ConnectionPool):
421 @param data(dict): message data as build by SatMessageProtocol.onMessage 423 @param data(dict): message data as build by SatMessageProtocol.onMessage
422 """ 424 """
423 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0) 425 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() if k not in NOT_IN_EXTRA}, 0)
424 from_jid = data['from'] 426 from_jid = data['from']
425 to_jid = data['to'] 427 to_jid = data['to']
426 d = self.dbpool.runQuery("INSERT INTO history(uid, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?)", 428 d = self.dbpool.runQuery("INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES (?,?,?,?,?,?,?,?,?,?,?,?)",
427 (data['uid'], data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) 429 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), self.profiles[profile], data['from'].userhost(), to_jid.userhost(), from_jid.resource, to_jid.resource, data['timestamp'], data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
428 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data]) 430 d.addCallbacks(self._addToHistoryCb, self._addToHistoryEb, callbackArgs=[data], errbackArgs=[data])
429 d.addErrback(self._logHistoryError, from_jid, to_jid, data) 431 d.addErrback(self._logHistoryError, from_jid, to_jid, data)
430 return d 432 return d
431 433
432 def sqliteHistoryToList(self, query_result): 434 def sqliteHistoryToList(self, query_result):
433 """Get SQL query result and return a list of message data dicts""" 435 """Get SQL query result and return a list of message data dicts"""
434 result = [] 436 result = []
435 current = {'uid': None} 437 current = {'uid': None}
436 for row in reversed(query_result): 438 for row in reversed(query_result):
437 uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ 439 uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
438 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row 440 type_, extra, message, message_lang, subject, subject_lang, thread, thread_parent = row
439 if uid != current['uid']: 441 if uid != current['uid']:
440 # new message 442 # new message
441 try: 443 try:
442 extra = pickle.loads(str(extra or "")) 444 extra = pickle.loads(str(extra or ""))
450 'subject': {}, 452 'subject': {},
451 'type': type_, 453 'type': type_,
452 'extra': extra, 454 'extra': extra,
453 'timestamp': timestamp, 455 'timestamp': timestamp,
454 } 456 }
457 if stanza_id is not None:
458 current['extra']['stanza_id'] = stanza_id
455 if update_uid is not None: 459 if update_uid is not None:
456 current['extra']['update_uid'] = update_uid 460 current['extra']['update_uid'] = update_uid
457 if received_timestamp is not None: 461 if received_timestamp is not None:
458 current['extra']['received_timestamp'] = str(received_timestamp) 462 current['extra']['received_timestamp'] = str(received_timestamp)
459 result.append(current) 463 result.append(current)
490 @param to_jid (JID): dest JID (full, or bare for catchall) 494 @param to_jid (JID): dest JID (full, or bare for catchall)
491 @param limit (int): maximum number of messages to get: 495 @param limit (int): maximum number of messages to get:
492 - 0 for no message (returns the empty list) 496 - 0 for no message (returns the empty list)
493 - None for unlimited 497 - None for unlimited
494 @param between (bool): confound source and dest (ignore the direction) 498 @param between (bool): confound source and dest (ignore the direction)
495 @param search (unicode): pattern to filter the history results 499 @param filters (dict[unicode, unicode]): pattern to filter the history results
496 @param profile (unicode): %(doc_profile)s 500 @param profile (unicode): %(doc_profile)s
497 @return: list of tuple as in [messageNew] 501 @return: list of tuple as in [messageNew]
498 """ 502 """
499 assert profile 503 assert profile
500 if filters is None: 504 if filters is None:
501 filters = {} 505 filters = {}
502 if limit == 0: 506 if limit == 0:
503 return defer.succeed([]) 507 return defer.succeed([])
504 508
505 query_parts = [u"SELECT uid, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ 509 query_parts = [u"SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
506 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ 510 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\
507 FROM history LEFT JOIN message ON history.uid = message.history_uid\ 511 FROM history LEFT JOIN message ON history.uid = message.history_uid\
508 LEFT JOIN subject ON history.uid=subject.history_uid\ 512 LEFT JOIN subject ON history.uid=subject.history_uid\
509 LEFT JOIN thread ON history.uid=thread.history_uid\ 513 LEFT JOIN thread ON history.uid=thread.history_uid\
510 WHERE profile_id=? AND"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here 514 WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
511 values = [self.profiles[profile]] 515 values = [self.profiles[profile]]
512 516
513 def test_jid(type_, _jid): 517 def test_jid(type_, jid_):
514 values.append(_jid.userhost()) 518 values.append(jid_.userhost())
515 if _jid.resource: 519 if jid_.resource:
516 values.append(_jid.resource) 520 values.append(jid_.resource)
517 return u'(%s=? AND %s_res=?)' % (type_, type_) 521 return u'({type_}=? AND {type_}_res=?)'.format(type_=type_)
518 return u'%s=?' % (type_, ) 522 return u'{type_}=?'.format(type_=type_)
519 523
520 if between: 524 if not from_jid and not to_jid:
521 query_parts.append(u"((%s AND %s) OR (%s AND %s))" % (test_jid('source', from_jid), 525 # not jid specified, we want all one2one communications
522 test_jid('dest', to_jid), 526 pass
523 test_jid('source', to_jid), 527 elif between:
524 test_jid('dest', from_jid))) 528 if not from_jid or not to_jid:
529 # we only have one jid specified, we check all messages
530 # from or to this jid
531 jid_ = from_jid or to_jid
532 query_parts.append(u"AND ({source} OR {dest})".format(
533 source=test_jid(u'source', jid_),
534 dest=test_jid(u'dest' , jid_)))
535 else:
536 # we have 2 jids specified, we check all communications between
537 # those 2 jids
538 query_parts.append(
539 u"AND (({source_from} AND {dest_to}) "
540 u"OR ({source_to} AND {dest_from}))".format(
541 source_from=test_jid('source', from_jid),
542 dest_to=test_jid('dest', to_jid),
543 source_to=test_jid('source', to_jid),
544 dest_from=test_jid('dest', from_jid)))
525 else: 545 else:
526 query_parts.append(u"%s AND %s" % (test_jid('source', from_jid), 546 # we want one communication in specific direction (from somebody or
527 test_jid('dest', to_jid))) 547 # to somebody).
548 q = []
549 if from_jid is not None:
550 q.append(test_jid('source', from_jid))
551 if to_jid is not None:
552 q.append(test_jid('dest', to_jid))
553 query_parts.append(u"AND " + u" AND ".join(q))
554
555 # set to True if "ORDER BY" is already added
556 order = False
528 557
529 if filters: 558 if filters:
530 if 'body' in filters: 559 if 'body' in filters:
531 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html 560 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html
532 query_parts.append(u"AND message LIKE ?") 561 query_parts.append(u"AND message LIKE ?")
540 values.extend(types) 569 values.extend(types)
541 if 'not_types' in filters: 570 if 'not_types' in filters:
542 types = filters['not_types'].split() 571 types = filters['not_types'].split()
543 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) 572 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types))))
544 values.extend(types) 573 values.extend(types)
545 574 if 'last_stanza_id' in filters:
546 575 # this request get the last message with a "stanza_id" that we
547 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList 576 # have in history. This is mainly used to retrieve messages sent
548 # we use DESC here so LIMIT keep the last messages 577 # while we were offline, using MAM (XEP-0313).
578 if (filters[u'last_stanza_id'] is not True
579 or from_jid is not None or to_jid is not None
580 or limit != 1):
581 raise ValueError(u"Unexpected values for last_stanza_id filter")
582 query_parts.append(u"AND stanza_id IS NOT NULL ORDER BY history.rowid DESC")
583 order = True
584
585 if not order:
586 query_parts.append(u"ORDER BY timestamp DESC") # we reverse the order in sqliteHistoryToList
587 # we use DESC here so LIMIT keep the last messages
549 if limit is not None: 588 if limit is not None:
550 query_parts.append(u"LIMIT ?") 589 query_parts.append(u"LIMIT ?")
551 values.append(limit) 590 values.append(limit)
552 591
553 d = self.dbpool.runQuery(u" ".join(query_parts), values) 592 d = self.dbpool.runQuery(u" ".join(query_parts), values)