# HG changeset patch # User Goffi # Date 1502555372 -7200 # Node ID 6d059f07c2d3af8ad0c90014b017da9ec3f614a4 # Parent 57a3051ee43593268249a39683fdcb60a9ca1975 privilege: added presence and +notify initial support: presence privilege is now used and capabilities are cached. When an entity is connected, last items are sent according to +notify nodes in disco. This is initial support, XEP-0356 doesn't allow yet to get roster updated, or synchronises contacts on startup. Only "open" access model is supported for now. "presence" should be added soon as it is trivial to support now. Only last items sending is handled for now, notifications support for new items/deletions should follow. Capabilities hash is not checked yet, with the security concerns that this imply. Check should be added in the future. diff -r 57a3051ee435 -r 6d059f07c2d3 sat_pubsub/privilege.py --- a/sat_pubsub/privilege.py Sat Aug 12 18:11:36 2017 +0200 +++ b/sat_pubsub/privilege.py Sat Aug 12 18:29:32 2017 +0200 @@ -24,12 +24,14 @@ from wokkel import xmppim from wokkel.compat import IQ from wokkel import pubsub +from wokkel import disco from wokkel.iwokkel import IPubSubService -from wokkel.subprotocols import XMPPHandler from twisted.python import log from twisted.python import failure +from twisted.internet import defer from twisted.words.xish import domish from twisted.words.protocols.jabber import jid +import time FORWARDED_NS = 'urn:xmpp:forward:0' PRIV_ENT_NS = 'urn:xmpp:privilege:1' @@ -50,7 +52,7 @@ class NotAllowedError(Exception): pass -class PrivilegesHandler(XMPPHandler): +class PrivilegesHandler(disco.DiscoClientProtocol): #FIXME: need to manage updates, and database sync #TODO: cache @@ -60,11 +62,17 @@ PERM_MESSAGE: 'none', PERM_PRESENCE: 'none'} self._pubsub_service = None + self._backend = None # FIXME: we use a hack supposing that our privilege come from hostname # and we are a component named [name].hostname # but we need to manage properly server # TODO: do proper server handling self.server_jid = jid.JID(service_jid.host.split('.', 1)[1]) + self.caps_map = {} # key: full jid, value: caps_hash + self.hash_map = {} # key: (hash,version), value: DiscoInfo instance + self.roster_cache = {} # key: jid, value: dict with "timestamp" and "roster" + self.presence_map = {} # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both") + self.server = None @property def permissions(self): @@ -75,7 +83,9 @@ if IPubSubService.providedBy(handler): self._pubsub_service = handler break + self._backend = self.parent.parent.getServiceNamed('backend') self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise) + self.xmlstream.addObserver('/presence', self.onPresence) def onAdvertise(self, message): """Managage the advertising privileges @@ -119,7 +129,7 @@ def processRoster(result): roster = {} - for element in result.elements(ROSTER_NS, 'item'): + for element in result.query.elements(ROSTER_NS, 'item'): item = xmppim.RosterItem.fromElement(element) roster[item.entity] = item @@ -190,3 +200,62 @@ # redirect = message.event.delete.addElement('redirect') # redirect['uri'] = redirectURI # self.send(message) + + + ## presence ## + + @defer.inlineCallbacks + def onPresence(self, presence_elt): + if self.server is None: + # FIXME: we use a hack supposing that our delegation come from hostname + # and we are a component named [name].hostname + # but we need to manage properly allowed servers + # TODO: do proper origin security check + _, self.server = presence_elt['to'].split('.', 1) + from_jid = jid.JID(presence_elt['from']) + from_jid_bare = from_jid.userhostJID() + if from_jid.host == self.server and from_jid_bare not in self.roster_cache: + roster = yield self.getRoster(from_jid_bare) + timestamp = time.time() + self.roster_cache[from_jid_bare] = {'timestamp': timestamp, + 'roster': roster, + } + for roster_jid, roster_item in roster.iteritems(): + if roster_item.subscriptionFrom: + self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare) + + presence_type = presence_elt.getAttribute('type') + if presence_type != "unavailable": + # new resource available, we check entity capabilities + try: + c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c')) + hash_ = c_elt['hash'] + ver = c_elt['ver'] + except (StopIteration, KeyError): + # no capabilities, we don't go further + return + + # FIXME: hash is not checked (cf. XEP-0115) + disco_tuple = (hash_, ver) + if from_jid not in self.caps_map: + self.caps_map[from_jid] = disco_tuple + + if disco_tuple not in self.hash_map: + # first time we se this hash, what is behind it? + infos = yield self.requestInfo(from_jid) + self.hash_map[disco_tuple] = { + 'notify': {f[:-7] for f in infos.features if f.endswith('+notify')}, + 'infos': infos + } + + # nodes are the nodes subscribed with +notify + nodes = tuple(self.hash_map[disco_tuple]['notify']) + if not nodes: + return + # publishers are entities which have granted presence access to our user + user itself + publishers = self.presence_map.get(from_jid_bare, ()) + (from_jid_bare,) + + last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True) + # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last + for pep_jid, node, item, item_access_model in last_items: + self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])