Mercurial > libervia-pubsub
diff sat_pubsub/delegation.py @ 478:b544109ab4c4
Privileged Entity update + Pubsub Account Management partial implementation + Public Pubsub Subscription
/!\ pgsql schema needs to be updated /!\
/!\ server conf needs to be updated for privileged entity: only the new
`urn:xmpp:privilege:2` namespace is handled now /!\
Privileged entity has been updated to hanlde the new namespace and IQ permission. Roster
pushes are not managed yet.
XEP-0376 (Pubsub Account Management) is partially implemented. The XEP is not fully
specified at the moment, and my messages on standard@ haven't seen any reply. Thus for now
only "Subscribing", "Unsubscribing" and "Listing Subscriptions" is implemented, "Auto
Subscriptions" and "Filtering" is not.
Public Pubsub Subscription
(https://xmpp.org/extensions/inbox/pubsub-public-subscriptions.html) is implemented;
the XEP has been accepted by council but is not yet published. It will be updated to use
subscription options instead of the <public> element actually specified, I'm waiting
for publication to update the XEP.
unsubscribe has been updated to return the `<subscription>` element as expected by
XEP-0060 (sat_tmp needs to be updated).
database schema has been updated to add columns necessary to keep track of subscriptions
to external nodes and to mark subscriptions as public.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 11 May 2022 13:39:08 +0200 |
parents | 607616f9ef5b |
children |
line wrap: on
line diff
--- a/sat_pubsub/delegation.py Mon Jan 03 16:48:22 2022 +0100 +++ b/sat_pubsub/delegation.py Wed May 11 13:39:08 2022 +0200 @@ -20,16 +20,19 @@ # This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service -from wokkel.subprotocols import XMPPHandler +from typing import Callable, Any + +from twisted.python import log +from twisted.internet import reactor, defer +from twisted.words.protocols.jabber import error, jid +from twisted.words.protocols.jabber import xmlstream +from twisted.words.xish import domish from wokkel import pubsub from wokkel import data_form -from wokkel import disco, iwokkel, generic -from wokkel.iwokkel import IPubSubService +from wokkel import disco, iwokkel from wokkel import mam -from twisted.python import log -from twisted.words.protocols.jabber import ijabber, jid, error -from twisted.words.protocols.jabber.xmlstream import toResponse -from twisted.words.xish import domish +from wokkel.iwokkel import IPubSubService +from wokkel.subprotocols import XMPPHandler from zope.interface import implementer DELEGATION_NS = 'urn:xmpp:delegation:2' @@ -40,6 +43,7 @@ DELEGATION_MAIN_SEP = "::" DELEGATION_BARE_SEP = ":bare:" +SEND_HOOK_TIMEOUT = 300 TO_HACK = ((IPubSubService, pubsub, "PubSubRequest"), (mam.IMAMService, mam, "MAMRequest"), (None, disco, "_DiscoRequest")) @@ -108,10 +112,28 @@ self._service_hack() self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) - self._current_iqs = {} # dict of iq being handler by delegation - self._xs_send = self.xmlstream.send + self._current_iqs = {} # dict of iq being handled by delegation + self.xs_send = self.xmlstream.send self.xmlstream.send = self._sendHack + def delegatedResult( + self, + iq_req_elt: domish.Element, + iq_resp_elt: domish.Element, + wrapping_iq_elt: domish.Element + ) -> None: + """Method called when a result to a delegated stanza has been received + + The result is wrapped and sent back to server + """ + iq_result_elt = xmlstream.toResponse(wrapping_iq_elt, 'result') + fwd_elt = iq_result_elt.addElement( + 'delegation', DELEGATION_NS + ).addElement('forwarded', FORWARDED_NS) + fwd_elt.addChild(iq_resp_elt) + iq_resp_elt.uri = iq_resp_elt.defaultUri = 'jabber:client' + self.xs_send(iq_result_elt) + def _sendHack(self, elt): """This method is called instead of xmlstream to control sending @@ -119,24 +141,24 @@ """ if isinstance(elt, domish.Element) and elt.name=='iq': try: - id_ = elt.getAttribute('id') - ori_iq, managed_entity = self._current_iqs[id_] - if jid.JID(elt['to']) != managed_entity: - log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})" - .format(got=jid.JID(elt['to']), expected=managed_entity)) + iq_id = elt["id"] + iq_req_elt, callback, cb_args, timeout = self._current_iqs[iq_id] + if elt['to'] != iq_req_elt["from"]: + log.err( + "IQ id conflict: the managed entity doesn't match (got " + f"{elt['to']!r} and was expecting {iq_req_elt['from']!r})" + ) raise KeyError except KeyError: # the iq is not a delegated one - self._xs_send(elt) + self.xs_send(elt) else: - del self._current_iqs[id_] - iq_result_elt = toResponse(ori_iq, 'result') - fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) - fwd_elt.addChild(elt) - elt.uri = elt.defaultUri = 'jabber:client' - self._xs_send(iq_result_elt) + if not timeout.called: + timeout.cancel() + del self._current_iqs[iq_id] + callback(iq_req_elt, elt, *cb_args) else: - self._xs_send(elt) + self.xs_send(elt) def _obsWrapper(self, observer, stanza): """Wrapper to observer which catch StanzaError @@ -147,7 +169,7 @@ observer(stanza) except error.StanzaError as e: error_elt = e.toResponse(stanza) - self._xs_send(error_elt) + self.xs_send(error_elt) stanza.handled = True def onAdvertise(self, message): @@ -186,12 +208,39 @@ log.msg("Invalid stanza received ({})".format(e)) log.msg('delegations updated:\n{}'.format( - '\n'.join([" - namespace {}{}".format(ns, + '\n'.join(["- namespace {}{}".format(ns, "" if not attributes else " with filtering on {} attribute(s)".format( ", ".join(attributes))) for ns, attributes in list(delegated.items())]))) if not pubsub.NS_PUBSUB in delegated: - log.msg("Didn't got pubsub delegation from server, can't act as a PEP service") + log.msg( + "Didn't got pubsub delegation from server, can't act as a PEP service" + ) + + def registerSendHook( + self, + iq_elt: domish.Element, + callback: Callable[[domish.Element, domish.Element, ...], None], + *args + ) -> None: + """Register a methode to call when an IQ element response is received + + If no result is received before SEND_HOOK_TIMEOUT seconds, the hook is deleted + @param iq_elt: source IQ element sent. Its "id" attribute will be used to track + response + @param callback: method to call when answer is seen + Will be called with: + - original IQ request + - received IQ result (or error) + - optional extra arguments + self.xs_send should be used to send final result + @param *args: argument to use with callback + """ + iq_id = iq_elt["id"] + timeout = reactor.callLater( + SEND_HOOK_TIMEOUT, self._current_iqs.pop, (iq_id, None) + ) + self._current_iqs[iq_id] = (iq_elt, callback, args, timeout) def onForward(self, iq): """Manage forwarded iq @@ -210,9 +259,7 @@ except StopIteration: raise error.StanzaError('not-acceptable') - managed_entity = jid.JID(fwd_iq['from']) - - self._current_iqs[fwd_iq['id']] = (iq, managed_entity) + self.registerSendHook(fwd_iq, self.delegatedResult, iq) fwd_iq.delegated = True # we need a recipient in pubsub request for PEP