Mercurial > libervia-backend
comparison sat/memory/sqlite.py @ 3028:ab2696e34d29
Python 3 port:
/!\ this is a huge commit
/!\ starting from this commit, SàT is needs Python 3.6+
/!\ SàT maybe be instable or some feature may not work anymore, this will improve with time
This patch port backend, bridge and frontends to Python 3.
Roughly this has been done this way:
- 2to3 tools has been applied (with python 3.7)
- all references to python2 have been replaced with python3 (notably shebangs)
- fixed files not handled by 2to3 (notably the shell script)
- several manual fixes
- fixed issues reported by Python 3 that where not handled in Python 2
- replaced "async" with "async_" when needed (it's a reserved word from Python 3.7)
- replaced zope's "implements" with @implementer decorator
- temporary hack to handle data pickled in database, as str or bytes may be returned,
to be checked later
- fixed hash comparison for password
- removed some code which is not needed anymore with Python 3
- deactivated some code which needs to be checked (notably certificate validation)
- tested with jp, fixed reported issues until some basic commands worked
- ported Primitivus (after porting dependencies like urwid satext)
- more manual fixes
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 13 Aug 2019 19:08:41 +0200 |
parents | 860c550028d6 |
children | f8cc88c773c8 |
comparison
equal
deleted
inserted
replaced
3027:ff5bcb12ae60 | 3028:ab2696e34d29 |
---|---|
1 #!/usr/bin/env python2 | 1 #!/usr/bin/env python3 |
2 # -*- coding: utf-8 -*- | 2 # -*- coding: utf-8 -*- |
3 | 3 |
4 # SAT: a jabber client | 4 # SAT: a jabber client |
5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) | 5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) |
6 | 6 |
29 from twisted.python import failure | 29 from twisted.python import failure |
30 from collections import OrderedDict | 30 from collections import OrderedDict |
31 import sys | 31 import sys |
32 import re | 32 import re |
33 import os.path | 33 import os.path |
34 import cPickle as pickle | 34 import pickle as pickle |
35 import hashlib | 35 import hashlib |
36 import sqlite3 | 36 import sqlite3 |
37 import json | 37 import json |
38 | 38 |
39 log = getLogger(__name__) | 39 log = getLogger(__name__) |
150 # FIXME: in case of error, we retry a couple of times | 150 # FIXME: in case of error, we retry a couple of times |
151 # this is a workaround, we need to move to better | 151 # this is a workaround, we need to move to better |
152 # Sqlite integration, probably with high level library | 152 # Sqlite integration, probably with high level library |
153 retry -= 1 | 153 retry -= 1 |
154 if retry == 0: | 154 if retry == 0: |
155 log.error(_(u'too many db tries, we abandon! Error message: {msg}\n' | 155 log.error(_('too many db tries, we abandon! Error message: {msg}\n' |
156 u'query was {query}' | 156 'query was {query}' |
157 .format(msg=e, query=u' '.join([unicode(a) for a in args])))) | 157 .format(msg=e, query=' '.join([str(a) for a in args])))) |
158 raise e | 158 raise e |
159 log.warning( | 159 log.warning( |
160 _(u'exception while running query, retrying ({try_}): {msg}').format( | 160 _('exception while running query, retrying ({try_}): {msg}').format( |
161 try_ = 6 - retry, | 161 try_ = 6 - retry, |
162 msg = e)) | 162 msg = e)) |
163 kw['query_retry'] = retry | 163 kw['query_retry'] = retry |
164 return self._runQuery(trans, *args, **kw) | 164 return self._runQuery(trans, *args, **kw) |
165 return trans.fetchall() | 165 return trans.fetchall() |
173 return adbapi.ConnectionPool._runInteraction(self, interaction, *args, **kw) | 173 return adbapi.ConnectionPool._runInteraction(self, interaction, *args, **kw) |
174 except Exception as e: | 174 except Exception as e: |
175 retry -= 1 | 175 retry -= 1 |
176 if retry == 0: | 176 if retry == 0: |
177 log.error( | 177 log.error( |
178 _(u'too many interaction tries, we abandon! Error message: {msg}\n' | 178 _('too many interaction tries, we abandon! Error message: {msg}\n' |
179 u'interaction method was: {interaction}\n' | 179 'interaction method was: {interaction}\n' |
180 u'interaction arguments were: {args}' | 180 'interaction arguments were: {args}' |
181 .format(msg=e, interaction=interaction, | 181 .format(msg=e, interaction=interaction, |
182 args=u', '.join([unicode(a) for a in args])))) | 182 args=', '.join([str(a) for a in args])))) |
183 raise e | 183 raise e |
184 log.warning( | 184 log.warning( |
185 _(u'exception while running interaction, retrying ({try_}): {msg}') | 185 _('exception while running interaction, retrying ({try_}): {msg}') |
186 .format(try_ = 4 - retry, msg = e)) | 186 .format(try_ = 4 - retry, msg = e)) |
187 kw['interaction_retry'] = retry | 187 kw['interaction_retry'] = retry |
188 return self._runInteraction(interaction, *args, **kw) | 188 return self._runInteraction(interaction, *args, **kw) |
189 | 189 |
190 | 190 |
202 log.info(_("Connecting database")) | 202 log.info(_("Connecting database")) |
203 new_base = not os.path.exists(db_filename) # do we have to create the database ? | 203 new_base = not os.path.exists(db_filename) # do we have to create the database ? |
204 if new_base: # the dir may not exist if it's not the XDG recommended one | 204 if new_base: # the dir may not exist if it's not the XDG recommended one |
205 dir_ = os.path.dirname(db_filename) | 205 dir_ = os.path.dirname(db_filename) |
206 if not os.path.exists(dir_): | 206 if not os.path.exists(dir_): |
207 os.makedirs(dir_, 0700) | 207 os.makedirs(dir_, 0o700) |
208 | 208 |
209 def foreignKeysOn(sqlite): | 209 def foreignKeysOn(sqlite): |
210 sqlite.execute('PRAGMA foreign_keys = ON') | 210 sqlite.execute('PRAGMA foreign_keys = ON') |
211 | 211 |
212 self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15) | 212 self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15) |
238 | 238 |
239 def commitStatements(self, statements): | 239 def commitStatements(self, statements): |
240 | 240 |
241 if statements is None: | 241 if statements is None: |
242 return defer.succeed(None) | 242 return defer.succeed(None) |
243 log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) | 243 log.debug("\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) |
244 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) | 244 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) |
245 return d | 245 return d |
246 | 246 |
247 def _updateDb(self, interaction, statements): | 247 def _updateDb(self, interaction, statements): |
248 for statement in statements: | 248 for statement in statements: |
268 name, id_ = profile | 268 name, id_ = profile |
269 self.profiles[name] = id_ | 269 self.profiles[name] = id_ |
270 | 270 |
271 def getProfilesList(self): | 271 def getProfilesList(self): |
272 """"Return list of all registered profiles""" | 272 """"Return list of all registered profiles""" |
273 return self.profiles.keys() | 273 return list(self.profiles.keys()) |
274 | 274 |
275 def hasProfile(self, profile_name): | 275 def hasProfile(self, profile_name): |
276 """return True if profile_name exists | 276 """return True if profile_name exists |
277 | 277 |
278 @param profile_name: name of the profile to check | 278 @param profile_name: name of the profile to check |
281 | 281 |
282 def profileIsComponent(self, profile_name): | 282 def profileIsComponent(self, profile_name): |
283 try: | 283 try: |
284 return self.profiles[profile_name] in self.components | 284 return self.profiles[profile_name] in self.components |
285 except KeyError: | 285 except KeyError: |
286 raise exceptions.NotFound(u"the requested profile doesn't exists") | 286 raise exceptions.NotFound("the requested profile doesn't exists") |
287 | 287 |
288 def getEntryPoint(self, profile_name): | 288 def getEntryPoint(self, profile_name): |
289 try: | 289 try: |
290 return self.components[self.profiles[profile_name]] | 290 return self.components[self.profiles[profile_name]] |
291 except KeyError: | 291 except KeyError: |
292 raise exceptions.NotFound(u"the requested profile doesn't exists or is not a component") | 292 raise exceptions.NotFound("the requested profile doesn't exists or is not a component") |
293 | 293 |
294 def createProfile(self, name, component=None): | 294 def createProfile(self, name, component=None): |
295 """Create a new profile | 295 """Create a new profile |
296 | 296 |
297 @param name(unicode): name of the profile | 297 @param name(unicode): name of the profile |
324 | 324 |
325 @param name: name of the profile | 325 @param name: name of the profile |
326 @return: deferred triggered once profile is actually deleted | 326 @return: deferred triggered once profile is actually deleted |
327 """ | 327 """ |
328 def deletionError(failure_): | 328 def deletionError(failure_): |
329 log.error(_(u"Can't delete profile [%s]") % name) | 329 log.error(_("Can't delete profile [%s]") % name) |
330 return failure_ | 330 return failure_ |
331 | 331 |
332 def delete(txn): | 332 def delete(txn): |
333 profile_id = self.profiles.pop(name) | 333 profile_id = self.profiles.pop(name) |
334 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) | 334 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) |
357 | 357 |
358 def fillParams(result): | 358 def fillParams(result): |
359 for param in result: | 359 for param in result: |
360 category, name, value = param | 360 category, name, value = param |
361 params_gen[(category, name)] = value | 361 params_gen[(category, name)] = value |
362 log.debug(_(u"loading general parameters from database")) | 362 log.debug(_("loading general parameters from database")) |
363 return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams) | 363 return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams) |
364 | 364 |
365 def loadIndParams(self, params_ind, profile): | 365 def loadIndParams(self, params_ind, profile): |
366 """Load individual parameters | 366 """Load individual parameters |
367 | 367 |
372 | 372 |
373 def fillParams(result): | 373 def fillParams(result): |
374 for param in result: | 374 for param in result: |
375 category, name, value = param | 375 category, name, value = param |
376 params_ind[(category, name)] = value | 376 params_ind[(category, name)] = value |
377 log.debug(_(u"loading individual parameters from database")) | 377 log.debug(_("loading individual parameters from database")) |
378 d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], )) | 378 d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], )) |
379 d.addCallback(fillParams) | 379 d.addCallback(fillParams) |
380 return d | 380 return d |
381 | 381 |
382 def getIndParam(self, category, name, profile): | 382 def getIndParam(self, category, name, profile): |
397 @param category: category of the parameter | 397 @param category: category of the parameter |
398 @param name: name of the parameter | 398 @param name: name of the parameter |
399 @param value: value to set | 399 @param value: value to set |
400 @return: deferred""" | 400 @return: deferred""" |
401 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value)) | 401 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value)) |
402 d.addErrback(lambda ignore: log.error(_(u"Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name}))) | 402 d.addErrback(lambda ignore: log.error(_("Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name}))) |
403 return d | 403 return d |
404 | 404 |
405 def setIndParam(self, category, name, value, profile): | 405 def setIndParam(self, category, name, value, profile): |
406 """Save the individual parameters in database | 406 """Save the individual parameters in database |
407 | 407 |
410 @param value: value to set | 410 @param value: value to set |
411 @param profile: a profile which *must* exist | 411 @param profile: a profile which *must* exist |
412 @return: deferred | 412 @return: deferred |
413 """ | 413 """ |
414 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) | 414 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) |
415 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) | 415 d.addErrback(lambda ignore: log.error(_("Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) |
416 return d | 416 return d |
417 | 417 |
418 ## History | 418 ## History |
419 | 419 |
420 def _addToHistoryCb(self, __, data): | 420 def _addToHistoryCb(self, __, data): |
421 # Message metadata were successfuly added to history | 421 # Message metadata were successfuly added to history |
422 # now we can add message and subject | 422 # now we can add message and subject |
423 uid = data['uid'] | 423 uid = data['uid'] |
424 d_list = [] | 424 d_list = [] |
425 for key in ('message', 'subject'): | 425 for key in ('message', 'subject'): |
426 for lang, value in data[key].iteritems(): | 426 for lang, value in data[key].items(): |
427 d = self.dbpool.runQuery( | 427 d = self.dbpool.runQuery( |
428 "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" | 428 "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" |
429 .format(key=key), | 429 .format(key=key), |
430 (uid, value, lang or None)) | 430 (uid, value, lang or None)) |
431 d.addErrback(lambda __: log.error( | 431 d.addErrback(lambda __: log.error( |
432 _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):" | 432 _("Can't save following {key} in history (uid: {uid}, lang:{lang}):" |
433 u" {value}").format( | 433 " {value}").format( |
434 key=key, uid=uid, lang=lang, value=value))) | 434 key=key, uid=uid, lang=lang, value=value))) |
435 d_list.append(d) | 435 d_list.append(d) |
436 try: | 436 try: |
437 thread = data['extra']['thread'] | 437 thread = data['extra']['thread'] |
438 except KeyError: | 438 except KeyError: |
441 thread_parent = data['extra'].get('thread_parent') | 441 thread_parent = data['extra'].get('thread_parent') |
442 d = self.dbpool.runQuery( | 442 d = self.dbpool.runQuery( |
443 "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", | 443 "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", |
444 (uid, thread, thread_parent)) | 444 (uid, thread, thread_parent)) |
445 d.addErrback(lambda __: log.error( | 445 d.addErrback(lambda __: log.error( |
446 _(u"Can't save following thread in history (uid: {uid}): thread: " | 446 _("Can't save following thread in history (uid: {uid}): thread: " |
447 u"{thread}), parent:{parent}").format( | 447 "{thread}), parent:{parent}").format( |
448 uid=uid, thread=thread, parent=thread_parent))) | 448 uid=uid, thread=thread, parent=thread_parent))) |
449 d_list.append(d) | 449 d_list.append(d) |
450 return defer.DeferredList(d_list) | 450 return defer.DeferredList(d_list) |
451 | 451 |
452 def _addToHistoryEb(self, failure_, data): | 452 def _addToHistoryEb(self, failure_, data): |
453 failure_.trap(sqlite3.IntegrityError) | 453 failure_.trap(sqlite3.IntegrityError) |
454 sqlite_msg = failure_.value.args[0] | 454 sqlite_msg = failure_.value.args[0] |
455 if "UNIQUE constraint failed" in sqlite_msg: | 455 if "UNIQUE constraint failed" in sqlite_msg: |
456 log.debug(u"message {} is already in history, not storing it again" | 456 log.debug("message {} is already in history, not storing it again" |
457 .format(data['uid'])) | 457 .format(data['uid'])) |
458 if 'received_timestamp' not in data: | 458 if 'received_timestamp' not in data: |
459 log.warning( | 459 log.warning( |
460 u"duplicate message is not delayed, this is maybe a bug: data={}" | 460 "duplicate message is not delayed, this is maybe a bug: data={}" |
461 .format(data)) | 461 .format(data)) |
462 # we cancel message to avoid sending duplicate message to frontends | 462 # we cancel message to avoid sending duplicate message to frontends |
463 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) | 463 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) |
464 else: | 464 else: |
465 log.error(u"Can't store message in history: {}".format(failure_)) | 465 log.error("Can't store message in history: {}".format(failure_)) |
466 | 466 |
467 def _logHistoryError(self, failure_, from_jid, to_jid, data): | 467 def _logHistoryError(self, failure_, from_jid, to_jid, data): |
468 if failure_.check(exceptions.CancelError): | 468 if failure_.check(exceptions.CancelError): |
469 # we propagate CancelError to avoid sending message to frontends | 469 # we propagate CancelError to avoid sending message to frontends |
470 raise failure_ | 470 raise failure_ |
471 log.error(_( | 471 log.error(_( |
472 u"Can't save following message in history: from [{from_jid}] to [{to_jid}] " | 472 "Can't save following message in history: from [{from_jid}] to [{to_jid}] " |
473 u"(uid: {uid})") | 473 "(uid: {uid})") |
474 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) | 474 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) |
475 | 475 |
476 def addToHistory(self, data, profile): | 476 def addToHistory(self, data, profile): |
477 """Store a new message in history | 477 """Store a new message in history |
478 | 478 |
479 @param data(dict): message data as build by SatMessageProtocol.onMessage | 479 @param data(dict): message data as build by SatMessageProtocol.onMessage |
480 """ | 480 """ |
481 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() | 481 extra = pickle.dumps({k: v for k, v in data['extra'].items() |
482 if k not in NOT_IN_EXTRA}, 0) | 482 if k not in NOT_IN_EXTRA}, 0) |
483 from_jid = data['from'] | 483 from_jid = data['from'] |
484 to_jid = data['to'] | 484 to_jid = data['to'] |
485 d = self.dbpool.runQuery( | 485 d = self.dbpool.runQuery( |
486 u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " | 486 "INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " |
487 u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " | 487 "source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " |
488 u"(?,?,?,?,?,?,?,?,?,?,?,?)", | 488 "(?,?,?,?,?,?,?,?,?,?,?,?)", |
489 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), | 489 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), |
490 self.profiles[profile], data['from'].userhost(), to_jid.userhost(), | 490 self.profiles[profile], data['from'].userhost(), to_jid.userhost(), |
491 from_jid.resource, to_jid.resource, data['timestamp'], | 491 from_jid.resource, to_jid.resource, data['timestamp'], |
492 data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) | 492 data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) |
493 d.addCallbacks(self._addToHistoryCb, | 493 d.addCallbacks(self._addToHistoryCb, |
506 received_timestamp, type_, extra, message, message_lang, subject, | 506 received_timestamp, type_, extra, message, message_lang, subject, |
507 subject_lang, thread, thread_parent) = row | 507 subject_lang, thread, thread_parent) = row |
508 if uid != current['uid']: | 508 if uid != current['uid']: |
509 # new message | 509 # new message |
510 try: | 510 try: |
511 extra = pickle.loads(str(extra or "")) | 511 extra = pickle.loads(extra or b"") |
512 except EOFError: | 512 except EOFError: |
513 extra = {} | 513 extra = {} |
514 current = { | 514 current = { |
515 'from': "%s/%s" % (source, source_res) if source_res else source, | 515 'from': "%s/%s" % (source, source_res) if source_res else source, |
516 'to': "%s/%s" % (dest, dest_res) if dest_res else dest, | 516 'to': "%s/%s" % (dest, dest_res) if dest_res else dest, |
541 if thread_parent is not None: | 541 if thread_parent is not None: |
542 current_extra['thread_parent'] = thread_parent | 542 current_extra['thread_parent'] = thread_parent |
543 else: | 543 else: |
544 if thread_parent is not None: | 544 if thread_parent is not None: |
545 log.error( | 545 log.error( |
546 u"Database inconsistency: thread parent without thread (uid: " | 546 "Database inconsistency: thread parent without thread (uid: " |
547 u"{uid}, thread_parent: {parent})" | 547 "{uid}, thread_parent: {parent})" |
548 .format(uid=uid, parent=thread_parent)) | 548 .format(uid=uid, parent=thread_parent)) |
549 | 549 |
550 return result | 550 return result |
551 | 551 |
552 def listDict2listTuple(self, messages_data): | 552 def listDict2listTuple(self, messages_data): |
573 if filters is None: | 573 if filters is None: |
574 filters = {} | 574 filters = {} |
575 if limit == 0: | 575 if limit == 0: |
576 return defer.succeed([]) | 576 return defer.succeed([]) |
577 | 577 |
578 query_parts = [u"SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ | 578 query_parts = ["SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ |
579 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ | 579 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ |
580 FROM history LEFT JOIN message ON history.uid = message.history_uid\ | 580 FROM history LEFT JOIN message ON history.uid = message.history_uid\ |
581 LEFT JOIN subject ON history.uid=subject.history_uid\ | 581 LEFT JOIN subject ON history.uid=subject.history_uid\ |
582 LEFT JOIN thread ON history.uid=thread.history_uid\ | 582 LEFT JOIN thread ON history.uid=thread.history_uid\ |
583 WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here | 583 WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here |
585 | 585 |
586 def test_jid(type_, jid_): | 586 def test_jid(type_, jid_): |
587 values.append(jid_.userhost()) | 587 values.append(jid_.userhost()) |
588 if jid_.resource: | 588 if jid_.resource: |
589 values.append(jid_.resource) | 589 values.append(jid_.resource) |
590 return u'({type_}=? AND {type_}_res=?)'.format(type_=type_) | 590 return '({type_}=? AND {type_}_res=?)'.format(type_=type_) |
591 return u'{type_}=?'.format(type_=type_) | 591 return '{type_}=?'.format(type_=type_) |
592 | 592 |
593 if not from_jid and not to_jid: | 593 if not from_jid and not to_jid: |
594 # not jid specified, we want all one2one communications | 594 # not jid specified, we want all one2one communications |
595 pass | 595 pass |
596 elif between: | 596 elif between: |
597 if not from_jid or not to_jid: | 597 if not from_jid or not to_jid: |
598 # we only have one jid specified, we check all messages | 598 # we only have one jid specified, we check all messages |
599 # from or to this jid | 599 # from or to this jid |
600 jid_ = from_jid or to_jid | 600 jid_ = from_jid or to_jid |
601 query_parts.append(u"AND ({source} OR {dest})".format( | 601 query_parts.append("AND ({source} OR {dest})".format( |
602 source=test_jid(u'source', jid_), | 602 source=test_jid('source', jid_), |
603 dest=test_jid(u'dest' , jid_))) | 603 dest=test_jid('dest' , jid_))) |
604 else: | 604 else: |
605 # we have 2 jids specified, we check all communications between | 605 # we have 2 jids specified, we check all communications between |
606 # those 2 jids | 606 # those 2 jids |
607 query_parts.append( | 607 query_parts.append( |
608 u"AND (({source_from} AND {dest_to}) " | 608 "AND (({source_from} AND {dest_to}) " |
609 u"OR ({source_to} AND {dest_from}))".format( | 609 "OR ({source_to} AND {dest_from}))".format( |
610 source_from=test_jid('source', from_jid), | 610 source_from=test_jid('source', from_jid), |
611 dest_to=test_jid('dest', to_jid), | 611 dest_to=test_jid('dest', to_jid), |
612 source_to=test_jid('source', to_jid), | 612 source_to=test_jid('source', to_jid), |
613 dest_from=test_jid('dest', from_jid))) | 613 dest_from=test_jid('dest', from_jid))) |
614 else: | 614 else: |
617 q = [] | 617 q = [] |
618 if from_jid is not None: | 618 if from_jid is not None: |
619 q.append(test_jid('source', from_jid)) | 619 q.append(test_jid('source', from_jid)) |
620 if to_jid is not None: | 620 if to_jid is not None: |
621 q.append(test_jid('dest', to_jid)) | 621 q.append(test_jid('dest', to_jid)) |
622 query_parts.append(u"AND " + u" AND ".join(q)) | 622 query_parts.append("AND " + " AND ".join(q)) |
623 | 623 |
624 if filters: | 624 if filters: |
625 if u'timestamp_start' in filters: | 625 if 'timestamp_start' in filters: |
626 query_parts.append(u"AND timestamp>= ?") | 626 query_parts.append("AND timestamp>= ?") |
627 values.append(float(filters[u'timestamp_start'])) | 627 values.append(float(filters['timestamp_start'])) |
628 if u'body' in filters: | 628 if 'body' in filters: |
629 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html | 629 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html |
630 query_parts.append(u"AND message LIKE ?") | 630 query_parts.append("AND message LIKE ?") |
631 values.append(u"%{}%".format(filters['body'])) | 631 values.append("%{}%".format(filters['body'])) |
632 if u'search' in filters: | 632 if 'search' in filters: |
633 query_parts.append(u"AND (message LIKE ? OR source_res LIKE ?)") | 633 query_parts.append("AND (message LIKE ? OR source_res LIKE ?)") |
634 values.extend([u"%{}%".format(filters['search'])] * 2) | 634 values.extend(["%{}%".format(filters['search'])] * 2) |
635 if u'types' in filters: | 635 if 'types' in filters: |
636 types = filters['types'].split() | 636 types = filters['types'].split() |
637 query_parts.append(u"AND type IN ({})".format(u','.join("?"*len(types)))) | 637 query_parts.append("AND type IN ({})".format(','.join("?"*len(types)))) |
638 values.extend(types) | 638 values.extend(types) |
639 if u'not_types' in filters: | 639 if 'not_types' in filters: |
640 types = filters['not_types'].split() | 640 types = filters['not_types'].split() |
641 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) | 641 query_parts.append("AND type NOT IN ({})".format(','.join("?"*len(types)))) |
642 values.extend(types) | 642 values.extend(types) |
643 if u'last_stanza_id' in filters: | 643 if 'last_stanza_id' in filters: |
644 # this request get the last message with a "stanza_id" that we | 644 # this request get the last message with a "stanza_id" that we |
645 # have in history. This is mainly used to retrieve messages sent | 645 # have in history. This is mainly used to retrieve messages sent |
646 # while we were offline, using MAM (XEP-0313). | 646 # while we were offline, using MAM (XEP-0313). |
647 if (filters[u'last_stanza_id'] is not True | 647 if (filters['last_stanza_id'] is not True |
648 or limit != 1): | 648 or limit != 1): |
649 raise ValueError(u"Unexpected values for last_stanza_id filter") | 649 raise ValueError("Unexpected values for last_stanza_id filter") |
650 query_parts.append(u"AND stanza_id IS NOT NULL") | 650 query_parts.append("AND stanza_id IS NOT NULL") |
651 | 651 |
652 | 652 |
653 # timestamp may be identical for 2 close messages (specially when delay is | 653 # timestamp may be identical for 2 close messages (specially when delay is |
654 # used) that's why we order ties by received_timestamp | 654 # used) that's why we order ties by received_timestamp |
655 # We'll reverse the order in sqliteHistoryToList | 655 # We'll reverse the order in sqliteHistoryToList |
656 # we use DESC here so LIMIT keep the last messages | 656 # we use DESC here so LIMIT keep the last messages |
657 query_parts.append(u"ORDER BY timestamp DESC, history.received_timestamp DESC") | 657 query_parts.append("ORDER BY timestamp DESC, history.received_timestamp DESC") |
658 if limit is not None: | 658 if limit is not None: |
659 query_parts.append(u"LIMIT ?") | 659 query_parts.append("LIMIT ?") |
660 values.append(limit) | 660 values.append(limit) |
661 | 661 |
662 d = self.dbpool.runQuery(u" ".join(query_parts), values) | 662 d = self.dbpool.runQuery(" ".join(query_parts), values) |
663 d.addCallback(self.sqliteHistoryToList) | 663 d.addCallback(self.sqliteHistoryToList) |
664 d.addCallback(self.listDict2listTuple) | 664 d.addCallback(self.listDict2listTuple) |
665 return d | 665 return d |
666 | 666 |
667 ## Private values | 667 ## Private values |
668 | 668 |
669 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None): | 669 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None): |
670 """generic errback for data queries""" | 670 """generic errback for data queries""" |
671 log.error(_(u"Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format( | 671 log.error(_("Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format( |
672 operation = operation, | 672 operation = operation, |
673 namespace = namespace, | 673 namespace = namespace, |
674 and_key = (u" and key " + key) if key is not None else u"", | 674 and_key = (" and key " + key) if key is not None else "", |
675 for_profile = (u' [' + profile + u']') if profile is not None else u'', | 675 for_profile = (' [' + profile + ']') if profile is not None else '', |
676 msg = failure_)) | 676 msg = failure_)) |
677 | |
678 def _load_pickle(self, v): | |
679 # FIXME: workaround for Python 3 port, some pickled data are byte while other are strings | |
680 try: | |
681 return pickle.loads(v) | |
682 except TypeError: | |
683 data = pickle.loads(v.encode('utf-8')) | |
684 log.warning(f"encoding issue in pickled data: {data}") | |
685 return data | |
677 | 686 |
678 def _generateDataDict(self, query_result, binary): | 687 def _generateDataDict(self, query_result, binary): |
679 if binary: | 688 if binary: |
680 return {k: pickle.loads(str(v)) for k,v in query_result} | 689 return {k: self._load_pickle(v) for k,v in query_result} |
681 else: | 690 else: |
682 return dict(query_result) | 691 return dict(query_result) |
683 | 692 |
684 def _getPrivateTable(self, binary, profile): | 693 def _getPrivateTable(self, binary, profile): |
685 """Get table to use for private values""" | 694 """Get table to use for private values""" |
686 table = [u'private'] | 695 table = ['private'] |
687 | 696 |
688 if profile is None: | 697 if profile is None: |
689 table.append(u'gen') | 698 table.append('gen') |
690 else: | 699 else: |
691 table.append(u'ind') | 700 table.append('ind') |
692 | 701 |
693 if binary: | 702 if binary: |
694 table.append(u'bin') | 703 table.append('bin') |
695 | 704 |
696 return u'_'.join(table) | 705 return '_'.join(table) |
697 | 706 |
698 def getPrivates(self, namespace, keys=None, binary=False, profile=None): | 707 def getPrivates(self, namespace, keys=None, binary=False, profile=None): |
699 """Get private value(s) from databases | 708 """Get private value(s) from databases |
700 | 709 |
701 @param namespace(unicode): namespace of the values | 710 @param namespace(unicode): namespace of the values |
704 @param binary(bool): True to deserialise binary values | 713 @param binary(bool): True to deserialise binary values |
705 @param profile(unicode, None): profile to use for individual values | 714 @param profile(unicode, None): profile to use for individual values |
706 None to use general values | 715 None to use general values |
707 @return (dict[unicode, object]): gotten keys/values | 716 @return (dict[unicode, object]): gotten keys/values |
708 """ | 717 """ |
709 log.debug(_(u"getting {type}{binary} private values from database for namespace {namespace}{keys}".format( | 718 log.debug(_("getting {type}{binary} private values from database for namespace {namespace}{keys}".format( |
710 type = u"general" if profile is None else "individual", | 719 type = "general" if profile is None else "individual", |
711 binary = u" binary" if binary else u"", | 720 binary = " binary" if binary else "", |
712 namespace = namespace, | 721 namespace = namespace, |
713 keys = u" with keys {}".format(u", ".join(keys)) if keys is not None else u""))) | 722 keys = " with keys {}".format(", ".join(keys)) if keys is not None else ""))) |
714 table = self._getPrivateTable(binary, profile) | 723 table = self._getPrivateTable(binary, profile) |
715 query_parts = [u"SELECT key,value FROM", table, "WHERE namespace=?"] | 724 query_parts = ["SELECT key,value FROM", table, "WHERE namespace=?"] |
716 args = [namespace] | 725 args = [namespace] |
717 | 726 |
718 if keys is not None: | 727 if keys is not None: |
719 placeholders = u','.join(len(keys) * u'?') | 728 placeholders = ','.join(len(keys) * '?') |
720 query_parts.append(u'AND key IN (' + placeholders + u')') | 729 query_parts.append('AND key IN (' + placeholders + ')') |
721 args.extend(keys) | 730 args.extend(keys) |
722 | 731 |
723 if profile is not None: | 732 if profile is not None: |
724 query_parts.append(u'AND profile_id=?') | 733 query_parts.append('AND profile_id=?') |
725 args.append(self.profiles[profile]) | 734 args.append(self.profiles[profile]) |
726 | 735 |
727 d = self.dbpool.runQuery(u" ".join(query_parts), args) | 736 d = self.dbpool.runQuery(" ".join(query_parts), args) |
728 d.addCallback(self._generateDataDict, binary) | 737 d.addCallback(self._generateDataDict, binary) |
729 d.addErrback(self._privateDataEb, u"get", namespace, profile=profile) | 738 d.addErrback(self._privateDataEb, "get", namespace, profile=profile) |
730 return d | 739 return d |
731 | 740 |
732 def setPrivateValue(self, namespace, key, value, binary=False, profile=None): | 741 def setPrivateValue(self, namespace, key, value, binary=False, profile=None): |
733 """Set a private value in database | 742 """Set a private value in database |
734 | 743 |
739 binary values need to be serialised, used for everything but strings | 748 binary values need to be serialised, used for everything but strings |
740 @param profile(unicode, None): profile to use for individual value | 749 @param profile(unicode, None): profile to use for individual value |
741 if None, it's a general value | 750 if None, it's a general value |
742 """ | 751 """ |
743 table = self._getPrivateTable(binary, profile) | 752 table = self._getPrivateTable(binary, profile) |
744 query_values_names = [u'namespace', u'key', u'value'] | 753 query_values_names = ['namespace', 'key', 'value'] |
745 query_values = [namespace, key] | 754 query_values = [namespace, key] |
746 | 755 |
747 if binary: | 756 if binary: |
748 value = sqlite3.Binary(pickle.dumps(value, 0)) | 757 value = sqlite3.Binary(pickle.dumps(value, 0)) |
749 | 758 |
750 query_values.append(value) | 759 query_values.append(value) |
751 | 760 |
752 if profile is not None: | 761 if profile is not None: |
753 query_values_names.append(u'profile_id') | 762 query_values_names.append('profile_id') |
754 query_values.append(self.profiles[profile]) | 763 query_values.append(self.profiles[profile]) |
755 | 764 |
756 query_parts = [u"REPLACE INTO", table, u'(', u','.join(query_values_names), u')', | 765 query_parts = ["REPLACE INTO", table, '(', ','.join(query_values_names), ')', |
757 u"VALUES (", u",".join(u'?'*len(query_values_names)), u')'] | 766 "VALUES (", ",".join('?'*len(query_values_names)), ')'] |
758 | 767 |
759 d = self.dbpool.runQuery(u" ".join(query_parts), query_values) | 768 d = self.dbpool.runQuery(" ".join(query_parts), query_values) |
760 d.addErrback(self._privateDataEb, u"set", namespace, key, profile=profile) | 769 d.addErrback(self._privateDataEb, "set", namespace, key, profile=profile) |
761 return d | 770 return d |
762 | 771 |
763 def delPrivateValue(self, namespace, key, binary=False, profile=None): | 772 def delPrivateValue(self, namespace, key, binary=False, profile=None): |
764 """Delete private value from database | 773 """Delete private value from database |
765 | 774 |
768 @param binary(bool): True if it's a binary values | 777 @param binary(bool): True if it's a binary values |
769 @param profile(unicode, None): profile to use for individual value | 778 @param profile(unicode, None): profile to use for individual value |
770 if None, it's a general value | 779 if None, it's a general value |
771 """ | 780 """ |
772 table = self._getPrivateTable(binary, profile) | 781 table = self._getPrivateTable(binary, profile) |
773 query_parts = [u"DELETE FROM", table, u"WHERE namespace=? AND key=?"] | 782 query_parts = ["DELETE FROM", table, "WHERE namespace=? AND key=?"] |
774 args = [namespace, key] | 783 args = [namespace, key] |
775 if profile is not None: | 784 if profile is not None: |
776 query_parts.append(u"AND profile_id=?") | 785 query_parts.append("AND profile_id=?") |
777 args.append(self.profiles[profile]) | 786 args.append(self.profiles[profile]) |
778 d = self.dbpool.runQuery(u" ".join(query_parts), args) | 787 d = self.dbpool.runQuery(" ".join(query_parts), args) |
779 d.addErrback(self._privateDataEb, u"delete", namespace, key, profile=profile) | 788 d.addErrback(self._privateDataEb, "delete", namespace, key, profile=profile) |
780 return d | 789 return d |
781 | 790 |
782 def delPrivateNamespace(self, namespace, binary=False, profile=None): | 791 def delPrivateNamespace(self, namespace, binary=False, profile=None): |
783 """Delete all data from a private namespace | 792 """Delete all data from a private namespace |
784 | 793 |
785 Be really cautious when you use this method, as all data with given namespace are | 794 Be really cautious when you use this method, as all data with given namespace are |
786 removed. | 795 removed. |
787 Params are the same as for delPrivateValue | 796 Params are the same as for delPrivateValue |
788 """ | 797 """ |
789 table = self._getPrivateTable(binary, profile) | 798 table = self._getPrivateTable(binary, profile) |
790 query_parts = [u"DELETE FROM", table, u"WHERE namespace=?"] | 799 query_parts = ["DELETE FROM", table, "WHERE namespace=?"] |
791 args = [namespace] | 800 args = [namespace] |
792 if profile is not None: | 801 if profile is not None: |
793 query_parts.append(u"AND profile_id=?") | 802 query_parts.append("AND profile_id=?") |
794 args.append(self.profiles[profile]) | 803 args.append(self.profiles[profile]) |
795 d = self.dbpool.runQuery(u" ".join(query_parts), args) | 804 d = self.dbpool.runQuery(" ".join(query_parts), args) |
796 d.addErrback(self._privateDataEb, u"delete namespace", namespace, profile=profile) | 805 d.addErrback(self._privateDataEb, "delete namespace", namespace, profile=profile) |
797 return d | 806 return d |
798 | 807 |
799 ## Files | 808 ## Files |
800 | 809 |
801 @defer.inlineCallbacks | 810 @defer.inlineCallbacks |
802 def getFiles(self, client, file_id=None, version=u'', parent=None, type_=None, | 811 def getFiles(self, client, file_id=None, version='', parent=None, type_=None, |
803 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None, | 812 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None, |
804 owner=None, access=None, projection=None, unique=False): | 813 owner=None, access=None, projection=None, unique=False): |
805 """retrieve files with with given filters | 814 """retrieve files with with given filters |
806 | 815 |
807 @param file_id(unicode, None): id of the file | 816 @param file_id(unicode, None): id of the file |
829 query_parts.append("FROM files WHERE") | 838 query_parts.append("FROM files WHERE") |
830 filters = ['profile_id=?'] | 839 filters = ['profile_id=?'] |
831 args = [self.profiles[client.profile]] | 840 args = [self.profiles[client.profile]] |
832 | 841 |
833 if file_id is not None: | 842 if file_id is not None: |
834 filters.append(u'id=?') | 843 filters.append('id=?') |
835 args.append(file_id) | 844 args.append(file_id) |
836 if version is not None: | 845 if version is not None: |
837 filters.append(u'version=?') | 846 filters.append('version=?') |
838 args.append(version) | 847 args.append(version) |
839 if parent is not None: | 848 if parent is not None: |
840 filters.append(u'parent=?') | 849 filters.append('parent=?') |
841 args.append(parent) | 850 args.append(parent) |
842 if type_ is not None: | 851 if type_ is not None: |
843 filters.append(u'type=?') | 852 filters.append('type=?') |
844 args.append(type_) | 853 args.append(type_) |
845 if file_hash is not None: | 854 if file_hash is not None: |
846 filters.append(u'file_hash=?') | 855 filters.append('file_hash=?') |
847 args.append(file_hash) | 856 args.append(file_hash) |
848 if hash_algo is not None: | 857 if hash_algo is not None: |
849 filters.append(u'hash_algo=?') | 858 filters.append('hash_algo=?') |
850 args.append(hash_algo) | 859 args.append(hash_algo) |
851 if name is not None: | 860 if name is not None: |
852 filters.append(u'name=?') | 861 filters.append('name=?') |
853 args.append(name) | 862 args.append(name) |
854 if namespace is not None: | 863 if namespace is not None: |
855 filters.append(u'namespace=?') | 864 filters.append('namespace=?') |
856 args.append(namespace) | 865 args.append(namespace) |
857 if mime_type is not None: | 866 if mime_type is not None: |
858 filters.append(u'mime_type=?') | 867 filters.append('mime_type=?') |
859 args.append(mime_type) | 868 args.append(mime_type) |
860 if owner is not None: | 869 if owner is not None: |
861 filters.append(u'owner=?') | 870 filters.append('owner=?') |
862 args.append(owner.full()) | 871 args.append(owner.full()) |
863 if access is not None: | 872 if access is not None: |
864 raise NotImplementedError('Access check is not implemented yet') | 873 raise NotImplementedError('Access check is not implemented yet') |
865 # a JSON comparison is needed here | 874 # a JSON comparison is needed here |
866 | 875 |
867 filters = u' AND '.join(filters) | 876 filters = ' AND '.join(filters) |
868 query_parts.append(filters) | 877 query_parts.append(filters) |
869 query = u' '.join(query_parts) | 878 query = ' '.join(query_parts) |
870 | 879 |
871 result = yield self.dbpool.runQuery(query, args) | 880 result = yield self.dbpool.runQuery(query, args) |
872 files_data = [dict(zip(projection, row)) for row in result] | 881 files_data = [dict(list(zip(projection, row))) for row in result] |
873 to_parse = {'access', 'extra'}.intersection(projection) | 882 to_parse = {'access', 'extra'}.intersection(projection) |
874 to_filter = {'owner'}.intersection(projection) | 883 to_filter = {'owner'}.intersection(projection) |
875 if to_parse or to_filter: | 884 if to_parse or to_filter: |
876 for file_data in files_data: | 885 for file_data in files_data: |
877 for key in to_parse: | 886 for key in to_parse: |
880 owner = file_data.get('owner') | 889 owner = file_data.get('owner') |
881 if owner is not None: | 890 if owner is not None: |
882 file_data['owner'] = jid.JID(owner) | 891 file_data['owner'] = jid.JID(owner) |
883 defer.returnValue(files_data) | 892 defer.returnValue(files_data) |
884 | 893 |
885 def setFile(self, client, name, file_id, version=u'', parent=None, type_=C.FILE_TYPE_FILE, | 894 def setFile(self, client, name, file_id, version='', parent=None, type_=C.FILE_TYPE_FILE, |
886 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None, | 895 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None, |
887 created=None, modified=None, owner=None, access=None, extra=None): | 896 created=None, modified=None, owner=None, access=None, extra=None): |
888 """set a file metadata | 897 """set a file metadata |
889 | 898 |
890 @param client(SatXMPPClient): client owning the file | 899 @param client(SatXMPPClient): client owning the file |
919 mime_type, created, modified, | 928 mime_type, created, modified, |
920 owner.full() if owner is not None else None, | 929 owner.full() if owner is not None else None, |
921 json.dumps(access) if access else None, | 930 json.dumps(access) if access else None, |
922 json.dumps(extra) if extra else None, | 931 json.dumps(extra) if extra else None, |
923 self.profiles[client.profile])) | 932 self.profiles[client.profile])) |
924 d.addErrback(lambda failure: log.error(_(u"Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure)))) | 933 d.addErrback(lambda failure: log.error(_("Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure)))) |
925 return d | 934 return d |
926 | 935 |
927 def _fileUpdate(self, cursor, file_id, column, update_cb): | 936 def _fileUpdate(self, cursor, file_id, column, update_cb): |
928 query = 'SELECT {column} FROM files where id=?'.format(column=column) | 937 query = 'SELECT {column} FROM files where id=?'.format(column=column) |
929 for i in xrange(5): | 938 for i in range(5): |
930 cursor.execute(query, [file_id]) | 939 cursor.execute(query, [file_id]) |
931 try: | 940 try: |
932 older_value_raw = cursor.fetchone()[0] | 941 older_value_raw = cursor.fetchone()[0] |
933 except TypeError: | 942 except TypeError: |
934 raise exceptions.NotFound | 943 raise exceptions.NotFound |
949 except sqlite3.Error: | 958 except sqlite3.Error: |
950 pass | 959 pass |
951 else: | 960 else: |
952 if cursor.rowcount == 1: | 961 if cursor.rowcount == 1: |
953 break; | 962 break; |
954 log.warning(_(u"table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1)) | 963 log.warning(_("table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1)) |
955 else: | 964 else: |
956 log.error(_(u"Can't update file table")) | 965 log.error(_("Can't update file table")) |
957 | 966 |
958 def fileUpdate(self, file_id, column, update_cb): | 967 def fileUpdate(self, file_id, column, update_cb): |
959 """Update a column value using a method to avoid race conditions | 968 """Update a column value using a method to avoid race conditions |
960 | 969 |
961 the older value will be retrieved from database, then update_cb will be applied | 970 the older value will be retrieved from database, then update_cb will be applied |
1070 update_data = self.generateUpdateData(local_sch, current_sch, False) | 1079 update_data = self.generateUpdateData(local_sch, current_sch, False) |
1071 log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated")) | 1080 log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated")) |
1072 update_raw = yield self.update2raw(update_data, True) | 1081 update_raw = yield self.update2raw(update_data, True) |
1073 defer.returnValue(update_raw) | 1082 defer.returnValue(update_raw) |
1074 else: | 1083 else: |
1075 log.error(_(u"schema version is up-to-date, but local schema differ from expected current schema")) | 1084 log.error(_("schema version is up-to-date, but local schema differ from expected current schema")) |
1076 update_data = self.generateUpdateData(local_sch, current_sch, True) | 1085 update_data = self.generateUpdateData(local_sch, current_sch, True) |
1077 update_raw = yield self.update2raw(update_data) | 1086 update_raw = yield self.update2raw(update_data) |
1078 log.warning(_(u"Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % u'\n'.join("%s;" % statement for statement in update_raw)) | 1087 log.warning(_("Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % '\n'.join("%s;" % statement for statement in update_raw)) |
1079 raise exceptions.DatabaseError("Database mismatch") | 1088 raise exceptions.DatabaseError("Database mismatch") |
1080 else: | 1089 else: |
1081 if local_version > CURRENT_DB_VERSION: | 1090 if local_version > CURRENT_DB_VERSION: |
1082 log.error(_( | 1091 log.error(_( |
1083 u"You database version is higher than the one used in this SàT " | 1092 "You database version is higher than the one used in this SàT " |
1084 u"version, are you using several version at the same time? We " | 1093 "version, are you using several version at the same time? We " |
1085 u"can't run SàT with this database.")) | 1094 "can't run SàT with this database.")) |
1086 sys.exit(1) | 1095 sys.exit(1) |
1087 | 1096 |
1088 # Database is not up-to-date, we'll do the update | 1097 # Database is not up-to-date, we'll do the update |
1089 if force_update: | 1098 if force_update: |
1090 log.info(_("Database content needs a specific processing, local database will be updated")) | 1099 log.info(_("Database content needs a specific processing, local database will be updated")) |
1091 else: | 1100 else: |
1092 log.info(_("Database schema has changed, local database will be updated")) | 1101 log.info(_("Database schema has changed, local database will be updated")) |
1093 update_raw = [] | 1102 update_raw = [] |
1094 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1): | 1103 for version in range(local_version + 1, CURRENT_DB_VERSION + 1): |
1095 try: | 1104 try: |
1096 update_data = DATABASE_SCHEMAS[version] | 1105 update_data = DATABASE_SCHEMAS[version] |
1097 except KeyError: | 1106 except KeyError: |
1098 raise exceptions.InternalError("Missing update definition (version %d)" % version) | 1107 raise exceptions.InternalError("Missing update definition (version %d)" % version) |
1099 if "specific" in update_data and update_raw: | 1108 if "specific" in update_data and update_raw: |
1148 @return: list of strings with raw statements | 1157 @return: list of strings with raw statements |
1149 """ | 1158 """ |
1150 ret = [] | 1159 ret = [] |
1151 assert isinstance(data, tuple) | 1160 assert isinstance(data, tuple) |
1152 for table, col_data in data: | 1161 for table, col_data in data: |
1153 assert isinstance(table, basestring) | 1162 assert isinstance(table, str) |
1154 assert isinstance(col_data, tuple) | 1163 assert isinstance(col_data, tuple) |
1155 for cols in col_data: | 1164 for cols in col_data: |
1156 if isinstance(cols, tuple): | 1165 if isinstance(cols, tuple): |
1157 assert all([isinstance(c, basestring) for c in cols]) | 1166 assert all([isinstance(c, str) for c in cols]) |
1158 indexed_cols = u','.join(cols) | 1167 indexed_cols = ','.join(cols) |
1159 elif isinstance(cols, basestring): | 1168 elif isinstance(cols, str): |
1160 indexed_cols = cols | 1169 indexed_cols = cols |
1161 else: | 1170 else: |
1162 raise exceptions.InternalError(u"unexpected index columns value") | 1171 raise exceptions.InternalError("unexpected index columns value") |
1163 index_name = table + u'__' + indexed_cols.replace(u',', u'_') | 1172 index_name = table + '__' + indexed_cols.replace(',', '_') |
1164 ret.append(Updater.INDEX_SQL % (index_name, table, indexed_cols)) | 1173 ret.append(Updater.INDEX_SQL % (index_name, table, indexed_cols)) |
1165 return ret | 1174 return ret |
1166 | 1175 |
1167 def statementHash(self, data): | 1176 def statementHash(self, data): |
1168 """ Generate hash of template data | 1177 """ Generate hash of template data |
1171 @param data: dictionary of "CREATE" statement, with tables names as key, | 1180 @param data: dictionary of "CREATE" statement, with tables names as key, |
1172 and tuples of (col_defs, constraints) as values | 1181 and tuples of (col_defs, constraints) as values |
1173 @return: hash as string | 1182 @return: hash as string |
1174 """ | 1183 """ |
1175 hash_ = hashlib.sha1() | 1184 hash_ = hashlib.sha1() |
1176 tables = data.keys() | 1185 tables = list(data.keys()) |
1177 tables.sort() | 1186 tables.sort() |
1178 | 1187 |
1179 def stmnts2str(stmts): | 1188 def stmnts2str(stmts): |
1180 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)]) | 1189 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)]) |
1181 | 1190 |
1182 for table in tables: | 1191 for table in tables: |
1183 col_defs, col_constr = data[table] | 1192 col_defs, col_constr = data[table] |
1184 hash_.update("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr))) | 1193 hash_.update( |
1194 ("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr))) | |
1195 .encode('utf-8')) | |
1185 return hash_.digest() | 1196 return hash_.digest() |
1186 | 1197 |
1187 def rawStatements2data(self, raw_statements): | 1198 def rawStatements2data(self, raw_statements): |
1188 """ separate "CREATE" statements into dictionary/tuples data | 1199 """ separate "CREATE" statements into dictionary/tuples data |
1189 | 1200 |
1322 ret.extend(cmds or []) | 1333 ret.extend(cmds or []) |
1323 defer.returnValue(ret) | 1334 defer.returnValue(ret) |
1324 | 1335 |
1325 def update_v8(self): | 1336 def update_v8(self): |
1326 """Update database from v7 to v8 (primary keys order changes + indexes)""" | 1337 """Update database from v7 to v8 (primary keys order changes + indexes)""" |
1327 log.info(u"Database update to v8") | 1338 log.info("Database update to v8") |
1328 statements = ["PRAGMA foreign_keys = OFF"] | 1339 statements = ["PRAGMA foreign_keys = OFF"] |
1329 | 1340 |
1330 # here is a copy of create and index data, we can't use "current" table | 1341 # here is a copy of create and index data, we can't use "current" table |
1331 # because it may change in a future version, which would break the update | 1342 # because it may change in a future version, which would break the update |
1332 # when doing v8 | 1343 # when doing v8 |
1355 for table in ('param_gen', 'param_ind', 'private_ind', 'private_ind_bin'): | 1366 for table in ('param_gen', 'param_ind', 'private_ind', 'private_ind_bin'): |
1356 statements.append("ALTER TABLE {0} RENAME TO {0}_old".format(table)) | 1367 statements.append("ALTER TABLE {0} RENAME TO {0}_old".format(table)) |
1357 schema = {table: create[table]} | 1368 schema = {table: create[table]} |
1358 cols = [d.split()[0] for d in schema[table][0]] | 1369 cols = [d.split()[0] for d in schema[table][0]] |
1359 statements.extend(Updater.createData2Raw(schema)) | 1370 statements.extend(Updater.createData2Raw(schema)) |
1360 statements.append(u"INSERT INTO {table}({cols}) " | 1371 statements.append("INSERT INTO {table}({cols}) " |
1361 u"SELECT {cols} FROM {table}_old".format( | 1372 "SELECT {cols} FROM {table}_old".format( |
1362 table=table, | 1373 table=table, |
1363 cols=u','.join(cols))) | 1374 cols=','.join(cols))) |
1364 statements.append(u"DROP TABLE {}_old".format(table)) | 1375 statements.append("DROP TABLE {}_old".format(table)) |
1365 | 1376 |
1366 statements.extend(Updater.indexData2Raw(index)) | 1377 statements.extend(Updater.indexData2Raw(index)) |
1367 statements.append("PRAGMA foreign_keys = ON") | 1378 statements.append("PRAGMA foreign_keys = ON") |
1368 return statements | 1379 return statements |
1369 | 1380 |
1370 @defer.inlineCallbacks | 1381 @defer.inlineCallbacks |
1371 def update_v7(self): | 1382 def update_v7(self): |
1372 """Update database from v6 to v7 (history unique constraint change)""" | 1383 """Update database from v6 to v7 (history unique constraint change)""" |
1373 log.info(u"Database update to v7, this may be long depending on your history " | 1384 log.info("Database update to v7, this may be long depending on your history " |
1374 u"size, please be patient.") | 1385 "size, please be patient.") |
1375 | 1386 |
1376 log.info(u"Some cleaning first") | 1387 log.info("Some cleaning first") |
1377 # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema | 1388 # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema |
1378 # normally database should not contain any, but better safe than sorry. | 1389 # normally database should not contain any, but better safe than sorry. |
1379 rows = yield self.dbpool.runQuery( | 1390 rows = yield self.dbpool.runQuery( |
1380 u"SELECT stanza_id, COUNT(*) as c FROM history WHERE stanza_id is not NULL " | 1391 "SELECT stanza_id, COUNT(*) as c FROM history WHERE stanza_id is not NULL " |
1381 u"GROUP BY stanza_id HAVING c>1") | 1392 "GROUP BY stanza_id HAVING c>1") |
1382 if rows: | 1393 if rows: |
1383 count = sum([r[1] for r in rows]) - len(rows) | 1394 count = sum([r[1] for r in rows]) - len(rows) |
1384 log.info(u"{count} duplicate stanzas found, cleaning".format(count=count)) | 1395 log.info("{count} duplicate stanzas found, cleaning".format(count=count)) |
1385 for stanza_id, count in rows: | 1396 for stanza_id, count in rows: |
1386 log.info(u"cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id)) | 1397 log.info("cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id)) |
1387 row_uids = yield self.dbpool.runQuery( | 1398 row_uids = yield self.dbpool.runQuery( |
1388 "SELECT uid FROM history WHERE stanza_id = ? LIMIT ?", | 1399 "SELECT uid FROM history WHERE stanza_id = ? LIMIT ?", |
1389 (stanza_id, count-1)) | 1400 (stanza_id, count-1)) |
1390 uids = [r[0] for r in row_uids] | 1401 uids = [r[0] for r in row_uids] |
1391 yield self.dbpool.runQuery( | 1402 yield self.dbpool.runQuery( |
1392 "DELETE FROM history WHERE uid IN ({})".format(u",".join(u"?"*len(uids))), | 1403 "DELETE FROM history WHERE uid IN ({})".format(",".join("?"*len(uids))), |
1393 uids) | 1404 uids) |
1394 | 1405 |
1395 def deleteInfo(txn): | 1406 def deleteInfo(txn): |
1396 # with foreign_keys on, the delete takes ages, so we deactivate it here | 1407 # with foreign_keys on, the delete takes ages, so we deactivate it here |
1397 # the time to delete info messages from history. | 1408 # the time to delete info messages from history. |
1398 txn.execute("PRAGMA foreign_keys = OFF") | 1409 txn.execute("PRAGMA foreign_keys = OFF") |
1399 txn.execute(u"DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " | 1410 txn.execute("DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " |
1400 u"type='info')") | 1411 "type='info')") |
1401 txn.execute(u"DELETE FROM subject WHERE history_uid IN (SELECT uid FROM history WHERE " | 1412 txn.execute("DELETE FROM subject WHERE history_uid IN (SELECT uid FROM history WHERE " |
1402 u"type='info')") | 1413 "type='info')") |
1403 txn.execute(u"DELETE FROM thread WHERE history_uid IN (SELECT uid FROM history WHERE " | 1414 txn.execute("DELETE FROM thread WHERE history_uid IN (SELECT uid FROM history WHERE " |
1404 u"type='info')") | 1415 "type='info')") |
1405 txn.execute(u"DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " | 1416 txn.execute("DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " |
1406 u"type='info')") | 1417 "type='info')") |
1407 txn.execute(u"DELETE FROM history WHERE type='info'") | 1418 txn.execute("DELETE FROM history WHERE type='info'") |
1408 # not sure that is is necessary to reactivate here, but in doubt… | 1419 # not sure that is is necessary to reactivate here, but in doubt… |
1409 txn.execute("PRAGMA foreign_keys = ON") | 1420 txn.execute("PRAGMA foreign_keys = ON") |
1410 | 1421 |
1411 log.info(u'Deleting "info" messages (this can take a while)') | 1422 log.info('Deleting "info" messages (this can take a while)') |
1412 yield self.dbpool.runInteraction(deleteInfo) | 1423 yield self.dbpool.runInteraction(deleteInfo) |
1413 | 1424 |
1414 log.info(u"Cleaning done") | 1425 log.info("Cleaning done") |
1415 | 1426 |
1416 # we have to rename table we will replace | 1427 # we have to rename table we will replace |
1417 # tables referencing history need to be replaced to, else reference would | 1428 # tables referencing history need to be replaced to, else reference would |
1418 # be to the old table (which will be dropped at the end). This buggy behaviour | 1429 # be to the old table (which will be dropped at the end). This buggy behaviour |
1419 # seems to be fixed in new version of Sqlite | 1430 # seems to be fixed in new version of Sqlite |
1421 yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old") | 1432 yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old") |
1422 yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old") | 1433 yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old") |
1423 yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old") | 1434 yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old") |
1424 | 1435 |
1425 # history | 1436 # history |
1426 query = (u"CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, " | 1437 query = ("CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, " |
1427 u"update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, " | 1438 "update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, " |
1428 u"source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, " | 1439 "source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, " |
1429 u"received_timestamp DATETIME, type TEXT, extra BLOB, " | 1440 "received_timestamp DATETIME, type TEXT, extra BLOB, " |
1430 u"FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, " | 1441 "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, " |
1431 u"FOREIGN KEY(type) REFERENCES message_types(type), " | 1442 "FOREIGN KEY(type) REFERENCES message_types(type), " |
1432 u"UNIQUE (profile_id, stanza_id, source, dest))") | 1443 "UNIQUE (profile_id, stanza_id, source, dest))") |
1433 yield self.dbpool.runQuery(query) | 1444 yield self.dbpool.runQuery(query) |
1434 | 1445 |
1435 # message | 1446 # message |
1436 query = (u"CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" | 1447 query = ("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" |
1437 u", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " | 1448 ", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " |
1438 u"history(uid) ON DELETE CASCADE)") | 1449 "history(uid) ON DELETE CASCADE)") |
1439 yield self.dbpool.runQuery(query) | 1450 yield self.dbpool.runQuery(query) |
1440 | 1451 |
1441 # subject | 1452 # subject |
1442 query = (u"CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" | 1453 query = ("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" |
1443 u", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " | 1454 ", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " |
1444 u"history(uid) ON DELETE CASCADE)") | 1455 "history(uid) ON DELETE CASCADE)") |
1445 yield self.dbpool.runQuery(query) | 1456 yield self.dbpool.runQuery(query) |
1446 | 1457 |
1447 # thread | 1458 # thread |
1448 query = (u"CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" | 1459 query = ("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" |
1449 u", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES " | 1460 ", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES " |
1450 u"history(uid) ON DELETE CASCADE)") | 1461 "history(uid) ON DELETE CASCADE)") |
1451 yield self.dbpool.runQuery(query) | 1462 yield self.dbpool.runQuery(query) |
1452 | 1463 |
1453 log.info(u"Now transfering old data to new tables, please be patient.") | 1464 log.info("Now transfering old data to new tables, please be patient.") |
1454 | 1465 |
1455 log.info(u"\nTransfering table history") | 1466 log.info("\nTransfering table history") |
1456 query = (u"INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, " | 1467 query = ("INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, " |
1457 u"dest, source_res, dest_res, timestamp, received_timestamp, type, extra" | 1468 "dest, source_res, dest_res, timestamp, received_timestamp, type, extra" |
1458 u") SELECT uid, stanza_id, update_uid, profile_id, source, dest, " | 1469 ") SELECT uid, stanza_id, update_uid, profile_id, source, dest, " |
1459 u"source_res, dest_res, timestamp, received_timestamp, type, extra " | 1470 "source_res, dest_res, timestamp, received_timestamp, type, extra " |
1460 u"FROM history_old") | 1471 "FROM history_old") |
1461 yield self.dbpool.runQuery(query) | 1472 yield self.dbpool.runQuery(query) |
1462 | 1473 |
1463 log.info(u"\nTransfering table message") | 1474 log.info("\nTransfering table message") |
1464 query = (u"INSERT INTO message (id, history_uid, message, language) SELECT id, " | 1475 query = ("INSERT INTO message (id, history_uid, message, language) SELECT id, " |
1465 u"history_uid, message, language FROM message_old") | 1476 "history_uid, message, language FROM message_old") |
1466 yield self.dbpool.runQuery(query) | 1477 yield self.dbpool.runQuery(query) |
1467 | 1478 |
1468 log.info(u"\nTransfering table subject") | 1479 log.info("\nTransfering table subject") |
1469 query = (u"INSERT INTO subject (id, history_uid, subject, language) SELECT id, " | 1480 query = ("INSERT INTO subject (id, history_uid, subject, language) SELECT id, " |
1470 u"history_uid, subject, language FROM subject_old") | 1481 "history_uid, subject, language FROM subject_old") |
1471 yield self.dbpool.runQuery(query) | 1482 yield self.dbpool.runQuery(query) |
1472 | 1483 |
1473 log.info(u"\nTransfering table thread") | 1484 log.info("\nTransfering table thread") |
1474 query = (u"INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id" | 1485 query = ("INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id" |
1475 u", history_uid, thread_id, parent_id FROM thread_old") | 1486 ", history_uid, thread_id, parent_id FROM thread_old") |
1476 yield self.dbpool.runQuery(query) | 1487 yield self.dbpool.runQuery(query) |
1477 | 1488 |
1478 log.info(u"\nRemoving old tables") | 1489 log.info("\nRemoving old tables") |
1479 # because of foreign keys, tables referencing history_old | 1490 # because of foreign keys, tables referencing history_old |
1480 # must be deleted first | 1491 # must be deleted first |
1481 yield self.dbpool.runQuery("DROP TABLE thread_old") | 1492 yield self.dbpool.runQuery("DROP TABLE thread_old") |
1482 yield self.dbpool.runQuery("DROP TABLE subject_old") | 1493 yield self.dbpool.runQuery("DROP TABLE subject_old") |
1483 yield self.dbpool.runQuery("DROP TABLE message_old") | 1494 yield self.dbpool.runQuery("DROP TABLE message_old") |
1484 yield self.dbpool.runQuery("DROP TABLE history_old") | 1495 yield self.dbpool.runQuery("DROP TABLE history_old") |
1485 log.info(u"\nReducing database size (this can take a while)") | 1496 log.info("\nReducing database size (this can take a while)") |
1486 yield self.dbpool.runQuery("VACUUM") | 1497 yield self.dbpool.runQuery("VACUUM") |
1487 log.info(u"Database update done :)") | 1498 log.info("Database update done :)") |
1488 | 1499 |
1489 @defer.inlineCallbacks | 1500 @defer.inlineCallbacks |
1490 def update_v3(self): | 1501 def update_v3(self): |
1491 """Update database from v2 to v3 (message refactoring)""" | 1502 """Update database from v2 to v3 (message refactoring)""" |
1492 # XXX: this update do all the messages in one huge transaction | 1503 # XXX: this update do all the messages in one huge transaction |
1493 # this is really memory consuming, but was OK on a reasonably | 1504 # this is really memory consuming, but was OK on a reasonably |
1494 # big database for tests. If issues are happening, we can cut it | 1505 # big database for tests. If issues are happening, we can cut it |
1495 # in smaller transactions using LIMIT and by deleting already updated | 1506 # in smaller transactions using LIMIT and by deleting already updated |
1496 # messages | 1507 # messages |
1497 log.info(u"Database update to v3, this may take a while") | 1508 log.info("Database update to v3, this may take a while") |
1498 | 1509 |
1499 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema | 1510 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema |
1500 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1") | 1511 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1") |
1501 if rows: | 1512 if rows: |
1502 log.info("fixing duplicate timestamp") | 1513 log.info("fixing duplicate timestamp") |
1504 for timestamp, __ in rows: | 1515 for timestamp, __ in rows: |
1505 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,)) | 1516 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,)) |
1506 for idx, (id_,) in enumerate(ids_rows): | 1517 for idx, (id_,) in enumerate(ids_rows): |
1507 fixed.append(id_) | 1518 fixed.append(id_) |
1508 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_)) | 1519 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_)) |
1509 log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed]))) | 1520 log.info("fixed messages with ids {}".format(', '.join([str(id_) for id_ in fixed]))) |
1510 | 1521 |
1511 def historySchema(txn): | 1522 def historySchema(txn): |
1512 log.info(u"History schema update") | 1523 log.info("History schema update") |
1513 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update") | 1524 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update") |
1514 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))") | 1525 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))") |
1515 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update") | 1526 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update") |
1516 | 1527 |
1517 yield self.dbpool.runInteraction(historySchema) | 1528 yield self.dbpool.runInteraction(historySchema) |
1518 | 1529 |
1519 def newTables(txn): | 1530 def newTables(txn): |
1520 log.info(u"Creating new tables") | 1531 log.info("Creating new tables") |
1521 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | 1532 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") |
1522 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | 1533 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") |
1523 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") | 1534 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") |
1524 | 1535 |
1525 yield self.dbpool.runInteraction(newTables) | 1536 yield self.dbpool.runInteraction(newTables) |
1526 | 1537 |
1527 log.info(u"inserting new message type") | 1538 log.info("inserting new message type") |
1528 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',)) | 1539 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',)) |
1529 | 1540 |
1530 log.info(u"messages update") | 1541 log.info("messages update") |
1531 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update") | 1542 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update") |
1532 total = len(rows) | 1543 total = len(rows) |
1533 | 1544 |
1534 def updateHistory(txn, queries): | 1545 def updateHistory(txn, queries): |
1535 for query, args in iter(queries): | 1546 for query, args in iter(queries): |
1543 try: | 1554 try: |
1544 extra = pickle.loads(str(extra or "")) | 1555 extra = pickle.loads(str(extra or "")) |
1545 except EOFError: | 1556 except EOFError: |
1546 extra = {} | 1557 extra = {} |
1547 except Exception: | 1558 except Exception: |
1548 log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_)) | 1559 log.warning("Can't handle extra data for message id {}, ignoring it".format(id_)) |
1549 extra = {} | 1560 extra = {} |
1550 | 1561 |
1551 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message))) | 1562 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message))) |
1552 | 1563 |
1553 try: | 1564 try: |
1554 subject = extra.pop('subject') | 1565 subject = extra.pop('subject') |
1555 except KeyError: | 1566 except KeyError: |
1556 pass | 1567 pass |
1557 else: | 1568 else: |
1558 try: | 1569 try: |
1559 subject = subject.decode('utf-8') | 1570 subject = subject |
1560 except UnicodeEncodeError: | 1571 except UnicodeEncodeError: |
1561 log.warning(u"Error while decoding subject, ignoring it") | 1572 log.warning("Error while decoding subject, ignoring it") |
1562 del extra['subject'] | 1573 del extra['subject'] |
1563 else: | 1574 else: |
1564 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject))) | 1575 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject))) |
1565 | 1576 |
1566 received_timestamp = extra.pop('timestamp', None) | 1577 received_timestamp = extra.pop('timestamp', None) |
1595 | 1606 |
1596 def prepare_queries(result, xmpp_password): | 1607 def prepare_queries(result, xmpp_password): |
1597 try: | 1608 try: |
1598 id_ = result[0][0] | 1609 id_ = result[0][0] |
1599 except IndexError: | 1610 except IndexError: |
1600 log.error(u"Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id) | 1611 log.error("Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id) |
1601 return defer.succeed(None) | 1612 return defer.succeed(None) |
1602 | 1613 |
1603 sat_password = xmpp_password | 1614 sat_password = xmpp_password |
1604 d1 = PasswordHasher.hash(sat_password) | 1615 d1 = PasswordHasher.hash(sat_password) |
1605 personal_key = BlockCipher.getRandomKey(base64=True) | 1616 personal_key = BlockCipher.getRandomKey(base64=True) |