changeset 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents 9125a6e440c0
children cfa40fa108a4
files db/libervia_pubsub_update_9_10.sql db/pubsub.sql sat_pubsub/backend.py sat_pubsub/const.py sat_pubsub/delegation.py sat_pubsub/pam.py sat_pubsub/pgsql_storage.py sat_pubsub/privilege.py sat_pubsub/pubsub_admin.py twisted/plugins/pubsub.py
diffstat 10 files changed, 859 insertions(+), 202 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/db/libervia_pubsub_update_9_10.sql	Wed May 11 13:39:08 2022 +0200
@@ -0,0 +1,24 @@
+-- we check version of the database before doing anything
+-- and stop execution if not good
+\set ON_ERROR_STOP
+DO $$
+DECLARE ver text;
+BEGIN
+    SELECT value INTO ver FROM metadata WHERE key='version';
+    IF NOT FOUND OR ver!='9' THEN
+        RAISE EXCEPTION 'This update file needs to be applied on database schema version 9, you use version %',ver;
+    END IF;
+END$$;
+\unset ON_ERROR_STOP
+-- end of version check
+
+/* subscriptions table updates */
+/* to handle external nodes */
+ALTER TABLE subscriptions ALTER COLUMN node_id DROP NOT NULL;
+ALTER TABLE subscriptions ADD COLUMN ext_service text;
+ALTER TABLE subscriptions ADD COLUMN ext_node text;
+/* and to mark a subscription as public */
+ALTER TABLE subscriptions ADD COLUMN public boolean NOT NULL DEFAULT FALSE;
+ALTER TABLE subscriptions ADD UNIQUE (entity_id, ext_service, ext_node);
+
+UPDATE metadata SET value='10' WHERE key='version';
--- a/db/pubsub.sql	Mon Jan 03 16:48:22 2022 +0100
+++ b/db/pubsub.sql	Wed May 11 13:39:08 2022 +0200
@@ -15,8 +15,8 @@
     deliver_payloads boolean NOT NULL DEFAULT TRUE,
     max_items integer NOT NULL DEFAULT 0
         CHECK (max_items >= 0),
-	overwrite_policy text NOT NULL DEFAULT 'original_publisher'
-		CHECK (overwrite_policy IN ('original_publisher', 'any_publisher')),
+    overwrite_policy text NOT NULL DEFAULT 'original_publisher'
+        CHECK (overwrite_policy IN ('original_publisher', 'any_publisher')),
     serial_ids boolean NOT NULL DEFAULT FALSE,
     consistent_publisher boolean NOT NULL DEFAULT FALSE,
     fts_language text NOT NULL DEFAULT 'generic',
@@ -53,14 +53,21 @@
     subscription_id serial PRIMARY KEY,
     entity_id integer NOT NULL REFERENCES entities ON DELETE CASCADE,
     resource text,
-    node_id integer NOT NULL REFERENCES nodes ON delete CASCADE,
+    node_id integer REFERENCES nodes ON DELETE CASCADE,
+    /* when we reference an external node (with PAM), node_id is NULL and service and node
+     * are set */
+    ext_service text,
+    ext_node text,
     state text NOT NULL DEFAULT 'subscribed'
         CHECK (state IN ('subscribed', 'pending', 'unconfigured')),
     subscription_type text
         CHECK (subscription_type IN (NULL, 'items', 'nodes')),
     subscription_depth text
         CHECK (subscription_depth IN (NULL, '1', 'all')),
-    UNIQUE (entity_id, resource, node_id));
+    public boolean NOT NULL DEFAULT FALSE,
+    UNIQUE (entity_id, resource, node_id)),
+    UNIQUE (entity_id, ext_service, ext_node)),
+    ;
 
 CREATE TABLE items (
     item_id serial PRIMARY KEY,
@@ -154,4 +161,4 @@
     value text
 );
 
-INSERT INTO metadata VALUES ('version', '9');
+INSERT INTO metadata VALUES ('version', '10');
--- a/sat_pubsub/backend.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/backend.py	Wed May 11 13:39:08 2022 +0200
@@ -62,6 +62,7 @@
 
 import copy
 import uuid
+import hashlib
 from typing import Optional, List, Tuple
 
 from zope.interface import implementer
@@ -240,10 +241,14 @@
     def _getFTSLanguagesEb(self, failure_):
         log.msg(f"WARNING: can get FTS languages: {failure_}")
 
-    def isAdmin(self, entity_jid):
+    def isAdmin(self, entity_jid: jid.JID) -> bool:
         """Return True if an entity is an administrator"""
         return entity_jid.userhostJID() in self.admins
 
+    def isFromServer(self, entity_jid: jid.JID) -> bool:
+        """Return True if an entity come from our server"""
+        return entity_jid.host == self.server_jid.host
+
     def supportsPublishOptions(self):
         return True
 
@@ -595,18 +600,21 @@
     def registerPurgeNotifier(self, observerfn, *args, **kwargs):
         self.addObserver('//event/pubsub/purge', observerfn, *args, **kwargs)
 
-    def subscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
+    async def subscribe(
+        self,
+        nodeIdentifier: str,
+        subscriber: jid.JID,
+        requestor: jid.JID,
+        options: Optional[dict],
+        pep: bool,
+        recipient: jid.JID
+    ) -> pubsub.Subscription:
         subscriberEntity = subscriber.userhostJID()
         if subscriberEntity != requestor.userhostJID():
-            return defer.fail(error.Forbidden())
+            raise error.Forbidden()
 
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(_getAffiliation, subscriberEntity)
-        d.addCallback(self._doSubscribe, subscriber, pep, recipient)
-        return d
-
-    def _doSubscribe(self, result, subscriber, pep, recipient):
-        node, affiliation = result
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+        __, affiliation = await _getAffiliation(node, subscriberEntity)
 
         if affiliation == 'outcast':
             raise error.Forbidden()
@@ -614,67 +622,61 @@
         access_model = node.getAccessModel()
 
         if access_model == const.VAL_AMODEL_OPEN:
-            d = defer.succeed(None)
+            pass
         elif access_model == const.VAL_AMODEL_PRESENCE:
-            d = self.checkPresenceSubscription(node, subscriber)
+            await self.checkPresenceSubscription(node, subscriber)
         elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
-            d = self.checkRosterGroups(node, subscriber)
+            await self.checkRosterGroups(node, subscriber)
         elif access_model == const.VAL_AMODEL_WHITELIST:
-            d = self.checkNodeAffiliations(node, subscriber)
+            await self.checkNodeAffiliations(node, subscriber)
         else:
             raise NotImplementedError
 
-        def trapExists(failure):
-            failure.trap(error.SubscriptionExists)
-            return False
+        config = {}
+        if options and options.get(f"{{{const.NS_PPS}}}public"):
+            config["public"] = True
+        try:
+            await node.addSubscription(subscriber, 'subscribed', config)
+        except error.SubscriptionExists:
+            send_last = False
+        else:
+            send_last = True
 
