changeset 463:f520ac3164b0

privilege: improvment on last message sending on presence with `+notify`: - local entities subscribed to the presence of an other local entity which is connecting are now added to presence map. This helps getting their notification even if they didn't connect recently - nodes with `presence` access model are now also used for `+notify` - notifications are not sent anymore in case of status change if the resource was already present.
author Goffi <goffi@goffi.org>
date Fri, 15 Oct 2021 13:40:56 +0200
parents a017af61a32b
children 391aa65f72b2
files CHANGELOG sat_pubsub/backend.py sat_pubsub/pgsql_storage.py sat_pubsub/privilege.py
diffstat 4 files changed, 165 insertions(+), 88 deletions(-) [+]
line wrap: on
line diff
--- a/CHANGELOG	Fri Oct 15 09:32:07 2021 +0200
+++ b/CHANGELOG	Fri Oct 15 13:40:56 2021 +0200
@@ -10,6 +10,7 @@
     - service name can now be specified with "service_name" parameter
     - PEP is now working for server itself, making it usable for Service Outage Status (XEP-0455)
     - namespace delegation update to v0.5 ("urn:xmpp:delegation:2" is now used)
+    - better handling of "+notify" for PEP
     - bug fixes
 
 v 0.3.0 (16/08/2019)
--- a/sat_pubsub/backend.py	Fri Oct 15 09:32:07 2021 +0200
+++ b/sat_pubsub/backend.py	Fri Oct 15 13:40:56 2021 +0200
@@ -62,7 +62,7 @@
 
 import copy
 import uuid
-from typing import Optional
+from typing import Optional, List, Tuple
 
 from zope.interface import implementer
 
@@ -277,12 +277,11 @@
             allowed_accesses = {'open', 'presence', 'whitelist'}
         return self.storage.getNodeIds(pep, recipient, allowed_accesses)
 
-    def getNodes(self, requestor, pep, recipient):
+    async def getNodes(self, requestor, pep, recipient):
         if pep:
-            d = self.privilege.isSubscribedFrom(requestor, recipient)
-            d.addCallback(self._getNodesIds, pep, recipient)
-            return d
-        return self.storage.getNodeIds(pep, recipient)
+            subscribed = await self.privilege.isSubscribedFrom(requestor, recipient)
+            return await self._getNodesIds(subscribed, pep, recipient)
+        return await self.storage.getNodeIds(pep, recipient)
 
     def getNodeMetaData(self, nodeIdentifier, pep, recipient=None):
         # FIXME: manage pep and recipient
@@ -1041,17 +1040,32 @@
 
         defer.returnValue((affiliation, owner, roster, access_model))
 
-    @defer.inlineCallbacks
-    def getItemsIds(self, nodeIdentifier, requestor, authorized_groups, unrestricted, maxItems=None, ext_data=None, pep=False, recipient=None):
+    async def getItemsIds(
+        self,
+        nodeIdentifier: str,
+        requestor: jid.JID,
+        authorized_groups: Optional[List[str]],
+        unrestricted: bool,
+        maxItems: Optional[int] = None,
+        ext_data: Optional[dict] = None,
+        pep: bool = False,
+        recipient: Optional[jid.JID] = None
+    ) -> List[str]:
+        """Retrieve IDs of items from a nodeIdentifier
+
+        Requestor permission is checked before retrieving items IDs
+        """
         # FIXME: items access model are not checked
         # TODO: check items access model
-        node = yield self.storage.getNode(nodeIdentifier, pep, recipient)
-        affiliation, owner, roster, access_model = yield self.checkNodeAccess(node, requestor)
-        ids = yield node.getItemsIds(authorized_groups,
-                                     unrestricted,
-                                     maxItems,
-                                     ext_data)
-        defer.returnValue(ids)
+        node = await self.storage.getNode(nodeIdentifier, pep, recipient)
+        await self.checkNodeAccess(node, requestor)
+        ids = await node.getItemsIds(
+            authorized_groups,
+            unrestricted,
+            maxItems,
+            ext_data
+        )
+        return ids
 
     def getItems(self, nodeIdentifier, requestor, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
@@ -1455,14 +1469,18 @@
         else:
             return item
 
-    @defer.inlineCallbacks
-    def _notifyPublish(self, data):
+    def _notifyPublish(self, data: dict) -> defer.Deferred:
+        return defer.ensureDeferred(self.notifyPublish(data))
+
+    async def notifyPublish(self, data: dict) -> None:
         items_data = data['items_data']
         node = data['node']
         pep = data['pep']
         recipient = data['recipient']
 
-        owners, notifications_filtered = yield self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient)
+        owners, notifications_filtered = await self._prepareNotify(
+            items_data, node, data.get('subscription'), pep, recipient
+        )
 
         # we notify the owners
         # FIXME: check if this comply with XEP-0060 (option needed ?)
