changeset 467:d86e0f8a1405

privilege: store roster cache in database: - rosters are now stored on database and restored on startup. This way, presence map can be restored without the need to wait for all contact to send presence again - roster version are checked, if a new version is received, presence map is updated accordingly - roster are not retrieved if presence are received in a too short delay (see ROSTER_TTL), to avoid using too much resources if a client connect/disconnect a lot The current behaviour works around XEP-0356 limitations. An update of the XEP will be needed to get roster pushes and roster version.
author Goffi <goffi@goffi.org>
date Fri, 15 Oct 2021 15:30:18 +0200
parents 0d38c3529972
children 445a2f67be4a
files sat_pubsub/backend.py sat_pubsub/pgsql_storage.py sat_pubsub/privilege.py
diffstat 3 files changed, 193 insertions(+), 53 deletions(-) [+]
line wrap: on
line diff
--- a/sat_pubsub/backend.py	Fri Oct 15 13:40:59 2021 +0200
+++ b/sat_pubsub/backend.py	Fri Oct 15 15:30:18 2021 +0200
@@ -955,7 +955,7 @@
             if not roster[requestor].subscriptionFrom:
                 raise error.Forbidden()
 
-        d = self.getOwnerRoster(node)
+        d = defer.ensureDeferred(self.getOwnerRoster(node))
         d.addCallback(gotRoster)
         return d
 
@@ -966,7 +966,7 @@
         @param node(Node): node to check
         @param requestor(jid.JID): entity who want to access node
         """
-        roster = yield self.getOwnerRoster(node)
+        roster = yield defer.ensureDeferred(self.getOwnerRoster(node))
 
         if roster is None:
             raise error.Forbidden()
@@ -1077,11 +1077,10 @@
         d.addCallback(lambda items_data: [item_data.item for item_data in items_data])
         return d
 
-    @defer.inlineCallbacks
-    def getOwnerRoster(self, node, owners=None):
+    async def getOwnerRoster(self, node, owners=None):
         # FIXME: roster of publisher, not owner, must be used
         if owners is None:
-            owners = yield node.getOwners()
+            owners = await node.getOwners()
 
         if len(owners) != 1:
             log.msg('publisher-roster access is not allowed with more than 1 owner')
@@ -1090,13 +1089,13 @@
         owner_jid = owners[0]
 
         try:
-            roster = yield self.privilege.getRoster(owner_jid)
+            roster = await self.privilege.getRoster(owner_jid)
         except Exception as e:
             log.msg("Error while getting roster of {owner_jid}: {msg}".format(
                 owner_jid = owner_jid.full(),
                 msg = e))
             return
-        defer.returnValue(roster)
+        return roster
 
     async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None,
                        itemIdentifiers=None, ext_data=None):
--- a/sat_pubsub/pgsql_storage.py	Fri Oct 15 13:40:59 2021 +0200
+++ b/sat_pubsub/pgsql_storage.py	Fri Oct 15 15:30:18 2021 +0200
@@ -51,12 +51,14 @@
 
 
 import copy, logging
+from datetime import datetime, timezone
 
 from zope.interface import implementer
 
 from twisted.internet import reactor
 from twisted.internet import defer
 from twisted.words.protocols.jabber import jid
+from twisted.words.xish import domish
 from twisted.python import log
 
 from wokkel import generic
@@ -478,6 +480,38 @@
         d.addCallback(self.formatLastItems)
         return d
 
+    def getRosterCache(self):
+        return self.dbpool.runInteraction(self._getRosterCache)
+
+    def _getRosterCache(self, cursor):
+        cursor.execute(
+            "SELECT roster_id, jid, version, updated, roster::text FROM roster"
+        )
+        return [(r.roster_id, jid.JID(r.jid), r.version, r.updated.timestamp(),
+                parseXml(r.roster)) for r in cursor.fetchall()]
+
+    def setRosterCache(
+            self,
+            jid_: jid.JID,
+            version: str,
+            updated: int,
+            roster: domish.Element
+    ):
+        return self.dbpool.runInteraction(self._setRosterCache, jid_, version, updated, roster)
+
+    def _setRosterCache(self, cursor, jid_, version, updated, roster):
+        cursor.execute(
+            "INSERT INTO roster(jid, version, updated, roster) VALUES (%s, %s, %s, %s) "
+            "ON CONFLICT (jid) DO UPDATE SET version=EXCLUDED.version, "
+            "updated=EXCLUDED.updated, roster=EXCLUDED.roster",
+            (
+                jid_.userhost(),
+                version,
+                datetime.fromtimestamp(updated, tz=timezone.utc),
+                roster.toXml()
+            )
+        )
+
 
 @implementer(iidavoll.INode)
 class Node:
--- a/sat_pubsub/privilege.py	Fri Oct 15 13:40:59 2021 +0200
+++ b/sat_pubsub/privilege.py	Fri Oct 15 15:30:18 2021 +0200
@@ -20,7 +20,8 @@
 "presences"
 
 import time
-from typing import List, Set
+from typing import Optional, Dict, List, Set
+from datetime import datetime, timezone
 from wokkel import xmppim
 from wokkel.compat import IQ
 from wokkel import pubsub
@@ -48,6 +49,14 @@
     PERM_PRESENCE:ALLOWED_PRESENCE
 }
 
+# Number of seconds before a roster cache is not considered valid anymore.
+# We keep this delay to avoid requesting roster too much in a row if an entity is
+# connecting/disconnecting often in a short time.
+ROSTER_TTL = 3600
+
+
+Roster = Dict[jid.JID, xmppim.RosterItem]
+
 
 class InvalidStanza(Exception):
     pass
@@ -70,7 +79,9 @@
         # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to
         # notify (notify)
         self.hash_map = {}
-        self.roster_cache = {}  # key: jid, value: dict with "timestamp" and "roster"
+        # dict which will be filled from database once connection is initialized,
+        # key: jid, value: dict with "timestamp" and "roster"
+        self.roster_cache = None
         # key: jid, value: set of entities who need to receive a notification when we
         #   get a presence from them. All entities in value have a presence subscription
         #   to the key entity.
@@ -82,6 +93,17 @@
     def permissions(self):
         return self._permissions
 
+    async def getRosterCacheFromDB(self):
+        rows = await self.backend.storage.getRosterCache()
+        for __, owner_jid, version, timestamp, roster_elt in rows:
+            roster = self.getRosterFromElement(roster_elt)
+            self.roster_cache[owner_jid] = {
+                "timestamp": timestamp,
+                "roster": roster,
+                "version": version
+            }
+            self.updatePresenceMap(owner_jid, roster, None)
+
     def connectionInitialized(self):
         for handler in self.parent.handlers:
             if IPubSubService.providedBy(handler):
@@ -90,6 +112,9 @@
         self.backend = self.parent.parent.getServiceNamed('backend')
         self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise)
         self.xmlstream.addObserver('/presence', self._onPresence)
+        if self.roster_cache is None:
+            self.roster_cache = {}
+            defer.ensureDeferred(self.getRosterCacheFromDB())
 
     def onAdvertise(self, message):
         """Managage the <message/> advertising privileges