-        def cb(sendLast):
-            d = node.getSubscription(subscriber)
-            if sendLast:
-                d.addCallback(self._sendLastPublished, node, pep, recipient)
-            return d
-
-        d.addCallback(lambda _: node.addSubscription(subscriber, 'subscribed', {}))
-        d.addCallbacks(lambda _: True, trapExists)
-        d.addCallback(cb)
-
-        return d
-
-    def _sendLastPublished(self, subscription, node, pep, recipient):
+        subscription = await node.getSubscription(subscriber)
 
-        def notifyItem(items_data):
-            if items_data:
-                reactor.callLater(0, self.dispatch,
-                                     {'items_data': items_data,
-                                      'node': node,
-                                      'pep': pep,
-                                      'recipient': recipient,
-                                      'subscription': subscription,
-                                     },
-                                     '//event/pubsub/notify')
-
-        config = node.getConfiguration()
-        sendLastPublished = config.get('pubsub#send_last_published_item',
-                                       'never')
-        if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
-            entity = subscription.subscriber.userhostJID()
-            d = defer.ensureDeferred(
-                self.getItemsData(
-                    node.nodeIdentifier, entity, recipient, maxItems=1, ext_data={'pep': pep}
+        if send_last:
+            config = node.getConfiguration()
+            sendLastPublished = config.get(
+                'pubsub#send_last_published_item', 'never'
+            )
+            if sendLastPublished == 'on_sub' and node.nodeType == 'leaf':
+                entity = subscription.subscriber.userhostJID()
+                items_data, __ = await self.getItemsData(
+                    node.nodeIdentifier, entity, recipient, maxItems=1,
+                    ext_data={'pep': pep}
                 )
-            )
-            d.addCallback(notifyItem)
-            d.addErrback(log.err)
+                if items_data:
+                    reactor.callLater(
+                        0,
+                        self.dispatch,
+                        {'items_data': items_data,
+                         'node': node,
+                         'pep': pep,
+                         'recipient': recipient,
+                         'subscription': subscription,
+                         },
+                        '//event/pubsub/notify'
+                    )
 
         return subscription
 
-    def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
+    async def unsubscribe(self, nodeIdentifier, subscriber, requestor, pep, recipient):
         if subscriber.userhostJID() != requestor.userhostJID():
-            return defer.fail(error.Forbidden())
+            raise error.Forbidden()
 
-        d = self.storage.getNode(nodeIdentifier, pep, recipient)
-        d.addCallback(lambda node: node.removeSubscription(subscriber))
-        return d
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+        await node.removeSubscription(subscriber)
+        return pubsub.Subscription(nodeIdentifier, subscriber, "none")
 
     def getSubscriptions(self, requestor, nodeIdentifier, pep, recipient):
         """retrieve subscriptions of an entity
@@ -685,7 +687,9 @@
         @param pep(bool): True if it's a PEP request
         @param recipient(jid.JID, None): recipient of the PEP request
         """
-        return self.storage.getSubscriptions(requestor, nodeIdentifier, pep, recipient)
+        return self.storage.getSubscriptions(
+            requestor, nodeIdentifier, None, pep, recipient
+        )
 
     def supportsAutoCreate(self):
         return True
@@ -749,6 +753,10 @@
         if not nodeIdentifier:
             return defer.fail(error.NoRootNode())
 
+        if ((nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS
+             or nodeIdentifier.startswith(const.PPS_SUBSCRIBERS_PREFIX))):
+            return defer.succeed({const.OPT_ACCESS_MODEL: const.VAL_AMODEL_OPEN})
+
         d = self.storage.getNode(nodeIdentifier, pep, recipient)
         d.addCallback(lambda node: node.getConfiguration())
 
@@ -1067,15 +1075,19 @@
         )
         return ids
 
-    def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
-                       itemIdentifiers=None, ext_data=None):
-        d = defer.ensureDeferred(
-            self.getItemsData(
-                nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data
-            )
+    async def getItems(
+        self,
+        nodeIdentifier: str,
+        requestor: jid.JID,
+        recipient: jid.JID,
+        maxItems: Optional[int] = None,
+        itemIdentifiers: Optional[List[str]] = None,
+        ext_data: Optional[dict] = None
+    ) -> Tuple[List[domish.Element], Optional[rsm.RSMResponse]]:
+        items_data, rsm_response = await self.getItemsData(
+            nodeIdentifier, requestor, recipient, maxItems, itemIdentifiers, ext_data
         )
-        d.addCallback(lambda items_data: [item_data.item for item_data in items_data])
-        return d
+        return [item_data.item for item_data in items_data], rsm_response
 
     async def getOwnerRoster(self, node, owners=None):
         # FIXME: roster of publisher, not owner, must be used
@@ -1099,20 +1111,40 @@
             return
         return roster
 
-    async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
-                       itemIdentifiers=None, ext_data=None):
+    async def getItemsData(
+        self,
+        nodeIdentifier: str,
+        requestor: jid.JID,
+        recipient: jid.JID,
+        maxItems: Optional[int] = None,
+        itemIdentifiers: Optional[List[str]] = None,
+        ext_data: Optional[dict] = None
+    ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]:
         """like getItems but return the whole ItemData"""
         if maxItems == 0:
             log.msg("WARNING: maxItems=0 on items retrieval")
-            return []
+            return [], None
 
         if ext_data is None:
             ext_data = {}
+
+        if nodeIdentifier == const.NS_PPS_SUBSCRIPTIONS:
+            return await self.getPublicSubscriptions(
+                requestor, maxItems, itemIdentifiers, ext_data,
+                ext_data.pop("pep", False), recipient
+            )
+        elif nodeIdentifier.startswith(f"{const.NS_PPS_SUBSCRIBERS}/"):
+            target_node = nodeIdentifier[len(const.NS_PPS_SUBSCRIBERS)+1:]
+            return await self.getPublicNodeSubscriptions(
+                target_node, requestor, maxItems, itemIdentifiers, ext_data,
+                ext_data.pop("pep", False), recipient
+            )
+
         node = await self.storage.getNode(nodeIdentifier, ext_data.get('pep', False), recipient)
         try:
             affiliation, owner, roster, access_model = await self.checkNodeAccess(node, requestor)
         except error.NotLeafNodeError:
-            return []
+            return [], None
 
         # at this point node access is checked
 
@@ -1155,9 +1187,9 @@
         if schema is not None:
             self.filterItemsWithSchema(items_data, schema, owner)
 
-        await self._items_rsm(
-            items_data, node, requestor_groups, owner, itemIdentifiers, ext_data)
-        return items_data
+        return await self._items_rsm(
+            items_data, node, requestor_groups, owner, itemIdentifiers, ext_data
+        )
 
     def _setCount(self, value, response):
         response.count = value
@@ -1180,48 +1212,135 @@
             rsm_request = ext_data['rsm']
         except KeyError:
             # No RSM in this request, nothing to do
-            return items_data
+            return items_data, None
 
         if itemIdentifiers:
             log.msg("WARNING, itemIdentifiers used with RSM, ignoring the RSM part")
-            return items_data
+            return items_data, None
 
-        response = rsm.RSMResponse()
+        rsm_response = rsm.RSMResponse()
 
         d_count = node.getItemsCount(authorized_groups, owner, ext_data)
-        d_count.addCallback(self._setCount, response)
+        d_count.addCallback(self._setCount, rsm_response)
         d_list = [d_count]
 
         if items_data:
