comparison sat_pubsub/pgsql_storage.py @ 414:ccb2a22ea0fc

Python 3 port: /!\ Python 3.6+ is now needed to use SàT Pubsub /!\ instability may occur and features may not be working anymore, this will improve with time The same procedure as in backend has been applied (check backend commit ab2696e34d29 logs for details). Python minimal version has been updated in setup.py
author Goffi <goffi@goffi.org>
date Fri, 16 Aug 2019 12:53:33 +0200
parents a58610ab2983
children c21f31355ab9
comparison
equal deleted inserted replaced
413:a5edf5e1dd74 414:ccb2a22ea0fc
1 #!/usr/bin/python 1 #!/usr/bin/env python3
2 #-*- coding: utf-8 -*- 2 #-*- coding: utf-8 -*-
3 3
4 # Copyright (c) 2012-2019 Jérôme Poisson 4 # Copyright (c) 2012-2019 Jérôme Poisson
5 # Copyright (c) 2013-2016 Adrien Cossa 5 # Copyright (c) 2013-2016 Adrien Cossa
6 # Copyright (c) 2003-2011 Ralph Meijer 6 # Copyright (c) 2003-2011 Ralph Meijer
51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 51 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
52 52
53 53
54 import copy, logging 54 import copy, logging
55 55
56 from zope.interface import implements 56 from zope.interface import implementer
57 57
58 from twisted.internet import reactor 58 from twisted.internet import reactor
59 from twisted.internet import defer 59 from twisted.internet import defer
60 from twisted.words.protocols.jabber import jid 60 from twisted.words.protocols.jabber import jid
61 from twisted.python import log 61 from twisted.python import log
75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE) 75 psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY) 76 psycopg2.extensions.register_type(psycopg2.extensions.UNICODEARRAY)
77 77
78 # parseXml manage str, but we get unicode 78 # parseXml manage str, but we get unicode
79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8')) 79 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
80 ITEMS_SEQ_NAME = u'node_{node_id}_seq' 80 ITEMS_SEQ_NAME = 'node_{node_id}_seq'
81 PEP_COL_NAME = 'pep' 81 PEP_COL_NAME = 'pep'
82 CURRENT_VERSION = '5' 82 CURRENT_VERSION = '5'
83 # retrieve the maximum integer item id + 1 83 # retrieve the maximum integer item id + 1
84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'" 84 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
85 85
100 else: 100 else:
101 pep_check="AND {} IS NULL".format(PEP_COL_NAME) 101 pep_check="AND {} IS NULL".format(PEP_COL_NAME)
102 return "{} {}".format(query, pep_check), values 102 return "{} {}".format(query, pep_check), values
103 103
104 104
105 @implementer(iidavoll.IStorage)
105 class Storage: 106 class Storage:
106 107
107 implements(iidavoll.IStorage)
108 108
109 defaultConfig = { 109 defaultConfig = {
110 'leaf': { 110 'leaf': {
111 const.OPT_PERSIST_ITEMS: True, 111 const.OPT_PERSIST_ITEMS: True,
112 const.OPT_DELIVER_PAYLOADS: True, 112 const.OPT_DELIVER_PAYLOADS: True,
433 if not entities or not nodes or not node_accesses or not item_accesses: 433 if not entities or not nodes or not node_accesses or not item_accesses:
434 raise ValueError("entities, nodes and accesses must not be empty") 434 raise ValueError("entities, nodes and accesses must not be empty")
435 if node_accesses != ('open',) or item_accesses != ('open',): 435 if node_accesses != ('open',) or item_accesses != ('open',):
436 raise NotImplementedError('only "open" access model is handled for now') 436 raise NotImplementedError('only "open" access model is handled for now')
437 if not pep: 437 if not pep:
438 raise NotImplementedError(u"getLastItems is only implemented for PEP at the moment") 438 raise NotImplementedError("getLastItems is only implemented for PEP at the moment")
439 d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model 439 d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model
440 FROM items 440 FROM items
441 NATURAL JOIN nodes 441 NATURAL JOIN nodes
442 WHERE nodes.pep IN %s 442 WHERE nodes.pep IN %s
443 AND node IN %s 443 AND node IN %s
450 item_accesses)) 450 item_accesses))
451 d.addCallback(self.formatLastItems) 451 d.addCallback(self.formatLastItems)
452 return d 452 return d
453 453
454 454
455 @implementer(iidavoll.INode)
455 class Node: 456 class Node:
456 457
457 implements(iidavoll.INode)
458 458
459 def __init__(self, nodeDbId, nodeIdentifier, config, schema): 459 def __init__(self, nodeDbId, nodeIdentifier, config, schema):
460 self.nodeDbId = nodeDbId 460 self.nodeDbId = nodeDbId
461 self.nodeIdentifier = nodeIdentifier 461 self.nodeIdentifier = nodeIdentifier
462 self._config = config 462 self._config = config
486 else an UUID will be returned 486 else an UUID will be returned
487 """ 487 """
488 if self._config[const.OPT_SERIAL_IDS]: 488 if self._config[const.OPT_SERIAL_IDS]:
489 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format( 489 d = self.dbpool.runQuery("SELECT nextval('{seq_name}')".format(
490 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId))) 490 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)))
491 d.addCallback(lambda rows: unicode(rows[0][0])) 491 d.addCallback(lambda rows: str(rows[0][0]))
492 return d 492 return d
493 else: 493 else:
494 return defer.succeed(unicode(uuid.uuid4())) 494 return defer.succeed(str(uuid.uuid4()))
495 495
496 @staticmethod 496 @staticmethod
497 def _configurationTriggers(cursor, node_id, old_config, new_config): 497 def _configurationTriggers(cursor, node_id, old_config, new_config):
498 """trigger database relative actions needed when a config is changed 498 """trigger database relative actions needed when a config is changed
499 499
643 cursor.execute(query, values) 643 cursor.execute(query, values)
644 rows = cursor.fetchall() 644 rows = cursor.fetchall()
645 645
646 subscriptions = [] 646 subscriptions = []
647 for row in rows: 647 for row in rows:
648 subscriber = jid.JID(u'%s/%s' % (row.jid, row.resource)) 648 subscriber = jid.JID('%s/%s' % (row.jid, row.resource))
649 649
650 options = {} 650 options = {}
651 if row.subscription_type: 651 if row.subscription_type:
652 options['pubsub#subscription_type'] = row.subscription_type; 652 options['pubsub#subscription_type'] = row.subscription_type;
653 if row.subscription_depth: 653 if row.subscription_depth:
728 # then we construct values for subscriptions update according to entity_id we just got 728 # then we construct values for subscriptions update according to entity_id we just got
729 placeholders = ','.join(len(subscriptions) * ["%s"]) 729 placeholders = ','.join(len(subscriptions) * ["%s"])
730 values = [] 730 values = []
731 for subscription in subscriptions: 731 for subscription in subscriptions:
732 entity_id = entities_map[subscription.subscriber].entity_id 732 entity_id = entities_map[subscription.subscriber].entity_id
733 resource = subscription.subscriber.resource or u'' 733 resource = subscription.subscriber.resource or ''
734 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None)) 734 values.append((self.nodeDbId, entity_id, resource, subscription.state, None, None))
735 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 735 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
736 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values]) 736 cursor.execute("INSERT INTO subscriptions(node_id, entity_id, resource, state, subscription_type, subscription_depth) VALUES " + placeholders + " ON CONFLICT (entity_id, resource, node_id) DO UPDATE SET state=EXCLUDED.state", [v for v in values])
737 737
738 def isSubscribed(self, entity): 738 def isSubscribed(self, entity):
806 entities = self.getOrCreateEntities(cursor, affiliations) 806 entities = self.getOrCreateEntities(cursor, affiliations)
807 807
808 # then we construct values for affiliations update according to entity_id we just got 808 # then we construct values for affiliations update according to entity_id we just got
809 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"]) 809 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"])
810 values = [] 810 values = []
811 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) 811 for e in entities:
812 values.extend((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId))
812 813
813 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 814 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
814 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) 815 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values)
815 816
816 def deleteAffiliations(self, entities): 817 def deleteAffiliations(self, entities):
834 (self.nodeDbId,)) 835 (self.nodeDbId,))
835 rows = cursor.fetchall() 836 rows = cursor.fetchall()
836 return [row[0] for row in rows] 837 return [row[0] for row in rows]
837 838
838 839
840 @implementer(iidavoll.ILeafNode)
839 class LeafNode(Node): 841 class LeafNode(Node):
840 842
841 implements(iidavoll.ILeafNode)
842 843
843 nodeType = 'leaf' 844 nodeType = 'leaf'
844 845
845 def getOrderBy(self, ext_data, direction='DESC'): 846 def getOrderBy(self, ext_data, direction='DESC'):
846 """Return ORDER BY clause corresponding to Order By key in ext_data 847 """Return ORDER BY clause corresponding to Order By key in ext_data
849 @param direction (unicode): ORDER BY direction (ASC or DESC) 850 @param direction (unicode): ORDER BY direction (ASC or DESC)
850 @return (unicode): ORDER BY clause to use 851 @return (unicode): ORDER BY clause to use
851 """ 852 """
852 keys = ext_data.get('order_by') 853 keys = ext_data.get('order_by')
853 if not keys: 854 if not keys:
854 return u'ORDER BY updated ' + direction 855 return 'ORDER BY updated ' + direction
855 cols_statmnt = [] 856 cols_statmnt = []
856 for key in keys: 857 for key in keys:
857 if key == 'creation': 858 if key == 'creation':
858 column = 'item_id' # could work with items.created too 859 column = 'item_id' # could work with items.created too
859 elif key == 'modification': 860 elif key == 'modification':
860 column = 'updated' 861 column = 'updated'
861 else: 862 else:
862 log.msg(u"WARNING: Unknown order by key: {key}".format(key=key)) 863 log.msg("WARNING: Unknown order by key: {key}".format(key=key))
863 column = 'updated' 864 column = 'updated'
864 cols_statmnt.append(column + u' ' + direction) 865 cols_statmnt.append(column + ' ' + direction)
865 866
866 return u"ORDER BY " + u",".join([col for col in cols_statmnt]) 867 return "ORDER BY " + ",".join([col for col in cols_statmnt])
867 868
868 @defer.inlineCallbacks 869 @defer.inlineCallbacks
869 def storeItems(self, items_data, publisher): 870 def storeItems(self, items_data, publisher):
870 # XXX: runInteraction doesn't seem to work when there are several "insert" 871 # XXX: runInteraction doesn't seem to work when there are several "insert"
871 # or "update". 872 # or "update".
916 # this can happen with serial_ids, if a item has been stored 917 # this can happen with serial_ids, if a item has been stored
917 # with a future id (generated by XMPP client) 918 # with a future id (generated by XMPP client)
918 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId)) 919 cursor.execute(NEXT_ITEM_ID_QUERY.format(node_id=self.nodeDbId))
919 next_id = cursor.fetchone()[0] 920 next_id = cursor.fetchone()[0]
920 # we update the sequence, so we can skip conflicting ids 921 # we update the sequence, so we can skip conflicting ids
921 cursor.execute(u"SELECT setval('{seq_name}', %s)".format( 922 cursor.execute("SELECT setval('{seq_name}', %s)".format(
922 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id]) 923 seq_name = ITEMS_SEQ_NAME.format(node_id=self.nodeDbId)), [next_id])
923 # and now we can retry the query with the new id 924 # and now we can retry the query with the new id
924 item['id'] = insert_data[1] = unicode(next_id) 925 item['id'] = insert_data[1] = str(next_id)
925 # item saved in DB must also be updated with the new id 926 # item saved in DB must also be updated with the new id
926 insert_data[3] = item.toXml() 927 insert_data[3] = item.toXml()
927 cursor.execute(insert_query, insert_data) 928 cursor.execute(insert_query, insert_data)
928 else: 929 else:
929 # but if we have not serial_ids, we have a real problem 930 # but if we have not serial_ids, we have a real problem
1057 if '/' in jid_s: 1058 if '/' in jid_s:
1058 query_filters.append("AND publisher=%s") 1059 query_filters.append("AND publisher=%s")
1059 args.append(filter_.value) 1060 args.append(filter_.value)
1060 else: 1061 else:
1061 query_filters.append("AND publisher LIKE %s") 1062 query_filters.append("AND publisher LIKE %s")
1062 args.append(u"{}%".format(filter_.value)) 1063 args.append("{}%".format(filter_.value))
1063 elif filter_.var == const.MAM_FILTER_CATEGORY: 1064 elif filter_.var == const.MAM_FILTER_CATEGORY:
1064 query.append("LEFT JOIN item_categories USING (item_id)") 1065 query.append("LEFT JOIN item_categories USING (item_id)")
1065 query_filters.append("AND category=%s") 1066 query_filters.append("AND category=%s")
1066 args.append(filter_.value) 1067 args.append(filter_.value)
1067 else: 1068 else: