Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 414:ccb2a22ea0fc
Python 3 port:
/!\ Python 3.6+ is now needed to use SàT Pubsub
/!\ instability may occur and features may not be working anymore, this will improve with time
The same procedure as in backend has been applied (check backend commit ab2696e34d29 logs
for details).
Python minimal version has been updated in setup.py
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 16 Aug 2019 12:53:33 +0200 |
parents | a58610ab2983 |
children | c21f31355ab9 |
comparison
equal
deleted
inserted
replaced
413:a5edf5e1dd74 | 414:ccb2a22ea0fc |
---|---|
1 #!/usr/bin/python | 1 #!/usr/bin/env python3 |
2 #-*- coding: utf-8 -*- | 2 #-*- coding: utf-8 -*- |
3 | 3 |
4 # Copyright (c) 2012-2019 Jérôme Poisson | 4 # Copyright (c) 2012-2019 Jérôme Poisson |
5 # Copyright (c) 2013-2016 Adrien Cossa | 5 # Copyright (c) 2013-2016 Adrien Cossa |
6 # Copyright (c) 2003-2011 Ralph Meijer | 6 # Copyright (c) 2003-2011 Ralph Meijer |
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. | 51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
52 | 52 |
53 | 53 |
54 import copy, logging | 54 import copy, logging |
55 | 55 |
56 from zope.interface import implements | 56 from zope.interface import implementer |
57 | 57 |
58 from twisted.internet import reactor | 58 from twisted.internet import reactor |
59 from twisted.internet import defer | 59 from twisted.internet import defer |
60 from twisted.words.protocols.jabber import jid | 60 from twisted.words.protocols.jabber import jid |
61 from twisted.python import log | 61 from twisted.python import log |
75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) | 75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) |
76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) | 76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) |
77 | 77 |
78 # parseXml manage str, but we get unicode | 78 # parseXml manage str, but we get unicode |
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) | 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) |
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' | 80 ITEMS_SEQ_NAME = 'node_{node_id}_seq' |
81 PEP_COL_NAME = 'pep' | 81 PEP_COL_NAME = 'pep' |
82 CURRENT_VERSION = '5' | 82 CURRENT_VERSION = '5' |
83 # retrieve the maximum integer item id + 1 | 83 # retrieve the maximum integer item id + 1 |
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" | 84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" |
85 | 85 |
100 else: | 100 else: |
101 pep_check="AND {} IS NULL".format(PEP_COL_NAME) | 101 pep_check="AND {} IS NULL".format(PEP_COL_NAME) |
102 return "{} {}".format(query, pep_check), values | 102 return "{} {}".format(query, pep_check), values |
103 | 103 |
104 | 104 |
105 @implementer(iidavoll.IStorage) | |
105 class Storage: | 106 class Storage: |
106 | 107 |
107 implements(iidavoll.IStorage) | |
108 | 108 |
109 defaultConfig = { | 109 defaultConfig = { |
110 'leaf': { | 110 'leaf': { |
111 const.OPT_PERSIST_ITEMS: True, | 111 const.OPT_PERSIST_ITEMS: True, |
112 const.OPT_DELIVER_PAYLOADS: True, | 112 const.OPT_DELIVER_PAYLOADS: True, |
433 if not entities or not nodes or not node_accesses or not item_accesses: | 433 if not entities or not nodes or not node_accesses or not item_accesses: |
434 raise ValueError("entities, nodes and accesses must not be empty") | 434 raise ValueError("entities, nodes and accesses must not be empty") |
435 if node_accesses != ('open',) or item_accesses != ('open',): | 435 if node_accesses != ('open',) or item_accesses != ('open',): |
436 raise NotImplementedError('only "open" access model is handled for now') | 436 raise NotImplementedError('only "open" access model is handled for now') |
437 if not pep: | 437 if not pep: |
438 raise NotImplementedError(u"getLastItems is only implemented for PEP at the moment") | 438 raise NotImplementedError("getLastItems is only implemented for PEP at the moment") |
439 d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model | 439 d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model |
440 FROM items | 440 FROM items |
441 NATURAL JOIN nodes | 441 NATURAL JOIN nodes |
442 WHERE nodes.pep IN %s | 442 WHERE nodes.pep IN %s |
443 AND node IN %s | 443 AND node IN %s |
450 item_accesses)) | 450 item_accesses)) |
451 d.addCallback(self.formatLastItems) | 451 d.addCallback(self.formatLastItems) |
452 return d | 452 return d |
453 | 453 |
454 | 454 |
455 @implementer(iidavoll.INode) | |
455 class Node: | 456 class Node: |
456 | 457 |
457 implements(iidavoll.INode) | |
458 | 458 |
459 def __init__(self, nodeDbId, nodeIdentifier, config, schema): | 459 def __init__(self, nodeDbId, nodeIdentifier, config, schema): |
460 self.nodeDbId = nodeDbId | 460 self.nodeDbId = nodeDbId |
461 self.nodeIdentifier = nodeIdentifier | 461 self.nodeIdentifier = nodeIdentifier |
462 self._config = config | 462 self._config = config |
486 else an UUID will be returned | 486 else an UUID will be returned |
487 """ | 487 """ |
488 if self._config[const.OPT_SERIAL_IDS]: | 488 if self._config[const.OPT_SERIAL_IDS]: |
489 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format( | 489 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format( |
490 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId))) | 490 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId))) |
491 d.addCallback(lambda rows: unicode(rows[0][0])) | 491 d.addCallback(lambda rows: str(rows[0][0])) |
492 return d | 492 return d |
493 else: | 493 else: |
494 return defer.succeed(unicode(uuid.uuid4())) | 494 return defer.succeed(str(uuid.uuid4())) |
495 | 495 |
496 @staticmethod | 496 @staticmethod |
497 def _configurationTriggers(cursor, node_id, old_config, new_config): | 497 def _configurationTriggers(cursor, node_id, old_config, new_config): |
498 """trigger database relative actions needed when a config is changed | 498 """trigger database relative actions needed when a config is changed |
499 | 499 |
643 cursor.execute(query, values) | 643 cursor.execute(query, values) |
644 rows = cursor.fetchall() | 644 rows = cursor.fetchall() |
645 | 645 |
646 subscriptions = [] | 646 subscriptions = [] |
647 for row in rows: | 647 for row in rows: |
648 subscriber = jid.JID(u'%s/%s' % (row.jid, row.resource)) | 648 subscriber = jid.JID('%s/%s' % (row.jid, row.resource)) |
649 | 649 |
650 options = {} | 650 options = {} |
651 if row.subscription_type: | 651 if row.subscription_type: |
652 options['pubsub#subscription_type'] = row.subscription_type; | 652 options['pubsub#subscription_type'] = row.subscription_type; |
653 if row.subscription_depth: | 653 if row.subscription_depth: |
728 # then we construct values for subscriptions update according to entity_id we just got | 728 # then we construct values for subscriptions update according to entity_id we just got |
729 placeholders = ','.join(len(subscriptions) * ["%s"]) | 729 placeholders = ','.join(len(subscriptions) * ["%s"]) |
730 values = [] | 730 values = [] |
731 for subscription in subscriptions: | 731 for subscription in subscriptions: |
732 entity_id = entities_map[subscription.subscriber].entity_id | 732 entity_id = entities_map[subscription.subscriber].entity_id |
733 resource = subscription.subscriber.resource or u'' | 733 resource = subscription.subscriber.resource or '' |
734 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None)) | 734 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None)) |
735 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | 735 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 |
736 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values]) | 736 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values]) |
737 | 737 |
738 def isSubscribed(self, entity): | 738 def isSubscribed(self, entity): |
806 entities = self.getOrCreateEntities(cursor, affiliations) | 806 entities = self.getOrCreateEntities(cursor, affiliations) |
807 | 807 |
808 # then we construct values for affiliations update according to entity_id we just got | 808 # then we construct values for affiliations update according to entity_id we just got |
809 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"]) | 809 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"]) |
810 values = [] | 810 values = [] |
811 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) | 811 for e in entities: |
812 values.extend((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId)) | |
812 | 813 |
813 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | 814 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 |
814 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) | 815 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) |
815 | 816 |
816 def deleteAffiliations(self, entities): | 817 def deleteAffiliations(self, entities): |
834 (self.nodeDbId,)) | 835 (self.nodeDbId,)) |
835 rows = cursor.fetchall() | 836 rows = cursor.fetchall() |
836 return [row[0] for row in rows] | 837 return [row[0] for row in rows] |
837 | 838 |
838 | 839 |
840 @implementer(iidavoll.ILeafNode) | |
839 class LeafNode(Node): | 841 class LeafNode(Node): |
840 | 842 |
841 implements(iidavoll.ILeafNode) | |
842 | 843 |
843 nodeType = 'leaf' | 844 nodeType = 'leaf' |
844 | 845 |
845 def getOrderBy(self, ext_data, direction='DESC'): | 846 def getOrderBy(self, ext_data, direction='DESC'): |
846 """Return ORDER BY clause corresponding to Order By key in ext_data | 847 """Return ORDER BY clause corresponding to Order By key in ext_data |
849 @param direction (unicode): ORDER BY direction (ASC or DESC) | 850 @param direction (unicode): ORDER BY direction (ASC or DESC) |
850 @return (unicode): ORDER BY clause to use | 851 @return (unicode): ORDER BY clause to use |
851 """ | 852 """ |
852 keys = ext_data.get('order_by') | 853 keys = ext_data.get('order_by') |
853 if not keys: | 854 if not keys: |
854 return u'ORDER BY updated ' + direction | 855 return 'ORDER BY updated ' + direction |
855 cols_statmnt = [] | 856 cols_statmnt = [] |
856 for key in keys: | 857 for key in keys: |
857 if key == 'creation': | 858 if key == 'creation': |
858 column = 'item_id' # could work with items.created too | 859 column = 'item_id' # could work with items.created too |
859 elif key == 'modification': | 860 elif key == 'modification': |
860 column = 'updated' | 861 column = 'updated' |
861 else: | 862 else: |
862 log.msg(u"WARNING: Unknown order by key: {key}".format(key=key)) | 863 log.msg("WARNING: Unknown order by key: {key}".format(key=key)) |
863 column = 'updated' | 864 column = 'updated' |
864 cols_statmnt.append(column + u' ' + direction) | 865 cols_statmnt.append(column + ' ' + direction) |
865 | 866 |
866 return u"ORDER BY " + u",".join([col for col in cols_statmnt]) | 867 return "ORDER BY " + ",".join([col for col in cols_statmnt]) |
867 | 868 |
868 @defer.inlineCallbacks | 869 @defer.inlineCallbacks |
869 def storeItems(self, items_data, publisher): | 870 def storeItems(self, items_data, publisher): |
870 # XXX: runInteraction doesn't seem to work when there are several "insert" | 871 # XXX: runInteraction doesn't seem to work when there are several "insert" |
871 # or "update". | 872 # or "update". |
916 # this can happen with serial_ids, if a item has been stored | 917 # this can happen with serial_ids, if a item has been stored |
917 # with a future id (generated by XMPP client) | 918 # with a future id (generated by XMPP client) |
918 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId)) | 919 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId)) |
919 next_id = cursor.fetchone()[0] | 920 next_id = cursor.fetchone()[0] |
920 # we update the sequence, so we can skip conflicting ids | 921 # we update the sequence, so we can skip conflicting ids |
921 cursor.execute(u"SELECT setval('{seq_name}', %s)".format( | 922 cursor.execute("SELECT setval('{seq_name}', %s)".format( |
922 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id]) | 923 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id]) |
923 # and now we can retry the query with the new id | 924 # and now we can retry the query with the new id |
924 item['id'] = insert_data[1] = unicode(next_id) | 925 item['id'] = insert_data[1] = str(next_id) |
925 # item saved in DB must also be updated with the new id | 926 # item saved in DB must also be updated with the new id |
926 insert_data[3] = item.toXml() | 927 insert_data[3] = item.toXml() |
927 cursor.execute(insert_query, insert_data) | 928 cursor.execute(insert_query, insert_data) |
928 else: | 929 else: |
929 # but if we have not serial_ids, we have a real problem | 930 # but if we have not serial_ids, we have a real problem |
1057 if '/' in jid_s: | 1058 if '/' in jid_s: |
1058 query_filters.append("AND publisher=%s") | 1059 query_filters.append("AND publisher=%s") |
1059 args.append(filter_.value) | 1060 args.append(filter_.value) |
1060 else: | 1061 else: |
1061 query_filters.append("AND publisher LIKE %s") | 1062 query_filters.append("AND publisher LIKE %s") |
1062 args.append(u"{}%".format(filter_.value)) | 1063 args.append("{}%".format(filter_.value)) |
1063 elif filter_.var == const.MAM_FILTER_CATEGORY: | 1064 elif filter_.var == const.MAM_FILTER_CATEGORY: |
1064 query.append("LEFT JOIN item_categories USING (item_id)") | 1065 query.append("LEFT JOIN item_categories USING (item_id)") |
1065 query_filters.append("AND category=%s") | 1066 query_filters.append("AND category=%s") |
1066 args.append(filter_.value) | 1067 args.append(filter_.value) |
1067 else: | 1068 else: |