@@ -1478,83 +1496,95 @@
                  [self.getFullItem(item_data) for item_data in items_data]))
 
         if pep:
-            defer.returnValue(self.backend.privilege.notifyPublish(
+            self.backend.privilege.notifyPublish(
                 recipient,
                 node.nodeIdentifier,
-                notifications_filtered))
+                notifications_filtered
+            )
 
         else:
-            defer.returnValue(self.pubsubService.notifyPublish(
+            self.pubsubService.notifyPublish(
                 self.serviceJID,
                 node.nodeIdentifier,
-                notifications_filtered))
+                notifications_filtered)
 
-    def _notifyRetract(self, data):
+    def _notifyRetract(self, data: dict) -> defer.Deferred:
+        return defer.ensureDeferred(self.notifyRetract(data))
+
+    async def notifyRetract(self, data: dict) -> None:
         items_data = data['items_data']
         node = data['node']
         pep = data['pep']
         recipient = data['recipient']
 
-        def afterPrepare(result):
-            owners, notifications_filtered = result
-            #we add the owners
+        owners, notifications_filtered = await self._prepareNotify(
+            items_data, node, data.get('subscription'), pep, recipient
+        )
+
+        #we add the owners
 
-            for owner_jid in owners:
-                notifications_filtered.append(
-                    (owner_jid,
-                     {pubsub.Subscription(node.nodeIdentifier,
-                                          owner_jid,
-                                          'subscribed')},
-                     [item_data.item for item_data in items_data]))
+        for owner_jid in owners:
+            notifications_filtered.append(
+                (owner_jid,
+                 {pubsub.Subscription(node.nodeIdentifier,
+                                      owner_jid,
+                                      'subscribed')},
+                 [item_data.item for item_data in items_data]))
 
-            if pep:
-                return self.backend.privilege.notifyRetract(
-                    recipient,
-                    node.nodeIdentifier,
-                    notifications_filtered)
+        if pep:
+            return self.backend.privilege.notifyRetract(
+                recipient,
+                node.nodeIdentifier,
+                notifications_filtered)
 
-            else:
-                return self.pubsubService.notifyRetract(
-                    self.serviceJID,
-                    node.nodeIdentifier,
-                    notifications_filtered)
+        else:
+            return self.pubsubService.notifyRetract(
+                self.serviceJID,
+                node.nodeIdentifier,
+                notifications_filtered)
 
-        d = self._prepareNotify(items_data, node, data.get('subscription'), pep, recipient)
-        d.addCallback(afterPrepare)
-        return d
-
-    @defer.inlineCallbacks
-    def _prepareNotify(self, items_data, node, subscription=None, pep=None, recipient=None):
+    async def _prepareNotify(
+        self,
+        items_data: Tuple[domish.Element, str, dict],
+        node,
+        subscription: Optional[pubsub.Subscription] = None,
+        pep: bool = False,
+        recipient: Optional[jid.JID] = None
+    ) -> Tuple:
         """Do a bunch of permissions check and filter notifications
 
         The owner is not added to these notifications,
         it must be added by the calling method
-        @param items_data(tuple): must contain:
-            - item (domish.Element)
-            - access_model (unicode)
+        @param items_data: must contain:
+            - item
+            - access_model
             - access_list (dict as returned getItemsById, or item_config)
         @param node(LeafNode): node hosting the items
-        @param subscription(pubsub.Subscription, None): TODO
+        @param subscription: TODO
 
-        @return (tuple): will contain:
+        @return: will contain:
             - notifications_filtered
             - node_owner_jid
             - items_data
         """
         if subscription is None:
-            notifications = yield self.backend.getNotifications(node, items_data)
+            notifications = await self.backend.getNotifications(node, items_data)
         else:
             notifications = [(subscription.subscriber, [subscription], items_data)]
 
-        if pep and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence'):
+        if ((pep
+             and node.getConfiguration()[const.OPT_ACCESS_MODEL] in ('open', 'presence')
+           )):
             # for PEP we need to manage automatic subscriptions (cf. XEP-0163 §4)
             explicit_subscribers = {subscriber for subscriber, _, _ in notifications}
-            auto_subscribers = yield self.backend.privilege.getAutoSubscribers(recipient, node.nodeIdentifier, explicit_subscribers)
+            auto_subscribers = await self.backend.privilege.getAutoSubscribers(
+                recipient, node.nodeIdentifier, explicit_subscribers
+            )
             for sub_jid in auto_subscribers:
                  sub = pubsub.Subscription(node.nodeIdentifier, sub_jid, 'subscribed')
                  notifications.append((sub_jid, [sub], items_data))
 