-            response.first = items_data[0].item['id']
-            response.last = items_data[-1].item['id']
+            rsm_response.first = items_data[0].item['id']
+            rsm_response.last = items_data[-1].item['id']
 
             # index handling
             if rsm_request.index is not None:
-                response.index = rsm_request.index
+                rsm_response.index = rsm_request.index
             elif rsm_request.before:
                 # The last page case (before == '') is managed in render method
                 d_index = node.getItemsIndex(rsm_request.before, authorized_groups, owner, ext_data)
-                d_index.addCallback(self._setIndex, response, -len(items_data))
+                d_index.addCallback(self._setIndex, rsm_response, -len(items_data))
                 d_list.append(d_index)
             elif rsm_request.after is not None:
                 d_index = node.getItemsIndex(rsm_request.after, authorized_groups, owner, ext_data)
-                d_index.addCallback(self._setIndex, response, 1)
+                d_index.addCallback(self._setIndex, rsm_response, 1)
                 d_list.append(d_index)
             else:
                 # the first page was requested
-                response.index = 0
+                rsm_response.index = 0
 
 
         await defer.DeferredList(d_list)
 
         if rsm_request.before == '':
             # the last page was requested
-            response.index = response.count - len(items_data)
+            rsm_response.index = rsm_response.count - len(items_data)
+
+        return items_data, rsm_response
+
+    def addEltFromSubDict(
+        self,
+        parent_elt: domish.Element,
+        from_jid: Optional[jid.JID],
+        sub_dict: dict[str, str],
+        namespace: Optional[str] = None,
+    ) -> None:
+        """Generate <subscription> element from storage.getAllSubscriptions's dict
+
+        @param parent_elt: element where the new subscription element must be added
+        @param sub_dict: subscription data as returned by storage.getAllSubscriptions
+        @param namespace: if not None, namespace to use for <subscription> element
+        @param service_attribute: name of the attribute to use for the subscribed service
+        """
+        subscription_elt = parent_elt.addElement(
+            "subscription" if namespace is None else (namespace, "subscription")
+        )
+        if from_jid is not None:
+            subscription_elt["jid"] = from_jid.userhost()
+        if sub_dict["node"] is not None:
+            if sub_dict["pep"] is not None:
+                subscription_elt["service"] = sub_dict["pep"]
+            else:
+                subscription_elt["service"] = self.jid.full()
+            subscription_elt["node"] = sub_dict["node"]
+        else:
+            subscription_elt["service"] = sub_dict["ext_service"]
+            subscription_elt["node"] = sub_dict["ext_node"]
+        subscription_elt["subscription"] = sub_dict["state"]
+
+    async def getPublicSubscriptions(
+        self,
+        requestor: jid.JID,
+        maxItems: Optional[int],
+        itemIdentifiers: Optional[List[str]],
+        ext_data: dict,
+        pep: bool,
+        recipient: jid.JID
+    ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]:
 
-        items_data.append(container.ItemData(response.toElement()))
+        if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"):
+            raise NotImplementedError(
+                "item identifiers, RSM and MAM are not implemented yet"
+            )
+
+        if not pep:
+            return [], None
+
+        subs = await self.storage.getAllSubscriptions(recipient, True)
+        items_data = []
+        for sub in subs:
+            if sub["state"] != "subscribed":
+                continue
+            item = domish.Element((pubsub.NS_PUBSUB, "item"))
+            item["id"] = sub["id"]
+            self.addEltFromSubDict(item, None, sub, const.NS_PPS)
+            items_data.append(container.ItemData(item))
+
+        return items_data, None
 
-        return items_data
+    async def getPublicNodeSubscriptions(
+        self,
+        nodeIdentifier: str,
+        requestor: jid.JID,
+        maxItems: Optional[int],
+        itemIdentifiers: Optional[List[str]],
+        ext_data: dict,
+        pep: bool,
+        recipient: jid.JID
+    ) -> Tuple[List[container.ItemData], Optional[rsm.RSMResponse]]:
+
+        if itemIdentifiers or ext_data.get("rsm") or ext_data.get("mam"):
+            raise NotImplementedError(
+                "item identifiers, RSM and MAM are not implemented yet"
+            )
+
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+
+        subs = await node.getSubscriptions(public=True)
+        items_data = []
+        for sub in subs:
+            item = domish.Element((pubsub.NS_PUBSUB, "item"))
+            item["id"] = sub.id
+            subscriber_elt = item.addElement((const.NS_PPS, "subscriber"))
+            subscriber_elt["jid"] = sub.subscriber.full()
+            items_data.append(container.ItemData(item))
+
+        return items_data, None
 
     async def retractItem(self, nodeIdentifier, itemIdentifiers, requestor, notify, pep, recipient):
         node = await self.storage.getNode(nodeIdentifier, pep, recipient)
@@ -1824,19 +1943,26 @@
         return d.addErrback(self._mapErrors)
 
     def subscribe(self, request):
-        d = self.backend.subscribe(request.nodeIdentifier,
-                                   request.subscriber,
-                                   request.sender,
-                                   self._isPep(request),
-                                   request.recipient)
+        d = defer.ensureDeferred(
+            self.backend.subscribe(
+                request.nodeIdentifier,
+                request.subscriber,
+                request.sender,
+                request.options,
+                self._isPep(request),
+                request.recipient
+            )
+        )
         return d.addErrback(self._mapErrors)
 
     def unsubscribe(self, request):
-        d = self.backend.unsubscribe(request.nodeIdentifier,
+        d = defer.ensureDeferred(
+            self.backend.unsubscribe(request.nodeIdentifier,
                                      request.subscriber,
                                      request.sender,
                                      self._isPep(request),
                                      request.recipient)
+        )
         return d.addErrback(self._mapErrors)
 
     def subscriptions(self, request):
@@ -1933,12 +2059,16 @@
         except AttributeError:
             pass
         ext_data['order_by'] = request.orderBy or []
-        d = self.backend.getItems(request.nodeIdentifier,
-                                  request.sender,
-                                  request.recipient,
-                                  request.maxItems,
-                                  request.itemIdentifiers,
-                                  ext_data)
+        d = defer.ensureDeferred(
+            self.backend.getItems(
+                request.nodeIdentifier,
+                request.sender,
+                request.recipient,
+                request.maxItems,
+                request.itemIdentifiers,
+                ext_data
+            )
+        )
         return d.addErrback(self._mapErrors)
 
     def retract(self, request):
@@ -1987,7 +2117,8 @@
             # cf. https://xmpp.org/extensions/xep-0060.html#subscriber-retrieve-returnsome
             disco.DiscoFeature(const.NS_PUBSUB_RSM),
             disco.DiscoFeature(pubsub.NS_ORDER_BY),
-            disco.DiscoFeature(const.NS_FDP)
+            disco.DiscoFeature(const.NS_FDP),
+            disco.DiscoFeature(const.NS_PPS)
         ]
 
     def getDiscoItems(self, requestor, service, nodeIdentifier=''):
--- a/sat_pubsub/const.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/const.py	Wed May 11 13:39:08 2022 +0200
@@ -59,8 +59,13 @@
 NS_PUBSUB_RSM = "http://jabber.org/protocol/pubsub#rsm"
 NS_FORWARD = 'urn:xmpp:forward:0'
 NS_FDP = 'urn:xmpp:fdp:0'
