diff sat_pubsub/delegation.py @ 291:61fb4817b77f

delegation: iq forwarded management: in addition to the onForward observer, the xmlstream.send message is monkey patched, so we can inject the forwarded stanza as if it was received normally, and intercept the result to send it back to the server.
author Goffi <goffi@goffi.org>
date Mon, 04 May 2015 18:40:47 +0200
parents f08f8536cab8
children 6918a0dad359
line wrap: on
line diff
--- a/sat_pubsub/delegation.py	Mon May 04 18:33:01 2015 +0200
+++ b/sat_pubsub/delegation.py	Mon May 04 18:40:47 2015 +0200
@@ -28,12 +28,15 @@
 from wokkel import data_form
 from wokkel import disco, iwokkel
 from twisted.python import log
-from twisted.words.protocols.jabber import error
+from twisted.words.protocols.jabber import jid, error
+from twisted.words.protocols.jabber.xmlstream import toResponse
+from twisted.words.xish import domish
 from zope.interface import implements
 
 DELEGATION_NS = 'urn:xmpp:delegation:1'
 FORWARDED_NS = 'urn:xmpp:forward:0'
 DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS)
+DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS)
 
 DELEGATION_MAIN_SEP = "::"
 DELEGATION_BARE_SEP = ":bare:"
@@ -49,9 +52,47 @@
 
     def connectionInitialized(self):
         self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
+        self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
+        self._current_iqs = {} # dict of iq being handler by delegation
+        self._xs_send = self.xmlstream.send
+        self.xmlstream.send = self._sendHack
+
+    def _sendHack(self, elt):
+        """This method is called instead of xmlstream to control sending
+
+        @param obj(domsish.Element, unicode, str): obj sent to real xmlstream
+        """
+        if isinstance(elt, domish.Element) and elt.name=='iq':
+            try:
+                ori_iq, managed_entity = self._current_iqs.pop(elt.getAttribute('id'))
+                if jid.JID(elt['to']) != managed_entity:
+                    log.msg("IQ id conflict: the managed entity doesn't match")
+                    raise KeyError
+            except KeyError:
+                # the iq is not a delegated one
+                self._xs_send(elt)
+            else:
+                iq_result_elt = toResponse(ori_iq, 'result')
+                fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
+                fwd_elt.addChild(elt)
+                self._xs_send(iq_result_elt)
+        else:
+            self._xs_send(elt)
+
+    def _obsWrapper(self, observer, stanza):
+        """Wrapper to observer which catch StanzaError
+
+        @param observer(callable): method to wrap
+        """
+        try:
+            observer(stanza)
+        except error.StanzaError as e:
+            error_elt = e.toResponse(stanza)
+            self._xs_send(error_elt)
+        stanza.handled = True
 
     def onAdvertise(self, message):
-        """Managage the <message/> advertising delegations"""
+        """Manage the <message/> advertising delegations"""
         delegation_elt = message.elements(DELEGATION_NS, 'delegation').next()
         delegated = {}
         for delegated_elt in delegation_elt.elements(DELEGATION_NS):
@@ -79,6 +120,29 @@
         if not pubsub.NS_PUBSUB in delegated:
             log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service")
 
+    def onForward(self, iq):
+        """Manage forwarded iq
+
+        @param iq(domish.Element): full delegation stanza
+        """
+        try:
+            fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next()
+                      .elements(FORWARDED_NS, 'forwarded').next()
+                      .elements('jabber:client', 'iq').next())
+        except StopIteration:
+            raise error.StanzaError('not-acceptable')
+
+        managed_entity = jid.JID(fwd_iq.getAttribute('to') or fwd_iq['from'])
+
+        if managed_entity.host != iq['from']:
+            log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}"
+                     .format(iq.toXml())).encode('utf-8'))
+            raise error.StanzaError('not-allowed')
+
+        self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
+        # we now inject the element in the stream
+        self.xmlstream.dispatch(fwd_iq)
+
     def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
         """Manage disco nesting