Mercurial > libervia-backend
changeset 2891:6a0f42e9410a
core (xmpp): implemented roster versioning
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 07 Apr 2019 18:47:17 +0200 (2019-04-07) |
parents | c652d079a9a1 |
children | 82b781c46841 |
files | sat/core/xmpp.py |
diffstat | 1 files changed, 111 insertions(+), 33 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/core/xmpp.py Sun Apr 07 18:44:57 2019 +0200 +++ b/sat/core/xmpp.py Sun Apr 07 18:47:17 2019 +0200 @@ -32,10 +32,9 @@ from wokkel import component from wokkel import delay from sat.core.log import getLogger - -log = getLogger(__name__) from sat.core import exceptions from sat.memory import encryption +from sat.memory import persistent from sat.tools import xml_tools from zope.interface import implements import time @@ -43,10 +42,16 @@ import uuid import sys +log = getLogger(__name__) + NS_X_DATA = u"jabber:x:data" NS_DISCO_INFO = u"http://jabber.org/protocol/disco#info" NS_XML_ELEMENT = u"urn:xmpp:xml-element" +NS_ROSTER_VER = u"urn:xmpp:features:rosterver" +# we use 2 "@" which is illegal in a jid, to be sure we are not mixing keys +# with roster jids +ROSTER_VER_KEY = u"@version@" class SatXMPPEntity(object): @@ -815,8 +820,8 @@ ) def _finish_connection(self, __): - self.roster.requestRoster() - super(SatXMPPClient, self)._finish_connection(__) + d = self.roster.requestRoster() + d.addCallback(lambda __: super(SatXMPPClient, self)._finish_connection(__)) class SatXMPPComponent(SatXMPPEntity, component.Component): @@ -1101,32 +1106,28 @@ class SatRosterProtocol(xmppim.RosterClientProtocol): + def __init__(self, host): xmppim.RosterClientProtocol.__init__(self) self.host = host self.got_roster = defer.Deferred() # called when roster is received and ready # XXX: the two following dicts keep a local copy of the roster + self._jids = {} # map from jids to RosterItem: key=jid value=RosterItem self._groups = {} # map from groups to jids: key=group value=set of jids - self._jids = None # map from jids to RosterItem: key=jid value=RosterItem + + @property + def versioning(self): + """True if server support roster versioning""" + return (NS_ROSTER_VER, u'ver') in self.parent.xmlstream.features - def rosterCb(self, roster): - assert roster is not None # FIXME: must be managed with roster versioning - self._groups.clear() - self._jids = roster - for item in roster.itervalues(): - if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: - # XXX: current behaviour: we don't want contact in our roster list - # if there is no presence subscription - # may change in the future - log.info( - u"Removing contact {} from roster because there is no presence " - u"subscription".format( - item.jid - ) - ) - self.removeItem(item.entity) # FIXME: to be checked - else: - self._registerItem(item) + @property + def roster_cache(self): + """Cache of roster from storage + + This property return a new PersistentDict on each call, it must be loaded + manually if necessary + """ + return persistent.PersistentDict(NS_ROSTER_VER, self.parent.profile) def _registerItem(self, item): """Register item in local cache @@ -1155,11 +1156,76 @@ for group in item.groups: self._groups.setdefault(group, set()).add(item.entity) + @defer.inlineCallbacks + def _cacheRoster(self, version): + """Serialise local roster and save it to storage + + @param version(unicode): version of roster in local cache + """ + roster_cache = self.roster_cache + yield roster_cache.clear() + roster_cache[ROSTER_VER_KEY] = version + for roster_jid, roster_item in self._jids.iteritems(): + roster_jid_s = roster_jid.full() + roster_item_elt = roster_item.toElement().toXml() + roster_cache[roster_jid_s] = roster_item_elt + + @defer.inlineCallbacks def requestRoster(self): - """ ask the server for Roster list """ - log.debug("requestRoster") - d = self.getRoster().addCallback(self.rosterCb) - d.chainDeferred(self.got_roster) + """Ask the server for Roster list """ + if self.versioning: + log.info(_(u"our server support roster versioning, we use it")) + roster_cache = self.roster_cache + yield roster_cache.load() + try: + version = roster_cache[ROSTER_VER_KEY] + except KeyError: + log.info(_(u"no roster in cache, we start fresh")) + # u"" means we use versioning without valid roster in cache + version = u"" + else: + log.info(_(u"We have roster v{version} in cache").format(version=version)) + # we deserialise cached roster to our local cache + for roster_jid_s, roster_item_elt_s in roster_cache.iteritems(): + if roster_jid_s == ROSTER_VER_KEY: + continue + roster_jid = jid.JID(roster_jid_s) + roster_item_elt = generic.parseXml(roster_item_elt_s.encode('utf-8')) + roster_item = xmppim.RosterItem.fromElement(roster_item_elt) + self._jids[roster_jid] = roster_item + self._registerItem(roster_item) + else: + log.warning(_(u"our server doesn't support roster versioning")) + version = None + + log.debug("requesting roster") + roster = yield self.getRoster(version=version) + if roster is None: + log.debug(u"empty roster result received, we'll get roster item with roster " + u"pushes") + else: + # a full roster is received + self._groups.clear() + self._jids = roster + for item in roster.itervalues(): + if not item.subscriptionTo and not item.subscriptionFrom and not item.ask: + # XXX: current behaviour: we don't want contact in our roster list + # if there is no presence subscription + # may change in the future + log.info( + u"Removing contact {} from roster because there is no presence " + u"subscription".format( + item.jid + ) + ) + self.removeItem(item.entity) # FIXME: to be checked + else: + self._registerItem(item) + yield self._cacheRoster(roster.version) + + if not self.got_roster.called: + # got_roster may already be called if we use resync() + self.got_roster.callback(None) def removeItem(self, to_jid): """Remove a contact from roster list @@ -1184,26 +1250,38 @@ return item_attr def setReceived(self, request): - # TODO: implement roster versioning (cf RFC 6121 ยง2.6) item = request.item + entity = item.entity + log.info(_(u"adding {entity} to roster").format(entity=entity.full())) + if request.version is not None: + # we update the cache in storage + roster_cache = self.roster_cache + roster_cache[ROSTER_VER_KEY] = request.version + roster_cache[entity.full()] = item.toElement().toXml() + try: # update the cache for the groups the contact has been removed from - left_groups = set(self._jids[item.entity].groups).difference(item.groups) + left_groups = set(self._jids[entity].groups).difference(item.groups) for group in left_groups: jids_set = self._groups[group] - jids_set.remove(item.entity) + jids_set.remove(entity) if not jids_set: del self._groups[group] except KeyError: pass # no previous item registration (or it's been cleared) - self._jids[item.entity] = item + self._jids[entity] = item self._registerItem(item) self.host.bridge.newContact( - item.entity.full(), self.getAttributes(item), item.groups, self.parent.profile + entity.full(), self.getAttributes(item), item.groups, self.parent.profile ) def removeReceived(self, request): entity = request.item.entity - log.info(u"removing %s from roster list" % entity.full()) + log.info(_(u"removing {entity} from roster").format(entity=entity.full())) + if request.version is not None: + # we update the cache in storage + roster_cache = self.roster_cache + roster_cache[ROSTER_VER_KEY] = request.version + del roster_cache[request.item.entity.full()] # we first remove item from local cache (self._groups and self._jids) try: