view sat_pubsub/delegation.py @ 302:b8b25efae0bc

fixed item_data issues after introduction of ItemData namedtuple
author Goffi <goffi@goffi.org>
date Wed, 25 Nov 2015 22:48:59 +0100
parents 6918a0dad359
children e6a9a3c93314
line wrap: on
line source

#!/usr/bin/python
#-*- coding: utf-8 -*-
#
"""
Copyright (c) 2015 Jérôme Poisson


This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

---

This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service
"""

from wokkel.subprotocols import XMPPHandler
from wokkel import pubsub
from wokkel import data_form
from wokkel import disco, iwokkel
from wokkel.iwokkel import IPubSubService
from twisted.python import log
from twisted.words.protocols.jabber import jid, error
from twisted.words.protocols.jabber.xmlstream import toResponse
from twisted.words.xish import domish
from zope.interface import implements

DELEGATION_NS = 'urn:xmpp:delegation:1'
FORWARDED_NS = 'urn:xmpp:forward:0'
DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS)
DELEGATION_FWD_XPATH = '/iq[@type="set"]/delegation[@xmlns="{}"]/forwarded[@xmlns="{}"]'.format(DELEGATION_NS, FORWARDED_NS)

DELEGATION_MAIN_SEP = "::"
DELEGATION_BARE_SEP = ":bare:"

class InvalidStanza(Exception):
    pass



