diff sat_pubsub/delegation.py @ 478:b544109ab4c4

Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription /!\ pgsql schema needs to be updated /!\ /!\ server conf needs to be updated for privileged entity: only the new `urn:xmpp:privilege:2` namespace is handled now /!\ Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster pushes are not managed yet. XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto Subscriptions" and "Filtering" is not. Public Pubsub Subscription (https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented; the XEP has been accepted by council but is not yet published. It will be updated to use subscription options instead of the <public> element actually specified, I'm waiting for publication to update the XEP. unsubscribe has been updated to return the `<subscription>` element as expected by XEP-0060 (sat_tmp needs to be updated). database schema has been updated to add columns necessary to keep track of subscriptions to external nodes and to mark subscriptions as public.
author Goffi <goffi@goffi.org>
date Wed, 11 May 2022 13:39:08 +0200
parents 607616f9ef5b
children
line wrap: on
line diff
--- a/sat_pubsub/delegation.py	Mon Jan 03 16:48:22 2022 +0100
+++ b/sat_pubsub/delegation.py	Wed May 11 13:39:08 2022 +0200
@@ -20,16 +20,19 @@
 
 # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service
 
-from wokkel.subprotocols import XMPPHandler
+from typing import Callable, Any
+
+from twisted.python import log
+from twisted.internet import reactor, defer
+from twisted.words.protocols.jabber import error, jid
+from twisted.words.protocols.jabber import xmlstream
+from twisted.words.xish import domish
 from wokkel import pubsub
 from wokkel import data_form
-from wokkel import disco, iwokkel, generic
-from wokkel.iwokkel import IPubSubService
+from wokkel import disco, iwokkel
 from wokkel import mam
-from twisted.python import log
-from twisted.words.protocols.jabber import ijabber, jid, error
-from twisted.words.protocols.jabber.xmlstream import toResponse
-from twisted.words.xish import domish
+from wokkel.iwokkel import IPubSubService
+from wokkel.subprotocols import XMPPHandler
 from zope.interface import implementer
 
 DELEGATION_NS = 'urn:xmpp:delegation:2'
@@ -40,6 +43,7 @@
 DELEGATION_MAIN_SEP = "::"
 DELEGATION_BARE_SEP = ":bare:"
 
+SEND_HOOK_TIMEOUT = 300
 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"),
            (mam.IMAMService, mam, "MAMRequest"),
            (None, disco, "_DiscoRequest"))
@@ -108,10 +112,28 @@
             self._service_hack()
         self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
         self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
-        self._current_iqs = {} # dict of iq being handler by delegation
-        self._xs_send = self.xmlstream.send
+        self._current_iqs = {} # dict of iq being handled by delegation
+        self.xs_send = self.xmlstream.send
         self.xmlstream.send = self._sendHack
 
+    def delegatedResult(
+        self,
+        iq_req_elt: domish.Element,
+        iq_resp_elt: domish.Element,
+        wrapping_iq_elt: domish.Element
+    ) -> None:
+        """Method called when a result to a delegated stanza has been received
+
+        The result is wrapped and sent back to server
+        """
+        iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result')
+        fwd_elt = iq_result_elt.addElement(
+            'delegation', DELEGATION_NS
+        ).addElement('forwarded', FORWARDED_NS)
+        fwd_elt.addChild(iq_resp_elt)
+        iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client'
+        self.xs_send(iq_result_elt)
+
     def _sendHack(self, elt):
         """This method is called instead of xmlstream to control sending
 
@@ -119,24 +141,24 @@
         """
         if isinstance(elt, domish.Element) and elt.name=='iq':
             try:
-                id_ = elt.getAttribute('id')
-                ori_iq, managed_entity = self._current_iqs[id_]
-                if jid.JID(elt['to']) != managed_entity:
-                    log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})"
-                            .format(got=jid.JID(elt['to']), expected=managed_entity))
+                iq_id = elt["id"]
+                iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id]
+                if elt['to'] != iq_req_elt["from"]:
+                    log.err(
+                        "IQ id conflict: the managed entity doesn't match (got "
+                        f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})"
+                    )
                     raise KeyError
             except KeyError:
                 # the iq is not a delegated one
-                self._xs_send(elt)
+                self.xs_send(elt)
             else:
-                del self._current_iqs[id_]
-                iq_result_elt = toResponse(ori_iq, 'result')
-                fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
-                fwd_elt.addChild(elt)
-                elt.uri = elt.defaultUri = 'jabber:client'
-                self._xs_send(iq_result_elt)
+                if not timeout.called:
+                    timeout.cancel()
+                    del self._current_iqs[iq_id]
+                callback(iq_req_elt, elt, *cb_args)
         else:
-            self._xs_send(elt)
+            self.xs_send(elt)
 
     def _obsWrapper(self, observer, stanza):
         """Wrapper to observer which catch StanzaError
@@ -147,7 +169,7 @@
             observer(stanza)
         except error.StanzaError as e:
             error_elt = e.toResponse(stanza)
-            self._xs_send(error_elt)
+            self.xs_send(error_elt)
         stanza.handled = True
 
     def onAdvertise(self, message):
@@ -186,12 +208,39 @@
                 log.msg("Invalid stanza received ({})".format(e))
 
         log.msg('delegations updated:\n{}'.format(
-            '\n'.join(["    - namespace {}{}".format(ns,
+            '\n'.join(["- namespace {}{}".format(ns,
             "" if not attributes else " with filtering on {} attribute(s)".format(
             ", ".join(attributes))) for ns, attributes in list(delegated.items())])))
 
         if not pubsub.NS_PUBSUB in delegated:
-            log.msg("Didn't got pubsub delegation from server, can't act as a PEP service")
+            log.msg(
+                "Didn't got pubsub delegation from server, can't act as a PEP service"
+            )
+
+    def registerSendHook(
+        self,
+        iq_elt: domish.Element,
+        callback: Callable[[domish.Element, domish.Element, ...], None],
+        *args
+    ) -> None:
+        """Register a methode to call when an IQ element response is received
+
+        If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted
+        @param iq_elt: source IQ element sent. Its "id" attribute will be used to track
+            response
+        @param callback: method to call when answer is seen
+            Will be called with:
+                - original IQ request
+                - received IQ result (or error)
+                - optional extra arguments
+            self.xs_send should be used to send final result
+        @param *args: argument to use with callback
+        """
+        iq_id = iq_elt["id"]
+        timeout = reactor.callLater(
+            SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None)
+        )
+        self._current_iqs[iq_id] = (iq_elt, callback, args, timeout)
 
     def onForward(self, iq):
         """Manage forwarded iq
@@ -210,9 +259,7 @@
         except StopIteration:
             raise error.StanzaError('not-acceptable')
 
-        managed_entity = jid.JID(fwd_iq['from'])
-
-        self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
+        self.registerSendHook(fwd_iq, self.delegatedResult, iq)
         fwd_iq.delegated = True
 
         # we need a recipient in pubsub request for PEP