Mercurial > libervia-pubsub
diff sat_pubsub/pgsql_storage.py @ 331:e93a9fd329d9
affiliations XMPP handling:
/!\ PostGreSQL minimal version raised to 9.5 (for upsert support)
- affiliations can now be handler after node creation using XMPP: affiliationsGet, affiliationsSet (both for node owner) and affiliations (for an entity to know with which nodes it is affiliated on the service) are implemented
- pgsql: getOrCreateEntities helping method has been implemented, it get entity_id and create missing entities when needed.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 26 Mar 2017 20:58:48 +0200 |
parents | 82d1259b3e36 |
children | 31cbd8b9fa7f |
line wrap: on
line diff
--- a/sat_pubsub/pgsql_storage.py Sun Mar 26 20:52:32 2017 +0200 +++ b/sat_pubsub/pgsql_storage.py Sun Mar 26 20:58:48 2017 +0200 @@ -291,16 +291,23 @@ if cursor.rowcount != 1: raise error.NodeNotFound() - + def getAffiliations(self, entity, nodeIdentifier, pep, recipient=None): + return self.dbpool.runInteraction(self._getAffiliations, entity, nodeIdentifier, pep, recipient) - def getAffiliations(self, entity, pep, recipient=None): - d = self.dbpool.runQuery(*withPEP("""SELECT node, affiliation FROM entities - NATURAL JOIN affiliations - NATURAL JOIN nodes - WHERE jid=%s""", - (entity.userhost(),), pep, recipient, 'nodes')) - d.addCallback(lambda results: [tuple(r) for r in results]) - return d + def _getAffiliations(self, cursor, entity, nodeIdentifier, pep, recipient=None): + query = ["""SELECT node, affiliation FROM entities + NATURAL JOIN affiliations + NATURAL JOIN nodes + WHERE jid=%s"""] + args = [entity.userhost()] + + if nodeIdentifier is not None: + query.append("AND node=%s") + args.append(nodeIdentifier) + + cursor.execute(*withPEP(' '.join(query), args, pep, recipient)) + rows = cursor.fetchall() + return [tuple(r) for r in rows] def getSubscriptions(self, entity, pep, recipient=None): @@ -578,6 +585,77 @@ return {jid.internJID(r[0]): r[1] for r in result} + def getOrCreateEntities(self, cursor, entities_jids): + """Get entity_id from entities in entities table + + Entities will be inserted it they don't exist + @param entities_jid(list[jid.JID]): entities to get or create + @return list[record(entity_jid,jid)]]: list of entity_id and jid (as plain string) + both existing and inserted entities are returned + """ + # cf. http://stackoverflow.com/a/35265559 + placeholders = ','.join(len(entities_jids) * ["(%s)"]) + query = ( + """ + WITH + jid_values (jid) AS ( + VALUES {placeholders} + ), + inserted (entity_id, jid) AS ( + INSERT INTO entities (jid) + SELECT jid + FROM jid_values + ON CONFLICT DO NOTHING + RETURNING entity_id, jid + ) + SELECT e.entity_id, e.jid + FROM entities e JOIN jid_values jv ON jv.jid = e.jid + UNION ALL + SELECT entity_id, jid + FROM inserted""".format(placeholders=placeholders)) + cursor.execute(query, [j.userhost() for j in entities_jids]) + return cursor.fetchall() + + def setAffiliations(self, affiliations): + return self.dbpool.runInteraction(self._setAffiliations, affiliations) + + + def _setAffiliations(self, cursor, affiliations): + self._checkNodeExists(cursor) + + entities = self.getOrCreateEntities(cursor, affiliations) + + # then we construct values for affiliations update according to entity_id we just got + placeholders = ','.join(len(affiliations) * ["(%s,%s,%s)"]) + values = [] + map(values.extend, ((e.entity_id, affiliations[jid.JID(e.jid)], self.nodeDbId) for e in entities)) + + # we use upsert so new values are inserted and existing one updated. This feature is only available for PostgreSQL >= 9.5 + cursor.execute("INSERT INTO affiliations(entity_id,affiliation,node_id) VALUES " + placeholders + " ON CONFLICT (entity_id,node_id) DO UPDATE SET affiliation=EXCLUDED.affiliation", values) + + + def deleteAffiliations(self, entities): + return self.dbpool.runInteraction(self._deleteAffiliations, entities) + + + def _deleteAffiliations(self, cursor, entities): + """delete affiliations and subscriptions for this entity""" + self._checkNodeExists(cursor) + placeholders = ','.join(len(entities) * ["%s"]) + cursor.execute("DELETE FROM affiliations WHERE node_id=%s AND entity_id in (SELECT entity_id FROM entities WHERE jid IN (" + placeholders + ")) RETURNING entity_id", [self.nodeDbId] + [e.userhost() for e in entities]) + + rows = cursor.fetchall() + placeholders = ','.join(len(rows) * ["%s"]) + cursor.execute("DELETE FROM subscriptions WHERE node_id=%s AND entity_id in (" + placeholders + ")", [self.nodeDbId] + [r[0] for r in rows]) + + def getAuthorizedGroups(self): + return self.dbpool.runInteraction(self._getNodeGroups) + + def _getAuthorizedGroups(self, cursor): + cursor.execute("SELECT groupname FROM node_groups_authorized NATURAL JOIN nodes WHERE node=%s", + (self.nodeDbId,)) + rows = cursor.fetchall() + return [row[0] for row in rows] class LeafNode(Node):