@@ -128,50 +153,143 @@
 
     ## roster ##
 
-    def getRoster(self, to_jid):
+    def updatePresenceMap(
+        self,
+        owner_jid: jid.JID,
+        roster: Roster,
+        old_roster: Optional[Roster]
+    ) -> None:
+        """Update ``self.presence_map`` from roster
+
+        @param owner_jid: jid of the owner of the roster
+        @param roster: roster dict as returned by self.getRoster
+        @param old_roster: previously cached roster if any
         """
-        Retrieve contact list.
+        if old_roster is not None:
+            # we check if presence subscription have not been removed and update
+            # presence_map accordingly
+            for roster_jid, roster_item in old_roster.items():
+                if ((roster_item.subscriptionFrom
+                     and (roster_jid not in roster
+                          or not roster[roster_jid].subscriptionFrom)
+                     )):
+                    try:
+                        self.presence_map[roster_jid].discard(owner_jid)
+                    except KeyError:
+                        pass
+                if ((roster_item.subscriptionTo
+                     and (roster_jid not in roster
+                          or not roster[roster_jid].subscriptionTo)
+                     )):
+                    try:
+                        self.presence_map[owner_jid].discard(roster_jid)
+                    except KeyError:
+                        pass
+
+        for roster_jid, roster_item in roster.items():
+            if roster_item.subscriptionFrom:
+                # we need to know who is subscribed to our user, to send them
+                # notifications when they send presence to us
+                self.presence_map.setdefault(roster_jid, set()).add(owner_jid)
+            if ((roster_item.subscriptionTo
+                 and jid.JID(roster_jid.host) == self.backend.server_jid)):
+                # we also need to know who on this server we are subscribed to, so
+                # we can get their notifications even if they didn't connect so far.
+                self.presence_map.setdefault(owner_jid, set()).add(roster_jid)
 
-        @return: Roster as a mapping from L{JID} to L{RosterItem}.
-        @rtype: L{twisted.internet.defer.Deferred}
+    def serialiseRoster(
+        self,
+        roster: Roster,
+        version: Optional[str] = None
+    ) -> domish.Element:
+        """Reconstruct Query element of the roster"""
+        roster_elt = domish.Element((ROSTER_NS, "query"))
+        if version:
+            roster_elt["ver"] = version
+        for item in roster.values():
+            roster_elt.addChild(item.toElement())
+        return roster_elt
+
+    async def updateRosterCache(
+        self,
+        owner_jid: jid.JID,
+        roster: Roster,
+        version: str
+    ) -> None:
+        """Update local roster cache and database"""
+        now = time.time()
+        self.roster_cache[owner_jid] = {
+            'timestamp': now,
+            'roster': roster,
+            'version': version
+        }
+        roster_elt = self.serialiseRoster(roster, version)
+        await self.backend.storage.setRosterCache(
+            owner_jid, version, now, roster_elt
+        )
+
+    def getRosterFromElement(self, query_elt: domish.Element) -> Roster:
+        """Parse roster query result payload to get a Roster dict"""
+        roster = {}
+        for element in query_elt.elements(ROSTER_NS, 'item'):
+            item = xmppim.RosterItem.fromElement(element)
+            roster[item.entity] = item
+        return roster
+
+    async def getRoster(self, to_jid: jid.JID) -> Roster:
+        """Retrieve contact list.
+
+        @param to_jid: jid of the entity owning the roster
+        @return: roster data
         """