-        owners = yield node.getOwners()
+        owners = await node.getOwners()
         owner_roster = None
 
         # now we check access of subscriber for each item, and keep only allowed ones
@@ -1585,7 +1615,7 @@
                 elif access_model == const.VAL_AMODEL_PUBLISHER_ROSTER:
                     if owner_roster is None:
                         # FIXME: publisher roster should be used, not owner
-                        owner_roster= yield self.backend.getOwnerRoster(node, owners)
+                        owner_roster= await self.backend.getOwnerRoster(node, owners)
                     if owner_roster is None:
                         owner_roster = {}
                     if not subscriber_bare in owner_roster:
@@ -1601,7 +1631,7 @@
             if allowed_items:
                 notifications_filtered.append((subscriber, subscriptions, allowed_items))
 
-        defer.returnValue((owners, notifications_filtered))
+        return (owners, notifications_filtered)
 
     async def _aNotifyDelete(self, data):
         nodeIdentifier = data['node'].nodeIdentifier
@@ -1753,7 +1783,7 @@
 
     def _publish_errb(self, failure, request):
         if failure.type == error.NodeNotFound and self.backend.supportsAutoCreate():
-            print("Auto-creating node %s" % (request.nodeIdentifier,))
+            print(f"Auto-creating node {request.nodeIdentifier}")
             d = self.backend.createNode(request.nodeIdentifier,
                                         request.sender,
                                         request.options,
--- a/sat_pubsub/pgsql_storage.py	Fri Oct 15 09:32:07 2021 +0200
+++ b/sat_pubsub/pgsql_storage.py	Fri Oct 15 13:40:56 2021 +0200
@@ -455,10 +455,12 @@
 
     def getLastItems(self, entities, nodes, node_accesses, item_accesses, pep):
         """get last item for several nodes and entities in a single request"""
+        # TODO: manage other access model (whitelist, …)
         if not entities or not nodes or not node_accesses or not item_accesses:
             raise BadRequest(text="entities, nodes and accesses must not be empty")
-        if node_accesses != ('open',) or item_accesses != ('open',):
-            raise NotImplementedError('only "open" access model is handled for now')
+        if any(not {"open", "presence"}.issuperset(a)
+               for a in (node_accesses, item_accesses)):
+            raise NotImplementedError('only "open" and "presence" access model are handled for now')
         if not pep:
             raise NotImplementedError("getLastItems is only implemented for PEP at the moment")
         d = self.dbpool.runQuery("""SELECT DISTINCT ON (node_id) pep, node, data::text, items.access_model
--- a/sat_pubsub/privilege.py	Fri Oct 15 09:32:07 2021 +0200
+++ b/sat_pubsub/privilege.py	Fri Oct 15 13:40:56 2021 +0200
@@ -19,8 +19,8 @@
 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and "
 "presences"
 
-# This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and presences
-
+import time
+from typing import List, Set
 from wokkel import xmppim
 from wokkel.compat import IQ
 from wokkel import pubsub
@@ -31,7 +31,6 @@
 from twisted.internet import defer
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid, error
-import time
 
 FORWARDED_NS = 'urn:xmpp:forward:0'
 PRIV_ENT_NS = 'urn:xmpp:privilege:1'
@@ -43,7 +42,11 @@
 ALLOWED_ROSTER = ('none', 'get', 'set', 'both')
 ALLOWED_MESSAGE = ('none', 'outgoing')
 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster')
-TO_CHECK = {PERM_ROSTER:ALLOWED_ROSTER, PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE}
+TO_CHECK = {
+    PERM_ROSTER:ALLOWED_ROSTER,
+    PERM_MESSAGE:ALLOWED_MESSAGE,
+    PERM_PRESENCE:ALLOWED_PRESENCE
+}
 
 
 class InvalidStanza(Exception):
@@ -53,8 +56,8 @@
     pass
 
 class PrivilegesHandler(disco.DiscoClientProtocol):
-    #FIXME: need to manage updates, and database sync
-    #TODO: cache
+    # FIXME: need to manage updates, XEP-0356 must be updated to get roster pushes
+    # TODO: cache
 
     def __init__(self, service_jid):
         super(PrivilegesHandler, self).__init__()
@@ -68,7 +71,12 @@
         # notify (notify)
         self.hash_map = {}
         self.roster_cache = {}  # key: jid, value: dict with "timestamp" and "roster"
-        self.presence_map = {}  # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both")
+        # key: jid, value: set of entities who need to receive a notification when we
+        #   get a presence from them. All entities in value have a presence subscription
+        #   to the key entity.
+        self.presence_map = {}
+        # resource currently online
+        self.presences = set()
 
     @property
     def permissions(self):
@@ -97,18 +105,26 @@
                 perm_type = perm_elt['type']
                 try:
                     if perm_type not in TO_CHECK[perm_access]:
-                        raise InvalidStanza('bad type [{}] for permission {}'.format(perm_type, perm_access))
+                        raise InvalidStanza(
+                            'bad type [{}] for permission {}'
+                            .format(perm_type, perm_access)
+                        )
                 except KeyError:
                     raise InvalidStanza('bad permission [{}]'.format(perm_access))
             except InvalidStanza as e:
-                log.msg("Invalid stanza received ({}), setting permission to none".format(e))
+                log.msg(
+                    f"Invalid stanza received ({e}), setting permission to none"
+                )
                 for perm in self._permissions:
                     self._permissions[perm] = 'none'
                 break
 
             self._permissions[perm_access] = perm_type or 'none'
 
-        log.msg('Privileges updated: roster={roster}, message={message}, presence={presence}'.format(**self._permissions))
+        log.msg(
+            'Privileges updated: roster={roster}, message={message}, presence={presence}'
+            .format(**self._permissions)
+        )
 
     ## roster ##
 
@@ -249,13 +265,30 @@
                                                 }
             for roster_jid, roster_item in roster.items():
                 if roster_item.subscriptionFrom:
+                    # we need to know who is subscribed to our user, to send them
+                    # notifications when they send presence to us
                     self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
+                if ((roster_item.subscriptionTo
+                     and jid.JID(roster_jid.host) == self.backend.server_jid)):
+                    # we also need to know who on this server we are subscribed to, so
+                    # we can get their notifications even if they didn't connect so far.
+                    self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid)
 
         presence_type = presence_elt.getAttribute('type')
-        if presence_type != "unavailable":
-            # new resource available, we check entity capabilities
+        if presence_type == "unavailable":
+            self.presences.discard(from_jid)
+        elif from_jid not in self.presences:
+            # new resource available
+
+            # we keep resources present in cache to avoid sending notifications on each
+            # status change
+            self.presences.add(from_jid)
+
+            # we check entity capabilities
             try:
-                c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c'))
+                c_elt = next(
+                    presence_elt.elements('http://jabber.org/protocol/caps', 'c')
+                )
                 hash_ = c_elt['hash']
                 ver = c_elt['ver']
             except (StopIteration, KeyError):
@@ -268,7 +301,7 @@
             if disco_tuple not in self.hash_map:
                 # first time we se this hash, what is behind it?
                 try:
-                    infos = yield self.requestInfo(from_jid)
+                    infos = await self.requestInfo(from_jid)
                 except error.StanzaError as e:
                     log.msg(
                         f"WARNING: can't request disco info for {from_jid!r} (presence: "
@@ -300,24 +333,35 @@
             )
 
             # FIXME: add "presence" access_model (for node) for getLastItems
-            last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True)
-            # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last
+            # TODO: manage other access model (whitelist, …)
+            last_items = await self.backend.storage.getLastItems(
+                publishers,
+                nodes,
+                ('open', 'presence'), ('open', 'presence'), True
+            )
+            # we send message with last item, as required by
+            # https://xmpp.org/extensions/xep-0163.html#notify-last
             for pep_jid, node, item, item_access_model in last_items:
                 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])
 
     ## misc ##
 
-    @defer.inlineCallbacks
-    def getAutoSubscribers(self, recipient, nodeIdentifier, explicit_subscribers):
-        """get automatic subscribers, i.e. subscribers with presence subscription and +notify for this node
+    async def getAutoSubscribers(
+        self,
+        recipient: jid.JID,
+        nodeIdentifier: str,
+        explicit_subscribers: Set[jid.JID]
+    ) -> List[jid.JID]:
+        """Get automatic subscribers
 
-        @param recipient(jid.JID): jid of the PEP owner of this node
-        @param nodeIdentifier(unicode): node
-        @param explicit_subscribers(set(jid.JID}: jids of people which have an explicit subscription
-        @return (list[jid.JID]): full jid of automatically subscribed entities
+        Get subscribers with presence subscription and +notify for this node
+        @param recipient: jid of the PEP owner of this node
+        @param nodeIdentifier: node
+        @param explicit_subscribers: jids of people which have an explicit subscription
+        @return: full jid of automatically subscribed entities
         """
         auto_subscribers = []
-        roster = yield self.getRoster(recipient)
+        roster = await self.getRoster(recipient)
         for roster_jid, roster_item in roster.items():
             if roster_jid in explicit_subscribers:
                 continue
@@ -331,4 +375,4 @@
                      if nodeIdentifier in notify:
                          full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res))
                          auto_subscribers.append(full_jid)
-        defer.returnValue(auto_subscribers)
+        return auto_subscribers