Mercurial > libervia-pubsub
changeset 467:d86e0f8a1405
privilege: store roster cache in database:
- rosters are now stored on database and restored on startup. This way, presence map can
be restored without the need to wait for all contact to send presence again
- roster version are checked, if a new version is received, presence map is updated
accordingly
- roster are not retrieved if presence are received in a too short delay (see ROSTER_TTL),
to avoid using too much resources if a client connect/disconnect a lot
The current behaviour works around XEP-0356 limitations. An update of the XEP will be
needed to get roster pushes and roster version.
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 15 Oct 2021 15:30:18 +0200 |
parents | 0d38c3529972 |
children | 445a2f67be4a |
files | sat_pubsub/backend.py sat_pubsub/pgsql_storage.py sat_pubsub/privilege.py |
diffstat | 3 files changed, 193 insertions(+), 53 deletions(-) [+] |
line wrap: on
line diff
--- a/sat_pubsub/backend.py Fri Oct 15 13:40:59 2021 +0200 +++ b/sat_pubsub/backend.py Fri Oct 15 15:30:18 2021 +0200 @@ -955,7 +955,7 @@ if not roster[requestor].subscriptionFrom: raise error.Forbidden() - d = self.getOwnerRoster(node) + d = defer.ensureDeferred(self.getOwnerRoster(node)) d.addCallback(gotRoster) return d @@ -966,7 +966,7 @@ @param node(Node): node to check @param requestor(jid.JID): entity who want to access node """ - roster = yield self.getOwnerRoster(node) + roster = yield defer.ensureDeferred(self.getOwnerRoster(node)) if roster is None: raise error.Forbidden() @@ -1077,11 +1077,10 @@ d.addCallback(lambda items_data: [item_data.item for item_data in items_data]) return d - @defer.inlineCallbacks - def getOwnerRoster(self, node, owners=None): + async def getOwnerRoster(self, node, owners=None): # FIXME: roster of publisher, not owner, must be used if owners is None: - owners = yield node.getOwners() + owners = await node.getOwners() if len(owners) != 1: log.msg('publisher-roster access is not allowed with more than 1 owner') @@ -1090,13 +1089,13 @@ owner_jid = owners[0] try: - roster = yield self.privilege.getRoster(owner_jid) + roster = await self.privilege.getRoster(owner_jid) except Exception as e: log.msg("Error while getting roster of {owner_jid}: {msg}".format( owner_jid = owner_jid.full(), msg = e)) return - defer.returnValue(roster) + return roster async def getItemsData(self, nodeIdentifier, requestor, recipient, maxItems=None, itemIdentifiers=None, ext_data=None):
--- a/sat_pubsub/pgsql_storage.py Fri Oct 15 13:40:59 2021 +0200 +++ b/sat_pubsub/pgsql_storage.py Fri Oct 15 15:30:18 2021 +0200 @@ -51,12 +51,14 @@ import copy, logging +from datetime import datetime, timezone from zope.interface import implementer from twisted.internet import reactor from twisted.internet import defer from twisted.words.protocols.jabber import jid +from twisted.words.xish import domish from twisted.python import log from wokkel import generic @@ -478,6 +480,38 @@ d.addCallback(self.formatLastItems) return d + def getRosterCache(self): + return self.dbpool.runInteraction(self._getRosterCache) + + def _getRosterCache(self, cursor): + cursor.execute( + "SELECT roster_id, jid, version, updated, roster::text FROM roster" + ) + return [(r.roster_id, jid.JID(r.jid), r.version, r.updated.timestamp(), + parseXml(r.roster)) for r in cursor.fetchall()] + + def setRosterCache( + self, + jid_: jid.JID, + version: str, + updated: int, + roster: domish.Element + ): + return self.dbpool.runInteraction(self._setRosterCache, jid_, version, updated, roster) + + def _setRosterCache(self, cursor, jid_, version, updated, roster): + cursor.execute( + "INSERT INTO roster(jid, version, updated, roster) VALUES (%s, %s, %s, %s) " + "ON CONFLICT (jid) DO UPDATE SET version=EXCLUDED.version, " + "updated=EXCLUDED.updated, roster=EXCLUDED.roster", + ( + jid_.userhost(), + version, + datetime.fromtimestamp(updated, tz=timezone.utc), + roster.toXml() + ) + ) + @implementer(iidavoll.INode) class Node:
--- a/sat_pubsub/privilege.py Fri Oct 15 13:40:59 2021 +0200 +++ b/sat_pubsub/privilege.py Fri Oct 15 15:30:18 2021 +0200 @@ -20,7 +20,8 @@ "presences" import time -from typing import List, Set +from typing import Optional, Dict, List, Set +from datetime import datetime, timezone from wokkel import xmppim from wokkel.compat import IQ from wokkel import pubsub @@ -48,6 +49,14 @@ PERM_PRESENCE:ALLOWED_PRESENCE } +# Number of seconds before a roster cache is not considered valid anymore. +# We keep this delay to avoid requesting roster too much in a row if an entity is +# connecting/disconnecting often in a short time. +ROSTER_TTL = 3600 + + +Roster = Dict[jid.JID, xmppim.RosterItem] + class InvalidStanza(Exception): pass @@ -70,7 +79,9 @@ # key: (hash,version), value: dict with DiscoInfo instance (infos) and nodes to # notify (notify) self.hash_map = {} - self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" + # dict which will be filled from database once connection is initialized, + # key: jid, value: dict with "timestamp" and "roster" + self.roster_cache = None # key: jid, value: set of entities who need to receive a notification when we # get a presence from them. All entities in value have a presence subscription # to the key entity. @@ -82,6 +93,17 @@ def permissions(self): return self._permissions + async def getRosterCacheFromDB(self): + rows = await self.backend.storage.getRosterCache() + for __, owner_jid, version, timestamp, roster_elt in rows: + roster = self.getRosterFromElement(roster_elt) + self.roster_cache[owner_jid] = { + "timestamp": timestamp, + "roster": roster, + "version": version + } + self.updatePresenceMap(owner_jid, roster, None) + def connectionInitialized(self): for handler in self.parent.handlers: if IPubSubService.providedBy(handler): @@ -90,6 +112,9 @@ self.backend = self.parent.parent.getServiceNamed('backend') self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise) self.xmlstream.addObserver('/presence', self._onPresence) + if self.roster_cache is None: + self.roster_cache = {} + defer.ensureDeferred(self.getRosterCacheFromDB()) def onAdvertise(self, message): """Managage the <message/> advertising privileges @@ -128,50 +153,143 @@ ## roster ## - def getRoster(self, to_jid): + def updatePresenceMap( + self, + owner_jid: jid.JID, + roster: Roster, + old_roster: Optional[Roster] + ) -> None: + """Update ``self.presence_map`` from roster + + @param owner_jid: jid of the owner of the roster + @param roster: roster dict as returned by self.getRoster + @param old_roster: previously cached roster if any """ - Retrieve contact list. + if old_roster is not None: + # we check if presence subscription have not been removed and update + # presence_map accordingly + for roster_jid, roster_item in old_roster.items(): + if ((roster_item.subscriptionFrom + and (roster_jid not in roster + or not roster[roster_jid].subscriptionFrom) + )): + try: + self.presence_map[roster_jid].discard(owner_jid) + except KeyError: + pass + if ((roster_item.subscriptionTo + and (roster_jid not in roster + or not roster[roster_jid].subscriptionTo) + )): + try: + self.presence_map[owner_jid].discard(roster_jid) + except KeyError: + pass + + for roster_jid, roster_item in roster.items(): + if roster_item.subscriptionFrom: + # we need to know who is subscribed to our user, to send them + # notifications when they send presence to us + self.presence_map.setdefault(roster_jid, set()).add(owner_jid) + if ((roster_item.subscriptionTo + and jid.JID(roster_jid.host) == self.backend.server_jid)): + # we also need to know who on this server we are subscribed to, so + # we can get their notifications even if they didn't connect so far. + self.presence_map.setdefault(owner_jid, set()).add(roster_jid) - @return: Roster as a mapping from L{JID} to L{RosterItem}. - @rtype: L{twisted.internet.defer.Deferred} + def serialiseRoster( + self, + roster: Roster, + version: Optional[str] = None + ) -> domish.Element: + """Reconstruct Query element of the roster""" + roster_elt = domish.Element((ROSTER_NS, "query")) + if version: + roster_elt["ver"] = version + for item in roster.values(): + roster_elt.addChild(item.toElement()) + return roster_elt + + async def updateRosterCache( + self, + owner_jid: jid.JID, + roster: Roster, + version: str + ) -> None: + """Update local roster cache and database""" + now = time.time() + self.roster_cache[owner_jid] = { + 'timestamp': now, + 'roster': roster, + 'version': version + } + roster_elt = self.serialiseRoster(roster, version) + await self.backend.storage.setRosterCache( + owner_jid, version, now, roster_elt + ) + + def getRosterFromElement(self, query_elt: domish.Element) -> Roster: + """Parse roster query result payload to get a Roster dict""" + roster = {} + for element in query_elt.elements(ROSTER_NS, 'item'): + item = xmppim.RosterItem.fromElement(element) + roster[item.entity] = item + return roster + + async def getRoster(self, to_jid: jid.JID) -> Roster: + """Retrieve contact list. + + @param to_jid: jid of the entity owning the roster + @return: roster data """ - # TODO: cache results if self._permissions[PERM_ROSTER] not in ('get', 'both'): log.msg("WARNING: permission not allowed to get roster") raise failure.Failure(NotAllowedError('roster get is not allowed')) - def processRoster(result): - roster = {} - for element in result.query.elements(ROSTER_NS, 'item'): - item = xmppim.RosterItem.fromElement(element) - roster[item.entity] = item - - return roster - iq = IQ(self.xmlstream, 'get') iq.addElement((ROSTER_NS, 'query')) iq["to"] = to_jid.userhost() - d = iq.send() - d.addCallback(processRoster) - return d + iq_result = await iq.send() + roster = self.getRosterFromElement(iq_result.query) - def _isSubscribedFrom(self, roster, entity, roster_owner_jid): + version = iq_result.query.getAttribute('ver') + cached_roster = self.roster_cache.get("to_jid") + if not cached_roster: + self.updatePresenceMap(to_jid, roster, None) + await self.updateRosterCache(to_jid, roster, version) + else: + # we already have a roster in cache, we have to check it if the new one is + # modified, and update presence_map and database + if version: + if cached_roster["version"] != version: + self.updatePresenceMap(to_jid, roster, cached_roster["roster"]) + await self.updateRosterCache(to_jid, roster, version) + else: + cached_roster["timestamp"] = time.time() + else: + # no version available, we have to compare the whole XML + if ((self.serialiseRoster(cached_roster["roster"]).toXml() != + self.serialiseRoster(roster))): + self.updatePresenceMap(to_jid, roster, cached_roster["roster"]) + await self.updateRosterCache(to_jid, roster, version) + else: + cached_roster["timestamp"] = time.time() + + return roster + + async def isSubscribedFrom(self, entity: jid.JID, roster_owner_jid: jid.JID) -> bool: + """Check if entity has presence subscription from roster_owner_jid + + @param entity: entity to check subscription to + @param roster_owner_jid: owner of the roster to check + @return: True if entity has a subscription from roster_owner_jid + """ + roster = await self.getRoster(roster_owner_jid) try: return roster[entity.userhostJID()].subscriptionFrom except KeyError: return False - def isSubscribedFrom(self, entity, roster_owner_jid): - """Check if entity has presence subscription from roster_owner_jid - - @param entity(jid.JID): entity to check subscription to - @param roster_owner_jid(jid.JID): owner of the roster to check - @return D(bool): True if entity has a subscription from roster_owner_jid - """ - d = self.getRoster(roster_owner_jid) - d.addCallback(self._isSubscribedFrom, entity, roster_owner_jid) - return d - ## message ## def sendMessage(self, priv_message, to_jid=None): @@ -257,22 +375,11 @@ from_jid = jid.JID(presence_elt['from']) from_jid_bare = from_jid.userhostJID() if ((jid.JID(from_jid.host) == self.backend.server_jid - and from_jid_bare not in self.roster_cache)): - roster = await self.getRoster(from_jid_bare) - timestamp = time.time() - self.roster_cache[from_jid_bare] = {'timestamp': timestamp, - 'roster': roster, - } - for roster_jid, roster_item in roster.items(): - if roster_item.subscriptionFrom: - # we need to know who is subscribed to our user, to send them - # notifications when they send presence to us - self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare) - if ((roster_item.subscriptionTo - and jid.JID(roster_jid.host) == self.backend.server_jid)): - # we also need to know who on this server we are subscribed to, so - # we can get their notifications even if they didn't connect so far. - self.presence_map.setdefault(from_jid_bare, set()).add(roster_jid) + and ( + from_jid_bare not in self.roster_cache + or time.time()-self.roster_cache[from_jid_bare]["timestamp"]>ROSTER_TTL + ))): + roster = await self.getRoster(from_jid) presence_type = presence_elt.getAttribute('type') if presence_type == "unavailable":