+NS_PPS = 'urn:xmpp:pps:0'
+NS_PPS_SUBSCRIPTIONS = "urn:xmpp:pps:subscriptions:0"
+NS_PPS_SUBSCRIBERS = "urn:xmpp:pps:subscribers:0"
 NS_SCHEMA_RESTRICT = 'https://salut-a-toi/protocol/schema#restrict:0'
 
+PPS_SUBSCRIBERS_PREFIX = f"{NS_PPS_SUBSCRIBERS}/"
+
 FDP_TEMPLATE_PREFIX = "fdp/template/"
 FDP_SUBMITTED_PREFIX = "fdp/submitted/"
 
--- a/sat_pubsub/delegation.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/delegation.py	Wed May 11 13:39:08 2022 +0200
@@ -20,16 +20,19 @@
 
 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service
 
-from wokkel.subprotocols import XMPPHandler
+from typing import Callable, Any
+
+from twisted.python import log
+from twisted.internet import reactor, defer
+from twisted.words.protocols.jabber import error, jid
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
 from wokkel import pubsub
 from wokkel import data_form
-from wokkel import disco, iwokkel, generic
-from wokkel.iwokkel import IPubSubService
+from wokkel import disco, iwokkel
 from wokkel import mam
-from twisted.python import log
-from twisted.words.protocols.jabber import ijabber, jid, error
-from twisted.words.protocols.jabber.xmlstream import toResponse
-from twisted.words.xish import domish
+from wokkel.iwokkel import IPubSubService
+from wokkel.subprotocols import XMPPHandler
 from zope.interface import implementer
 
 DELEGATION_NS = 'urn:xmpp:delegation:2'
@@ -40,6 +43,7 @@
 DELEGATION_MAIN_SEP = "::"
 DELEGATION_BARE_SEP = ":bare:"
 
+SEND_HOOK_TIMEOUT = 300
 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"),
            (mam.IMAMService, mam, "MAMRequest"),
            (None, disco, "_DiscoRequest"))
@@ -108,10 +112,28 @@
             self._service_hack()
         self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
         self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
-        self._current_iqs = {} # dict of iq being handler by delegation
-        self._xs_send = self.xmlstream.send
+        self._current_iqs = {} # dict of iq being handled by delegation
+        self.xs_send = self.xmlstream.send
         self.xmlstream.send = self._sendHack
 
+    def delegatedResult(
+        self,
+        iq_req_elt: domish.Element,
+        iq_resp_elt: domish.Element,
+        wrapping_iq_elt: domish.Element
+    ) -> None:
+        """Method called when a result to a delegated stanza has been received
+
+        The result is wrapped and sent back to server
+        """
+        iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result')
+        fwd_elt = iq_result_elt.addElement(
+            'delegation', DELEGATION_NS
+        ).addElement('forwarded', FORWARDED_NS)
+        fwd_elt.addChild(iq_resp_elt)
+        iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client'
+        self.xs_send(iq_result_elt)
+
     def _sendHack(self, elt):
         """This method is called instead of xmlstream to control sending
 
@@ -119,24 +141,24 @@
         """
         if isinstance(elt, domish.Element) and elt.name=='iq':
             try:
-                id_ = elt.getAttribute('id')
-                ori_iq, managed_entity = self._current_iqs[id_]
-                if jid.JID(elt['to']) != managed_entity:
-                    log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})"
-                            .format(got=jid.JID(elt['to']), expected=managed_entity))
+                iq_id = elt["id"]
+                iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id]
+                if elt['to'] != iq_req_elt["from"]:
+                    log.err(
+                        "IQ id conflict: the managed entity doesn't match (got "
+                        f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})"
+                    )
                     raise KeyError
             except KeyError:
                 # the iq is not a delegated one
-                self._xs_send(elt)
+                self.xs_send(elt)
             else:
-                del self._current_iqs[id_]
-                iq_result_elt = toResponse(ori_iq, 'result')
-                fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
-                fwd_elt.addChild(elt)
-                elt.uri = elt.defaultUri = 'jabber:client'
-                self._xs_send(iq_result_elt)
+                if not timeout.called:
+                    timeout.cancel()
+                    del self._current_iqs[iq_id]
+                callback(iq_req_elt, elt, *cb_args)
         else:
-            self._xs_send(elt)
+            self.xs_send(elt)
 
     def _obsWrapper(self, observer, stanza):
         """Wrapper to observer which catch StanzaError
@@ -147,7 +169,7 @@
             observer(stanza)
         except error.StanzaError as e:
             error_elt = e.toResponse(stanza)
-            self._xs_send(error_elt)
+            self.xs_send(error_elt)
         stanza.handled = True
 
     def onAdvertise(self, message):
@@ -186,12 +208,39 @@
                 log.msg("Invalid stanza received ({})".format(e))
 
         log.msg('delegations updated:\n{}'.format(
-            '\n'.join(["    - namespace {}{}".format(ns,
+            '\n'.join(["- namespace {}{}".format(ns,
             "" if not attributes else " with filtering on {} attribute(s)".format(
             ", ".join(attributes))) for ns, attributes in list(delegated.items())])))
 
         if not pubsub.NS_PUBSUB in delegated:
-            log.msg("Didn't got pubsub delegation from server, can't act as a PEP service")
+            log.msg(
+                "Didn't got pubsub delegation from server, can't act as a PEP service"
+            )
+
+    def registerSendHook(
+        self,
+        iq_elt: domish.Element,
+        callback: Callable[[domish.Element, domish.Element, ...], None],
+        *args
+    ) -> None:
+        """Register a methode to call when an IQ element response is received
+
+        If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted
+        @param iq_elt: source IQ element sent. Its "id" attribute will be used to track
+            response
+        @param callback: method to call when answer is seen
+            Will be called with:
+                - original IQ request
+                - received IQ result (or error)
+                - optional extra arguments
+            self.xs_send should be used to send final result
+        @param *args: argument to use with callback
+        """
+        iq_id = iq_elt["id"]
+        timeout = reactor.callLater(
+            SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None)
+        )
+        self._current_iqs[iq_id] = (iq_elt, callback, args, timeout)
 
     def onForward(self, iq):
         """Manage forwarded iq
@@ -210,9 +259,7 @@
         except StopIteration:
             raise error.StanzaError('not-acceptable')
 
-        managed_entity = jid.JID(fwd_iq['from'])
-
-        self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
+        self.registerSendHook(fwd_iq, self.delegatedResult, iq)
         fwd_iq.delegated = True
 
         # we need a recipient in pubsub request for PEP
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat_pubsub/pam.py	Wed May 11 13:39:08 2022 +0200
@@ -0,0 +1,197 @@
+#!/usr/bin/env python3
+#
+# Copyright (c) 2015-2022 Jérôme Poisson
+
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+"This module implements XEP-0376 (Pubsub Account Management)"
+
+from typing import Optional
+
+from twisted.internet import defer
+from twisted.python import log
+from twisted.words.protocols.jabber import jid, xmlstream, error as jabber_error
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel, pubsub, data_form
+from wokkel.iwokkel import IPubSubService
+from zope.interface import implementer
+
+from sat_pubsub import error
+
+NS_PAM = "urn:xmpp:pam:0"
+PAM_SUB_XPATH = (
+    f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/subscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
+)
+PAM_UNSUB_XPATH = (
+    f'/iq[@type="set"]/pam[@xmlns="{NS_PAM}"]/unsubscribe[@xmlns="{pubsub.NS_PUBSUB}"]'
+)
+PAM_SUBSCRIPTIONS_XPATH = (
+    f'/iq[@type="get"]/subscriptions[@xmlns="{NS_PAM}"]'
+)
+
+
+@implementer(iwokkel.IDisco)
+class PAMHandler(disco.DiscoClientProtocol):
+
+    def __init__(self, service_jid):
+        super(PAMHandler, self).__init__()
+        self.backend = None
+
+    def connectionInitialized(self):
+        for handler in self.parent.handlers:
+            if IPubSubService.providedBy(handler):
+                self._pubsub_service = handler
+                break
+        self.backend = self.parent.parent.getServiceNamed('backend')
+        self.xmlstream.addObserver(PAM_SUB_XPATH, self._onSubscribe)
+        self.xmlstream.addObserver(PAM_UNSUB_XPATH, self._onUnsubscribe)
+        self.xmlstream.addObserver(PAM_SUBSCRIPTIONS_XPATH, self._onSubscriptions)
+
+    def getServerUser(self, iq_elt: domish.Element) -> Optional[jid.JID]:
+        """Get JID of sender if it's a user from our server
+
+        If it's a user from an external server, None is returned and a message is log
+        """
+        from_jid = jid.JID(iq_elt["from"])
+        if not self.backend.isFromServer(from_jid):
+            log.msg(f"ignoring PAM request from external user: {iq_elt.toXml()}")
+        else:
+            return jid.JID(iq_elt["from"])
+
+    def onSubscribeResult(self, iq_req_elt, iq_result_elt, pam_iq_elt):
+        destinee_jid = jid.JID(iq_req_elt["from"])
+        sender_jid = jid.JID(iq_req_elt["to"])
+        message_elt = domish.Element((None, "message"))
+        message_elt["to"] = destinee_jid.userhost()
+        message_elt["from"] = destinee_jid.userhost()
+        # XXX: we explicitely store the notification to be sure that all clients get it
+        message_elt.addElement(("urn:xmpp:hints", "store"))
+        notify_elt = message_elt.addElement((NS_PAM, "notify"))
+        notify_elt["service"] = sender_jid.full()
+        notify_elt.addChild(iq_result_elt.pubsub.subscription)
+        self.backend.privilege.sendMessage(message_elt)
+        pam_iq_result_elt = xmlstream.toResponse(pam_iq_elt, 'result')
+        self.xmlstream.send(pam_iq_result_elt)
+
+    async def onSubRequest(self, from_jid, iq_elt, subscribe=True):
+        try:
+            service_jid = jid.JID(iq_elt.pam.getAttribute("jid"))
+        except RuntimeError:
+            log.msg(
+                f'Invalid PAM element (missing "jid" attribute): {iq_elt.toXml()}'
+            )
+            return
+        iq_elt.handled = True
+        new_iq_elt = domish.Element((None, "iq"))
+        new_iq_elt["from"] = from_jid.userhost()
+        new_iq_elt["to"] = service_jid.full()
+        new_iq_elt.addUniqueId()
+        new_iq_elt["type"] = "set"
+        new_pubsub_elt = new_iq_elt.addElement((pubsub.NS_PUBSUB, "pubsub"))
+        new_pubsub_elt.addChild(
+            iq_elt.pam.subscribe if subscribe else iq_elt.pam.unsubscribe
+        )
+        try:
+            options_elt = next(iq_elt.pam.elements(pubsub.NS_PUBSUB, "options"))
+        except StopIteration:
+            options_elt = None
+        else:
+            new_pubsub_elt.addChild(options_elt)
+
+        if self.backend.isFromServer(service_jid):
+            # the request is handled locally
+            new_iq_elt.delegated = True
+            self.backend.delegation.registerSendHook(
+                new_iq_elt, self.onSubscribeResult, iq_elt
+            )
+            self.xmlstream.dispatch(new_iq_elt)
+        else:
+            # this is a (un)subscribe request to an external server
+            sub_result_elt = await self.backend.privilege.sendIQ(new_iq_elt)
+            if sub_result_elt["type"] == "result":
+                if subscribe:
+                    node = new_iq_elt.pubsub.subscribe["node"]
+                    state = sub_result_elt.pubsub.subscription.getAttribute(
+                        "subscription", "subscribed"
+                    )
+                    public = False
+                    if options_elt is not None:
+                        options_form = data_form.findForm(
+                            options_elt, pubsub.NS_PUBSUB_SUBSCRIBE_OPTIONS
+                        )
+                        if options_form is not None:
+                            public = options_form.get(f"{{{const.NS_PPS}}}public", False)
+                    await self.backend.storage.addExternalSubscription(
+                        from_jid.userhostJID(),
+                        service_jid,
+                        node,
+                        state,
+                        public
+                    )
+                else:
+                    node = new_iq_elt.pubsub.unsubscribe["node"]
+                    try:
+                        await self.backend.storage.removeExternalSubscription(
+                            from_jid.userhostJID(),
+                            service_jid,
+                            node,
+                        )
+                    except error.NotSubscribed:
+                        pass
+            self.onSubscribeResult(new_iq_elt, sub_result_elt, iq_elt)
+
+    def _onSubscribe(self, iq_elt: domish.Element) -> None:
+        if not iq_elt.delegated:
+            return
+        from_jid = self.getServerUser(iq_elt)
+        if from_jid is not None:
+            defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt))
+
+    def _onUnsubscribe(self, iq_elt: domish.Element) -> None:
+        if not iq_elt.delegated:
+            return
+        from_jid = self.getServerUser(iq_elt)
+        if from_jid is not None:
+            defer.ensureDeferred(self.onSubRequest(from_jid, iq_elt, subscribe=False))
+
+    async def onSubscriptions(self, from_jid: jid.JID, iq_elt: domish.Element) -> None:
+        iq_elt.handled = True
+        try:
+            subs = await self.backend.storage.getAllSubscriptions(from_jid)
+        except Exception as e:
+            error_elt = jabber_error.StanzaError(
+                "internal-server-error",
+                text=str(e)
+            ).toResponse(iq_elt)
+            self.xmlstream.send(error_elt)
+        else:
+            result_elt = xmlstream.toResponse(iq_elt, "result")
+            subscriptions_elt = result_elt.addElement((NS_PAM, "subscriptions"))
+            for sub in subs:
+                self.backend.addEltFromSubDict(subscriptions_elt, from_jid, sub)
+            self.xmlstream.send(result_elt)
+
+    def _onSubscriptions(self, iq_elt: domish.Element) -> None:
+        if not iq_elt.delegated:
+            return
+        from_jid = self.getServerUser(iq_elt)
+        if from_jid is not None:
+            defer.ensureDeferred(self.onSubscriptions(from_jid, iq_elt))
+
+    def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
+        return [disco.DiscoFeature(NS_PAM)]
+
+    def getDiscoItems(self, requestor, service, nodeIdentifier=''):
+        return []
--- a/sat_pubsub/pgsql_storage.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/pgsql_storage.py	Wed May 11 13:39:08 2022 +0200
@@ -49,7 +49,7 @@
 # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 
-
+from typing import Optional, List
 import copy, logging
 from datetime import datetime, timezone
 
@@ -80,7 +80,7 @@
 parseXml = lambda unicode_data: generic.parseXml(unicode_data.encode('utf-8'))
 ITEMS_SEQ_NAME = 'node_{node_id}_seq'
 PEP_COL_NAME = 'pep'
-CURRENT_VERSION = '9'
+CURRENT_VERSION = '10'
 # retrieve the maximum integer item id + 1
 NEXT_ITEM_ID_QUERY = r"SELECT COALESCE(max(item::integer)+1,1) as val from items where node_id={node_id} and item ~ E'^\\d+$'"
 
@@ -406,8 +406,15 @@
         rows = cursor.fetchall()
         return [tuple(r) for r in rows]
 
-    def getSubscriptions(self, entity, nodeIdentifier=None, pep=False, recipient=None):
-        """retrieve subscriptions of an entity
+    def getSubscriptions(
+        self,
+        entity: jid.JID,
+        nodeIdentifier: Optional[str] = None,
+        public: Optional[bool] = None,
+        pep: bool = False,
+        recipient: Optional[jid.JID]=None
+    ) -> List[Subscription]:
+        """Retrieve local subscriptions of an entity
 
         @param entity(jid.JID): entity to check
         @param nodeIdentifier(unicode, None): node identifier
@@ -425,25 +432,136 @@
                 subscriptions.append(subscription)
             return subscriptions
 
-        query = ["""SELECT node,
+        query = ["""SELECT nodes.node,
                            jid,
                            resource,
                            state
                     FROM entities
                     NATURAL JOIN subscriptions
-                    NATURAL JOIN nodes
-                    WHERE jid=%s"""]
-
+                    LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id
+                    WHERE jid=%s AND subscriptions.node_id IS NOT NULL"""]
         args = [entity.userhost()]
 
+        if public is not None:
+            query.append("AND subscriptions.public=%s")
+            args.append(public)
+
         if nodeIdentifier is not None:
-            query.append("AND node=%s")
+            query.append("AND nodes.node=%s")
             args.append(nodeIdentifier)
 
         d = self.dbpool.runQuery(*withPEP(' '.join(query), args, pep, recipient))
         d.addCallback(toSubscriptions)
         return d
 
+    async def getAllSubscriptions(
+        self,
+        entity: jid.JID,
+        public: Optional[bool] = None
+    ):
+        query = """SELECT  subscription_id::text as id,
+                           node,
+                           pep,
+                           ext_service,
+                           ext_node,
+                           state
+                    FROM entities
+                    NATURAL JOIN subscriptions
+                    LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id
+                    WHERE jid=%s"""
+        args = [entity.userhost()]
+        if public is not None:
+            query += "AND public=%s"
+            args.append(public)
+        rows = await self.dbpool.runQuery(query, args)
+        return [r._asdict() for r in rows]
+
+    def addExternalSubscription(
+        self,
+        entity: jid.JID,
+        service: jid.JID,
+        node: str,
+        state: str,
+        public: bool = False
+    ) -> defer.Deferred:
+        """Store a subscription to an external node
+
+        @param entity: entity being subscribed
+        @param service: pubsub service hosting the node
+        @param node: pubsub node being subscribed to
+        @param state: state of the subscription
+        @param public: True if the subscription is publicly visible
+        """
+        return self.dbpool.runInteraction(
+            self._addExternalSubscription,
+            entity, service, node, state, public
+        )
+
+    def _addExternalSubscription(
+        self,
+        cursor,
+        entity: jid.JID,
+        service: jid.JID,
+        node: str,
+        state: str,
+        public: bool
+    ) -> None:
+
+        try:
+            cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
+                           (entity.userhost(),))
+        except cursor._pool.dbapi.IntegrityError:
+            cursor.connection.rollback()
+
+        cursor.execute("""INSERT INTO subscriptions
+                          (ext_service, ext_node, entity_id, state, public)
+                          SELECT %s, %s, entity_id, %s, %s FROM
+                          (SELECT entity_id FROM entities
+                                            WHERE jid=%s) AS ent_id
+                       ON CONFLICT(entity_id, ext_service, ext_node) DO UPDATE SET public=EXCLUDED.public""",
+                       (service.full(),
+                        node,
+                        state,
+                        public,
+                        entity.userhost()
+                        ))
+
+    def removeExternalSubscription(
+        self,
+        entity: jid.JID,
+        service: jid.JID,
+        node: str,
+    ) -> defer.Deferred:
+        """Remove a subscription from an external node
+
+        @param entity: entity being unsubscribed
+        @param service: pubsub service hosting the node
+        @param node: pubsub node being unsubscribed to
+        """
+        return self.dbpool.runInteraction(
+            self._removeExternalSubscription,
+            entity, service, node
+        )
+
+    def _removeExternalSubscription(
+        self,
+        cursor,
+        entity: jid.JID,
+        service: jid.JID,
+        node: str,
+    ) -> None:
+        cursor.execute("""DELETE FROM subscriptions WHERE
+                          ext_service=%s AND
+                          ext_node=%s AND
+                          entity_id=(SELECT entity_id FROM entities
+                                                      WHERE jid=%s)
+                          """,
+                       (service.full(),
+                        node,
+                        entity.userhost()))
+        if cursor.rowcount != 1:
+            raise error.NotSubscribed()
+
     def getDefaultConfiguration(self, nodeType):
         return self.defaultConfig[nodeType].copy()
 
