Mercurial > libervia-pubsub
diff sat_pubsub/delegation.py @ 287:61f92273fb69
implementation of XEP-0355 (Namespace Delegation) to use SàT Pubsub as PEP service, first draft
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 16 Apr 2015 21:06:19 +0200 |
parents | sat_pubsub/privilege.py@2f87fa282dfd |
children | 073161f6f143 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat_pubsub/delegation.py Thu Apr 16 21:06:19 2015 +0200 @@ -0,0 +1,73 @@ +#!/usr/bin/python +#-*- coding: utf-8 -*- +# +""" +Copyright (c) 2015 Jérôme Poisson + + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as published by +the Free Software Foundation, either version 3 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see <http://www.gnu.org/licenses/>. + +--- + +This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service +""" + +from wokkel.subprotocols import XMPPHandler +from wokkel import pubsub +from twisted.python import log + +DELEGATION_NS = 'urn:xmpp:delegation:1' +FORWARDED_NS = 'urn:xmpp:forward:0' +DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) + +class InvalidStanza(Exception): + pass + +class DelegationsHandler(XMPPHandler): + + def __init__(self): + super(DelegationsHandler, self).__init__() + + def connectionInitialized(self): + self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) + + def onAdvertise(self, message): + """Managage the <message/> advertising delegations""" + delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() + delegated = {} + for delegated_elt in delegation_elt.elements(DELEGATION_NS): + try: + if delegated_elt.name != 'delegated': + raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name)) + try: + namespace = delegated_elt['namespace'] + except KeyError: + raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element') + delegated[namespace] = [] + for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'): + try: + delegated[namespace].append(attribute_elt["name"]) + except KeyError: + raise InvalidStanza(u'was expecting a "name" attribute in attribute element') + except InvalidStanza as e: + log.msg("Invalid stanza received ({})".format(e)) + + log.msg(u'delegations updated:\n{}'.format( + u'\n'.join([u" - namespace {}{}".format(ns, + u"" if not attributes else u" with filtering on {} attribute(s)".format( + u", ".join(attributes))) for ns, attributes in delegated.items()]))) + + if not pubsub.NS_PUBSUB in delegated: + log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service") +