class DelegationsHandler(XMPPHandler):
    implements(iwokkel.IDisco)
    _service_hacked = False

    def __init__(self):
        super(DelegationsHandler, self).__init__()

    def _service_hack(self):
        """Patch the PubSubService to track delegated stanzas"""
        # XXX: we need to monkey patch to track origin of the stanza in PubSubRequest.
        #      As PubSubRequest from sat.tmp.wokkel.pubsub use _request_class while
        #      original wokkel.pubsub use directly pubsub.PubSubRequest, we need to
        #      check which version is used before monkeypatching
        for handler in self.parent.handlers:
            if IPubSubService.providedBy(handler):
                if hasattr(handler, '_request_class'):
                    request_base_class = handler._request_class
                else:
                    request_base_class = pubsub.PubSubRequest

                class PubSubRequestWithDelegation(request_base_class):
                    """A PubSubReques which put an indicator if the stanza comme from delegation"""

                    @classmethod
                    def fromElement(cls, element):
                        """Check if element comme from delegation, and set a delegated flags

                        delegated flaf is either False, or it's a jid of the delegating server
                        the delegated flag must be set on element before use
                        """
                        try:
                            # __getattr__ is overriden in domish.Element, so we use __getattribute__
                            delegated = element.__getattribute__('delegated')
                        except AttributeError:
                            delegated = False
                        instance = cls.__base__.fromElement(element)
                        instance.delegated = delegated
                        return instance

                if hasattr(handler, '_request_class'):
                    handler._request_class = PubSubRequestWithDelegation
                else:
                    pubsub.PubSubRequest = PubSubRequestWithDelegation
        DelegationsHandler._service_hacked = True

    def connectionInitialized(self):
        if not self._service_hacked:
            self._service_hack()
        self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)
        self.xmlstream.addObserver(DELEGATION_FWD_XPATH, self._obsWrapper, 0, self.onForward)
        self._current_iqs = {} # dict of iq being handler by delegation
        self._xs_send = self.xmlstream.send
        self.xmlstream.send = self._sendHack

    def _sendHack(self, elt):
        """This method is called instead of xmlstream to control sending

        @param obj(domsish.Element, unicode, str): obj sent to real xmlstream
        """
        if isinstance(elt, domish.Element) and elt.name=='iq':
            try:
                id_ = elt.getAttribute('id')
                ori_iq, managed_entity = self._current_iqs[id_]
                if jid.JID(elt['to']) != managed_entity:
                    log.msg("IQ id conflict: the managed entity doesn't match (got {got} was expecting {expected})"
                            .format(got=jid.JID(elt['to']), expected=managed_entity))
                    raise KeyError
            except KeyError:
                # the iq is not a delegated one
                self._xs_send(elt)
            else:
                del self._current_iqs[id_]
                iq_result_elt = toResponse(ori_iq, 'result')
                fwd_elt = iq_result_elt.addElement('delegation', DELEGATION_NS).addElement('forwarded', FORWARDED_NS)
                fwd_elt.addChild(elt)
                elt.uri = elt.defaultUri = 'jabber:client'
                self._xs_send(iq_result_elt)
        else:
            self._xs_send(elt)

    def _obsWrapper(self, observer, stanza):
        """Wrapper to observer which catch StanzaError

        @param observer(callable): method to wrap
        """
        try:
            observer(stanza)
        except error.StanzaError as e:
            error_elt = e.toResponse(stanza)
            self._xs_send(error_elt)
        stanza.handled = True

    def onAdvertise(self, message):
        """Manage the <message/> advertising delegations"""
        delegation_elt = message.elements(DELEGATION_NS, 'delegation').next()
        delegated = {}
        for delegated_elt in delegation_elt.elements(DELEGATION_NS):
            try:
                if delegated_elt.name != 'delegated':
                    raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name))
                try:
                    namespace = delegated_elt['namespace']
                except KeyError:
                    raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element')
                delegated[namespace] = []
                for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'):
                    try:
                        delegated[namespace].append(attribute_elt["name"])
                    except KeyError:
                        raise InvalidStanza(u'was expecting a "name" attribute in attribute element')
            except InvalidStanza as e:
                log.msg("Invalid stanza received ({})".format(e))

        log.msg(u'delegations updated:\n{}'.format(
            u'\n'.join([u"    - namespace {}{}".format(ns,
            u"" if not attributes else u" with filtering on {} attribute(s)".format(
            u", ".join(attributes))) for ns, attributes in delegated.items()])))

        if not pubsub.NS_PUBSUB in delegated:
            log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service")

    def onForward(self, iq):
        """Manage forwarded iq

        @param iq(domish.Element): full delegation stanza
        """
        try:
            fwd_iq = (iq.elements(DELEGATION_NS, 'delegation').next()
                      .elements(FORWARDED_NS, 'forwarded').next()
                      .elements('jabber:client', 'iq').next())
        except StopIteration:
            raise error.StanzaError('not-acceptable')

        managed_entity = jid.JID(fwd_iq['from'])

        if managed_entity.host != iq['from']:
            log.msg((u"SECURITY WARNING: forwarded stanza doesn't come from the emitting server: {}"
                     .format(iq.toXml())).encode('utf-8'))
            raise error.StanzaError('not-allowed')

        self._current_iqs[fwd_iq['id']] = (iq, managed_entity)
        fwd_iq.delegated = True

        # we need a recipient in pubsub request for PEP
        # so we set "to" attribute if it doesn't exist
        if not fwd_iq.hasAttribute('to'):
            fwd_iq["to"] = jid.JID(fwd_iq["from"]).userhost()

        # we now inject the element in the stream
        self.xmlstream.dispatch(fwd_iq)

    def getDiscoInfo(self, requestor, target, nodeIdentifier=''):
        """Manage disco nesting

        This method looks for DiscoHandler in sibling handlers and use it to
        collect main disco infos. It then filters by delegated namespace and return it.
        An identity is added for PEP if pubsub namespace is requested.

        The same features/identities are returned for main and bare nodes
        """
        if not nodeIdentifier.startswith(DELEGATION_NS):
            return []

        try:
            _, namespace = nodeIdentifier.split(DELEGATION_MAIN_SEP, 1)
        except ValueError:
            try:
                _, namespace = nodeIdentifier.split(DELEGATION_BARE_SEP, 1)
            except ValueError:
                log.msg("Unexpected disco node: {}".format(nodeIdentifier))
                raise error.StanzaError('not-acceptable')

        if not namespace:
            log.msg("No namespace found in node {}".format(nodeIdentifier))
            return []

        def gotInfos(infos):
            ns_features = []
            for info in infos:
                if isinstance(info, disco.DiscoFeature) and info.startswith(namespace):
                    ns_features.append(info)
                elif (isinstance(info, data_form.Form) and info.formNamespace
                    and info.formNamespace.startwith(namespace)):
                    # extensions management (XEP-0128)
                    ns_features.append(info)

            if namespace == pubsub.NS_PUBSUB:
                ns_features.append(disco.DiscoIdentity('pubsub', 'pep'))

            return ns_features

        for handler in self.parent.handlers:
            if isinstance(handler, disco.DiscoHandler):
                break

        if not isinstance(handler, disco.DiscoHandler):
            log.err("Can't find DiscoHandler")
            return []

        d = handler.info(requestor, target, '')
        d.addCallback(gotInfos)
        return d

    def getDiscoItems(self, requestor, target, nodeIdentifier=''):
        return []