diff sat_pubsub/privilege.py @ 463:f520ac3164b0

privilege: improvment on last message sending on presence with `+notify`: - local entities subscribed to the presence of an other local entity which is connecting are now added to presence map. This helps getting their notification even if they didn't connect recently - nodes with `presence` access model are now also used for `+notify` - notifications are not sent anymore in case of status change if the resource was already present.
author Goffi <goffi@goffi.org>
date Fri, 15 Oct 2021 13:40:56 +0200
parents a017af61a32b
children d86e0f8a1405
line wrap: on
line diff
--- a/sat_pubsub/privilege.py	Fri Oct 15 09:32:07 2021 +0200
+++ b/sat_pubsub/privilege.py	Fri Oct 15 13:40:56 2021 +0200
@@ -19,8 +19,8 @@
 "This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and "
 "presences"
 
-# This module implements XEP-0356 (Privileged Entity) to manage rosters, messages and presences
-
+import time
+from typing import List, Set
 from wokkel import xmppim
 from wokkel.compat import IQ
 from wokkel import pubsub
@@ -31,7 +31,6 @@
 from twisted.internet import defer
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid, error
-import time
 
 FORWARDED_NS = 'urn:xmpp:forward:0'
 PRIV_ENT_NS = 'urn:xmpp:privilege:1'
@@ -43,7 +42,11 @@
 ALLOWED_ROSTER = ('none', 'get', 'set', 'both')
 ALLOWED_MESSAGE = ('none', 'outgoing')
 ALLOWED_PRESENCE = ('none', 'managed_entity', 'roster')
-TO_CHECK = {PERM_ROSTER:ALLOWED_ROSTER, PERM_MESSAGE:ALLOWED_MESSAGE, PERM_PRESENCE:ALLOWED_PRESENCE}
+TO_CHECK = {
+    PERM_ROSTER:ALLOWED_ROSTER,
+    PERM_MESSAGE:ALLOWED_MESSAGE,
+    PERM_PRESENCE:ALLOWED_PRESENCE
+}
 
 
 class InvalidStanza(Exception):
@@ -53,8 +56,8 @@
     pass
 
 class PrivilegesHandler(disco.DiscoClientProtocol):
-    #FIXME: need to manage updates, and database sync
-    #TODO: cache
+    # FIXME: need to manage updates, XEP-0356 must be updated to get roster pushes
+    # TODO: cache
 
     def __init__(self, service_jid):
         super(PrivilegesHandler, self).__init__()
@@ -68,7 +71,12 @@
         # notify (notify)
         self.hash_map = {}
         self.roster_cache = {}  # key: jid, value: dict with "timestamp" and "roster"
-        self.presence_map = {}  # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both")
+        # key: jid, value: set of entities who need to receive a notification when we
+        #   get a presence from them. All entities in value have a presence subscription
+        #   to the key entity.
+        self.presence_map = {}
+        # resource currently online
+        self.presences = set()
 
     @property
     def permissions(self):
@@ -97,18 +105,26 @@
                 perm_type = perm_elt['type']
                 try:
                     if perm_type not in TO_CHECK[perm_access]:
-                        raise InvalidStanza('bad type [{}] for permission {}'.format(perm_type, perm_access))
+                        raise InvalidStanza(
+                            'bad type [{}] for permission {}'
+                            .format(perm_type, perm_access)
+                        )
                 except KeyError:
                     raise InvalidStanza('bad permission [{}]'.format(perm_access))
             except InvalidStanza as e:
-                log.msg("Invalid stanza received ({}), setting permission to none".format(e))
+                log.msg(
+                    f"Invalid stanza received ({e}), setting permission to none"
+                )
                 for perm in self._permissions:
                     self._permissions[perm] = 'none'
                 break
 
             self._permissions[perm_access] = perm_type or 'none'
 