-        # TODO: cache results
         if self._permissions[PERM_ROSTER] not in ('get', 'both'):
             log.msg("WARNING: permission not allowed to get roster")
             raise failure.Failure(NotAllowedError('roster get is not allowed'))
 
-        def processRoster(result):
-            roster = {}
-            for element in result.query.elements(ROSTER_NS, 'item'):
-                item = xmppim.RosterItem.fromElement(element)
-                roster[item.entity] = item
-
-            return roster
-
         iq = IQ(self.xmlstream, 'get')
         iq.addElement((ROSTER_NS, 'query'))
         iq["to"] = to_jid.userhost()
-        d = iq.send()
-        d.addCallback(processRoster)
-        return d
+        iq_result = await iq.send()
+        roster = self.getRosterFromElement(iq_result.query)
 
-    def _isSubscribedFrom(self, roster, entity, roster_owner_jid):
+        version = iq_result.query.getAttribute('ver')
+        cached_roster = self.roster_cache.get("to_jid")
+        if not cached_roster:
+            self.updatePresenceMap(to_jid, roster, None)
+            await self.updateRosterCache(to_jid, roster, version)
+        else:
+            # we already have a roster in cache, we have to check it if the new one is
+            # modified, and update presence_map and database
+            if version:
+                if cached_roster["version"] != version:
+                    self.updatePresenceMap(to_jid, roster, cached_roster["roster"])
+                    await self.updateRosterCache(to_jid, roster, version)
+                else:
+                    cached_roster["timestamp"] = time.time()
+            else:
+                # no version available, we have to compare the whole XML
+                if ((self.serialiseRoster(cached_roster["roster"]).toXml() !=
+                     self.serialiseRoster(roster))):
+                    self.updatePresenceMap(to_jid, roster, cached_roster["roster"])
+                    await self.updateRosterCache(to_jid, roster, version)
+                else:
+                    cached_roster["timestamp"] = time.time()
+
+        return roster
+
+    async def isSubscribedFrom(self, entity: jid.JID, roster_owner_jid: jid.JID) -> bool:
+        """Check if entity has presence subscription from roster_owner_jid
+
+        @param entity: entity to check subscription to
+        @param roster_owner_jid: owner of the roster to check
+        @return: True if entity has a subscription from roster_owner_jid
+        """
+        roster = await self.getRoster(roster_owner_jid)
         try:
             return roster[entity.userhostJID()].subscriptionFrom
         except KeyError:
             return False
 
-    def isSubscribedFrom(self, entity, roster_owner_jid):
-        """Check if entity has presence subscription from roster_owner_jid
-
-        @param entity(jid.JID): entity to check subscription to
-        @param roster_owner_jid(jid.JID): owner of the roster to check
-        @return D(bool): True if entity has a subscription from roster_owner_jid
-        """
-        d = self.getRoster(roster_owner_jid)
-        d.addCallback(self._isSubscribedFrom, entity, roster_owner_jid)
-        return d
-
     ## message ##
 
     def sendMessage(self, priv_message, to_jid=None):
@@ -257,22 +375,11 @@
         from_jid = jid.JID(presence_elt['from'])
         from_jid_bare = from_jid.userhostJID()
         if ((jid.JID(from_jid.host) == self.backend.server_jid
-             and from_jid_bare not in self.roster_cache)):
-            roster = await self.getRoster(from_jid_bare)
-            timestamp = time.time()
-            self.roster_cache[from_jid_bare] = {'timestamp': timestamp,
-                                                'roster': roster,
-                                                }
-            for roster_jid, roster_item in roster.items():
-                if roster_item.subscriptionFrom:
-                    # we need to know who is subscribed to our user, to send them
-                    # notifications when they send presence to us
-                    self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
-                if ((roster_item.subscriptionTo
-                     and jid.JID(roster_jid.host) == self.backend.server_jid)):
-                    # we also need to know who on this server we are subscribed to, so
-                    # we can get their notifications even if they didn't connect so far.
-                    self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid)
+             and (
+                 from_jid_bare not in self.roster_cache
+                 or time.time()-self.roster_cache[from_jid_bare]["timestamp"]>ROSTER_TTL
+             ))):
+            roster = await self.getRoster(from_jid)
 
         presence_type = presence_elt.getAttribute('type')
         if presence_type == "unavailable":