@@ -683,9 +801,9 @@
         resource = subscriber.resource or ''
 
         cursor.execute("""SELECT state FROM subscriptions
-                          NATURAL JOIN nodes
+                          LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id
                           NATURAL JOIN entities
-                          WHERE node_id=%s AND jid=%s AND resource=%s""",
+                          WHERE subscriptions.node_id=%s AND jid=%s AND resource=%s""",
                        (self.nodeDbId,
                         userhost,
                         resource))
@@ -696,25 +814,38 @@
         else:
             return Subscription(self.nodeIdentifier, subscriber, row[0])
 
-    def getSubscriptions(self, state=None):
-        return self.dbpool.runInteraction(self._getSubscriptions, state)
+    def getSubscriptions(
+        self,
+        state: Optional[str]=None,
+        public: Optional[bool] = None
+    ) -> List[Subscription]:
+        return self.dbpool.runInteraction(self._getSubscriptions, state, public)
 
-    def _getSubscriptions(self, cursor, state):
+    def _getSubscriptions(
+        self,
+        cursor,
+        state: Optional[str],
+        public: Optional[bool] = None,
+    ) -> List[Subscription]:
         self._checkNodeExists(cursor)
 
-        query = """SELECT node, jid, resource, state,
+        query = ["""SELECT subscription_id::text, nodes.node, jid, resource, state,
                           subscription_type, subscription_depth
-                   FROM subscriptions
-                   NATURAL JOIN nodes
-                   NATURAL JOIN entities
-                   WHERE node_id=%s"""
+                    FROM subscriptions
+                    LEFT JOIN nodes ON nodes.node_id=subscriptions.node_id
+                    NATURAL JOIN entities
+                    WHERE subscriptions.node_id=%s"""]
         values = [self.nodeDbId]
 
         if state:
-            query += " AND state=%s"
+            query.append("AND state=%s")
             values.append(state)
 
-        cursor.execute(query, values)
+        if public is not None:
+            query.append("AND public=%s")
+            values.append(public)
+
+        cursor.execute(" ".join(query), values)
         rows = cursor.fetchall()
 
         subscriptions = []
@@ -727,8 +858,9 @@
             if row.subscription_depth:
                 options['pubsub#subscription_depth'] = row.subscription_depth;
 
-            subscriptions.append(Subscription(row.node, subscriber,
-                                              row.state, options))
+            subscription = Subscription(row.node, subscriber, row.state, options)
+            subscription.id = row.subscription_id
+            subscriptions.append(subscription)
 
         return subscriptions
 
@@ -744,6 +876,7 @@
 
         subscription_type = config.get('pubsub#subscription_type')
         subscription_depth = config.get('pubsub#subscription_depth')
+        public = config.get("public", False)
 
         try:
             cursor.execute("""INSERT INTO entities (jid) VALUES (%s)""",
@@ -751,20 +884,31 @@
         except cursor._pool.dbapi.IntegrityError:
             cursor.connection.rollback()
 
-        try:
-            cursor.execute("""INSERT INTO subscriptions
-                              (node_id, entity_id, resource, state,
-                               subscription_type, subscription_depth)
-                              SELECT %s, entity_id, %s, %s, %s, %s FROM
-                              (SELECT entity_id FROM entities
-                                                WHERE jid=%s) AS ent_id""",
-                           (self.nodeDbId,
-                            resource,
-                            state,
-                            subscription_type,
-                            subscription_depth,
-                            userhost))
-        except cursor._pool.dbapi.IntegrityError:
+        # the RETURNING trick to detect INSERT vs UPDATE comes from
+        # https://stackoverflow.com/a/47001830/4188764 thanks!
+        cursor.execute("""INSERT INTO subscriptions
+                          (node_id, entity_id, resource, state,
+                           subscription_type, subscription_depth, public)
+                          SELECT %s, entity_id, %s, %s, %s, %s, %s FROM
+                          (SELECT entity_id FROM entities
+                                            WHERE jid=%s) AS ent_id
+                       ON CONFLICT (entity_id, node_id, resource) DO UPDATE SET public=EXCLUDED.public
+                       RETURNING (xmax = 0) AS inserted""",
+                       (self.nodeDbId,
+                        resource,
+                        state,
+                        subscription_type,
+                        subscription_depth,
+                        public,
+                        userhost))
+
+        rows = cursor.fetchone()
+        if not rows.inserted:
+            # this was an update, the subscription was already existing
+
+            # we have to explicitly commit, otherwise the exception raised rollbacks the
+            # transation
+            cursor.connection.commit()
             raise error.SubscriptionExists()
 
     def removeSubscription(self, subscriber):
--- a/sat_pubsub/privilege.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/privilege.py	Wed May 11 13:39:08 2022 +0200
@@ -19,13 +19,13 @@
 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and "
 "presences"
 
-from typing import Dict, List, Optional, Set
+from typing import Dict, List, Optional, Union, Set
 import time
 
 from twisted.internet import defer
 from twisted.python import log
-from twisted.python import failure
 from twisted.words.protocols.jabber import error, jid
+from twisted.words.protocols.jabber import xmlstream
 from twisted.words.xish import domish
 from wokkel import xmppim
 from wokkel import pubsub
@@ -35,13 +35,14 @@
 
 from .error import NotAllowedError
 
-FORWARDED_NS = 'urn:xmpp:forward:0'
-PRIV_ENT_NS = 'urn:xmpp:privilege:1'
-PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(PRIV_ENT_NS)
+NS_FORWARDED = 'urn:xmpp:forward:0'
+NS_PRIV_ENT = 'urn:xmpp:privilege:2'
+PRIV_ENT_ADV_XPATH = '/message/privilege[@xmlns="{}"]'.format(NS_PRIV_ENT)
 ROSTER_NS = 'jabber:iq:roster'
 PERM_ROSTER = 'roster'
 PERM_MESSAGE = 'message'
 PERM_PRESENCE = 'presence'
+PERM_IQ = 'iq'
 ALLOWED_ROSTER = ('none', 'get', 'set', 'both')
 ALLOWED_MESSAGE = ('none', 'outgoing')
 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster')
@@ -50,6 +51,13 @@
     PERM_MESSAGE:ALLOWED_MESSAGE,
     PERM_PRESENCE:ALLOWED_PRESENCE
 }
+PERMS_BASE : dict[str, Optional[Union[str, Dict[str, Union[str, bool]]]]]= {
+    PERM_ROSTER: None,
+    PERM_MESSAGE: None,
+    PERM_PRESENCE: None,
+    PERM_IQ: None,
+}
+
 
 # Number of seconds before a roster cache is not considered valid anymore.
 # We keep this delay to avoid requesting roster too much in a row if an entity is
@@ -70,9 +78,7 @@
     def __init__(self, service_jid):
         super(PrivilegesHandler, self).__init__()
         self.backend = None
-        self._permissions = {PERM_ROSTER: 'none',
-                             PERM_MESSAGE: 'none',
-                             PERM_PRESENCE: 'none'}
+        self._permissions = PERMS_BASE.copy()
         self._pubsub_service = None
         self.caps_map = {}  # key: bare jid, value: dict of resources with caps hash
         # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to
@@ -120,34 +126,69 @@
 
         self._permissions will be updated according to advertised privileged
         """
-        privilege_elt = next(message.elements(PRIV_ENT_NS, 'privilege'))
-        for perm_elt in privilege_elt.elements(PRIV_ENT_NS):
+        self._permissions = PERMS_BASE.copy()
+        privilege_elt = next(message.elements(NS_PRIV_ENT, 'privilege'))
+        for perm_elt in privilege_elt.elements(NS_PRIV_ENT, 'perm'):
             try:
-                if perm_elt.name != 'perm':
-                    raise InvalidStanza('unexpected element {}'.format(perm_elt.name))
-                perm_access = perm_elt['access']
-                perm_type = perm_elt['type']
+                perm_access = perm_elt["access"]
+            except KeyError:
+                log.err(f"missing 'access' attribute in perm element: {perm_elt.toXml()}")
+                continue
+            if perm_access in (PERM_ROSTER, PERM_MESSAGE, PERM_PRESENCE):
                 try:
+                    perm_type = perm_elt["type"]
+                except KeyError:
+                    log.err(
+                        "missing 'type' attribute in perm element: "
+                        f"{perm_elt.toXml()}"
+                    )
+                    continue
+                else:
                     if perm_type not in TO_CHECK[perm_access]:
-                        raise InvalidStanza(
-                            'bad type [{}] for permission {}'
-                            .format(perm_type, perm_access)
+                        log.err(
+                            f'bad type {perm_type!r}: {perm_elt.toXml()}'
+                        )
+                        continue
+                self._permissions[perm_access] = perm_type or None
+            elif perm_access == "iq":
+                iq_perms = self._permissions["iq"] = {}
+                for namespace_elt in perm_elt.elements(NS_PRIV_ENT, "namespace"):
+                    ns = namespace_elt.getAttribute("ns")
+                    perm_type = namespace_elt.getAttribute("type")
+                    if not ns or not perm_type:
+                        log.err(
+                            f"invalid namespace element: {namespace_elt.toXml()}"
                         )
-                except KeyError:
-                    raise InvalidStanza('bad permission [{}]'.format(perm_access))
-            except InvalidStanza as e:
-                log.msg(
-                    f"Invalid stanza received ({e}), setting permission to none"
-                )
-                for perm in self._permissions:
-                    self._permissions[perm] = 'none'
-                break
+                    else:
+                        if perm_type not in ("get", "set", "both"):
+                            log.err(
+                                f"invalid namespace type: {namespace_elt.toXml()}"
+                            )
+                        else:
+                            ns_perms = iq_perms[ns] = {"type": perm_type}
+                            ns_perms["get"] = perm_type in ("get", "both")
+                            ns_perms["set"] = perm_type in ("set", "both")
+            else:
+                log.err(f"unknown {perm_access!r} access: {perm_elt.toXml()}'")
 
-            self._permissions[perm_access] = perm_type or 'none'
+        perms = self._permissions
+        perms_iq = perms["iq"]
+        if perms_iq is None:
+            iq_perm_txt = "    no iq perm advertised"
+        elif not isinstance(perms_iq, dict):
+            raise ValueError('INTERNAL ERROR: "iq" perm should a dict')
+        else:
+            iq_perm_txt = "\n".join(
+                f"    - {ns}: {perms['type']}"
+                for ns, perms in perms_iq.items()
+            )
 
         log.msg(
-            'Privileges updated: roster={roster}, message={message}, presence={presence}'
-            .format(**self._permissions)
+            "Privileges updated:\n"
+            f"roster: {perms[PERM_ROSTER]}\n"
+            f"message: {perms[PERM_MESSAGE]}\n"
+            f"presence: {perms[PERM_PRESENCE]}\n"
+            f"iq:\n{iq_perm_txt}"
         )
 
     ## roster ##
@@ -308,8 +349,8 @@
         if to_jid is None:
             to_jid = self.backend.server_jid
         main_message['to'] = to_jid.full()
-        privilege_elt = main_message.addElement((PRIV_ENT_NS, 'privilege'))
-        forwarded_elt = privilege_elt.addElement((FORWARDED_NS, 'forwarded'))
+        privilege_elt = main_message.addElement((NS_PRIV_ENT, 'privilege'))
+        forwarded_elt = privilege_elt.addElement((NS_FORWARDED, 'forwarded'))
         priv_message['xmlns'] = 'jabber:client'
         forwarded_elt.addChild(priv_message)
         self.send(main_message)
@@ -452,6 +493,62 @@
             for pep_jid, node, item, item_access_model in last_items:
                 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])
 
+    ## IQ ##
+
+    async def sendIQ(
+        self,
+        priv_iq: domish.Element,
+        to: Optional[jid.JID] = None
+    ) -> domish.Element:
+        """Send privileged IQ stanza
+
+        @param priv_iq: privileged IQ stanza
+        @param to: bare jid of user on behalf of who the stanza is sent
+        The stanza will be wrapped and sent to the server. Result/Error stanza will sent
+        back as return value.
+        """
+        if to is None:
+            try:
+                to = jid.JID(priv_iq["from"])
+            except (KeyError, RuntimeError):
+                raise ValueError(
+                    'no "to" specified, and invalid "to" attribute in priv_iq'
+                )
+        if not to.user or to.resource or to.host != self.backend.server_jid.userhost():
+            raise NotAllowedError(
+                f'"to" attribute must be set to a bare jid of the server, {to} is invalid'
+            )
+        iq_type = priv_iq.getAttribute("type")
+        if iq_type not in ("get", "set"):
+            raise ValueError(f"invalid IQ type: {priv_iq.toXml()}")
+        first_child = priv_iq.firstChildElement()
+        iq_perms: Optional[dict] = self._permissions[PERM_IQ]
+
+        if ((not iq_perms or first_child is None or first_child.uri is None
+             or not iq_perms.get(first_child.uri, {}).get(iq_type, False))):
+            raise NotAllowedError(
+                "privileged IQ stanza not allowed for this namespace/type combination "
+                f"{priv_iq.toXml()}"
+            )
+
+        main_iq = xmlstream.IQ(self.xmlstream, iq_type)
+        main_iq.timeout = 120
+        privileged_iq_elt = main_iq.addElement((NS_PRIV_ENT, "privileged_iq"))
+        priv_iq['xmlns'] = 'jabber:client'
+        privileged_iq_elt.addChild(priv_iq)
+        ret_elt = await main_iq.send(to.full())
+        # we unwrap the result
+        for name, ns in (
+                ("privilege", NS_PRIV_ENT),
+                ("forwarded", NS_FORWARDED),
+                ("iq", "jabber:client")
+        ):
+            try:
+                ret_elt = next(ret_elt.elements(ns, name))
+            except StopIteration:
+                raise ValueError(f"Invalid privileged IQ result: {ret_elt.toXml()}")
+        return ret_elt
+
     ## misc ##
 
     async def getAutoSubscribers(
--- a/sat_pubsub/pubsub_admin.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/pubsub_admin.py	Wed May 11 13:39:08 2022 +0200
@@ -92,13 +92,13 @@
         for item in publish_elt.elements(pubsub.NS_PUBSUB, 'item'):
             try:
                 requestor = jid.JID(item.attributes.pop('publisher'))
+            except KeyError:
+                requestor = from_jid
             except Exception as e:
                 log.msg("WARNING: invalid jid in publisher ({requestor}): {msg}"
                     .format(requestor=requestor, msg=e))
                 self.sendError(iq_elt)
                 return
-            except KeyError:
-                requestor = from_jid
 
             # we don't use a DeferredList because we want to be sure that
             # each request is done in order
--- a/twisted/plugins/pubsub.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/twisted/plugins/pubsub.py	Wed May 11 13:39:08 2022 +0200
@@ -232,6 +232,7 @@
         from sat_pubsub.backend import BackendService, ExtraDiscoHandler
         from sat_pubsub.privilege import PrivilegesHandler
         from sat_pubsub.delegation import DelegationsHandler
+        from sat_pubsub.pam import PAMHandler
 
         if not config['jid'] or not config['xmpp_pwd']:
             raise usage.UsageError("You must specify jid and xmpp_pwd")
@@ -290,6 +291,10 @@
         ph.setHandlerParent(cs)
         bs.privilege = ph
 
+        pam = PAMHandler(config["jid"])
+        pam.setHandlerParent(cs)
+        bs.pam = pam
+
         resource = IPubSubResource(bs)
         resource.hideNodes = config["hide-nodes"]
         resource.serviceJID = config["jid"]