-        log.msg('Privileges updated: roster={roster}, message={message}, presence={presence}'.format(**self._permissions))
+        log.msg(
+            'Privileges updated: roster={roster}, message={message}, presence={presence}'
+            .format(**self._permissions)
+        )
 
     ## roster ##
 
@@ -249,13 +265,30 @@
                                                 }
             for roster_jid, roster_item in roster.items():
                 if roster_item.subscriptionFrom:
+                    # we need to know who is subscribed to our user, to send them
+                    # notifications when they send presence to us
                     self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
+                if ((roster_item.subscriptionTo
+                     and jid.JID(roster_jid.host) == self.backend.server_jid)):
+                    # we also need to know who on this server we are subscribed to, so
+                    # we can get their notifications even if they didn't connect so far.
+                    self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid)
 
         presence_type = presence_elt.getAttribute('type')
-        if presence_type != "unavailable":
-            # new resource available, we check entity capabilities
+        if presence_type == "unavailable":
+            self.presences.discard(from_jid)
+        elif from_jid not in self.presences:
+            # new resource available
+
+            # we keep resources present in cache to avoid sending notifications on each
+            # status change
+            self.presences.add(from_jid)
+
+            # we check entity capabilities
             try:
-                c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c'))
+                c_elt = next(
+                    presence_elt.elements('http://jabber.org/protocol/caps', 'c')
+                )
                 hash_ = c_elt['hash']
                 ver = c_elt['ver']
             except (StopIteration, KeyError):
@@ -268,7 +301,7 @@
             if disco_tuple not in self.hash_map:
                 # first time we se this hash, what is behind it?
                 try:
-                    infos = yield self.requestInfo(from_jid)
+                    infos = await self.requestInfo(from_jid)
                 except error.StanzaError as e:
                     log.msg(
                         f"WARNING: can't request disco info for {from_jid!r} (presence: "
@@ -300,24 +333,35 @@
             )
 
             # FIXME: add "presence" access_model (for node) for getLastItems
-            last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True)
-            # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last
+            # TODO: manage other access model (whitelist, …)
+            last_items = await self.backend.storage.getLastItems(
+                publishers,
+                nodes,
+                ('open', 'presence'), ('open', 'presence'), True
+            )
+            # we send message with last item, as required by
+            # https://xmpp.org/extensions/xep-0163.html#notify-last
             for pep_jid, node, item, item_access_model in last_items:
                 self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])
 
     ## misc ##
 
-    @defer.inlineCallbacks
-    def getAutoSubscribers(self, recipient, nodeIdentifier, explicit_subscribers):
-        """get automatic subscribers, i.e. subscribers with presence subscription and +notify for this node
+    async def getAutoSubscribers(
+        self,
+        recipient: jid.JID,
+        nodeIdentifier: str,
+        explicit_subscribers: Set[jid.JID]
+    ) -> List[jid.JID]:
+        """Get automatic subscribers
 
-        @param recipient(jid.JID): jid of the PEP owner of this node
-        @param nodeIdentifier(unicode): node
-        @param explicit_subscribers(set(jid.JID}: jids of people which have an explicit subscription
-        @return (list[jid.JID]): full jid of automatically subscribed entities
+        Get subscribers with presence subscription and +notify for this node
+        @param recipient: jid of the PEP owner of this node
+        @param nodeIdentifier: node
+        @param explicit_subscribers: jids of people which have an explicit subscription
+        @return: full jid of automatically subscribed entities
         """
         auto_subscribers = []
-        roster = yield self.getRoster(recipient)
+        roster = await self.getRoster(recipient)
         for roster_jid, roster_item in roster.items():
             if roster_jid in explicit_subscribers:
                 continue
@@ -331,4 +375,4 @@
                      if nodeIdentifier in notify:
                          full_jid = jid.JID(tuple=(roster_jid.user, roster_jid.host, res))
                          auto_subscribers.append(full_jid)
-        defer.returnValue(auto_subscribers)
+        return auto_subscribers