comparison sat_pubsub/pgsql_storage.py @ 331:e93a9fd329d9

affiliations XMPP handling: /!\ PostGreSQL minimal version raised to 9.5 (for upsert support) - affiliations can now be handler after node creation using XMPP: affiliationsGet, affiliationsSet (both for node owner) and affiliations (for an entity to know with which nodes it is affiliated on the service) are implemented - pgsql: getOrCreateEntities helping method has been implemented, it get entity_id and create missing entities when needed.
author Goffi <goffi@goffi.org>
date Sun, 26 Mar 2017 20:58:48 +0200
parents 82d1259b3e36
children 31cbd8b9fa7f
comparison
equal deleted inserted replaced
330:82d1259b3e36 331:e93a9fd329d9
289 (nodeIdentifier,), pep, recipient)) 289 (nodeIdentifier,), pep, recipient))
290 290
291 if cursor.rowcount != 1: 291 if cursor.rowcount != 1:
292 raise error.NodeNotFound() 292 raise error.NodeNotFound()
293 293
294 294 def getAffiliations(self, entity, nodeIdentifier, pep, recipient=None):
295 295 return self.dbpool.runInteraction(self._getAffiliations, entity, nodeIdentifier, pep, recipient)
296 def getAffiliations(self, entity, pep, recipient=None): 296
297 d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities 297 def _getAffiliations(self, cursor, entity, nodeIdentifier, pep, recipient=None):
298 NATURAL JOIN affiliations 298 query = ["""SELECT node, affiliation FROM entities
299 NATURAL JOIN nodes 299 NATURAL JOIN affiliations
300 WHERE jid=%s""", 300 NATURAL JOIN nodes
301 (entity.userhost(),), pep, recipient, 'nodes')) 301 WHERE jid=%s"""]
302 d.addCallback(lambda results: [tuple(r) for r in results]) 302 args = [entity.userhost()]
303 return d 303
304 if nodeIdentifier is not None:
305 query.append("AND node=%s")
306 args.append(nodeIdentifier)
307
308 cursor.execute(*withPEP(' '.join(query), args, pep, recipient))
309 rows = cursor.fetchall()
310 return [tuple(r) for r in rows]
304 311
305 312
306 def getSubscriptions(self, entity, pep, recipient=None): 313 def getSubscriptions(self, entity, pep, recipient=None):
307 def toSubscriptions(rows): 314 def toSubscriptions(rows):
308 subscriptions = [] 315 subscriptions = []
576 (self.nodeDbId,)) 583 (self.nodeDbId,))
577 result = cursor.fetchall() 584 result = cursor.fetchall()
578 585
579 return {jid.internJID(r[0]): r[1] for r in result} 586 return {jid.internJID(r[0]): r[1] for r in result}
580 587
588 def getOrCreateEntities(self, cursor, entities_jids):
589 """Get entity_id from entities in entities table
590
591 Entities will be inserted it they don't exist
592 @param entities_jid(list[jid.JID]): entities to get or create
593 @return list[record(entity_jid,jid)]]: list of entity_id and jid (as plain string)
594 both existing and inserted entities are returned
595 """
596 # cf. http://stackoverflow.com/a/35265559
597 placeholders = ','.join(len(entities_jids) * ["(%s)"])
598 query = (
599 """
600 WITH
601 jid_values (jid) AS (
602 VALUES {placeholders}
603 ),
604 inserted (entity_id, jid) AS (
605 INSERT INTO entities (jid)
606 SELECT jid
607 FROM jid_values
608 ON CONFLICT DO NOTHING
609 RETURNING entity_id, jid
610 )
611 SELECT e.entity_id, e.jid
612 FROM entities e JOIN jid_values jv ON jv.jid = e.jid
613 UNION ALL
614 SELECT entity_id, jid
615 FROM inserted""".format(placeholders=placeholders))
616 cursor.execute(query, [j.userhost() for j in entities_jids])
617 return cursor.fetchall()
618
619 def setAffiliations(self, affiliations):
620 return self.dbpool.runInteraction(self._setAffiliations, affiliations)
621
622
623 def _setAffiliations(self, cursor, affiliations):
624 self._checkNodeExists(cursor)
625
626 entities = self.getOrCreateEntities(cursor, affiliations)
627
628 # then we construct values for affiliations update according to entity_id we just got
629 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"])
630 values = []
631 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities))
632
633 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5
634 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values)
635
636
637 def deleteAffiliations(self, entities):
638 return self.dbpool.runInteraction(self._deleteAffiliations, entities)
639
640
641 def _deleteAffiliations(self, cursor, entities):
642 """delete affiliations and subscriptions for this entity"""
643 self._checkNodeExists(cursor)
644 placeholders = ','.join(len(entities) * ["%s"])
645 cursor.execute("DELETE FROM affiliations WHERE node_id=%s AND entity_id in (SELECT entity_id FROM entities WHERE jid IN (" + placeholders + ")) RETURNING entity_id", [self.nodeDbId] + [e.userhost() for e in entities])
646
647 rows = cursor.fetchall()
648 placeholders = ','.join(len(rows) * ["%s"])
649 cursor.execute("DELETE FROM subscriptions WHERE node_id=%s AND entity_id in (" + placeholders + ")", [self.nodeDbId] + [r[0] for r in rows])
650
651 def getAuthorizedGroups(self):
652 return self.dbpool.runInteraction(self._getNodeGroups)
653
654 def _getAuthorizedGroups(self, cursor):
655 cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s",
656 (self.nodeDbId,))
657 rows = cursor.fetchall()
658 return [row[0] for row in rows]
581 659
582 660
583 class LeafNode(Node): 661 class LeafNode(Node):
584 662
585 implements(iidavoll.ILeafNode) 663 implements(iidavoll.ILeafNode)