Mercurial > libervia-pubsub
comparison sat_pubsub/pgsql_storage.py @ 331:e93a9fd329d9
affiliations XMPP handling:
/!\ PostGreSQL minimal version raised to 9.5 (for upsert support)
- affiliations can now be handler after node creation using XMPP: affiliationsGet, affiliationsSet (both for node owner) and affiliations (for an entity to know with which nodes it is affiliated on the service) are implemented
- pgsql: getOrCreateEntities helping method has been implemented, it get entity_id and create missing entities when needed.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 26 Mar 2017 20:58:48 +0200 |
parents | 82d1259b3e36 |
children | 31cbd8b9fa7f |
comparison
equal
deleted
inserted
replaced
330:82d1259b3e36 | 331:e93a9fd329d9 |
---|---|
289 (nodeIdentifier,), pep, recipient)) | 289 (nodeIdentifier,), pep, recipient)) |
290 | 290 |
291 if cursor.rowcount != 1: | 291 if cursor.rowcount != 1: |
292 raise error.NodeNotFound() | 292 raise error.NodeNotFound() |
293 | 293 |
294 | 294 def getAffiliations(self, entity, nodeIdentifier, pep, recipient=None): |
295 | 295 return self.dbpool.runInteraction(self._getAffiliations, entity, nodeIdentifier, pep, recipient) |
296 def getAffiliations(self, entity, pep, recipient=None): | 296 |
297 d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities | 297 def _getAffiliations(self, cursor, entity, nodeIdentifier, pep, recipient=None): |
298 NATURAL JOIN affiliations | 298 query = ["""SELECT node, affiliation FROM entities |
299 NATURAL JOIN nodes | 299 NATURAL JOIN affiliations |
300 WHERE jid=%s""", | 300 NATURAL JOIN nodes |
301 (entity.userhost(),), pep, recipient, 'nodes')) | 301 WHERE jid=%s"""] |
302 d.addCallback(lambda results: [tuple(r) for r in results]) | 302 args = [entity.userhost()] |
303 return d | 303 |
304 if nodeIdentifier is not None: | |
305 query.append("AND node=%s") | |
306 args.append(nodeIdentifier) | |
307 | |
308 cursor.execute(*withPEP(' '.join(query), args, pep, recipient)) | |
309 rows = cursor.fetchall() | |
310 return [tuple(r) for r in rows] | |
304 | 311 |
305 | 312 |
306 def getSubscriptions(self, entity, pep, recipient=None): | 313 def getSubscriptions(self, entity, pep, recipient=None): |
307 def toSubscriptions(rows): | 314 def toSubscriptions(rows): |
308 subscriptions = [] | 315 subscriptions = [] |
576 (self.nodeDbId,)) | 583 (self.nodeDbId,)) |
577 result = cursor.fetchall() | 584 result = cursor.fetchall() |
578 | 585 |
579 return {jid.internJID(r[0]): r[1] for r in result} | 586 return {jid.internJID(r[0]): r[1] for r in result} |
580 | 587 |
588 def getOrCreateEntities(self, cursor, entities_jids): | |
589 """Get entity_id from entities in entities table | |
590 | |
591 Entities will be inserted it they don't exist | |
592 @param entities_jid(list[jid.JID]): entities to get or create | |
593 @return list[record(entity_jid,jid)]]: list of entity_id and jid (as plain string) | |
594 both existing and inserted entities are returned | |
595 """ | |
596 # cf. http://stackoverflow.com/a/35265559 | |
597 placeholders = ','.join(len(entities_jids) * ["(%s)"]) | |
598 query = ( | |
599 """ | |
600 WITH | |
601 jid_values (jid) AS ( | |
602 VALUES {placeholders} | |
603 ), | |
604 inserted (entity_id, jid) AS ( | |
605 INSERT INTO entities (jid) | |
606 SELECT jid | |
607 FROM jid_values | |
608 ON CONFLICT DO NOTHING | |
609 RETURNING entity_id, jid | |
610 ) | |
611 SELECT e.entity_id, e.jid | |
612 FROM entities e JOIN jid_values jv ON jv.jid = e.jid | |
613 UNION ALL | |
614 SELECT entity_id, jid | |
615 FROM inserted""".format(placeholders=placeholders)) | |
616 cursor.execute(query, [j.userhost() for j in entities_jids]) | |
617 return cursor.fetchall() | |
618 | |
619 def setAffiliations(self, affiliations): | |
620 return self.dbpool.runInteraction(self._setAffiliations, affiliations) | |
621 | |
622 | |
623 def _setAffiliations(self, cursor, affiliations): | |
624 self._checkNodeExists(cursor) | |
625 | |
626 entities = self.getOrCreateEntities(cursor, affiliations) | |
627 | |
628 # then we construct values for affiliations update according to entity_id we just got | |
629 placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"]) | |
630 values = [] | |
631 map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) | |
632 | |
633 # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 | |
634 cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) | |
635 | |
636 | |
637 def deleteAffiliations(self, entities): | |
638 return self.dbpool.runInteraction(self._deleteAffiliations, entities) | |
639 | |
640 | |
641 def _deleteAffiliations(self, cursor, entities): | |
642 """delete affiliations and subscriptions for this entity""" | |
643 self._checkNodeExists(cursor) | |
644 placeholders = ','.join(len(entities) * ["%s"]) | |
645 cursor.execute("DELETE FROM affiliations WHERE node_id=%s AND entity_id in (SELECT entity_id FROM entities WHERE jid IN (" + placeholders + ")) RETURNING entity_id", [self.nodeDbId] + [e.userhost() for e in entities]) | |
646 | |
647 rows = cursor.fetchall() | |
648 placeholders = ','.join(len(rows) * ["%s"]) | |
649 cursor.execute("DELETE FROM subscriptions WHERE node_id=%s AND entity_id in (" + placeholders + ")", [self.nodeDbId] + [r[0] for r in rows]) | |
650 | |
651 def getAuthorizedGroups(self): | |
652 return self.dbpool.runInteraction(self._getNodeGroups) | |
653 | |
654 def _getAuthorizedGroups(self, cursor): | |
655 cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", | |
656 (self.nodeDbId,)) | |
657 rows = cursor.fetchall() | |
658 return [row[0] for row in rows] | |
581 | 659 |
582 | 660 |
583 class LeafNode(Node): | 661 class LeafNode(Node): |
584 | 662 |
585 implements(iidavoll.ILeafNode) | 663 implements(iidavoll.ILeafNode) |