diff sat_pubsub/privilege.py @ 338:6d059f07c2d3

privilege: added presence and +notify initial support: presence privilege is now used and capabilities are cached. When an entity is connected, last items are sent according to +notify nodes in disco. This is initial support, XEP-0356 doesn't allow yet to get roster updated, or synchronises contacts on startup. Only "open" access model is supported for now. "presence" should be added soon as it is trivial to support now. Only last items sending is handled for now, notifications support for new items/deletions should follow. Capabilities hash is not checked yet, with the security concerns that this imply. Check should be added in the future.
author Goffi <goffi@goffi.org>
date Sat, 12 Aug 2017 18:29:32 +0200
parents c7fe09894952
children 28c9579901d3
line wrap: on
line diff
--- a/sat_pubsub/privilege.py	Sat Aug 12 18:11:36 2017 +0200
+++ b/sat_pubsub/privilege.py	Sat Aug 12 18:29:32 2017 +0200
@@ -24,12 +24,14 @@
 from wokkel import xmppim
 from wokkel.compat import IQ
 from wokkel import pubsub
+from wokkel import disco
 from wokkel.iwokkel import IPubSubService
-from wokkel.subprotocols import XMPPHandler
 from twisted.python import log
 from twisted.python import failure
+from twisted.internet import defer
 from twisted.words.xish import domish
 from twisted.words.protocols.jabber import jid
+import time
 
 FORWARDED_NS = 'urn:xmpp:forward:0'
 PRIV_ENT_NS = 'urn:xmpp:privilege:1'
@@ -50,7 +52,7 @@
 class NotAllowedError(Exception):
     pass
 
-class PrivilegesHandler(XMPPHandler):
+class PrivilegesHandler(disco.DiscoClientProtocol):
     #FIXME: need to manage updates, and database sync
     #TODO: cache
 
@@ -60,11 +62,17 @@
                              PERM_MESSAGE: 'none',
                              PERM_PRESENCE: 'none'}
         self._pubsub_service = None
+        self._backend = None
         # FIXME: we use a hack supposing that our privilege come from hostname
         #        and we are a component named [name].hostname
         #        but we need to manage properly server
         # TODO: do proper server handling
         self.server_jid = jid.JID(service_jid.host.split('.', 1)[1])
+        self.caps_map = {}  # key: full jid, value: caps_hash
+        self.hash_map = {}  # key: (hash,version), value: DiscoInfo instance
+        self.roster_cache = {}  # key: jid, value: dict with "timestamp" and "roster"
+        self.presence_map = {}  # inverted roster: key: jid, value: set of entities who has this jid in roster (with presence of "from" or "both")
+        self.server = None
 
     @property
     def permissions(self):
@@ -75,7 +83,9 @@
             if IPubSubService.providedBy(handler):
                 self._pubsub_service = handler
                 break
+        self._backend = self.parent.parent.getServiceNamed('backend')
         self.xmlstream.addObserver(PRIV_ENT_ADV_XPATH, self.onAdvertise)
+        self.xmlstream.addObserver('/presence', self.onPresence)
 
     def onAdvertise(self, message):
         """Managage the <message/> advertising privileges
@@ -119,7 +129,7 @@
 
         def processRoster(result):
             roster = {}
-            for element in result.elements(ROSTER_NS, 'item'):
+            for element in result.query.elements(ROSTER_NS, 'item'):
                 item = xmppim.RosterItem.fromElement(element)
                 roster[item.entity] = item
 
@@ -190,3 +200,62 @@
     #             redirect = message.event.delete.addElement('redirect')
     #             redirect['uri'] = redirectURI
     #         self.send(message)
+
+
+    ## presence ##
+
+    @defer.inlineCallbacks
+    def onPresence(self, presence_elt):
+        if self.server is None:
+            # FIXME: we use a hack supposing that our delegation come from hostname
+            #        and we are a component named [name].hostname
+            #        but we need to manage properly allowed servers
+            # TODO: do proper origin security check
+            _, self.server = presence_elt['to'].split('.', 1)
+        from_jid = jid.JID(presence_elt['from'])
+        from_jid_bare = from_jid.userhostJID()
+        if from_jid.host == self.server and from_jid_bare not in self.roster_cache:
+            roster = yield self.getRoster(from_jid_bare)
+            timestamp = time.time()
+            self.roster_cache[from_jid_bare] = {'timestamp': timestamp,
+                                                'roster': roster,
+                                                }
+            for roster_jid, roster_item in roster.iteritems():
+                if roster_item.subscriptionFrom:
+                    self.presence_map.setdefault(roster_jid, set()).add(from_jid_bare)
+
+        presence_type = presence_elt.getAttribute('type')
+        if presence_type != "unavailable":
+            # new resource available, we check entity capabilities
+            try:
+                c_elt = next(presence_elt.elements('http://jabber.org/protocol/caps', 'c'))
+                hash_ = c_elt['hash']
+                ver = c_elt['ver']
+            except (StopIteration, KeyError):
+                # no capabilities, we don't go further
+                return
+
+            # FIXME: hash is not checked (cf. XEP-0115)
+            disco_tuple = (hash_, ver)
+            if from_jid not in self.caps_map:
+                self.caps_map[from_jid] = disco_tuple
+
+            if disco_tuple not in self.hash_map:
+                # first time we se this hash, what is behind it?
+                infos = yield self.requestInfo(from_jid)
+                self.hash_map[disco_tuple] = {
+                    'notify': {f[:-7] for f in infos.features if f.endswith('+notify')},
+                    'infos': infos
+                    }
+
+            # nodes are the nodes subscribed with +notify
+            nodes = tuple(self.hash_map[disco_tuple]['notify'])
+            if not nodes:
+                return
+            # publishers are entities which have granted presence access to our user + user itself
+            publishers = self.presence_map.get(from_jid_bare, ()) + (from_jid_bare,)
+
+            last_items = yield self._backend.storage.getLastItems(publishers, nodes, ('open',), ('open',), True)
+            # we send message with last item, as required by https://xmpp.org/extensions/xep-0163.html#notify-last
+            for pep_jid, node, item, item_access_model in last_items:
+                self.notifyPublish(pep_jid, node, [(from_jid, None, [item])])