view sat_pubsub/delegation.py @ 287:61f92273fb69

implementation of XEP-0355 (Namespace Delegation) to use SàT Pubsub as PEP service, first draft
author Goffi <goffi@goffi.org>
date Thu, 16 Apr 2015 21:06:19 +0200
parents sat_pubsub/privilege.py@2f87fa282dfd
children 073161f6f143
line wrap: on
line source

#!/usr/bin/python
#-*- coding: utf-8 -*-
#
"""
Copyright (c) 2015 Jérôme Poisson


This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public License
along with this program.  If not, see <http://www.gnu.org/licenses/>.

---

This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service
"""

from wokkel.subprotocols import XMPPHandler
from wokkel import pubsub
from twisted.python import log

DELEGATION_NS = 'urn:xmpp:delegation:1'
FORWARDED_NS = 'urn:xmpp:forward:0'
DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS)

class InvalidStanza(Exception):
    pass

class DelegationsHandler(XMPPHandler):

    def __init__(self):
        super(DelegationsHandler, self).__init__()

    def connectionInitialized(self):
        self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise)

    def onAdvertise(self, message):
        """Managage the <message/> advertising delegations"""
        delegation_elt = message.elements(DELEGATION_NS, 'delegation').next()
        delegated = {}
        for delegated_elt in delegation_elt.elements(DELEGATION_NS):
            try:
                if delegated_elt.name != 'delegated':
                    raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name))
                try:
                    namespace = delegated_elt['namespace']
                except KeyError:
                    raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element')
                delegated[namespace] = []
                for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'):
                    try:
                        delegated[namespace].append(attribute_elt["name"])
                    except KeyError:
                        raise InvalidStanza(u'was expecting a "name" attribute in attribute element')
            except InvalidStanza as e:
                log.msg("Invalid stanza received ({})".format(e))

        log.msg(u'delegations updated:\n{}'.format(
            u'\n'.join([u"    - namespace {}{}".format(ns,
            u"" if not attributes else u" with filtering on {} attribute(s)".format(
            u", ".join(attributes))) for ns, attributes in delegated.items()])))

        if not pubsub.NS_PUBSUB in delegated:
            log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service")