Mercurial > libervia-backend
view sat/plugins/plugin_xep_0060.py @ 3079:f8cc88c773c8
core (memory/sqlite): added `before_uid` filter to retrieve history before a message UID
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 05 Dec 2019 22:58:06 +0100 |
parents | 73db9db8b9e1 |
children | cea52c9ddfd9 |
line wrap: on
line source
#!/usr/bin/env python3 # -*- coding: utf-8 -*- # SAT plugin for Publish-Subscribe (xep-0060) # Copyright (C) 2009-2019 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from collections import namedtuple import urllib.request, urllib.parse, urllib.error from functools import reduce from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger from sat.core import exceptions from sat.tools import sat_defer from sat.tools.common import data_format from zope.interface import implementer from wokkel import disco from wokkel import data_form from wokkel import generic # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version # mam and rsm come from sat_tmp.wokkel too from wokkel import pubsub from wokkel import rsm from wokkel import mam from twisted.words.protocols.jabber import jid, error from twisted.internet import reactor, defer log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Publish-Subscribe", C.PI_IMPORT_NAME: "XEP-0060", C.PI_TYPE: "XEP", C.PI_PROTOCOLS: ["XEP-0060"], C.PI_DEPENDENCIES: [], C.PI_RECOMMENDATIONS: ["XEP-0059", "XEP-0313"], C.PI_MAIN: "XEP_0060", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol"""), } UNSPECIFIED = "unspecified error" Extra = namedtuple("Extra", ("rsm_request", "extra")) # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None # extra is a potentially empty dict TIMEOUT = 30 class XEP_0060(object): OPT_ACCESS_MODEL = "pubsub#access_model" OPT_PERSIST_ITEMS = "pubsub#persist_items" OPT_MAX_ITEMS = "pubsub#max_items" OPT_DELIVER_PAYLOADS = "pubsub#deliver_payloads" OPT_SEND_ITEM_SUBSCRIBE = "pubsub#send_item_subscribe" OPT_NODE_TYPE = "pubsub#node_type" OPT_SUBSCRIPTION_TYPE = "pubsub#subscription_type" OPT_SUBSCRIPTION_DEPTH = "pubsub#subscription_depth" OPT_ROSTER_GROUPS_ALLOWED = "pubsub#roster_groups_allowed" OPT_PUBLISH_MODEL = "pubsub#publish_model" ACCESS_OPEN = "open" ACCESS_PRESENCE = "presence" ACCESS_ROSTER = "roster" ACCESS_PUBLISHER_ROSTER = "publisher-roster" ACCESS_AUTHORIZE = "authorize" ACCESS_WHITELIST = "whitelist" ID_SINGLETON = "current" def __init__(self, host): log.info(_("PubSub plugin initialization")) self.host = host self._rsm = host.plugins.get("XEP-0059") self._mam = host.plugins.get("XEP-0313") self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) self.rt_sessions = sat_defer.RTDeferredSessions() host.bridge.addMethod( "psNodeCreate", ".plugin", in_sign="ssa{ss}s", out_sign="s", method=self._createNode, async_=True, ) host.bridge.addMethod( "psNodeConfigurationGet", ".plugin", in_sign="sss", out_sign="a{ss}", method=self._getNodeConfiguration, async_=True, ) host.bridge.addMethod( "psNodeConfigurationSet", ".plugin", in_sign="ssa{ss}s", out_sign="", method=self._setNodeConfiguration, async_=True, ) host.bridge.addMethod( "psNodeAffiliationsGet", ".plugin", in_sign="sss", out_sign="a{ss}", method=self._getNodeAffiliations, async_=True, ) host.bridge.addMethod( "psNodeAffiliationsSet", ".plugin", in_sign="ssa{ss}s", out_sign="", method=self._setNodeAffiliations, async_=True, ) host.bridge.addMethod( "psNodeSubscriptionsGet", ".plugin", in_sign="sss", out_sign="a{ss}", method=self._getNodeSubscriptions, async_=True, ) host.bridge.addMethod( "psNodeSubscriptionsSet", ".plugin", in_sign="ssa{ss}s", out_sign="", method=self._setNodeSubscriptions, async_=True, ) host.bridge.addMethod( "psNodePurge", ".plugin", in_sign="sss", out_sign="", method=self._purgeNode, async_=True, ) host.bridge.addMethod( "psNodeDelete", ".plugin", in_sign="sss", out_sign="", method=self._deleteNode, async_=True, ) host.bridge.addMethod( "psNodeWatchAdd", ".plugin", in_sign="sss", out_sign="", method=self._addWatch, async_=False, ) host.bridge.addMethod( "psNodeWatchRemove", ".plugin", in_sign="sss", out_sign="", method=self._removeWatch, async_=False, ) host.bridge.addMethod( "psAffiliationsGet", ".plugin", in_sign="sss", out_sign="a{ss}", method=self._getAffiliations, async_=True, ) host.bridge.addMethod( "psItemsGet", ".plugin", in_sign="ssiassa{ss}s", out_sign="(asa{ss})", method=self._getItems, async_=True, ) host.bridge.addMethod( "psItemSend", ".plugin", in_sign="ssssa{ss}s", out_sign="s", method=self._sendItem, async_=True, ) host.bridge.addMethod( "psItemsSend", ".plugin", in_sign="ssasa{ss}s", out_sign="as", method=self._sendItems, async_=True, ) host.bridge.addMethod( "psRetractItem", ".plugin", in_sign="sssbs", out_sign="", method=self._retractItem, async_=True, ) host.bridge.addMethod( "psRetractItems", ".plugin", in_sign="ssasbs", out_sign="", method=self._retractItems, async_=True, ) host.bridge.addMethod( "psSubscribe", ".plugin", in_sign="ssa{ss}s", out_sign="s", method=self._subscribe, async_=True, ) host.bridge.addMethod( "psUnsubscribe", ".plugin", in_sign="sss", out_sign="", method=self._unsubscribe, async_=True, ) host.bridge.addMethod( "psSubscriptionsGet", ".plugin", in_sign="sss", out_sign="aa{ss}", method=self._subscriptions, async_=True, ) host.bridge.addMethod( "psSubscribeToMany", ".plugin", in_sign="a(ss)sa{ss}s", out_sign="s", method=self._subscribeToMany, ) host.bridge.addMethod( "psGetSubscribeRTResult", ".plugin", in_sign="ss", out_sign="(ua(sss))", method=self._manySubscribeRTResult, async_=True, ) host.bridge.addMethod( "psGetFromMany", ".plugin", in_sign="a(ss)ia{ss}s", out_sign="s", method=self._getFromMany, ) host.bridge.addMethod( "psGetFromManyRTResult", ".plugin", in_sign="ss", out_sign="(ua(sssasa{ss}))", method=self._getFromManyRTResult, async_=True, ) # high level observer method host.bridge.addSignal( "psEvent", ".plugin", signature="ssssss" ) # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) host.bridge.addSignal( "psEventRaw", ".plugin", signature="sssass" ) # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile def getHandler(self, client): client.pubsub_client = SatPubSubClient(self.host, self) return client.pubsub_client @defer.inlineCallbacks def profileConnected(self, client): client.pubsub_watching = set() try: client.pubsub_service = jid.JID( self.host.memory.getConfig("", "pubsub_service") ) except RuntimeError: log.info( _( "Can't retrieve pubsub_service from conf, we'll use first one that we find" ) ) client.pubsub_service = yield self.host.findServiceEntity( client, "pubsub", "service" ) def getFeatures(self, profile): try: client = self.host.getClient(profile) except exceptions.ProfileNotSetError: return {} try: return { "service": client.pubsub_service.full() if client.pubsub_service is not None else "" } except AttributeError: if self.host.isConnected(profile): log.debug("Profile is not connected, service is not checked yet") else: log.error("Service should be available !") return {} def parseExtra(self, extra): """Parse extra dictionnary used bridge's extra dictionnaries @param extra(dict): extra data used to configure request @return(Extra): filled Extra instance """ if extra is None: rsm_request = None extra = {} else: # order-by if C.KEY_ORDER_BY in extra: # FIXME: we temporarily manage only one level of ordering # we need to switch to a fully serialised extra data # to be able to encode a whole ordered list extra[C.KEY_ORDER_BY] = [extra.pop(C.KEY_ORDER_BY)] # rsm if self._rsm is None: rsm_request = None else: rsm_request = self._rsm.parseExtra(extra) # mam if self._mam is None: mam_request = None else: mam_request = self._mam.parseExtra(extra, with_rsm=False) if mam_request is not None: assert "mam" not in extra extra["mam"] = mam_request return Extra(rsm_request, extra) def addManagedNode(self, node, **kwargs): """Add a handler for a node @param node(unicode): node to monitor all node *prefixed* with this one will be triggered @param **kwargs: method(s) to call when the node is found the method must be named after PubSub constants in lower case and suffixed with "_cb" e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE """ assert node is not None assert kwargs callbacks = self._node_cb.setdefault(node, {}) for event, cb in kwargs.items(): event_name = event[:-3] assert event_name in C.PS_EVENTS callbacks.setdefault(event_name, []).append(cb) def removeManagedNode(self, node, *args): """Add a handler for a node @param node(unicode): node to monitor @param *args: callback(s) to remove """ assert args try: registred_cb = self._node_cb[node] except KeyError: pass else: for callback in args: for event, cb_list in registred_cb.items(): try: cb_list.remove(callback) except ValueError: pass else: log.debug( "removed callback {cb} for event {event} on node {node}".format( cb=callback, event=event, node=node ) ) if not cb_list: del registred_cb[event] if not registred_cb: del self._node_cb[node] return log.error( "Trying to remove inexistant callback {cb} for node {node}".format( cb=callback, node=node ) ) # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): # """Retrieve the name of the nodes that are accessible on the target service. # @param service (JID): target service # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) # @param profile (str): %(doc_profile)s # @return: deferred which fire a list of nodes # """ # client = self.host.getClient(profile) # d = self.host.getDiscoItems(client, service, nodeIdentifier) # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) # return d # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): # """Retrieve the name of the nodes to which the profile is subscribed on the target service. # @param service (JID): target service # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) # @param filter_ (str): filter the result according to the given subscription type: # - None: do not filter # - 'pending': subscription has not been approved yet by the node owner # - 'unconfigured': subscription options have not been configured yet # - 'subscribed': subscription is complete # @param profile (str): %(doc_profile)s # @return: Deferred list[str] # """ # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) # return d def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) d = self.sendItem( client, service, nodeIdentifier, payload, item_id or None, extra ) d.addCallback(lambda ret: ret or "") return d def _sendItems(self, service, nodeIdentifier, items, extra=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) try: items = [generic.parseXml(item.encode('utf-8')) for item in items] except Exception as e: raise exceptions.DataError(_("Can't parse items: {msg}").format( msg=e)) d = self.sendItems( client, service, nodeIdentifier, items, extra ) return d def _getPublishedItemId(self, iq_elt, original_id): """return item of published id if found in answer if not found original_id is returned, or empty string if it is None or empty string """ try: item_id = iq_elt.pubsub.publish.item["id"] except (AttributeError, KeyError): item_id = None return item_id or original_id def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, extra=None): """High level method to send one item @param service(jid.JID, None): service to send the item to None to use PEP @param NodeIdentifier(unicode): PubSub node to use @param payload(domish.Element, unicode): payload of the item to send @param item_id(unicode, None): id to use or None to create one @param extra(dict, None): extra option, not used yet @return (unicode, None): id of the created item """ item_elt = pubsub.Item(id=item_id, payload=payload) d = self.publish(client, service, nodeIdentifier, [item_elt]) d.addCallback(self._getPublishedItemId, item_id) return d def _publishCb(self, iq_result): """Parse publish result, and return ids given by pubsub service""" try: item_ids = [item['id'] for item in iq_result.pubsub.publish.elements(pubsub.NS_PUBSUB, 'item')] except AttributeError: return [] return item_ids def sendItems(self, client, service, nodeIdentifier, items, extra=None): """High level method to send several items at once @param service(jid.JID, None): service to send the item to None to use PEP @param NodeIdentifier(unicode): PubSub node to use @param items(list[domish.Element]): whole item elements to send, "id" will be used if set @param extra(dict, None): extra option, not used yet @return (list[unicode]): ids of the created items """ parsed_items = [] for item in items: if item.name != 'item': raise exceptions.DataError(_("Invalid item: {xml}").format(item.toXml())) item_id = item.getAttribute("id") parsed_items.append(pubsub.Item(id=item_id, payload=item.firstChildElement())) d = self.publish(client, service, nodeIdentifier, parsed_items) d.addCallback(self._publishCb) return d def publish(self, client, service, nodeIdentifier, items=None): return client.pubsub_client.publish( service, nodeIdentifier, items, client.pubsub_client.parent.jid ) def _unwrapMAMMessage(self, message_elt): try: item_elt = reduce( lambda elt, ns_name: next(elt.elements(*ns_name)), (message_elt, (mam.NS_MAM, "result"), (C.NS_FORWARD, "forwarded"), (C.NS_CLIENT, "message"), ("http://jabber.org/protocol/pubsub#event", "event"), ("http://jabber.org/protocol/pubsub#event", "items"), ("http://jabber.org/protocol/pubsub#event", "item"), )) except StopIteration: raise exceptions.DataError("Can't find Item in MAM message element") return item_elt def _getItems(self, service="", node="", max_items=10, item_ids=None, sub_id=None, extra_dict=None, profile_key=C.PROF_KEY_NONE): """Get items from pubsub node @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit """ client = self.host.getClient(profile_key) service = jid.JID(service) if service else None max_items = None if max_items == C.NO_LIMIT else max_items extra = self.parseExtra(extra_dict) d = self.getItems( client, service, node or None, max_items or None, item_ids, sub_id or None, extra.rsm_request, extra.extra, ) d.addCallback(self.transItemsData) return d def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, extra=None): """Retrieve pubsub items from a node. @param service (JID, None): pubsub service. @param node (str): node id. @param max_items (int): optional limit on the number of retrieved items. @param item_ids (list[str]): identifiers of the items to be retrieved (can't be used with rsm_request). @param sub_id (str): optional subscription identifier. @param rsm_request (rsm.RSMRequest): RSM request data @return: a deferred couple (list[dict], dict) containing: - list of items - metadata with the following keys: - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse - service, node: service and node used """ if item_ids and max_items is not None: max_items = None if rsm_request and item_ids: raise ValueError("items_id can't be used with rsm") if extra is None: extra = {} try: mam_query = extra["mam"] except KeyError: d = client.pubsub_client.items( service = service, nodeIdentifier = node, maxItems = max_items, subscriptionIdentifier = sub_id, sender = None, itemIdentifiers = item_ids, orderBy = extra.get(C.KEY_ORDER_BY), rsm_request = rsm_request ) # we have no MAM data here, so we add None d.addCallback(lambda data: data + (None,)) d.addErrback(sat_defer.stanza2NotFound) d.addTimeout(TIMEOUT, reactor) else: # if mam is requested, we have to do a totally different query if self._mam is None: raise exceptions.NotFound("MAM (XEP-0313) plugin is not available") if max_items is not None: raise exceptions.DataError("max_items parameter can't be used with MAM") if item_ids: raise exceptions.DataError("items_ids parameter can't be used with MAM") if mam_query.node is None: mam_query.node = node elif mam_query.node != node: raise exceptions.DataError( "MAM query node is incoherent with getItems's node" ) if mam_query.rsm is None: mam_query.rsm = rsm_request else: if mam_query.rsm != rsm_request: raise exceptions.DataError( "Conflict between RSM request and MAM's RSM request" ) d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) try: subscribe = C.bool(extra["subscribe"]) except KeyError: subscribe = False def subscribeEb(failure, service, node): failure.trap(error.StanzaError) log.warning( "Could not subscribe to node {} on service {}: {}".format( node, str(service), str(failure.value) ) ) def doSubscribe(data): self.subscribe(client, service, node).addErrback( subscribeEb, service, node ) return data if subscribe: d.addCallback(doSubscribe) def addMetadata(result): # TODO: handle the third argument (mam_response) items, rsm_response, mam_response = result service_jid = service if service else client.jid.userhostJID() metadata = { "service": service_jid, "node": node, "uri": self.getNodeURI(service_jid, node), } if rsm_request is not None and rsm_response is not None: metadata.update( { "rsm_" + key: value for key, value in rsm_response.toDict().items() } ) if mam_response is not None: for key, value in mam_response.items(): metadata["mam_" + key] = value return (items, metadata) d.addCallback(addMetadata) return d # @defer.inlineCallbacks # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): # """Massively retrieve pubsub items from many nodes. # @param service (JID): target service. # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. # @param max_items (int): optional limit on the number of retrieved items *per node*. # @param sub_id (str): optional subscription identifier. # @param rsm (dict): RSM request data # @param profile_key (str): %(doc_profile_key)s # @return: a deferred dict with: # - key: a value in (a subset of) data.keys() # - couple (list[dict], dict) containing: # - list of items # - RSM response data # """ # client = self.host.getClient(profile_key) # found_nodes = yield self.listNodes(service, profile=client.profile) # d_dict = {} # for publisher, node in data.items(): # if node not in found_nodes: # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) # continue # avoid pubsub "item-not-found" error # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) # defer.returnValue(d_dict) def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.getOptions( service, nodeIdentifier, subscriber, subscriptionIdentifier ) def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.setOptions( service, nodeIdentifier, subscriber, options, subscriptionIdentifier ) def _createNode(self, service_s, nodeIdentifier, options, profile_key): client = self.host.getClient(profile_key) return self.createNode( client, jid.JID(service_s) if service_s else None, nodeIdentifier, options ) def createNode(self, client, service, nodeIdentifier=None, options=None): """Create a new node @param service(jid.JID): PubSub service, @param NodeIdentifier(unicode, None): node name use None to create instant node (identifier will be returned by this method) @param option(dict[unicode, unicode], None): node configuration options @return (unicode): identifier of the created node (may be different from requested name) """ # TODO: if pubsub service doesn't hande publish-options, configure it in a second time return client.pubsub_client.createNode(service, nodeIdentifier, options) @defer.inlineCallbacks def createIfNewNode(self, client, service, nodeIdentifier, options=None): """Helper method similar to createNode, but will not fail in case of conflict""" try: yield self.createNode(client, service, nodeIdentifier, options) except error.StanzaError as e: if e.condition == "conflict": pass else: raise e def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getConfiguration( client, jid.JID(service_s) if service_s else None, nodeIdentifier ) def serialize(form): # FIXME: better more generic dataform serialisation should be available in SàT return {f.var: str(f.value) for f in list(form.fields.values())} d.addCallback(serialize) return d def getConfiguration(self, client, service, nodeIdentifier): request = pubsub.PubSubRequest("configureGet") request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq): form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG) form.typeCheck() return form d = request.send(client.xmlstream) d.addCallback(cb) return d def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): client = self.host.getClient(profile_key) d = self.setConfiguration( client, jid.JID(service_s) if service_s else None, nodeIdentifier, options ) return d def setConfiguration(self, client, service, nodeIdentifier, options): request = pubsub.PubSubRequest("configureSet") request.recipient = service request.nodeIdentifier = nodeIdentifier form = data_form.Form( formType="submit", formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG ) form.makeFields(options) request.options = form d = request.send(client.xmlstream) return d def _getAffiliations(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getAffiliations( client, jid.JID(service_s) if service_s else None, nodeIdentifier or None ) return d def getAffiliations(self, client, service, nodeIdentifier=None): """Retrieve affiliations of an entity @param nodeIdentifier(unicode, None): node to get affiliation from None to get all nodes affiliations for this service """ request = pubsub.PubSubRequest("affiliations") request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq_elt): try: affiliations_elt = next( iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "affiliations")) ) except StopIteration: raise ValueError( _("Invalid result: missing <affiliations> element: {}").format( iq_elt.toXml ) ) try: return { e["node"]: e["affiliation"] for e in affiliations_elt.elements((pubsub.NS_PUBSUB, "affiliation")) } except KeyError: raise ValueError( _("Invalid result: bad <affiliation> element: {}").format( iq_elt.toXml ) ) d = request.send(client.xmlstream) d.addCallback(cb) return d def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getNodeAffiliations( client, jid.JID(service_s) if service_s else None, nodeIdentifier ) d.addCallback( lambda affiliations: {j.full(): a for j, a in affiliations.items()} ) return d def getNodeAffiliations(self, client, service, nodeIdentifier): """Retrieve affiliations of a node owned by profile""" request = pubsub.PubSubRequest("affiliationsGet") request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq_elt): try: affiliations_elt = next( iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, "affiliations")) ) except StopIteration: raise ValueError( _("Invalid result: missing <affiliations> element: {}").format( iq_elt.toXml ) ) try: return { jid.JID(e["jid"]): e["affiliation"] for e in affiliations_elt.elements( (pubsub.NS_PUBSUB_OWNER, "affiliation") ) } except KeyError: raise ValueError( _("Invalid result: bad <affiliation> element: {}").format( iq_elt.toXml ) ) d = request.send(client.xmlstream) d.addCallback(cb) return d def _setNodeAffiliations( self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE ): client = self.host.getClient(profile_key) affiliations = { jid.JID(jid_): affiliation for jid_, affiliation in affiliations.items() } d = self.setNodeAffiliations( client, jid.JID(service_s) if service_s else None, nodeIdentifier, affiliations, ) return d def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): """Update affiliations of a node owned by profile @param affiliations(dict[jid.JID, unicode]): affiliations to set check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations """ request = pubsub.PubSubRequest("affiliationsSet") request.recipient = service request.nodeIdentifier = nodeIdentifier request.affiliations = affiliations d = request.send(client.xmlstream) return d def _purgeNode(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) return self.purgeNode( client, jid.JID(service_s) if service_s else None, nodeIdentifier ) def purgeNode(self, client, service, nodeIdentifier): return client.pubsub_client.purgeNode(service, nodeIdentifier) def _deleteNode(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) return self.deleteNode( client, jid.JID(service_s) if service_s else None, nodeIdentifier ) def deleteNode(self, client, service, nodeIdentifier): return client.pubsub_client.deleteNode(service, nodeIdentifier) def _addWatch(self, service_s, node, profile_key): """watch modifications on a node This method should only be called from bridge """ client = self.host.getClient(profile_key) service = jid.JID(service_s) if service_s else client.jid.userhostJID() client.pubsub_watching.add((service, node)) def _removeWatch(self, service_s, node, profile_key): """remove a node watch This method should only be called from bridge """ client = self.host.getClient(profile_key) service = jid.JID(service_s) if service_s else client.jid.userhostJID() client.pubsub_watching.remove((service, node)) def _retractItem( self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key ): return self._retractItems( service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key ) def _retractItems( self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key ): return self.retractItems( jid.JID(service_s) if service_s else None, nodeIdentifier, itemIdentifiers, notify, profile_key, ) def retractItems( self, service, nodeIdentifier, itemIdentifiers, notify=True, profile_key=C.PROF_KEY_NONE, ): client = self.host.getClient(profile_key) return client.pubsub_client.retractItems( service, nodeIdentifier, itemIdentifiers, notify=True ) def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) d = self.subscribe(client, service, nodeIdentifier, options=options or None) d.addCallback(lambda subscription: subscription.subscriptionIdentifier or "") return d def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe return client.pubsub_client.subscribe( service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options ) def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) return self.unsubscribe(client, service, nodeIdentifier) def unsubscribe( self, client, service, nodeIdentifier, sub_jid=None, subscriptionIdentifier=None, sender=None, ): return client.pubsub_client.unsubscribe( service, nodeIdentifier, sub_jid or client.jid.userhostJID(), subscriptionIdentifier, sender, ) def _subscriptions(self, service, nodeIdentifier="", profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) def gotSubscriptions(subscriptions): # we replace pubsub.Subscription instance by dict that we can serialize for idx, sub in enumerate(subscriptions): sub_dict = { "node": sub.nodeIdentifier, "subscriber": sub.subscriber.full(), "state": sub.state, } if sub.subscriptionIdentifier is not None: sub_dict["id"] = sub.subscriptionIdentifier subscriptions[idx] = sub_dict return subscriptions d = self.subscriptions(client, service, nodeIdentifier or None) d.addCallback(gotSubscriptions) return d def subscriptions(self, client, service, nodeIdentifier=None): """retrieve subscriptions from a service @param service(jid.JID): PubSub service @param nodeIdentifier(unicode, None): node to check None to get all subscriptions """ return client.pubsub_client.subscriptions(service, nodeIdentifier) ## misc tools ## def getNodeURI(self, service, node, item=None): """Return XMPP URI of a PubSub node @param service(jid.JID): PubSub service @param node(unicode): node @return (unicode): URI of the node """ assert service is not None # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) # use ";" as a separator. So if more than one value is used in query_data, # urlencode MUST NOT BE USED. query_data = [("node", node.encode("utf-8"))] if item is not None: query_data.append(("item", item.encode("utf-8"))) return "xmpp:{service}?;{query}".format( service=service.userhost(), query=urllib.parse.urlencode(query_data) ) ## methods to manage several stanzas/jids at once ## # generic # def getRTResults( self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE ): return self.rt_sessions.getResults(session_id, on_success, on_error, profile) def transItemsData(self, items_data, item_cb=lambda item: item.toXml(), serialise=False): """Helper method to transform result from [getItems] the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) as returned by [getItems]. metadata values are then casted to unicode and each item is passed to items_cb then optionally serialised with data_format.serialise. @param items_data(tuple): tuple returned by [getItems] @param item_cb(callable): method to transform each item @param serialise(bool): if True do a data_format.serialise after applying item_cb @return (tuple): a serialised form ready to go throught bridge """ items, metadata = items_data if serialise: items = [data_format.serialise(item_cb(item)) for item in items] else: items = [item_cb(item) for item in items] return ( items, {key: str(value) for key, value in metadata.items()}, ) def transItemsDataD(self, items_data, item_cb, serialise=False): """Helper method to transform result from [getItems], deferred version the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) as returned by [getItems]. metadata values are then casted to unicode and each item is passed to items_cb then optionally serialised with data_format.serialise. An errback is added to item_cb, and when it is fired the value is filtered from final items @param items_data(tuple): tuple returned by [getItems] @param item_cb(callable): method to transform each item (must return a deferred) @param serialise(bool): if True do a data_format.serialise after applying item_cb @return (tuple): a deferred which fire a serialised form ready to go throught bridge """ items, metadata = items_data def eb(failure_): log.warning(f"Error while serialising/parsing item: {failure_.value}") d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) def finishSerialisation(parsed_items): if serialise: items = [data_format.serialise(i) for i in parsed_items if i is not None] else: items = [i for i in parsed_items if i is not None] return ( items, {key: str(value) for key, value in metadata.items()}, ) d.addCallback(finishSerialisation) return d def serDList(self, results, failure_result=None): """Serialise a DeferredList result @param results: DeferredList results @param failure_result: value to use as value for failed Deferred (default: empty tuple) @return (list): list with: - failure: empty in case of success, else error message - result """ if failure_result is None: failure_result = () return [ ("", result) if success else (str(result.result) or UNSPECIFIED, failure_result) for success, result in results ] # subscribe # def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getNodeSubscriptions( client, jid.JID(service_s) if service_s else None, nodeIdentifier ) d.addCallback( lambda subscriptions: {j.full(): a for j, a in subscriptions.items()} ) return d def getNodeSubscriptions(self, client, service, nodeIdentifier): """Retrieve subscriptions to a node @param nodeIdentifier(unicode): node to get subscriptions from """ if not nodeIdentifier: raise exceptions.DataError("node identifier can't be empty") request = pubsub.PubSubRequest("subscriptionsGet") request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq_elt): try: subscriptions_elt = next( iq_elt.pubsub.elements((pubsub.NS_PUBSUB, "subscriptions")) ) except StopIteration: raise ValueError( _("Invalid result: missing <subscriptions> element: {}").format( iq_elt.toXml ) ) except AttributeError as e: raise ValueError(_("Invalid result: {}").format(e)) try: return { jid.JID(s["jid"]): s["subscription"] for s in subscriptions_elt.elements( (pubsub.NS_PUBSUB, "subscription") ) } except KeyError: raise ValueError( _("Invalid result: bad <subscription> element: {}").format( iq_elt.toXml ) ) d = request.send(client.xmlstream) d.addCallback(cb) return d def _setNodeSubscriptions( self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE ): client = self.host.getClient(profile_key) subscriptions = { jid.JID(jid_): subscription for jid_, subscription in subscriptions.items() } d = self.setNodeSubscriptions( client, jid.JID(service_s) if service_s else None, nodeIdentifier, subscriptions, ) return d def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): """Set or update subscriptions of a node owned by profile @param subscriptions(dict[jid.JID, unicode]): subscriptions to set check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions """ request = pubsub.PubSubRequest("subscriptionsSet") request.recipient = service request.nodeIdentifier = nodeIdentifier request.subscriptions = { pubsub.Subscription(nodeIdentifier, jid_, state) for jid_, state in subscriptions.items() } d = request.send(client.xmlstream) return d def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for subcribeToManu session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of tuple(unicode, unicode, bool, unicode) with: - service: pubsub service - and node: pubsub node - failure(unicode): empty string in case of success, error message else @param profile_key: %(doc_profile_key)s """ profile = self.host.getClient(profile_key).profile d = self.rt_sessions.getResults( session_id, on_success=lambda result: "", on_error=lambda failure: str(failure.value), profile=profile, ) # we need to convert jid.JID to unicode with full() to serialise it for the bridge d.addCallback( lambda ret: ( ret[0], [ (service.full(), node, "" if success else failure or UNSPECIFIED) for (service, node), (success, failure) in ret[1].items() ], ) ) return d def _subscribeToMany( self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE ): return self.subscribeToMany( [(jid.JID(service), str(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key, ) def subscribeToMany( self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE ): """Subscribe to several nodes at once. @param node_data (iterable[tuple]): iterable of tuple (service, node) where: - service (jid.JID) is the pubsub service - node (unicode) is the node to subscribe to @param subscriber (jid.JID): optional subscription identifier. @param options (dict): subscription options @param profile_key (str): %(doc_profile_key)s @return (str): RT Deferred session id """ client = self.host.getClient(profile_key) deferreds = {} for service, node in node_data: deferreds[(service, node)] = client.pubsub_client.subscribe( service, node, subscriber, options=options ) return self.rt_sessions.newSession(deferreds, client.profile) # found_nodes = yield self.listNodes(service, profile=client.profile) # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) # d_list = [] # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): # if nodeIdentifier not in found_nodes: # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) # continue # avoid sat-pubsub "SubscriptionExists" error # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) # defer.returnValue(d_list) # get # def _getFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for getFromMany session @param session_id: id of the real-time deferred session @param profile_key: %(doc_profile_key)s @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of tuple with - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - items (list[s]): raw XML of items - metadata(dict): serialised metadata """ profile = self.host.getClient(profile_key).profile d = self.rt_sessions.getResults( session_id, on_success=lambda result: ("", self.transItemsData(result)), on_error=lambda failure: (str(failure.value) or UNSPECIFIED, ([], {})), profile=profile, ) d.addCallback( lambda ret: ( ret[0], [ (service.full(), node, failure, items, metadata) for (service, node), (success, (failure, (items, metadata))) in ret[ 1 ].items() ], ) ) return d def _getFromMany( self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE ): """ @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit """ max_item = None if max_item == C.NO_LIMIT else max_item extra = self.parseExtra(extra_dict) return self.getFromMany( [(jid.JID(service), str(node)) for service, node in node_data], max_item, extra.rsm_request, extra.extra, profile_key, ) def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get items from many nodes at once @param node_data (iterable[tuple]): iterable of tuple (service, node) where: - service (jid.JID) is the pubsub service - node (unicode) is the node to get items from @param max_items (int): optional limit on the number of retrieved items. @param rsm_request (RSMRequest): RSM request data @param profile_key (unicode): %(doc_profile_key)s @return (str): RT Deferred session id """ client = self.host.getClient(profile_key) deferreds = {} for service, node in node_data: deferreds[(service, node)] = self.getItems( client, service, node, max_item, rsm_request=rsm_request, extra=extra ) return self.rt_sessions.newSession(deferreds, client.profile) @implementer(disco.IDisco) class SatPubSubClient(rsm.PubSubClient): def __init__(self, host, parent_plugin): self.host = host self.parent_plugin = parent_plugin rsm.PubSubClient.__init__(self) def connectionInitialized(self): rsm.PubSubClient.connectionInitialized(self) def _getNodeCallbacks(self, node, event): """Generate callbacks from given node and event @param node(unicode): node used for the item any registered node which prefix the node will match @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE @return (iterator[callable]): callbacks for this node/event """ for registered_node, callbacks_dict in self.parent_plugin._node_cb.items(): if not node.startswith(registered_node): continue try: for callback in callbacks_dict[event]: yield callback except KeyError: continue def itemsReceived(self, event): log.debug("Pubsub items received") for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): callback(self.parent, event) client = self.parent if (event.sender, event.nodeIdentifier) in client.pubsub_watching: raw_items = [i.toXml() for i in event.items] self.host.bridge.psEventRaw( event.sender.full(), event.nodeIdentifier, C.PS_ITEMS, raw_items, client.profile, ) def deleteReceived(self, event): log.debug(("Publish node deleted")) for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): callback(self.parent, event) client = self.parent if (event.sender, event.nodeIdentifier) in client.pubsub_watching: self.host.bridge.psEventRaw( event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile ) def subscriptions(self, service, nodeIdentifier, sender=None): """Return the list of subscriptions to the given service and node. @param service: The publish subscribe service to retrieve the subscriptions from. @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). @type nodeIdentifier: C{unicode} @return (list[pubsub.Subscription]): list of subscriptions """ request = pubsub.PubSubRequest("subscriptions") request.recipient = service request.nodeIdentifier = nodeIdentifier request.sender = sender d = request.send(self.xmlstream) def cb(iq): subs = [] for subscription_elt in iq.pubsub.subscriptions.elements( pubsub.NS_PUBSUB, "subscription" ): subscription = pubsub.Subscription( subscription_elt["node"], jid.JID(subscription_elt["jid"]), subscription_elt["subscription"], subscriptionIdentifier=subscription_elt.getAttribute("subid"), ) subs.append(subscription) return subs return d.addCallback(cb) def purgeNode(self, service, nodeIdentifier): """Purge a node (i.e. delete all items from it) @param service(jid.JID, None): service to send the item to None to use PEP @param NodeIdentifier(unicode): PubSub node to use """ # TODO: propose this upstream and remove it once merged request = pubsub.PubSubRequest('purge') request.recipient = service request.nodeIdentifier = nodeIdentifier return request.send(self.xmlstream) def getDiscoInfo(self, requestor, service, nodeIdentifier=""): disco_info = [] self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) return disco_info def getDiscoItems(self, requestor, service, nodeIdentifier=""): return []