comparison src/memory/sqlite.py @ 853:c2f6ada7858f

core (sqlite): automatic database update: - new Updater class check database consistency (by calculating a hash on the .schema), and updates base if necessary - database now has a version (1 for current, 0 will be for 0.3's database), for each change this version will be increased - creation statements and update statements are in the form of dict of dict with tuples. There is a help text at the top of the module to explain how it works - if we are on a development version, the updater try to update the database automaticaly (without deleting table or columns). The Updater.generateUpdateData method can be used to ease the creation of update data (i.e. the dictionary at the top, see the one for the key 1 for an example). - if there is an inconsistency, an exception is raised, and a message indicate the SQL statements that should fix the situation. - well... this is rather complicated, a KISS method would maybe have been better. The future will say if we need to simplify it :-/ - new DatabaseError exception
author Goffi <goffi@goffi.org>
date Sun, 23 Feb 2014 23:30:32 +0100
parents f8681a7fd834
children 34dd9287dfe5
comparison
equal deleted inserted replaced
852:4cc55e05266d 853:c2f6ada7858f
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from sat.core.i18n import _ 20 from sat.core.i18n import _
21 from sat.core import exceptions
21 from logging import debug, info, warning, error 22 from logging import debug, info, warning, error
22 from twisted.enterprise import adbapi 23 from twisted.enterprise import adbapi
23 from twisted.internet import defer 24 from twisted.internet import defer
25 from collections import OrderedDict
24 from time import time 26 from time import time
27 import re
25 import os.path 28 import os.path
26 import cPickle as pickle 29 import cPickle as pickle
30 import hashlib
31
32 CURRENT_DB_VERSION = 1
33
34 # XXX: DATABASE schemas are used in the following way:
35 # - 'current' key is for the actual database schema, for a new base
36 # - x(int) is for update needed between x-1 and x. All number are needed between y and z to do an update
37 # e.g.: if CURRENT_DB_VERSION is 6, 'current' is the actuel DB, and to update from version 3, numbers 4, 5 and 6 are needed
38 # a 'current' data dict can contains the keys:
39 # - 'CREATE': it contains an Ordered dict with table to create as keys, and a len 2 tuple as value, where value[0] are the columns definitions and value[1] are the table constraints
40 # - 'INSERT': it contains an Ordered dict with table where values have to be inserted, and many tuples containing values to insert in the order of the rows (#TODO: manage named columns)
41 # an update data dict (the ones with a number) can contains the keys 'create', 'delete', 'cols create', 'cols delete', 'cols modify' or 'insert'. See Updater.generateUpdateData for more infos. This metho can be used to autogenerate update_data, to ease the work of the developers.
42
43 DATABASE_SCHEMAS = {
44 "current": {'CREATE': OrderedDict((
45 ('profiles', (("id INTEGER PRIMARY KEY ASC", "name TEXT"),
46 ("UNIQUE (name)",))),
47 ('message_types', (("type TEXT PRIMARY KEY",),
48 tuple())),
49 ('history', (("id INTEGER PRIMARY KEY ASC", "profile_id INTEGER", "source TEXT", "dest TEXT", "source_res TEXT", "dest_res TEXT", "timestamp DATETIME", "message TEXT", "type TEXT", "extra BLOB"),
50 ("FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE", "FOREIGN KEY(type) REFERENCES message_types(type)"))),
51 ('param_gen', (("category TEXT", "name TEXT", "value TEXT"),
52 ("PRIMARY KEY (category,name)",))),
53 ('param_ind', (("category TEXT", "name TEXT", "profile_id INTEGER", "value TEXT"),
54 ("PRIMARY KEY (category,name,profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
55 ('private_gen', (("namespace TEXT", "key TEXT", "value TEXT"),
56 ("PRIMARY KEY (namespace, key)",))),
57 ('private_ind', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value TEXT"),
58 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE"))),
59 ('private_gen_bin', (("namespace TEXT", "key TEXT", "value BLOB"),
60 ("PRIMARY KEY (namespace, key)",))),
61
62 ('private_ind_bin', (("namespace TEXT", "key TEXT", "profile_id INTEGER", "value BLOB"),
63 ("PRIMARY KEY (namespace, key, profile_id)", "FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE")))
64 )),
65 'INSERT': OrderedDict((
66 ('message_types', (("'chat'",), ("'error'",), ("'groupchat'",), ("'headline'",), ("'normal'",))),
67 )),
68 },
69 1: {'cols create': {'history': ('extra BLOB',)}
70 },
71 }
27 72
28 73
29 class SqliteStorage(object): 74 class SqliteStorage(object):
30 """This class manage storage with Sqlite database""" 75 """This class manage storage with Sqlite database"""
31 76
32 def __init__(self, db_filename): 77 def __init__(self, db_filename, sat_version):
33 """Connect to the given database 78 """Connect to the given database
34 @param db_filename: full path to the Sqlite database""" 79 @param db_filename: full path to the Sqlite database"""
35 self.initialized = defer.Deferred() # triggered when memory is fully initialised and ready 80 self.initialized = defer.Deferred() # triggered when memory is fully initialised and ready
36 self.profiles = {} # we keep cache for the profiles (key: profile name, value: profile id) 81 self.profiles = {} # we keep cache for the profiles (key: profile name, value: profile id)
37 82
38 info(_("Connecting database")) 83 info(_("Connecting database"))
39 new_base = not os.path.exists(db_filename) # do we have to create the database ? 84 new_base = not os.path.exists(db_filename) # do we have to create the database ?
40 self.dbpool = adbapi.ConnectionPool("sqlite3", db_filename, check_same_thread=False) 85 self.dbpool = adbapi.ConnectionPool("sqlite3", db_filename, check_same_thread=False)
41 86
42 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done 87 # init_defer is the initialisation deferred, initialisation is ok when all its callbacks have been done
43 init_defer = self.dbpool.runOperation("PRAGMA foreign_keys = ON").addErrback(lambda x: error(_("Can't activate foreign keys"))) 88 init_defer = self.dbpool.runOperation("PRAGMA foreign_keys = ON").addErrback(lambda x: error(_("Can't activate foreign keys")))
44 89
45 if new_base: 90 def getNewBaseSql():
46 info(_("The database is new, creating the tables")) 91 info(_("The database is new, creating the tables"))
47 database_creation = [ 92 database_creation = ["PRAGMA user_version=%d" % CURRENT_DB_VERSION]
48 "CREATE TABLE profiles (id INTEGER PRIMARY KEY ASC, name TEXT, UNIQUE (name))", 93 database_creation.extend(Updater.createData2Raw(DATABASE_SCHEMAS['current']['CREATE']))
49 "CREATE TABLE message_types (type TEXT PRIMARY KEY)", 94 database_creation.extend(Updater.insertData2Raw(DATABASE_SCHEMAS['current']['INSERT']))
50 "INSERT INTO message_types VALUES ('chat')", 95 return database_creation
51 "INSERT INTO message_types VALUES ('error')", 96
52 "INSERT INTO message_types VALUES ('groupchat')", 97 def getUpdateSql():
53 "INSERT INTO message_types VALUES ('headline')", 98 updater = Updater(self.dbpool, sat_version)
54 "INSERT INTO message_types VALUES ('normal')", 99 return updater.checkUpdates()
55 "CREATE TABLE history (id INTEGER PRIMARY KEY ASC, profile_id INTEGER, source TEXT, dest TEXT, source_res TEXT, dest_res TEXT, timestamp DATETIME, message TEXT, type TEXT, extra BLOB, FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE, FOREIGN KEY(type) REFERENCES message_types(type))", 100
56 "CREATE TABLE param_gen (category TEXT, name TEXT, value TEXT, PRIMARY KEY (category,name))", 101 def commitStatements(statements):
57 "CREATE TABLE param_ind (category TEXT, name TEXT, profile_id INTEGER, value TEXT, PRIMARY KEY (category,name,profile_id), FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE)", 102
58 "CREATE TABLE private_gen (namespace TEXT, key TEXT, value TEXT, PRIMARY KEY (namespace, key))", 103 if statements is None:
59 "CREATE TABLE private_ind (namespace TEXT, key TEXT, profile_id INTEGER, value TEXT, PRIMARY KEY (namespace, key, profile_id), FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE)", 104 return defer.succeed(None)
60 "CREATE TABLE private_gen_bin (namespace TEXT, key TEXT, value BLOB, PRIMARY KEY (namespace, key))", 105 debug("===== COMMITING STATEMENTS =====\n%s\n============\n\n" % '\n'.join(statements))
61 "CREATE TABLE private_ind_bin (namespace TEXT, key TEXT, profile_id INTEGER, value BLOB, PRIMARY KEY (namespace, key, profile_id), FOREIGN KEY(profile_id) REFERENCES profiles(id) ON DELETE CASCADE)", 106 d = self.dbpool.runInteraction(self._updateDb, tuple(statements))
62 ] 107 return d
63 for op in database_creation: 108
64 init_defer.addCallback(lambda ignore, sql: self.dbpool.runOperation(sql), op) 109 init_defer.addCallback(lambda ignore: getNewBaseSql() if new_base else getUpdateSql())
65 init_defer.addErrback(lambda ignore, sql: error(_("Error while creating tables in database [QUERY: %s]") % sql, op)) 110 init_defer.addCallback(commitStatements)
66 111
67 def fillProfileCache(ignore): 112 def fillProfileCache(ignore):
68 d = self.dbpool.runQuery("SELECT name,id FROM profiles").addCallback(self._profilesCache) 113 d = self.dbpool.runQuery("SELECT name,id FROM profiles").addCallback(self._profilesCache)
69 d.chainDeferred(self.initialized) 114 d.chainDeferred(self.initialized)
70 115
71 init_defer.addCallback(fillProfileCache) 116 init_defer.addCallback(fillProfileCache)
117
118 def _updateDb(self, interaction, statements):
119 for statement in statements:
120 interaction.execute(statement)
121
72 122
73 #Profiles 123 #Profiles
74 def _profilesCache(self, profiles_result): 124 def _profilesCache(self, profiles_result):
75 """Fill the profiles cache 125 """Fill the profiles cache
76 @param profiles_result: result of the sql profiles query""" 126 @param profiles_result: result of the sql profiles query"""
400 450
401 def __getFirstResult(self, result): 451 def __getFirstResult(self, result):
402 """Return the first result of a database query 452 """Return the first result of a database query
403 Useful when we are looking for one specific value""" 453 Useful when we are looking for one specific value"""
404 return None if not result else result[0][0] 454 return None if not result else result[0][0]
455
456
457 class Updater(object):
458 stmnt_regex = re.compile(r"(?:[\w ]+(?:\([\w, ]+\))?)+")
459 clean_regex = re.compile(r"^ +|(?<= ) +|(?<=,) +| +$")
460 CREATE_SQL = "CREATE TABLE %s (%s)"
461 INSERT_SQL = "INSERT INTO %s VALUES (%s)"
462 DROP_SQL = "DROP TABLE %s"
463 ALTER_SQL = "ALTER TABLE %s ADD COLUMN %s"
464 RENAME_TABLE_SQL = "ALTER TABLE %s RENAME TO %s"
465
466 CONSTRAINTS = ('PRIMARY', 'UNIQUE', 'CHECK', 'FOREIGN')
467 TMP_TABLE = "tmp_sat_update"
468
469 def __init__(self, dbpool, sat_version):
470 self._sat_version = sat_version
471 self.dbpool = dbpool
472
473 def getLocalVersion(self):
474 """ Get local database version
475 @return: version (int)
476
477 """
478 return self.dbpool.runQuery("PRAGMA user_version").addCallback(lambda ret: int(ret[0][0]))
479
480 def _setLocalVersion(self, version):
481 """ Set local database version
482 @param version: version (int)
483 @return: deferred
484
485 """
486 return self.dbpool.runOperation("PRAGMA user_version=%d" % version)
487
488 def getLocalSchema(self):
489 """ return raw local schema
490 @return: list of strings with CREATE sql statements for local database
491
492 """
493 d = self.dbpool.runQuery("select sql from sqlite_master where type = 'table'")
494 d.addCallback(lambda result: [row[0] for row in result])
495 return d
496
497 @defer.inlineCallbacks
498 def checkUpdates(self):
499 """ Check is database schema update is needed, according to DATABASE_SCHEMAS
500 @return: deferred which fire a list of SQL update statements, or None if no update is needed
501
502 """
503 local_version = yield self.getLocalVersion()
504 raw_local_sch = yield self.getLocalSchema()
505 local_sch = self.rawStatements2data(raw_local_sch)
506 current_sch = DATABASE_SCHEMAS['current']['CREATE']
507 local_hash = self.statementHash(local_sch)
508 current_hash = self.statementHash(current_sch)
509
510 if local_hash == current_hash:
511 if local_version != CURRENT_DB_VERSION:
512 warning(_("Your local schema is up-to-date, but database versions mismatch, fixing it..."))
513 yield self._setLocalVersion(CURRENT_DB_VERSION)
514 else:
515 # an update is needed
516
517 if local_version == CURRENT_DB_VERSION:
518 # Database mismatch and we have the latest version
519 if self._sat_version.endswith('D'):
520 # we are in a development version
521 update_data = self.generateUpdateData(local_sch, current_sch, False)
522 warning(_("There is a schema mismatch, but as we are on a dev version, database will be updated"))
523 update_raw = self.update2raw(update_data, True)
524 defer.returnValue(update_raw)
525 else:
526 error(_(u"schema version is up-to-date, but local schema differ from expected current schema"))
527 update_data = self.generateUpdateData(local_sch, current_sch, True)
528 warning(_(u"Here are the commands that should fix the situation, use at your own risk (do a backup before modifying database), you can go to SàT's MUC room at sat@chat.jabberfr.org for help\n### SQL###\n%s\n### END SQL ###\n") % u'\n'.join(("%s;" % statement for statement in self.update2raw(update_data))))
529 raise exceptions.DatabaseError("Database mismatch")
530 else:
531 # Database is not up-to-date, we'll do the update
532 info(_("Database schema has changed, local database will be updated"))
533 update_raw = []
534 for version in xrange(local_version+1, CURRENT_DB_VERSION+1):
535 try:
536 update_data = DATABASE_SCHEMAS[version]
537 except KeyError:
538 raise exceptions.InternalError("Missing update definition (version %d)" % version)
539 update_raw.extend(self.update2raw(update_data))
540 update_raw.append("PRAGMA user_version=%d" % CURRENT_DB_VERSION)
541 defer.returnValue(update_raw)
542
543 @staticmethod
544 def createData2Raw(data):
545 """ Generate SQL statements from statements data
546 @param data: dictionary with table as key, and statements data in tuples as value
547 @return: list of strings with raw statements
548
549 """
550 ret = []
551 for table in data:
552 defs, constraints = data[table]
553 assert isinstance(defs, tuple)
554 assert isinstance(constraints, tuple)
555 ret.append(Updater.CREATE_SQL % (table, ', '.join(defs + constraints)))
556 return ret
557
558 @staticmethod
559 def insertData2Raw(data):
560 """ Generate SQL statements from statements data
561 @param data: dictionary with table as key, and statements data in tuples as value
562 @return: list of strings with raw statements
563
564 """
565 ret = []
566 for table in data:
567 values_tuple = data[table]
568 assert isinstance(values_tuple, tuple)
569 for values in values_tuple:
570 assert isinstance(values, tuple)
571 ret.append(Updater.INSERT_SQL % (table, ', '.join(values)))
572 return ret
573
574 def statementHash(self, data):
575 """ Generate hash of template data
576 useful to compare schemas
577
578 @param data: dictionary of "CREATE" statement, with tables names as key,
579 and tuples of (col_defs, constraints) as values
580 @return: hash as string
581 """
582 hash_ = hashlib.sha1()
583 tables = data.keys()
584 tables.sort()
585
586 def stmnts2str(stmts):
587 return ','.join([self.clean_regex.sub('',stmt) for stmt in sorted(stmts)])
588
589 for table in tables:
590 col_defs, col_constr = data[table]
591 hash_.update("%s:%s:%s" % (table, stmnts2str(col_defs), stmnts2str(col_constr)))
592 return hash_.digest()
593
594 def rawStatements2data(self, raw_statements):
595 """ separate "CREATE" statements into dictionary/tuples data
596 @param raw_statements: list of CREATE statements as strings
597 @return: dictionary with table names as key, and a (col_defs, constraints) tuple
598
599 """
600 schema_dict = {}
601 for create_statement in raw_statements:
602 if not create_statement.startswith("CREATE TABLE "):
603 warning("Unexpected statement, ignoring it")
604 continue
605 _create_statement = create_statement[13:]
606 table, raw_col_stats = _create_statement.split(' ',1)
607 if raw_col_stats[0] != '(' or raw_col_stats[-1] != ')':
608 warning("Unexpected statement structure, ignoring it")
609 continue
610 col_stats = [stmt.strip() for stmt in self.stmnt_regex.findall(raw_col_stats[1:-1])]
611 col_defs = []
612 constraints = []
613 for col_stat in col_stats:
614 name = col_stat.split(' ',1)[0]
615 if name in self.CONSTRAINTS:
616 constraints.append(col_stat)
617 else:
618 col_defs.append(col_stat)
619 schema_dict[table] = (tuple(col_defs), tuple(constraints))
620 return schema_dict
621
622 def generateUpdateData(self, old_data, new_data, modify=False):
623 """ Generate data for automatic update between two schema data
624 @param old_data: data of the former schema (which must be updated)
625 @param new_data: data of the current schema
626 @param modify: if True, always use "cols modify" table, else try to ALTER tables
627 @return: update data, a dictionary with:
628 - 'create': dictionary of tables to create
629 - 'delete': tuple of tables to delete
630 - 'cols create': dictionary of columns to create (table as key, tuple of columns to create as value)
631 - 'cols delete': dictionary of columns to delete (table as key, tuple of columns to delete as value)
632 - 'cols modify': dictionary of columns to modify (table as key, tuple of old columns to transfert as value). With this table, a new table will be created, and content from the old table will be transfered to it, only cols specified in the tuple will be transfered.
633
634 """
635
636 create_tables_data = {}
637 create_cols_data = {}
638 modify_cols_data = {}
639 delete_cols_data = {}
640 old_tables = set(old_data.keys())
641 new_tables = set(new_data.keys())
642
643 def getChanges(set_olds, set_news):
644 to_create = set_news.difference(set_olds)
645 to_delete = set_olds.difference(set_news)
646 to_check = set_news.intersection(set_olds)
647 return tuple(to_create), tuple(to_delete), tuple(to_check)
648
649 tables_to_create, tables_to_delete, tables_to_check = getChanges(old_tables, new_tables)
650
651 for table in tables_to_create:
652 create_tables_data[table] = new_data[table]
653
654 for table in tables_to_check:
655 old_col_defs, old_constraints = old_data[table]
656 new_col_defs, new_constraints = new_data[table]
657 for obj in old_col_defs, old_constraints, new_col_defs, new_constraints:
658 if not isinstance(obj, tuple):
659 raise InternalError("Columns definitions must be tuples")
660 defs_create, defs_delete, ignore = getChanges(set(old_col_defs), set(new_col_defs))
661 constraints_create, constraints_delete, ignore = getChanges(set(old_constraints), set(new_constraints))
662 created_col_names = set([name.split(' ',1)[0] for name in defs_create])
663 deleted_col_names = set([name.split(' ',1)[0] for name in defs_delete])
664 if (created_col_names.intersection(deleted_col_names or constraints_create or constraints_delete) or
665 (modify and (defs_create or constraints_create or defs_delete or constraints_delete))):
666 # we have modified columns, we need to transfer table
667 # we determinate which columns are in both schema so we can transfer them
668 old_names = set([name.split(' ',1)[0] for name in old_col_defs])
669 new_names = set([name.split(' ',1)[0] for name in new_col_defs])
670 modify_cols_data[table] = tuple(old_names.intersection(new_names));
671 else:
672 if defs_create:
673 create_cols_data[table] = (defs_create)
674 if defs_delete or constraints_delete:
675 delete_cols_data[table] = (defs_delete)
676
677 return {'create': create_tables_data,
678 'delete': tables_to_delete,
679 'cols create': create_cols_data,
680 'cols delete': delete_cols_data,
681 'cols modify': modify_cols_data
682 }
683
684 def update2raw(self, update, dev_version=False):
685 """ Transform update data to raw SQLite statements
686 @param upadte: update data as returned by generateUpdateData
687 @param dev_version: if True, update will be done in dev mode: no deletion will be done, instead a message will be shown. This prevent accidental lost of data while working on the code/database.
688 @return: list of string with SQL statements needed to update the base
689
690 """
691 ret = self.createData2Raw(update.get('create', {}))
692 drop = []
693 for table in update.get('delete', tuple()):
694 drop.append(self.DROP_SQL % table)
695 if dev_version:
696 if drop:
697 info("Dev version, SQL NOT EXECUTED:\n--\n%s\n--\n" % "\n".join(drop))
698 else:
699 ret.extend(drop)
700
701 cols_create = update.get('cols create', {})
702 for table in cols_create:
703 for col_def in cols_create[table]:
704 ret.append(self.ALTER_SQL % (table, col_def))
705
706 cols_delete = update.get('cols delete', {})
707 for table in cols_delete:
708 info("Following columns in table [%s] are not needed anymore, but are kept for dev version: %s" % (table, ", ".join(cols_delete[table])))
709
710 cols_modify = update.get('cols modify', {})
711 for table in cols_modify:
712 ret.append(self.RENAME_TABLE_SQL % (table, self.TMP_TABLE))
713 main, extra = DATABASE_SCHEMAS['current']['CREATE'][table]
714 ret.append(self.CREATE_SQL % (table, ', '.join(main + extra)))
715 common_cols = ', '.join(cols_modify[table])
716 ret.append("INSERT INTO %s (%s) SELECT %s FROM %s" % (table, common_cols, common_cols, self.TMP_TABLE))
717 ret.append(self.DROP_SQL % self.TMP_TABLE)
718
719 insert = update.get('insert', {})
720 ret.extend(self.insertData2Raw(insert))
721
722 return ret