comparison sat/memory/sqlite.py @ 3028:ab2696e34d29

Python 3 port: /!\ this is a huge commit /!\ starting from this commit, SàT is needs Python 3.6+ /!\ SàT maybe be instable or some feature may not work anymore, this will improve with time This patch port backend, bridge and frontends to Python 3. Roughly this has been done this way: - 2to3 tools has been applied (with python 3.7) - all references to python2 have been replaced with python3 (notably shebangs) - fixed files not handled by 2to3 (notably the shell script) - several manual fixes - fixed issues reported by Python 3 that where not handled in Python 2 - replaced "async" with "async_" when needed (it's a reserved word from Python 3.7) - replaced zope's "implements" with @implementer decorator - temporary hack to handle data pickled in database, as str or bytes may be returned, to be checked later - fixed hash comparison for password - removed some code which is not needed anymore with Python 3 - deactivated some code which needs to be checked (notably certificate validation) - tested with jp, fixed reported issues until some basic commands worked - ported Primitivus (after porting dependencies like urwid satext) - more manual fixes
author Goffi <goffi@goffi.org>
date Tue, 13 Aug 2019 19:08:41 +0200
parents 860c550028d6
children f8cc88c773c8
comparison
equal deleted inserted replaced
3027:ff5bcb12ae60 3028:ab2696e34d29
1 #!/usr/bin/env python2 1 #!/usr/bin/env python3
2 # -*- coding: utf-8 -*- 2 # -*- coding: utf-8 -*-
3 3
4 # SAT: a jabber client 4 # SAT: a jabber client
5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) 5 # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org)
6 6
29 from twisted.python import failure 29 from twisted.python import failure
30 from collections import OrderedDict 30 from collections import OrderedDict
31 import sys 31 import sys
32 import re 32 import re
33 import os.path 33 import os.path
34 import cPickle as pickle 34 import pickle as pickle
35 import hashlib 35 import hashlib
36 import sqlite3 36 import sqlite3
37 import json 37 import json
38 38
39 log = getLogger(__name__) 39 log = getLogger(__name__)
150 # FIXME: in case of error, we retry a couple of times 150 # FIXME: in case of error, we retry a couple of times
151 # this is a workaround, we need to move to better 151 # this is a workaround, we need to move to better
152 # Sqlite integration, probably with high level library 152 # Sqlite integration, probably with high level library
153 retry -= 1 153 retry -= 1
154 if retry == 0: 154 if retry == 0:
155 log.error(_(u'too many db tries, we abandon! Error message: {msg}\n' 155 log.error(_('too many db tries, we abandon! Error message: {msg}\n'
156 u'query was {query}' 156 'query was {query}'
157 .format(msg=e, query=u' '.join([unicode(a) for a in args])))) 157 .format(msg=e, query=' '.join([str(a) for a in args]))))
158 raise e 158 raise e
159 log.warning( 159 log.warning(
160 _(u'exception while running query, retrying ({try_}): {msg}').format( 160 _('exception while running query, retrying ({try_}): {msg}').format(
161 try_ = 6 - retry, 161 try_ = 6 - retry,
162 msg = e)) 162 msg = e))
163 kw['query_retry'] = retry 163 kw['query_retry'] = retry
164 return self._runQuery(trans, *args, **kw) 164 return self._runQuery(trans, *args, **kw)
165 return trans.fetchall() 165 return trans.fetchall()
173 return adbapi.ConnectionPool._runInteraction(self, interaction, *args, **kw) 173 return adbapi.ConnectionPool._runInteraction(self, interaction, *args, **kw)
174 except Exception as e: 174 except Exception as e:
175 retry -= 1 175 retry -= 1
176 if retry == 0: 176 if retry == 0:
177 log.error( 177 log.error(
178 _(u'too many interaction tries, we abandon! Error message: {msg}\n' 178 _('too many interaction tries, we abandon! Error message: {msg}\n'
179 u'interaction method was: {interaction}\n' 179 'interaction method was: {interaction}\n'
180 u'interaction arguments were: {args}' 180 'interaction arguments were: {args}'
181 .format(msg=e, interaction=interaction, 181 .format(msg=e, interaction=interaction,
182 args=u', '.join([unicode(a) for a in args])))) 182 args=', '.join([str(a) for a in args]))))
183 raise e 183 raise e
184 log.warning( 184 log.warning(
185 _(u'exception while running interaction, retrying ({try_}): {msg}') 185 _('exception while running interaction, retrying ({try_}): {msg}')
186 .format(try_ = 4 - retry, msg = e)) 186 .format(try_ = 4 - retry, msg = e))
187 kw['interaction_retry'] = retry 187 kw['interaction_retry'] = retry
188 return self._runInteraction(interaction, *args, **kw) 188 return self._runInteraction(interaction, *args, **kw)
189 189
190 190
202 log.info(_("Connecting database")) 202 log.info(_("Connecting database"))
203 new_base = not os.path.exists(db_filename) # do we have to create the database ? 203 new_base = not os.path.exists(db_filename) # do we have to create the database ?
204 if new_base: # the dir may not exist if it's not the XDG recommended one 204 if new_base: # the dir may not exist if it's not the XDG recommended one
205 dir_ = os.path.dirname(db_filename) 205 dir_ = os.path.dirname(db_filename)
206 if not os.path.exists(dir_): 206 if not os.path.exists(dir_):
207 os.makedirs(dir_, 0700) 207 os.makedirs(dir_, 0o700)
208 208
209 def foreignKeysOn(sqlite): 209 def foreignKeysOn(sqlite):
210 sqlite.execute('PRAGMA foreign_keys = ON') 210 sqlite.execute('PRAGMA foreign_keys = ON')
211 211
212 self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15) 212 self.dbpool = ConnectionPool("sqlite3", db_filename, cp_openfun=foreignKeysOn, check_same_thread=False, timeout=15)
238 238
239 def commitStatements(self, statements): 239 def commitStatements(self, statements):
240 240
241 if statements is None: 241 if statements is None:
242 return defer.succeed(None) 242 return defer.succeed(None)
243 log.debug(u"\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements)) 243 log.debug("\n===== COMMITTING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
244 d = self.dbpool.runInteraction(self._updateDb, tuple(statements)) 244 d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
245 return d 245 return d
246 246
247 def _updateDb(self, interaction, statements): 247 def _updateDb(self, interaction, statements):
248 for statement in statements: 248 for statement in statements:
268 name, id_ = profile 268 name, id_ = profile
269 self.profiles[name] = id_ 269 self.profiles[name] = id_
270 270
271 def getProfilesList(self): 271 def getProfilesList(self):
272 """"Return list of all registered profiles""" 272 """"Return list of all registered profiles"""
273 return self.profiles.keys() 273 return list(self.profiles.keys())
274 274
275 def hasProfile(self, profile_name): 275 def hasProfile(self, profile_name):
276 """return True if profile_name exists 276 """return True if profile_name exists
277 277
278 @param profile_name: name of the profile to check 278 @param profile_name: name of the profile to check
281 281
282 def profileIsComponent(self, profile_name): 282 def profileIsComponent(self, profile_name):
283 try: 283 try:
284 return self.profiles[profile_name] in self.components 284 return self.profiles[profile_name] in self.components
285 except KeyError: 285 except KeyError:
286 raise exceptions.NotFound(u"the requested profile doesn't exists") 286 raise exceptions.NotFound("the requested profile doesn't exists")
287 287
288 def getEntryPoint(self, profile_name): 288 def getEntryPoint(self, profile_name):
289 try: 289 try:
290 return self.components[self.profiles[profile_name]] 290 return self.components[self.profiles[profile_name]]
291 except KeyError: 291 except KeyError:
292 raise exceptions.NotFound(u"the requested profile doesn't exists or is not a component") 292 raise exceptions.NotFound("the requested profile doesn't exists or is not a component")
293 293
294 def createProfile(self, name, component=None): 294 def createProfile(self, name, component=None):
295 """Create a new profile 295 """Create a new profile
296 296
297 @param name(unicode): name of the profile 297 @param name(unicode): name of the profile
324 324
325 @param name: name of the profile 325 @param name: name of the profile
326 @return: deferred triggered once profile is actually deleted 326 @return: deferred triggered once profile is actually deleted
327 """ 327 """
328 def deletionError(failure_): 328 def deletionError(failure_):
329 log.error(_(u"Can't delete profile [%s]") % name) 329 log.error(_("Can't delete profile [%s]") % name)
330 return failure_ 330 return failure_
331 331
332 def delete(txn): 332 def delete(txn):
333 profile_id = self.profiles.pop(name) 333 profile_id = self.profiles.pop(name)
334 txn.execute("DELETE FROM profiles WHERE name = ?", (name,)) 334 txn.execute("DELETE FROM profiles WHERE name = ?", (name,))
357 357
358 def fillParams(result): 358 def fillParams(result):
359 for param in result: 359 for param in result:
360 category, name, value = param 360 category, name, value = param
361 params_gen[(category, name)] = value 361 params_gen[(category, name)] = value
362 log.debug(_(u"loading general parameters from database")) 362 log.debug(_("loading general parameters from database"))
363 return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams) 363 return self.dbpool.runQuery("SELECT category,name,value FROM param_gen").addCallback(fillParams)
364 364
365 def loadIndParams(self, params_ind, profile): 365 def loadIndParams(self, params_ind, profile):
366 """Load individual parameters 366 """Load individual parameters
367 367
372 372
373 def fillParams(result): 373 def fillParams(result):
374 for param in result: 374 for param in result:
375 category, name, value = param 375 category, name, value = param
376 params_ind[(category, name)] = value 376 params_ind[(category, name)] = value
377 log.debug(_(u"loading individual parameters from database")) 377 log.debug(_("loading individual parameters from database"))
378 d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], )) 378 d = self.dbpool.runQuery("SELECT category,name,value FROM param_ind WHERE profile_id=?", (self.profiles[profile], ))
379 d.addCallback(fillParams) 379 d.addCallback(fillParams)
380 return d 380 return d
381 381
382 def getIndParam(self, category, name, profile): 382 def getIndParam(self, category, name, profile):
397 @param category: category of the parameter 397 @param category: category of the parameter
398 @param name: name of the parameter 398 @param name: name of the parameter
399 @param value: value to set 399 @param value: value to set
400 @return: deferred""" 400 @return: deferred"""
401 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value)) 401 d = self.dbpool.runQuery("REPLACE INTO param_gen(category,name,value) VALUES (?,?,?)", (category, name, value))
402 d.addErrback(lambda ignore: log.error(_(u"Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name}))) 402 d.addErrback(lambda ignore: log.error(_("Can't set general parameter (%(category)s/%(name)s) in database" % {"category": category, "name": name})))
403 return d 403 return d
404 404
405 def setIndParam(self, category, name, value, profile): 405 def setIndParam(self, category, name, value, profile):
406 """Save the individual parameters in database 406 """Save the individual parameters in database
407 407
410 @param value: value to set 410 @param value: value to set
411 @param profile: a profile which *must* exist 411 @param profile: a profile which *must* exist
412 @return: deferred 412 @return: deferred
413 """ 413 """
414 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value)) 414 d = self.dbpool.runQuery("REPLACE INTO param_ind(category,name,profile_id,value) VALUES (?,?,?,?)", (category, name, self.profiles[profile], value))
415 d.addErrback(lambda ignore: log.error(_(u"Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile}))) 415 d.addErrback(lambda ignore: log.error(_("Can't set individual parameter (%(category)s/%(name)s) for [%(profile)s] in database" % {"category": category, "name": name, "profile": profile})))
416 return d 416 return d
417 417
418 ## History 418 ## History
419 419
420 def _addToHistoryCb(self, __, data): 420 def _addToHistoryCb(self, __, data):
421 # Message metadata were successfuly added to history 421 # Message metadata were successfuly added to history
422 # now we can add message and subject 422 # now we can add message and subject
423 uid = data['uid'] 423 uid = data['uid']
424 d_list = [] 424 d_list = []
425 for key in ('message', 'subject'): 425 for key in ('message', 'subject'):
426 for lang, value in data[key].iteritems(): 426 for lang, value in data[key].items():
427 d = self.dbpool.runQuery( 427 d = self.dbpool.runQuery(
428 "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)" 428 "INSERT INTO {key}(history_uid, {key}, language) VALUES (?,?,?)"
429 .format(key=key), 429 .format(key=key),
430 (uid, value, lang or None)) 430 (uid, value, lang or None))
431 d.addErrback(lambda __: log.error( 431 d.addErrback(lambda __: log.error(
432 _(u"Can't save following {key} in history (uid: {uid}, lang:{lang}):" 432 _("Can't save following {key} in history (uid: {uid}, lang:{lang}):"
433 u" {value}").format( 433 " {value}").format(
434 key=key, uid=uid, lang=lang, value=value))) 434 key=key, uid=uid, lang=lang, value=value)))
435 d_list.append(d) 435 d_list.append(d)
436 try: 436 try:
437 thread = data['extra']['thread'] 437 thread = data['extra']['thread']
438 except KeyError: 438 except KeyError:
441 thread_parent = data['extra'].get('thread_parent') 441 thread_parent = data['extra'].get('thread_parent')
442 d = self.dbpool.runQuery( 442 d = self.dbpool.runQuery(
443 "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)", 443 "INSERT INTO thread(history_uid, thread_id, parent_id) VALUES (?,?,?)",
444 (uid, thread, thread_parent)) 444 (uid, thread, thread_parent))
445 d.addErrback(lambda __: log.error( 445 d.addErrback(lambda __: log.error(
446 _(u"Can't save following thread in history (uid: {uid}): thread: " 446 _("Can't save following thread in history (uid: {uid}): thread: "
447 u"{thread}), parent:{parent}").format( 447 "{thread}), parent:{parent}").format(
448 uid=uid, thread=thread, parent=thread_parent))) 448 uid=uid, thread=thread, parent=thread_parent)))
449 d_list.append(d) 449 d_list.append(d)
450 return defer.DeferredList(d_list) 450 return defer.DeferredList(d_list)
451 451
452 def _addToHistoryEb(self, failure_, data): 452 def _addToHistoryEb(self, failure_, data):
453 failure_.trap(sqlite3.IntegrityError) 453 failure_.trap(sqlite3.IntegrityError)
454 sqlite_msg = failure_.value.args[0] 454 sqlite_msg = failure_.value.args[0]
455 if "UNIQUE constraint failed" in sqlite_msg: 455 if "UNIQUE constraint failed" in sqlite_msg:
456 log.debug(u"message {} is already in history, not storing it again" 456 log.debug("message {} is already in history, not storing it again"
457 .format(data['uid'])) 457 .format(data['uid']))
458 if 'received_timestamp' not in data: 458 if 'received_timestamp' not in data:
459 log.warning( 459 log.warning(
460 u"duplicate message is not delayed, this is maybe a bug: data={}" 460 "duplicate message is not delayed, this is maybe a bug: data={}"
461 .format(data)) 461 .format(data))
462 # we cancel message to avoid sending duplicate message to frontends 462 # we cancel message to avoid sending duplicate message to frontends
463 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message")) 463 raise failure.Failure(exceptions.CancelError("Cancelled duplicated message"))
464 else: 464 else:
465 log.error(u"Can't store message in history: {}".format(failure_)) 465 log.error("Can't store message in history: {}".format(failure_))
466 466
467 def _logHistoryError(self, failure_, from_jid, to_jid, data): 467 def _logHistoryError(self, failure_, from_jid, to_jid, data):
468 if failure_.check(exceptions.CancelError): 468 if failure_.check(exceptions.CancelError):
469 # we propagate CancelError to avoid sending message to frontends 469 # we propagate CancelError to avoid sending message to frontends
470 raise failure_ 470 raise failure_
471 log.error(_( 471 log.error(_(
472 u"Can't save following message in history: from [{from_jid}] to [{to_jid}] " 472 "Can't save following message in history: from [{from_jid}] to [{to_jid}] "
473 u"(uid: {uid})") 473 "(uid: {uid})")
474 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid'])) 474 .format(from_jid=from_jid.full(), to_jid=to_jid.full(), uid=data['uid']))
475 475
476 def addToHistory(self, data, profile): 476 def addToHistory(self, data, profile):
477 """Store a new message in history 477 """Store a new message in history
478 478
479 @param data(dict): message data as build by SatMessageProtocol.onMessage 479 @param data(dict): message data as build by SatMessageProtocol.onMessage
480 """ 480 """
481 extra = pickle.dumps({k: v for k, v in data['extra'].iteritems() 481 extra = pickle.dumps({k: v for k, v in data['extra'].items()
482 if k not in NOT_IN_EXTRA}, 0) 482 if k not in NOT_IN_EXTRA}, 0)
483 from_jid = data['from'] 483 from_jid = data['from']
484 to_jid = data['to'] 484 to_jid = data['to']
485 d = self.dbpool.runQuery( 485 d = self.dbpool.runQuery(
486 u"INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, " 486 "INSERT INTO history(uid, stanza_id, update_uid, profile_id, source, dest, "
487 u"source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES " 487 "source_res, dest_res, timestamp, received_timestamp, type, extra) VALUES "
488 u"(?,?,?,?,?,?,?,?,?,?,?,?)", 488 "(?,?,?,?,?,?,?,?,?,?,?,?)",
489 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'), 489 (data['uid'], data['extra'].get('stanza_id'), data['extra'].get('update_uid'),
490 self.profiles[profile], data['from'].userhost(), to_jid.userhost(), 490 self.profiles[profile], data['from'].userhost(), to_jid.userhost(),
491 from_jid.resource, to_jid.resource, data['timestamp'], 491 from_jid.resource, to_jid.resource, data['timestamp'],
492 data.get('received_timestamp'), data['type'], sqlite3.Binary(extra))) 492 data.get('received_timestamp'), data['type'], sqlite3.Binary(extra)))
493 d.addCallbacks(self._addToHistoryCb, 493 d.addCallbacks(self._addToHistoryCb,
506 received_timestamp, type_, extra, message, message_lang, subject, 506 received_timestamp, type_, extra, message, message_lang, subject,
507 subject_lang, thread, thread_parent) = row 507 subject_lang, thread, thread_parent) = row
508 if uid != current['uid']: 508 if uid != current['uid']:
509 # new message 509 # new message
510 try: 510 try:
511 extra = pickle.loads(str(extra or "")) 511 extra = pickle.loads(extra or b"")
512 except EOFError: 512 except EOFError:
513 extra = {} 513 extra = {}
514 current = { 514 current = {
515 'from': "%s/%s" % (source, source_res) if source_res else source, 515 'from': "%s/%s" % (source, source_res) if source_res else source,
516 'to': "%s/%s" % (dest, dest_res) if dest_res else dest, 516 'to': "%s/%s" % (dest, dest_res) if dest_res else dest,
541 if thread_parent is not None: 541 if thread_parent is not None:
542 current_extra['thread_parent'] = thread_parent 542 current_extra['thread_parent'] = thread_parent
543 else: 543 else:
544 if thread_parent is not None: 544 if thread_parent is not None:
545 log.error( 545 log.error(
546 u"Database inconsistency: thread parent without thread (uid: " 546 "Database inconsistency: thread parent without thread (uid: "
547 u"{uid}, thread_parent: {parent})" 547 "{uid}, thread_parent: {parent})"
548 .format(uid=uid, parent=thread_parent)) 548 .format(uid=uid, parent=thread_parent))
549 549
550 return result 550 return result
551 551
552 def listDict2listTuple(self, messages_data): 552 def listDict2listTuple(self, messages_data):
573 if filters is None: 573 if filters is None:
574 filters = {} 574 filters = {}
575 if limit == 0: 575 if limit == 0:
576 return defer.succeed([]) 576 return defer.succeed([])
577 577
578 query_parts = [u"SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\ 578 query_parts = ["SELECT uid, stanza_id, update_uid, source, dest, source_res, dest_res, timestamp, received_timestamp,\
579 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\ 579 type, extra, message, message.language, subject, subject.language, thread_id, thread.parent_id\
580 FROM history LEFT JOIN message ON history.uid = message.history_uid\ 580 FROM history LEFT JOIN message ON history.uid = message.history_uid\
581 LEFT JOIN subject ON history.uid=subject.history_uid\ 581 LEFT JOIN subject ON history.uid=subject.history_uid\
582 LEFT JOIN thread ON history.uid=thread.history_uid\ 582 LEFT JOIN thread ON history.uid=thread.history_uid\
583 WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here 583 WHERE profile_id=?"] # FIXME: not sure if it's the best request, messages and subjects can appear several times here
585 585
586 def test_jid(type_, jid_): 586 def test_jid(type_, jid_):
587 values.append(jid_.userhost()) 587 values.append(jid_.userhost())
588 if jid_.resource: 588 if jid_.resource:
589 values.append(jid_.resource) 589 values.append(jid_.resource)
590 return u'({type_}=? AND {type_}_res=?)'.format(type_=type_) 590 return '({type_}=? AND {type_}_res=?)'.format(type_=type_)
591 return u'{type_}=?'.format(type_=type_) 591 return '{type_}=?'.format(type_=type_)
592 592
593 if not from_jid and not to_jid: 593 if not from_jid and not to_jid:
594 # not jid specified, we want all one2one communications 594 # not jid specified, we want all one2one communications
595 pass 595 pass
596 elif between: 596 elif between:
597 if not from_jid or not to_jid: 597 if not from_jid or not to_jid:
598 # we only have one jid specified, we check all messages 598 # we only have one jid specified, we check all messages
599 # from or to this jid 599 # from or to this jid
600 jid_ = from_jid or to_jid 600 jid_ = from_jid or to_jid
601 query_parts.append(u"AND ({source} OR {dest})".format( 601 query_parts.append("AND ({source} OR {dest})".format(
602 source=test_jid(u'source', jid_), 602 source=test_jid('source', jid_),
603 dest=test_jid(u'dest' , jid_))) 603 dest=test_jid('dest' , jid_)))
604 else: 604 else:
605 # we have 2 jids specified, we check all communications between 605 # we have 2 jids specified, we check all communications between
606 # those 2 jids 606 # those 2 jids
607 query_parts.append( 607 query_parts.append(
608 u"AND (({source_from} AND {dest_to}) " 608 "AND (({source_from} AND {dest_to}) "
609 u"OR ({source_to} AND {dest_from}))".format( 609 "OR ({source_to} AND {dest_from}))".format(
610 source_from=test_jid('source', from_jid), 610 source_from=test_jid('source', from_jid),
611 dest_to=test_jid('dest', to_jid), 611 dest_to=test_jid('dest', to_jid),
612 source_to=test_jid('source', to_jid), 612 source_to=test_jid('source', to_jid),
613 dest_from=test_jid('dest', from_jid))) 613 dest_from=test_jid('dest', from_jid)))
614 else: 614 else:
617 q = [] 617 q = []
618 if from_jid is not None: 618 if from_jid is not None:
619 q.append(test_jid('source', from_jid)) 619 q.append(test_jid('source', from_jid))
620 if to_jid is not None: 620 if to_jid is not None:
621 q.append(test_jid('dest', to_jid)) 621 q.append(test_jid('dest', to_jid))
622 query_parts.append(u"AND " + u" AND ".join(q)) 622 query_parts.append("AND " + " AND ".join(q))
623 623
624 if filters: 624 if filters:
625 if u'timestamp_start' in filters: 625 if 'timestamp_start' in filters:
626 query_parts.append(u"AND timestamp>= ?") 626 query_parts.append("AND timestamp>= ?")
627 values.append(float(filters[u'timestamp_start'])) 627 values.append(float(filters['timestamp_start']))
628 if u'body' in filters: 628 if 'body' in filters:
629 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html 629 # TODO: use REGEXP (function to be defined) instead of GLOB: https://www.sqlite.org/lang_expr.html
630 query_parts.append(u"AND message LIKE ?") 630 query_parts.append("AND message LIKE ?")
631 values.append(u"%{}%".format(filters['body'])) 631 values.append("%{}%".format(filters['body']))
632 if u'search' in filters: 632 if 'search' in filters:
633 query_parts.append(u"AND (message LIKE ? OR source_res LIKE ?)") 633 query_parts.append("AND (message LIKE ? OR source_res LIKE ?)")
634 values.extend([u"%{}%".format(filters['search'])] * 2) 634 values.extend(["%{}%".format(filters['search'])] * 2)
635 if u'types' in filters: 635 if 'types' in filters:
636 types = filters['types'].split() 636 types = filters['types'].split()
637 query_parts.append(u"AND type IN ({})".format(u','.join("?"*len(types)))) 637 query_parts.append("AND type IN ({})".format(','.join("?"*len(types))))
638 values.extend(types) 638 values.extend(types)
639 if u'not_types' in filters: 639 if 'not_types' in filters:
640 types = filters['not_types'].split() 640 types = filters['not_types'].split()
641 query_parts.append(u"AND type NOT IN ({})".format(u','.join("?"*len(types)))) 641 query_parts.append("AND type NOT IN ({})".format(','.join("?"*len(types))))
642 values.extend(types) 642 values.extend(types)
643 if u'last_stanza_id' in filters: 643 if 'last_stanza_id' in filters:
644 # this request get the last message with a "stanza_id" that we 644 # this request get the last message with a "stanza_id" that we
645 # have in history. This is mainly used to retrieve messages sent 645 # have in history. This is mainly used to retrieve messages sent
646 # while we were offline, using MAM (XEP-0313). 646 # while we were offline, using MAM (XEP-0313).
647 if (filters[u'last_stanza_id'] is not True 647 if (filters['last_stanza_id'] is not True
648 or limit != 1): 648 or limit != 1):
649 raise ValueError(u"Unexpected values for last_stanza_id filter") 649 raise ValueError("Unexpected values for last_stanza_id filter")
650 query_parts.append(u"AND stanza_id IS NOT NULL") 650 query_parts.append("AND stanza_id IS NOT NULL")
651 651
652 652
653 # timestamp may be identical for 2 close messages (specially when delay is 653 # timestamp may be identical for 2 close messages (specially when delay is
654 # used) that's why we order ties by received_timestamp 654 # used) that's why we order ties by received_timestamp
655 # We'll reverse the order in sqliteHistoryToList 655 # We'll reverse the order in sqliteHistoryToList
656 # we use DESC here so LIMIT keep the last messages 656 # we use DESC here so LIMIT keep the last messages
657 query_parts.append(u"ORDER BY timestamp DESC, history.received_timestamp DESC") 657 query_parts.append("ORDER BY timestamp DESC, history.received_timestamp DESC")
658 if limit is not None: 658 if limit is not None:
659 query_parts.append(u"LIMIT ?") 659 query_parts.append("LIMIT ?")
660 values.append(limit) 660 values.append(limit)
661 661
662 d = self.dbpool.runQuery(u" ".join(query_parts), values) 662 d = self.dbpool.runQuery(" ".join(query_parts), values)
663 d.addCallback(self.sqliteHistoryToList) 663 d.addCallback(self.sqliteHistoryToList)
664 d.addCallback(self.listDict2listTuple) 664 d.addCallback(self.listDict2listTuple)
665 return d 665 return d
666 666
667 ## Private values 667 ## Private values
668 668
669 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None): 669 def _privateDataEb(self, failure_, operation, namespace, key=None, profile=None):
670 """generic errback for data queries""" 670 """generic errback for data queries"""
671 log.error(_(u"Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format( 671 log.error(_("Can't {operation} data in database for namespace {namespace}{and_key}{for_profile}: {msg}").format(
672 operation = operation, 672 operation = operation,
673 namespace = namespace, 673 namespace = namespace,
674 and_key = (u" and key " + key) if key is not None else u"", 674 and_key = (" and key " + key) if key is not None else "",
675 for_profile = (u' [' + profile + u']') if profile is not None else u'', 675 for_profile = (' [' + profile + ']') if profile is not None else '',
676 msg = failure_)) 676 msg = failure_))
677
678 def _load_pickle(self, v):
679 # FIXME: workaround for Python 3 port, some pickled data are byte while other are strings
680 try:
681 return pickle.loads(v)
682 except TypeError:
683 data = pickle.loads(v.encode('utf-8'))
684 log.warning(f"encoding issue in pickled data: {data}")
685 return data
677 686
678 def _generateDataDict(self, query_result, binary): 687 def _generateDataDict(self, query_result, binary):
679 if binary: 688 if binary:
680 return {k: pickle.loads(str(v)) for k,v in query_result} 689 return {k: self._load_pickle(v) for k,v in query_result}
681 else: 690 else:
682 return dict(query_result) 691 return dict(query_result)
683 692
684 def _getPrivateTable(self, binary, profile): 693 def _getPrivateTable(self, binary, profile):
685 """Get table to use for private values""" 694 """Get table to use for private values"""
686 table = [u'private'] 695 table = ['private']
687 696
688 if profile is None: 697 if profile is None:
689 table.append(u'gen') 698 table.append('gen')
690 else: 699 else:
691 table.append(u'ind') 700 table.append('ind')
692 701
693 if binary: 702 if binary:
694 table.append(u'bin') 703 table.append('bin')
695 704
696 return u'_'.join(table) 705 return '_'.join(table)
697 706
698 def getPrivates(self, namespace, keys=None, binary=False, profile=None): 707 def getPrivates(self, namespace, keys=None, binary=False, profile=None):
699 """Get private value(s) from databases 708 """Get private value(s) from databases
700 709
701 @param namespace(unicode): namespace of the values 710 @param namespace(unicode): namespace of the values
704 @param binary(bool): True to deserialise binary values 713 @param binary(bool): True to deserialise binary values
705 @param profile(unicode, None): profile to use for individual values 714 @param profile(unicode, None): profile to use for individual values
706 None to use general values 715 None to use general values
707 @return (dict[unicode, object]): gotten keys/values 716 @return (dict[unicode, object]): gotten keys/values
708 """ 717 """
709 log.debug(_(u"getting {type}{binary} private values from database for namespace {namespace}{keys}".format( 718 log.debug(_("getting {type}{binary} private values from database for namespace {namespace}{keys}".format(
710 type = u"general" if profile is None else "individual", 719 type = "general" if profile is None else "individual",
711 binary = u" binary" if binary else u"", 720 binary = " binary" if binary else "",
712 namespace = namespace, 721 namespace = namespace,
713 keys = u" with keys {}".format(u", ".join(keys)) if keys is not None else u""))) 722 keys = " with keys {}".format(", ".join(keys)) if keys is not None else "")))
714 table = self._getPrivateTable(binary, profile) 723 table = self._getPrivateTable(binary, profile)
715 query_parts = [u"SELECT key,value FROM", table, "WHERE namespace=?"] 724 query_parts = ["SELECT key,value FROM", table, "WHERE namespace=?"]
716 args = [namespace] 725 args = [namespace]
717 726
718 if keys is not None: 727 if keys is not None:
719 placeholders = u','.join(len(keys) * u'?') 728 placeholders = ','.join(len(keys) * '?')
720 query_parts.append(u'AND key IN (' + placeholders + u')') 729 query_parts.append('AND key IN (' + placeholders + ')')
721 args.extend(keys) 730 args.extend(keys)
722 731
723 if profile is not None: 732 if profile is not None:
724 query_parts.append(u'AND profile_id=?') 733 query_parts.append('AND profile_id=?')
725 args.append(self.profiles[profile]) 734 args.append(self.profiles[profile])
726 735
727 d = self.dbpool.runQuery(u" ".join(query_parts), args) 736 d = self.dbpool.runQuery(" ".join(query_parts), args)
728 d.addCallback(self._generateDataDict, binary) 737 d.addCallback(self._generateDataDict, binary)
729 d.addErrback(self._privateDataEb, u"get", namespace, profile=profile) 738 d.addErrback(self._privateDataEb, "get", namespace, profile=profile)
730 return d 739 return d
731 740
732 def setPrivateValue(self, namespace, key, value, binary=False, profile=None): 741 def setPrivateValue(self, namespace, key, value, binary=False, profile=None):
733 """Set a private value in database 742 """Set a private value in database
734 743
739 binary values need to be serialised, used for everything but strings 748 binary values need to be serialised, used for everything but strings
740 @param profile(unicode, None): profile to use for individual value 749 @param profile(unicode, None): profile to use for individual value
741 if None, it's a general value 750 if None, it's a general value
742 """ 751 """
743 table = self._getPrivateTable(binary, profile) 752 table = self._getPrivateTable(binary, profile)
744 query_values_names = [u'namespace', u'key', u'value'] 753 query_values_names = ['namespace', 'key', 'value']
745 query_values = [namespace, key] 754 query_values = [namespace, key]
746 755
747 if binary: 756 if binary:
748 value = sqlite3.Binary(pickle.dumps(value, 0)) 757 value = sqlite3.Binary(pickle.dumps(value, 0))
749 758
750 query_values.append(value) 759 query_values.append(value)
751 760
752 if profile is not None: 761 if profile is not None:
753 query_values_names.append(u'profile_id') 762 query_values_names.append('profile_id')
754 query_values.append(self.profiles[profile]) 763 query_values.append(self.profiles[profile])
755 764
756 query_parts = [u"REPLACE INTO", table, u'(', u','.join(query_values_names), u')', 765 query_parts = ["REPLACE INTO", table, '(', ','.join(query_values_names), ')',
757 u"VALUES (", u",".join(u'?'*len(query_values_names)), u')'] 766 "VALUES (", ",".join('?'*len(query_values_names)), ')']
758 767
759 d = self.dbpool.runQuery(u" ".join(query_parts), query_values) 768 d = self.dbpool.runQuery(" ".join(query_parts), query_values)
760 d.addErrback(self._privateDataEb, u"set", namespace, key, profile=profile) 769 d.addErrback(self._privateDataEb, "set", namespace, key, profile=profile)
761 return d 770 return d
762 771
763 def delPrivateValue(self, namespace, key, binary=False, profile=None): 772 def delPrivateValue(self, namespace, key, binary=False, profile=None):
764 """Delete private value from database 773 """Delete private value from database
765 774
768 @param binary(bool): True if it's a binary values 777 @param binary(bool): True if it's a binary values
769 @param profile(unicode, None): profile to use for individual value 778 @param profile(unicode, None): profile to use for individual value
770 if None, it's a general value 779 if None, it's a general value
771 """ 780 """
772 table = self._getPrivateTable(binary, profile) 781 table = self._getPrivateTable(binary, profile)
773 query_parts = [u"DELETE FROM", table, u"WHERE namespace=? AND key=?"] 782 query_parts = ["DELETE FROM", table, "WHERE namespace=? AND key=?"]
774 args = [namespace, key] 783 args = [namespace, key]
775 if profile is not None: 784 if profile is not None:
776 query_parts.append(u"AND profile_id=?") 785 query_parts.append("AND profile_id=?")
777 args.append(self.profiles[profile]) 786 args.append(self.profiles[profile])
778 d = self.dbpool.runQuery(u" ".join(query_parts), args) 787 d = self.dbpool.runQuery(" ".join(query_parts), args)
779 d.addErrback(self._privateDataEb, u"delete", namespace, key, profile=profile) 788 d.addErrback(self._privateDataEb, "delete", namespace, key, profile=profile)
780 return d 789 return d
781 790
782 def delPrivateNamespace(self, namespace, binary=False, profile=None): 791 def delPrivateNamespace(self, namespace, binary=False, profile=None):
783 """Delete all data from a private namespace 792 """Delete all data from a private namespace
784 793
785 Be really cautious when you use this method, as all data with given namespace are 794 Be really cautious when you use this method, as all data with given namespace are
786 removed. 795 removed.
787 Params are the same as for delPrivateValue 796 Params are the same as for delPrivateValue
788 """ 797 """
789 table = self._getPrivateTable(binary, profile) 798 table = self._getPrivateTable(binary, profile)
790 query_parts = [u"DELETE FROM", table, u"WHERE namespace=?"] 799 query_parts = ["DELETE FROM", table, "WHERE namespace=?"]
791 args = [namespace] 800 args = [namespace]
792 if profile is not None: 801 if profile is not None:
793 query_parts.append(u"AND profile_id=?") 802 query_parts.append("AND profile_id=?")
794 args.append(self.profiles[profile]) 803 args.append(self.profiles[profile])
795 d = self.dbpool.runQuery(u" ".join(query_parts), args) 804 d = self.dbpool.runQuery(" ".join(query_parts), args)
796 d.addErrback(self._privateDataEb, u"delete namespace", namespace, profile=profile) 805 d.addErrback(self._privateDataEb, "delete namespace", namespace, profile=profile)
797 return d 806 return d
798 807
799 ## Files 808 ## Files
800 809
801 @defer.inlineCallbacks 810 @defer.inlineCallbacks
802 def getFiles(self, client, file_id=None, version=u'', parent=None, type_=None, 811 def getFiles(self, client, file_id=None, version='', parent=None, type_=None,
803 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None, 812 file_hash=None, hash_algo=None, name=None, namespace=None, mime_type=None,
804 owner=None, access=None, projection=None, unique=False): 813 owner=None, access=None, projection=None, unique=False):
805 """retrieve files with with given filters 814 """retrieve files with with given filters
806 815
807 @param file_id(unicode, None): id of the file 816 @param file_id(unicode, None): id of the file
829 query_parts.append("FROM files WHERE") 838 query_parts.append("FROM files WHERE")
830 filters = ['profile_id=?'] 839 filters = ['profile_id=?']
831 args = [self.profiles[client.profile]] 840 args = [self.profiles[client.profile]]
832 841
833 if file_id is not None: 842 if file_id is not None:
834 filters.append(u'id=?') 843 filters.append('id=?')
835 args.append(file_id) 844 args.append(file_id)
836 if version is not None: 845 if version is not None:
837 filters.append(u'version=?') 846 filters.append('version=?')
838 args.append(version) 847 args.append(version)
839 if parent is not None: 848 if parent is not None:
840 filters.append(u'parent=?') 849 filters.append('parent=?')
841 args.append(parent) 850 args.append(parent)
842 if type_ is not None: 851 if type_ is not None:
843 filters.append(u'type=?') 852 filters.append('type=?')
844 args.append(type_) 853 args.append(type_)
845 if file_hash is not None: 854 if file_hash is not None:
846 filters.append(u'file_hash=?') 855 filters.append('file_hash=?')
847 args.append(file_hash) 856 args.append(file_hash)
848 if hash_algo is not None: 857 if hash_algo is not None:
849 filters.append(u'hash_algo=?') 858 filters.append('hash_algo=?')
850 args.append(hash_algo) 859 args.append(hash_algo)
851 if name is not None: 860 if name is not None:
852 filters.append(u'name=?') 861 filters.append('name=?')
853 args.append(name) 862 args.append(name)
854 if namespace is not None: 863 if namespace is not None:
855 filters.append(u'namespace=?') 864 filters.append('namespace=?')
856 args.append(namespace) 865 args.append(namespace)
857 if mime_type is not None: 866 if mime_type is not None:
858 filters.append(u'mime_type=?') 867 filters.append('mime_type=?')
859 args.append(mime_type) 868 args.append(mime_type)
860 if owner is not None: 869 if owner is not None:
861 filters.append(u'owner=?') 870 filters.append('owner=?')
862 args.append(owner.full()) 871 args.append(owner.full())
863 if access is not None: 872 if access is not None:
864 raise NotImplementedError('Access check is not implemented yet') 873 raise NotImplementedError('Access check is not implemented yet')
865 # a JSON comparison is needed here 874 # a JSON comparison is needed here
866 875
867 filters = u' AND '.join(filters) 876 filters = ' AND '.join(filters)
868 query_parts.append(filters) 877 query_parts.append(filters)
869 query = u' '.join(query_parts) 878 query = ' '.join(query_parts)
870 879
871 result = yield self.dbpool.runQuery(query, args) 880 result = yield self.dbpool.runQuery(query, args)
872 files_data = [dict(zip(projection, row)) for row in result] 881 files_data = [dict(list(zip(projection, row))) for row in result]
873 to_parse = {'access', 'extra'}.intersection(projection) 882 to_parse = {'access', 'extra'}.intersection(projection)
874 to_filter = {'owner'}.intersection(projection) 883 to_filter = {'owner'}.intersection(projection)
875 if to_parse or to_filter: 884 if to_parse or to_filter:
876 for file_data in files_data: 885 for file_data in files_data:
877 for key in to_parse: 886 for key in to_parse:
880 owner = file_data.get('owner') 889 owner = file_data.get('owner')
881 if owner is not None: 890 if owner is not None:
882 file_data['owner'] = jid.JID(owner) 891 file_data['owner'] = jid.JID(owner)
883 defer.returnValue(files_data) 892 defer.returnValue(files_data)
884 893
885 def setFile(self, client, name, file_id, version=u'', parent=None, type_=C.FILE_TYPE_FILE, 894 def setFile(self, client, name, file_id, version='', parent=None, type_=C.FILE_TYPE_FILE,
886 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None, 895 file_hash=None, hash_algo=None, size=None, namespace=None, mime_type=None,
887 created=None, modified=None, owner=None, access=None, extra=None): 896 created=None, modified=None, owner=None, access=None, extra=None):
888 """set a file metadata 897 """set a file metadata
889 898
890 @param client(SatXMPPClient): client owning the file 899 @param client(SatXMPPClient): client owning the file
919 mime_type, created, modified, 928 mime_type, created, modified,
920 owner.full() if owner is not None else None, 929 owner.full() if owner is not None else None,
921 json.dumps(access) if access else None, 930 json.dumps(access) if access else None,
922 json.dumps(extra) if extra else None, 931 json.dumps(extra) if extra else None,
923 self.profiles[client.profile])) 932 self.profiles[client.profile]))
924 d.addErrback(lambda failure: log.error(_(u"Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure)))) 933 d.addErrback(lambda failure: log.error(_("Can't save file metadata for [{profile}]: {reason}".format(profile=client.profile, reason=failure))))
925 return d 934 return d
926 935
927 def _fileUpdate(self, cursor, file_id, column, update_cb): 936 def _fileUpdate(self, cursor, file_id, column, update_cb):
928 query = 'SELECT {column} FROM files where id=?'.format(column=column) 937 query = 'SELECT {column} FROM files where id=?'.format(column=column)
929 for i in xrange(5): 938 for i in range(5):
930 cursor.execute(query, [file_id]) 939 cursor.execute(query, [file_id])
931 try: 940 try:
932 older_value_raw = cursor.fetchone()[0] 941 older_value_raw = cursor.fetchone()[0]
933 except TypeError: 942 except TypeError:
934 raise exceptions.NotFound 943 raise exceptions.NotFound
949 except sqlite3.Error: 958 except sqlite3.Error:
950 pass 959 pass
951 else: 960 else:
952 if cursor.rowcount == 1: 961 if cursor.rowcount == 1:
953 break; 962 break;
954 log.warning(_(u"table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1)) 963 log.warning(_("table not updated, probably due to race condition, trying again ({tries})").format(tries=i+1))
955 else: 964 else:
956 log.error(_(u"Can't update file table")) 965 log.error(_("Can't update file table"))
957 966
958 def fileUpdate(self, file_id, column, update_cb): 967 def fileUpdate(self, file_id, column, update_cb):
959 """Update a column value using a method to avoid race conditions 968 """Update a column value using a method to avoid race conditions
960 969
961 the older value will be retrieved from database, then update_cb will be applied 970 the older value will be retrieved from database, then update_cb will be applied
1070 update_data = self.generateUpdateData(local_sch, current_sch, False) 1079 update_data = self.generateUpdateData(local_sch, current_sch, False)
1071 log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated")) 1080 log.warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated"))
1072 update_raw = yield self.update2raw(update_data, True) 1081 update_raw = yield self.update2raw(update_data, True)
1073 defer.returnValue(update_raw) 1082 defer.returnValue(update_raw)
1074 else: 1083 else:
1075 log.error(_(u"schema version is up-to-date, but local schema differ from expected current schema")) 1084 log.error(_("schema version is up-to-date, but local schema differ from expected current schema"))
1076 update_data = self.generateUpdateData(local_sch, current_sch, True) 1085 update_data = self.generateUpdateData(local_sch, current_sch, True)
1077 update_raw = yield self.update2raw(update_data) 1086 update_raw = yield self.update2raw(update_data)
1078 log.warning(_(u"Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % u'\n'.join("%s;" % statement for statement in update_raw)) 1087 log.warning(_("Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % '\n'.join("%s;" % statement for statement in update_raw))
1079 raise exceptions.DatabaseError("Database mismatch") 1088 raise exceptions.DatabaseError("Database mismatch")
1080 else: 1089 else:
1081 if local_version > CURRENT_DB_VERSION: 1090 if local_version > CURRENT_DB_VERSION:
1082 log.error(_( 1091 log.error(_(
1083 u"You database version is higher than the one used in this SàT " 1092 "You database version is higher than the one used in this SàT "
1084 u"version, are you using several version at the same time? We " 1093 "version, are you using several version at the same time? We "
1085 u"can't run SàT with this database.")) 1094 "can't run SàT with this database."))
1086 sys.exit(1) 1095 sys.exit(1)
1087 1096
1088 # Database is not up-to-date, we'll do the update 1097 # Database is not up-to-date, we'll do the update
1089 if force_update: 1098 if force_update:
1090 log.info(_("Database content needs a specific processing, local database will be updated")) 1099 log.info(_("Database content needs a specific processing, local database will be updated"))
1091 else: 1100 else:
1092 log.info(_("Database schema has changed, local database will be updated")) 1101 log.info(_("Database schema has changed, local database will be updated"))
1093 update_raw = [] 1102 update_raw = []
1094 for version in xrange(local_version + 1, CURRENT_DB_VERSION + 1): 1103 for version in range(local_version + 1, CURRENT_DB_VERSION + 1):
1095 try: 1104 try:
1096 update_data = DATABASE_SCHEMAS[version] 1105 update_data = DATABASE_SCHEMAS[version]
1097 except KeyError: 1106 except KeyError:
1098 raise exceptions.InternalError("Missing update definition (version %d)" % version) 1107 raise exceptions.InternalError("Missing update definition (version %d)" % version)
1099 if "specific" in update_data and update_raw: 1108 if "specific" in update_data and update_raw:
1148 @return: list of strings with raw statements 1157 @return: list of strings with raw statements
1149 """ 1158 """
1150 ret = [] 1159 ret = []
1151 assert isinstance(data, tuple) 1160 assert isinstance(data, tuple)
1152 for table, col_data in data: 1161 for table, col_data in data:
1153 assert isinstance(table, basestring) 1162 assert isinstance(table, str)
1154 assert isinstance(col_data, tuple) 1163 assert isinstance(col_data, tuple)
1155 for cols in col_data: 1164 for cols in col_data:
1156 if isinstance(cols, tuple): 1165 if isinstance(cols, tuple):
1157 assert all([isinstance(c, basestring) for c in cols]) 1166 assert all([isinstance(c, str) for c in cols])
1158 indexed_cols = u','.join(cols) 1167 indexed_cols = ','.join(cols)
1159 elif isinstance(cols, basestring): 1168 elif isinstance(cols, str):
1160 indexed_cols = cols 1169 indexed_cols = cols
1161 else: 1170 else:
1162 raise exceptions.InternalError(u"unexpected index columns value") 1171 raise exceptions.InternalError("unexpected index columns value")
1163 index_name = table + u'__' + indexed_cols.replace(u',', u'_') 1172 index_name = table + '__' + indexed_cols.replace(',', '_')
1164 ret.append(Updater.INDEX_SQL % (index_name, table, indexed_cols)) 1173 ret.append(Updater.INDEX_SQL % (index_name, table, indexed_cols))
1165 return ret 1174 return ret
1166 1175
1167 def statementHash(self, data): 1176 def statementHash(self, data):
1168 """ Generate hash of template data 1177 """ Generate hash of template data
1171 @param data: dictionary of "CREATE" statement, with tables names as key, 1180 @param data: dictionary of "CREATE" statement, with tables names as key,
1172 and tuples of (col_defs, constraints) as values 1181 and tuples of (col_defs, constraints) as values
1173 @return: hash as string 1182 @return: hash as string
1174 """ 1183 """
1175 hash_ = hashlib.sha1() 1184 hash_ = hashlib.sha1()
1176 tables = data.keys() 1185 tables = list(data.keys())
1177 tables.sort() 1186 tables.sort()
1178 1187
1179 def stmnts2str(stmts): 1188 def stmnts2str(stmts):
1180 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)]) 1189 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)])
1181 1190
1182 for table in tables: 1191 for table in tables:
1183 col_defs, col_constr = data[table] 1192 col_defs, col_constr = data[table]
1184 hash_.update("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr))) 1193 hash_.update(
1194 ("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr)))
1195 .encode('utf-8'))
1185 return hash_.digest() 1196 return hash_.digest()
1186 1197
1187 def rawStatements2data(self, raw_statements): 1198 def rawStatements2data(self, raw_statements):
1188 """ separate "CREATE" statements into dictionary/tuples data 1199 """ separate "CREATE" statements into dictionary/tuples data
1189 1200
1322 ret.extend(cmds or []) 1333 ret.extend(cmds or [])
1323 defer.returnValue(ret) 1334 defer.returnValue(ret)
1324 1335
1325 def update_v8(self): 1336 def update_v8(self):
1326 """Update database from v7 to v8 (primary keys order changes + indexes)""" 1337 """Update database from v7 to v8 (primary keys order changes + indexes)"""
1327 log.info(u"Database update to v8") 1338 log.info("Database update to v8")
1328 statements = ["PRAGMA foreign_keys = OFF"] 1339 statements = ["PRAGMA foreign_keys = OFF"]
1329 1340
1330 # here is a copy of create and index data, we can't use "current" table 1341 # here is a copy of create and index data, we can't use "current" table
1331 # because it may change in a future version, which would break the update 1342 # because it may change in a future version, which would break the update
1332 # when doing v8 1343 # when doing v8
1355 for table in ('param_gen', 'param_ind', 'private_ind', 'private_ind_bin'): 1366 for table in ('param_gen', 'param_ind', 'private_ind', 'private_ind_bin'):
1356 statements.append("ALTER TABLE {0} RENAME TO {0}_old".format(table)) 1367 statements.append("ALTER TABLE {0} RENAME TO {0}_old".format(table))
1357 schema = {table: create[table]} 1368 schema = {table: create[table]}
1358 cols = [d.split()[0] for d in schema[table][0]] 1369 cols = [d.split()[0] for d in schema[table][0]]
1359 statements.extend(Updater.createData2Raw(schema)) 1370 statements.extend(Updater.createData2Raw(schema))
1360 statements.append(u"INSERT INTO {table}({cols}) " 1371 statements.append("INSERT INTO {table}({cols}) "
1361 u"SELECT {cols} FROM {table}_old".format( 1372 "SELECT {cols} FROM {table}_old".format(
1362 table=table, 1373 table=table,
1363 cols=u','.join(cols))) 1374 cols=','.join(cols)))
1364 statements.append(u"DROP TABLE {}_old".format(table)) 1375 statements.append("DROP TABLE {}_old".format(table))
1365 1376
1366 statements.extend(Updater.indexData2Raw(index)) 1377 statements.extend(Updater.indexData2Raw(index))
1367 statements.append("PRAGMA foreign_keys = ON") 1378 statements.append("PRAGMA foreign_keys = ON")
1368 return statements 1379 return statements
1369 1380
1370 @defer.inlineCallbacks 1381 @defer.inlineCallbacks
1371 def update_v7(self): 1382 def update_v7(self):
1372 """Update database from v6 to v7 (history unique constraint change)""" 1383 """Update database from v6 to v7 (history unique constraint change)"""
1373 log.info(u"Database update to v7, this may be long depending on your history " 1384 log.info("Database update to v7, this may be long depending on your history "
1374 u"size, please be patient.") 1385 "size, please be patient.")
1375 1386
1376 log.info(u"Some cleaning first") 1387 log.info("Some cleaning first")
1377 # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema 1388 # we need to fix duplicate stanza_id, as it can result in conflicts with the new schema
1378 # normally database should not contain any, but better safe than sorry. 1389 # normally database should not contain any, but better safe than sorry.
1379 rows = yield self.dbpool.runQuery( 1390 rows = yield self.dbpool.runQuery(
1380 u"SELECT stanza_id, COUNT(*) as c FROM history WHERE stanza_id is not NULL " 1391 "SELECT stanza_id, COUNT(*) as c FROM history WHERE stanza_id is not NULL "
1381 u"GROUP BY stanza_id HAVING c>1") 1392 "GROUP BY stanza_id HAVING c>1")
1382 if rows: 1393 if rows:
1383 count = sum([r[1] for r in rows]) - len(rows) 1394 count = sum([r[1] for r in rows]) - len(rows)
1384 log.info(u"{count} duplicate stanzas found, cleaning".format(count=count)) 1395 log.info("{count} duplicate stanzas found, cleaning".format(count=count))
1385 for stanza_id, count in rows: 1396 for stanza_id, count in rows:
1386 log.info(u"cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id)) 1397 log.info("cleaning duplicate stanza {stanza_id}".format(stanza_id=stanza_id))
1387 row_uids = yield self.dbpool.runQuery( 1398 row_uids = yield self.dbpool.runQuery(
1388 "SELECT uid FROM history WHERE stanza_id = ? LIMIT ?", 1399 "SELECT uid FROM history WHERE stanza_id = ? LIMIT ?",
1389 (stanza_id, count-1)) 1400 (stanza_id, count-1))
1390 uids = [r[0] for r in row_uids] 1401 uids = [r[0] for r in row_uids]
1391 yield self.dbpool.runQuery( 1402 yield self.dbpool.runQuery(
1392 "DELETE FROM history WHERE uid IN ({})".format(u",".join(u"?"*len(uids))), 1403 "DELETE FROM history WHERE uid IN ({})".format(",".join("?"*len(uids))),
1393 uids) 1404 uids)
1394 1405
1395 def deleteInfo(txn): 1406 def deleteInfo(txn):
1396 # with foreign_keys on, the delete takes ages, so we deactivate it here 1407 # with foreign_keys on, the delete takes ages, so we deactivate it here
1397 # the time to delete info messages from history. 1408 # the time to delete info messages from history.
1398 txn.execute("PRAGMA foreign_keys = OFF") 1409 txn.execute("PRAGMA foreign_keys = OFF")
1399 txn.execute(u"DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " 1410 txn.execute("DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE "
1400 u"type='info')") 1411 "type='info')")
1401 txn.execute(u"DELETE FROM subject WHERE history_uid IN (SELECT uid FROM history WHERE " 1412 txn.execute("DELETE FROM subject WHERE history_uid IN (SELECT uid FROM history WHERE "
1402 u"type='info')") 1413 "type='info')")
1403 txn.execute(u"DELETE FROM thread WHERE history_uid IN (SELECT uid FROM history WHERE " 1414 txn.execute("DELETE FROM thread WHERE history_uid IN (SELECT uid FROM history WHERE "
1404 u"type='info')") 1415 "type='info')")
1405 txn.execute(u"DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE " 1416 txn.execute("DELETE FROM message WHERE history_uid IN (SELECT uid FROM history WHERE "
1406 u"type='info')") 1417 "type='info')")
1407 txn.execute(u"DELETE FROM history WHERE type='info'") 1418 txn.execute("DELETE FROM history WHERE type='info'")
1408 # not sure that is is necessary to reactivate here, but in doubt… 1419 # not sure that is is necessary to reactivate here, but in doubt…
1409 txn.execute("PRAGMA foreign_keys = ON") 1420 txn.execute("PRAGMA foreign_keys = ON")
1410 1421
1411 log.info(u'Deleting "info" messages (this can take a while)') 1422 log.info('Deleting "info" messages (this can take a while)')
1412 yield self.dbpool.runInteraction(deleteInfo) 1423 yield self.dbpool.runInteraction(deleteInfo)
1413 1424
1414 log.info(u"Cleaning done") 1425 log.info("Cleaning done")
1415 1426
1416 # we have to rename table we will replace 1427 # we have to rename table we will replace
1417 # tables referencing history need to be replaced to, else reference would 1428 # tables referencing history need to be replaced to, else reference would
1418 # be to the old table (which will be dropped at the end). This buggy behaviour 1429 # be to the old table (which will be dropped at the end). This buggy behaviour
1419 # seems to be fixed in new version of Sqlite 1430 # seems to be fixed in new version of Sqlite
1421 yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old") 1432 yield self.dbpool.runQuery("ALTER TABLE message RENAME TO message_old")
1422 yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old") 1433 yield self.dbpool.runQuery("ALTER TABLE subject RENAME TO subject_old")
1423 yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old") 1434 yield self.dbpool.runQuery("ALTER TABLE thread RENAME TO thread_old")
1424 1435
1425 # history 1436 # history
1426 query = (u"CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, " 1437 query = ("CREATE TABLE history (uid TEXT PRIMARY KEY, stanza_id TEXT, "
1427 u"update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, " 1438 "update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, "
1428 u"source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, " 1439 "source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, "
1429 u"received_timestamp DATETIME, type TEXT, extra BLOB, " 1440 "received_timestamp DATETIME, type TEXT, extra BLOB, "
1430 u"FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, " 1441 "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, "
1431 u"FOREIGN KEY(type) REFERENCES message_types(type), " 1442 "FOREIGN KEY(type) REFERENCES message_types(type), "
1432 u"UNIQUE (profile_id, stanza_id, source, dest))") 1443 "UNIQUE (profile_id, stanza_id, source, dest))")
1433 yield self.dbpool.runQuery(query) 1444 yield self.dbpool.runQuery(query)
1434 1445
1435 # message 1446 # message
1436 query = (u"CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" 1447 query = ("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
1437 u", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " 1448 ", message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES "
1438 u"history(uid) ON DELETE CASCADE)") 1449 "history(uid) ON DELETE CASCADE)")
1439 yield self.dbpool.runQuery(query) 1450 yield self.dbpool.runQuery(query)
1440 1451
1441 # subject 1452 # subject
1442 query = (u"CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" 1453 query = ("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
1443 u", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES " 1454 ", subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES "
1444 u"history(uid) ON DELETE CASCADE)") 1455 "history(uid) ON DELETE CASCADE)")
1445 yield self.dbpool.runQuery(query) 1456 yield self.dbpool.runQuery(query)
1446 1457
1447 # thread 1458 # thread
1448 query = (u"CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER" 1459 query = ("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER"
1449 u", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES " 1460 ", thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES "
1450 u"history(uid) ON DELETE CASCADE)") 1461 "history(uid) ON DELETE CASCADE)")
1451 yield self.dbpool.runQuery(query) 1462 yield self.dbpool.runQuery(query)
1452 1463
1453 log.info(u"Now transfering old data to new tables, please be patient.") 1464 log.info("Now transfering old data to new tables, please be patient.")
1454 1465
1455 log.info(u"\nTransfering table history") 1466 log.info("\nTransfering table history")
1456 query = (u"INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, " 1467 query = ("INSERT INTO history (uid, stanza_id, update_uid, profile_id, source, "
1457 u"dest, source_res, dest_res, timestamp, received_timestamp, type, extra" 1468 "dest, source_res, dest_res, timestamp, received_timestamp, type, extra"
1458 u") SELECT uid, stanza_id, update_uid, profile_id, source, dest, " 1469 ") SELECT uid, stanza_id, update_uid, profile_id, source, dest, "
1459 u"source_res, dest_res, timestamp, received_timestamp, type, extra " 1470 "source_res, dest_res, timestamp, received_timestamp, type, extra "
1460 u"FROM history_old") 1471 "FROM history_old")
1461 yield self.dbpool.runQuery(query) 1472 yield self.dbpool.runQuery(query)
1462 1473
1463 log.info(u"\nTransfering table message") 1474 log.info("\nTransfering table message")
1464 query = (u"INSERT INTO message (id, history_uid, message, language) SELECT id, " 1475 query = ("INSERT INTO message (id, history_uid, message, language) SELECT id, "
1465 u"history_uid, message, language FROM message_old") 1476 "history_uid, message, language FROM message_old")
1466 yield self.dbpool.runQuery(query) 1477 yield self.dbpool.runQuery(query)
1467 1478
1468 log.info(u"\nTransfering table subject") 1479 log.info("\nTransfering table subject")
1469 query = (u"INSERT INTO subject (id, history_uid, subject, language) SELECT id, " 1480 query = ("INSERT INTO subject (id, history_uid, subject, language) SELECT id, "
1470 u"history_uid, subject, language FROM subject_old") 1481 "history_uid, subject, language FROM subject_old")
1471 yield self.dbpool.runQuery(query) 1482 yield self.dbpool.runQuery(query)
1472 1483
1473 log.info(u"\nTransfering table thread") 1484 log.info("\nTransfering table thread")
1474 query = (u"INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id" 1485 query = ("INSERT INTO thread (id, history_uid, thread_id, parent_id) SELECT id"
1475 u", history_uid, thread_id, parent_id FROM thread_old") 1486 ", history_uid, thread_id, parent_id FROM thread_old")
1476 yield self.dbpool.runQuery(query) 1487 yield self.dbpool.runQuery(query)
1477 1488
1478 log.info(u"\nRemoving old tables") 1489 log.info("\nRemoving old tables")
1479 # because of foreign keys, tables referencing history_old 1490 # because of foreign keys, tables referencing history_old
1480 # must be deleted first 1491 # must be deleted first
1481 yield self.dbpool.runQuery("DROP TABLE thread_old") 1492 yield self.dbpool.runQuery("DROP TABLE thread_old")
1482 yield self.dbpool.runQuery("DROP TABLE subject_old") 1493 yield self.dbpool.runQuery("DROP TABLE subject_old")
1483 yield self.dbpool.runQuery("DROP TABLE message_old") 1494 yield self.dbpool.runQuery("DROP TABLE message_old")
1484 yield self.dbpool.runQuery("DROP TABLE history_old") 1495 yield self.dbpool.runQuery("DROP TABLE history_old")
1485 log.info(u"\nReducing database size (this can take a while)") 1496 log.info("\nReducing database size (this can take a while)")
1486 yield self.dbpool.runQuery("VACUUM") 1497 yield self.dbpool.runQuery("VACUUM")
1487 log.info(u"Database update done :)") 1498 log.info("Database update done :)")
1488 1499
1489 @defer.inlineCallbacks 1500 @defer.inlineCallbacks
1490 def update_v3(self): 1501 def update_v3(self):
1491 """Update database from v2 to v3 (message refactoring)""" 1502 """Update database from v2 to v3 (message refactoring)"""
1492 # XXX: this update do all the messages in one huge transaction 1503 # XXX: this update do all the messages in one huge transaction
1493 # this is really memory consuming, but was OK on a reasonably 1504 # this is really memory consuming, but was OK on a reasonably
1494 # big database for tests. If issues are happening, we can cut it 1505 # big database for tests. If issues are happening, we can cut it
1495 # in smaller transactions using LIMIT and by deleting already updated 1506 # in smaller transactions using LIMIT and by deleting already updated
1496 # messages 1507 # messages
1497 log.info(u"Database update to v3, this may take a while") 1508 log.info("Database update to v3, this may take a while")
1498 1509
1499 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema 1510 # we need to fix duplicate timestamp, as it can result in conflicts with the new schema
1500 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1") 1511 rows = yield self.dbpool.runQuery("SELECT timestamp, COUNT(*) as c FROM history GROUP BY timestamp HAVING c>1")
1501 if rows: 1512 if rows:
1502 log.info("fixing duplicate timestamp") 1513 log.info("fixing duplicate timestamp")
1504 for timestamp, __ in rows: 1515 for timestamp, __ in rows:
1505 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,)) 1516 ids_rows = yield self.dbpool.runQuery("SELECT id from history where timestamp=?", (timestamp,))
1506 for idx, (id_,) in enumerate(ids_rows): 1517 for idx, (id_,) in enumerate(ids_rows):
1507 fixed.append(id_) 1518 fixed.append(id_)
1508 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_)) 1519 yield self.dbpool.runQuery("UPDATE history SET timestamp=? WHERE id=?", (float(timestamp) + idx * 0.001, id_))
1509 log.info(u"fixed messages with ids {}".format(u', '.join([unicode(id_) for id_ in fixed]))) 1520 log.info("fixed messages with ids {}".format(', '.join([str(id_) for id_ in fixed])))
1510 1521
1511 def historySchema(txn): 1522 def historySchema(txn):
1512 log.info(u"History schema update") 1523 log.info("History schema update")
1513 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update") 1524 txn.execute("ALTER TABLE history RENAME TO tmp_sat_update")
1514 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))") 1525 txn.execute("CREATE TABLE history (uid TEXT PRIMARY KEY, update_uid TEXT, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME NOT NULL, received_timestamp DATETIME, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type), UNIQUE (profile_id, timestamp, source, dest, source_res, dest_res))")
1515 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update") 1526 txn.execute("INSERT INTO history (uid, profile_id, source, dest, source_res, dest_res, timestamp, type, extra) SELECT id, profile_id, source, dest, source_res, dest_res, timestamp, type, extra FROM tmp_sat_update")
1516 1527
1517 yield self.dbpool.runInteraction(historySchema) 1528 yield self.dbpool.runInteraction(historySchema)
1518 1529
1519 def newTables(txn): 1530 def newTables(txn):
1520 log.info(u"Creating new tables") 1531 log.info("Creating new tables")
1521 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") 1532 txn.execute("CREATE TABLE message (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, message TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
1522 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") 1533 txn.execute("CREATE TABLE thread (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, thread_id TEXT, parent_id TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
1523 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)") 1534 txn.execute("CREATE TABLE subject (id INTEGER PRIMARY KEY ASC, history_uid INTEGER, subject TEXT, language TEXT, FOREIGN KEY(history_uid) REFERENCES history(uid) ON DELETE CASCADE)")
1524 1535
1525 yield self.dbpool.runInteraction(newTables) 1536 yield self.dbpool.runInteraction(newTables)
1526 1537
1527 log.info(u"inserting new message type") 1538 log.info("inserting new message type")
1528 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',)) 1539 yield self.dbpool.runQuery("INSERT INTO message_types VALUES (?)", ('info',))
1529 1540
1530 log.info(u"messages update") 1541 log.info("messages update")
1531 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update") 1542 rows = yield self.dbpool.runQuery("SELECT id, timestamp, message, extra FROM tmp_sat_update")
1532 total = len(rows) 1543 total = len(rows)
1533 1544
1534 def updateHistory(txn, queries): 1545 def updateHistory(txn, queries):
1535 for query, args in iter(queries): 1546 for query, args in iter(queries):
1543 try: 1554 try:
1544 extra = pickle.loads(str(extra or "")) 1555 extra = pickle.loads(str(extra or ""))
1545 except EOFError: 1556 except EOFError:
1546 extra = {} 1557 extra = {}
1547 except Exception: 1558 except Exception:
1548 log.warning(u"Can't handle extra data for message id {}, ignoring it".format(id_)) 1559 log.warning("Can't handle extra data for message id {}, ignoring it".format(id_))
1549 extra = {} 1560 extra = {}
1550 1561
1551 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message))) 1562 queries.append(("INSERT INTO message(history_uid, message) VALUES (?,?)", (id_, message)))
1552 1563
1553 try: 1564 try:
1554 subject = extra.pop('subject') 1565 subject = extra.pop('subject')
1555 except KeyError: 1566 except KeyError:
1556 pass 1567 pass
1557 else: 1568 else:
1558 try: 1569 try:
1559 subject = subject.decode('utf-8') 1570 subject = subject
1560 except UnicodeEncodeError: 1571 except UnicodeEncodeError:
1561 log.warning(u"Error while decoding subject, ignoring it") 1572 log.warning("Error while decoding subject, ignoring it")
1562 del extra['subject'] 1573 del extra['subject']
1563 else: 1574 else:
1564 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject))) 1575 queries.append(("INSERT INTO subject(history_uid, subject) VALUES (?,?)", (id_, subject)))
1565 1576
1566 received_timestamp = extra.pop('timestamp', None) 1577 received_timestamp = extra.pop('timestamp', None)
1595 1606
1596 def prepare_queries(result, xmpp_password): 1607 def prepare_queries(result, xmpp_password):
1597 try: 1608 try:
1598 id_ = result[0][0] 1609 id_ = result[0][0]
1599 except IndexError: 1610 except IndexError:
1600 log.error(u"Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id) 1611 log.error("Profile of id %d is referenced in 'param_ind' but it doesn't exist!" % profile_id)
1601 return defer.succeed(None) 1612 return defer.succeed(None)
1602 1613
1603 sat_password = xmpp_password 1614 sat_password = xmpp_password
1604 d1 = PasswordHasher.hash(sat_password) 1615 d1 = PasswordHasher.hash(sat_password)
1605 personal_key = BlockCipher.getRandomKey(base64=True) 1616 personal_key = BlockCipher.getRandomKey(base64=True)