Mercurial > libervia-pubsub
view sat_pubsub/delegation.py @ 287:61f92273fb69
implementation of XEP-0355 (Namespace Delegation) to use SàT Pubsub as PEP service, first draft
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 16 Apr 2015 21:06:19 +0200 |
parents | sat_pubsub/privilege.py@2f87fa282dfd |
children | 073161f6f143 |
line wrap: on
line source
#!/usr/bin/python #-*- coding: utf-8 -*- # """ Copyright (c) 2015 Jérôme Poisson This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. --- This module implements XEP-0355 (Namespace delegation) to use SàT Pubsub as PEP service """ from wokkel.subprotocols import XMPPHandler from wokkel import pubsub from twisted.python import log DELEGATION_NS = 'urn:xmpp:delegation:1' FORWARDED_NS = 'urn:xmpp:forward:0' DELEGATION_ADV_XPATH = '/message/delegation[@xmlns="{}"]'.format(DELEGATION_NS) class InvalidStanza(Exception): pass class DelegationsHandler(XMPPHandler): def __init__(self): super(DelegationsHandler, self).__init__() def connectionInitialized(self): self.xmlstream.addObserver(DELEGATION_ADV_XPATH, self.onAdvertise) def onAdvertise(self, message): """Managage the <message/> advertising delegations""" delegation_elt = message.elements(DELEGATION_NS, 'delegation').next() delegated = {} for delegated_elt in delegation_elt.elements(DELEGATION_NS): try: if delegated_elt.name != 'delegated': raise InvalidStanza(u'unexpected element {}'.format(delegated_elt.name)) try: namespace = delegated_elt['namespace'] except KeyError: raise InvalidStanza(u'was expecting a "namespace" attribute in delegated element') delegated[namespace] = [] for attribute_elt in delegated_elt.elements(DELEGATION_NS, 'attribute'): try: delegated[namespace].append(attribute_elt["name"]) except KeyError: raise InvalidStanza(u'was expecting a "name" attribute in attribute element') except InvalidStanza as e: log.msg("Invalid stanza received ({})".format(e)) log.msg(u'delegations updated:\n{}'.format( u'\n'.join([u" - namespace {}{}".format(ns, u"" if not attributes else u" with filtering on {} attribute(s)".format( u", ".join(attributes))) for ns, attributes in delegated.items()]))) if not pubsub.NS_PUBSUB in delegated: log.msg(u"Didn't got pubsub delegation from server, can't act as a PEP service")