Mercurial > libervia-pubsub
view sat_pubsub/delegation.py @ 291:61fb4817b77f
delegation: iq forwarded management:
in addition to the onForward observer, the xmlstream.send message is monkey patched, so we can inject the forwarded stanza as if it was received normally, and intercept the result to send it back to the server.
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 04 May 2015 18:40:47 +0200 |
parents | f08f8536cab8 |
children | 6918a0dad359 |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- # """ Copyright (c) 2015 Jérôme Poisson This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. --- This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service """ from wokkel.subprotocols import XMPPHandler from wokkel import pubsub from wokkel import data_form from wokkel import disco, iwokkel from twisted.python import log from twisted.words.protocols.jabber import jid, error from twisted.words.protocols.jabber.xmlstream import toResponse from twisted.words.xish import domish from zope.interface import implements DELEGATION_NS = 'urn:xmpp:delegation:1' FORWARDED_NS = 'urn:xmpp:forward:0' DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS) DELEGATION_MAIN_SEP = "::" DELEGATION_BARE_SEP = ":bare:" class InvalidStanza(Exception): pass class DelegationsHandler(XMPPHandler): implements(iwokkel.IDisco) def __init__(self): super(DelegationsHandler, self).__init__() def connectionInitialized(self): self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward) self._current_iqs = {} # dict of iq being handler by delegation self._xs_send = self.xmlstream.send self.xmlstream.send = self._sendHack def _sendHack(self, elt): """This method is called instead of xmlstream to control sending @param obj(domsish.Element, unicode, str): obj sent to real xmlstream """ if isinstance(elt, domish.Element) and elt.name=='iq': try: ori_iq, managed_entity = self._current_iqs.pop(elt.getAttribute('id')) if jid.JID(elt['to']) != managed_entity: log.msg("IQ id conflict: the managed entity doesn't match") raise KeyError except KeyError: # the iq is not a delegated one self._xs_send(elt) else: iq_result_elt = toResponse(ori_iq, 'result') fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS) fwd_elt.addChild(elt) self._xs_send(iq_result_elt) else: self._xs_send(elt) def _obsWrapper(self, observer, stanza): """Wrapper to observer which catch StanzaError @param observer(callable): method to wrap """ try: observer(stanza) except error.StanzaError as e: error_elt = e.toResponse(stanza) self._xs_send(error_elt) stanza.handled = True def onAdvertise(self, message): """Manage the <message/> advertising delegations""" delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() delegated = {} for delegated_elt in delegation_elt.elements(DELEGATION_NS): try: if delegated_elt.name != 'delegated': raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name)) try: namespace = delegated_elt['namespace'] except KeyError: raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element') delegated[namespace] = [] for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'): try: delegated[namespace].append(attribute_elt["name"]) except KeyError: raise InvalidStanza(u'was expecting a "name" attribute in attribute element') except InvalidStanza as e: log.msg("Invalid stanza received ({})".format(e)) log.msg(u'delegations updated:\n{}'.format( u'\n'.join([u" - namespace {}{}".format(ns, u"" if not attributes else u" with filtering on {} attribute(s)".format( u", ".join(attributes))) for ns, attributes in delegated.items()]))) if not pubsub.NS_PUBSUB in delegated: log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service") def onForward(self, iq): """Manage forwarded iq @param iq(domish.Element): full delegation stanza """ try: fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next() .elements(FORWARDED_NS, 'forwarded').next() .elements('jabber:client', 'iq').next()) except StopIteration: raise error.StanzaError('not-acceptable') managed_entity = jid.JID(fwd_iq.getAttribute('to') or fwd_iq['from']) if managed_entity.host != iq['from']: log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}" .format(iq.toXml())).encode('utf-8')) raise error.StanzaError('not-allowed') self._current_iqs[fwd_iq['id']] = (iq, managed_entity) # we now inject the element in the stream self.xmlstream.dispatch(fwd_iq) def getDiscoInfo(self, requestor, target, nodeIdentifier=''): """Manage disco nesting This method looks for DiscoHandler in sibling handlers and use it to collect main disco infos. It then filters by delegated namespace and return it. An identity is added for PEP if pubsub namespace is requested. The same features/identities are returned for main and bare nodes """ if not nodeIdentifier.startswith(DELEGATION_NS): return [] try: _, namespace = nodeIdentifier.split(DELEGATION_MAIN_SEP, 1) except ValueError: try: _, namespace = nodeIdentifier.split(DELEGATION_BARE_SEP, 1) except ValueError: log.msg("Unexpected disco node: {}".format(nodeIdentifier)) raise error.StanzaError('not-acceptable') if not namespace: log.msg("No namespace found in node {}".format(nodeIdentifier)) return [] def gotInfos(infos): ns_features = [] for info in infos: if isinstance(info, disco.DiscoFeature) and info.startswith(namespace): ns_features.append(info) elif (isinstance(info, data_form.Form) and info.formNamespace and info.formNamespace.startwith(namespace)): # extensions management (XEP-0128) ns_features.append(info) if namespace == pubsub.NS_PUBSUB: ns_features.append(disco.DiscoIdentity('pubsub', 'pep')) return ns_features for handler in self.parent.handlers: if isinstance(handler, disco.DiscoHandler): break if not isinstance(handler, disco.DiscoHandler): log.err("Can't find DiscoHandler") return [] d = handler.info(requestor, target, '') d.addCallback(gotInfos) return d def getDiscoItems(self, requestor, target, nodeIdentifier=''): return []