Mercurial > libervia-backend
view src/plugins/plugin_xep_0060.py @ 2451:d1153ce68ca0
tools (config): complexe data can now be set using json and the "_json" suffix
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 30 Nov 2017 20:53:47 +0100 |
parents | 30278ea1ca7c |
children | a9c092bf4ee9 |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # SAT plugin for Publish-Subscribe (xep-0060) # Copyright (C) 2009-2017 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.i18n import _ from sat.core.constants import Const as C from sat.core.log import getLogger log = getLogger(__name__) from sat.core import exceptions from sat.tools import sat_defer from twisted.words.protocols.jabber import jid, error from twisted.internet import defer from wokkel import disco from wokkel import data_form from zope.interface import implements from collections import namedtuple import urllib import datetime from dateutil import tz # XXX: sat_tmp.wokkel.pubsub is actually use instead of wokkel version # mam and rsm come from sat_tmp.wokkel too from wokkel import pubsub from wokkel import rsm from wokkel import mam PLUGIN_INFO = { C.PI_NAME: "Publish-Subscribe", C.PI_IMPORT_NAME: "XEP-0060", C.PI_TYPE: "XEP", C.PI_PROTOCOLS: ["XEP-0060"], C.PI_DEPENDENCIES: [], C.PI_RECOMMENDATIONS: ["XEP-0313"], C.PI_MAIN: "XEP_0060", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Implementation of PubSub Protocol""") } UNSPECIFIED = "unspecified error" MAM_FILTER = "mam_filter_" Extra = namedtuple('Extra', ('rsm_request', 'extra')) # rsm_request is the rsm.RSMRequest build with rsm_ prefixed keys, or None # extra is a potentially empty dict class XEP_0060(object): OPT_ACCESS_MODEL = 'pubsub#access_model' OPT_PERSIST_ITEMS = 'pubsub#persist_items' OPT_MAX_ITEMS = 'pubsub#max_items' OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads' OPT_SEND_ITEM_SUBSCRIBE = 'pubsub#send_item_subscribe' OPT_NODE_TYPE = 'pubsub#node_type' OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type' OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth' OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed' OPT_PUBLISH_MODEL = 'pubsub#publish_model' ACCESS_OPEN = 'open' ACCESS_PRESENCE = 'presence' ACCESS_ROSTER = 'roster' ACCESS_PUBLISHER_ROSTER = 'publisher-roster' ACCESS_AUTHORIZE = 'authorize' ACCESS_WHITELIST = 'whitelist' def __init__(self, host): log.info(_(u"PubSub plugin initialization")) self.host = host self._mam = host.plugins.get('XEP-0313') self._node_cb = {} # dictionnary of callbacks for node (key: node, value: list of callbacks) self.rt_sessions = sat_defer.RTDeferredSessions() host.bridge.addMethod("psNodeCreate", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self._createNode, async=True) host.bridge.addMethod("psNodeConfigurationGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeConfiguration, async=True) host.bridge.addMethod("psNodeConfigurationSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeConfiguration, async=True) host.bridge.addMethod("psNodeAffiliationsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeAffiliations, async=True) host.bridge.addMethod("psNodeAffiliationsSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeAffiliations, async=True) host.bridge.addMethod("psNodeSubscriptionsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getNodeSubscriptions, async=True) host.bridge.addMethod("psNodeSubscriptionsSet", ".plugin", in_sign='ssa{ss}s', out_sign='', method=self._setNodeSubscriptions, async=True) host.bridge.addMethod("psNodeDelete", ".plugin", in_sign='sss', out_sign='', method=self._deleteNode, async=True) host.bridge.addMethod("psNodeWatchAdd", ".plugin", in_sign='sss', out_sign='', method=self._addWatch, async=False) host.bridge.addMethod("psNodeWatchRemove", ".plugin", in_sign='sss', out_sign='', method=self._removeWatch, async=False) host.bridge.addMethod("psAffiliationsGet", ".plugin", in_sign='sss', out_sign='a{ss}', method=self._getAffiliations, async=True) host.bridge.addMethod("psItemsGet", ".plugin", in_sign='ssiassa{ss}s', out_sign='(asa{ss})', method=self._getItems, async=True) host.bridge.addMethod("psItemSend", ".plugin", in_sign='ssssa{ss}s', out_sign='s', method=self._sendItem, async=True) host.bridge.addMethod("psRetractItem", ".plugin", in_sign='sssbs', out_sign='', method=self._retractItem, async=True) host.bridge.addMethod("psRetractItems", ".plugin", in_sign='ssasbs', out_sign='', method=self._retractItems, async=True) host.bridge.addMethod("psSubscribe", ".plugin", in_sign='ssa{ss}s', out_sign='s', method=self._subscribe, async=True) host.bridge.addMethod("psUnsubscribe", ".plugin", in_sign='sss', out_sign='', method=self._unsubscribe, async=True) host.bridge.addMethod("psSubscriptionsGet", ".plugin", in_sign='sss', out_sign='aa{ss}', method=self._subscriptions, async=True) host.bridge.addMethod("psSubscribeToMany", ".plugin", in_sign='a(ss)sa{ss}s', out_sign='s', method=self._subscribeToMany) host.bridge.addMethod("psGetSubscribeRTResult", ".plugin", in_sign='ss', out_sign='(ua(sss))', method=self._manySubscribeRTResult, async=True) host.bridge.addMethod("psGetFromMany", ".plugin", in_sign='a(ss)ia{ss}s', out_sign='s', method=self._getFromMany) host.bridge.addMethod("psGetFromManyRTResult", ".plugin", in_sign='ss', out_sign='(ua(sssasa{ss}))', method=self._getFromManyRTResult, async=True) # high level observer method host.bridge.addSignal("psEvent", ".plugin", signature='ssssa{ss}s') # args: category, service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), data, profile # low level observer method, used if service/node is in watching list (see psNodeWatch* methods) host.bridge.addSignal("psEventRaw", ".plugin", signature='sssass') # args: service(jid), node, type (C.PS_ITEMS, C.PS_DELETE), list of item_xml, profile def getHandler(self, client): client.pubsub_client = SatPubSubClient(self.host, self) return client.pubsub_client @defer.inlineCallbacks def profileConnected(self, client): client.pubsub_watching = set() client.pubsub_service = yield self.host.findServiceEntity(client, "pubsub", "service") def getFeatures(self, profile): try: client = self.host.getClient(profile) except exceptions.ProfileNotSetError: return {} try: return {'service': client.pubsub_service.full() if client.pubsub_service is not None else ''} except AttributeError: if self.host.isConnected(profile): log.debug("Profile is not connected, service is not checked yet") else: log.error("Service should be available !") return {} def parseExtra(self, extra): """Parse extra dictionnary used bridge's extra dictionnaries @param extra(dict): extra data used to configure request @return(Extra): filled Extra instance """ if extra is not None: # rsm rsm_args = {} for arg in ('max', 'after', 'before', 'index'): try: argname = "max_" if arg == 'max' else arg rsm_args[argname] = extra.pop('rsm_{}'.format(arg)) except KeyError: continue if rsm_args: rsm_request = rsm.RSMRequest(**rsm_args) else: rsm_request = None # mam mam_args = {} for arg in ('start', 'end'): try: mam_args[arg] = datetime.datetime.fromtimestamp(int(extra.pop('{}{}'.format(MAM_FILTER, arg))), tz.tzutc()) except (TypeError, ValueError): log.warning(u"Bad value for {} filter".format(arg)) except KeyError: continue try: mam_args['with_jid'] = jid.JID(extra.pop('{}jid'.format(MAM_FILTER))) except (jid.InvalidFormat): log.warning(u"Bad value for jid filter") except KeyError: pass for name, value in extra.iteritems(): if name.startswith(MAM_FILTER): var = name[len(MAM_FILTER):] extra_fields = mam_args.setdefault('extra_fields', []) extra_fields.append(data_form.Field(var=var, value=value)) if mam_args: assert 'mam' not in extra extra['mam'] = mam.MAMRequest(mam.buildForm(**mam_args)) else: rsm_request = None extra = {} return Extra(rsm_request, extra) def addManagedNode(self, node, **kwargs): """Add a handler for a node @param node(unicode): node to monitor all node *prefixed* with this one will be triggered @param **kwargs: method(s) to call when the node is found the method must be named after PubSub constants in lower case and suffixed with "_cb" e.g.: "items_cb" for C.PS_ITEMS, "delete_cb" for C.PS_DELETE """ assert node is not None assert kwargs callbacks = self._node_cb.setdefault(node, {}) for event, cb in kwargs.iteritems(): event_name = event[:-3] assert event_name in C.PS_EVENTS callbacks.setdefault(event_name,[]).append(cb) def removeManagedNode(self, node, *args): """Add a handler for a node @param node(unicode): node to monitor @param *args: callback(s) to remove """ assert args try: registred_cb = self._node_cb[node] except KeyError: pass else: for callback in args: for event, cb_list in registred_cb.iteritems(): try: cb_list.remove(callback) except ValueError: pass else: log.debug(u"removed callback {cb} for event {event} on node {node}".format( cb=callback, event=event, node=node)) if not cb_list: del registred_cb[event] if not registred_cb: del self._node_cb[node] return log.error(u"Trying to remove inexistant callback {cb} for node {node}".format(cb=callback, node=node)) # def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE): # """Retrieve the name of the nodes that are accessible on the target service. # @param service (JID): target service # @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes) # @param profile (str): %(doc_profile)s # @return: deferred which fire a list of nodes # """ # client = self.host.getClient(profile) # d = self.host.getDiscoItems(client, service, nodeIdentifier) # d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')]) # return d # def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE): # """Retrieve the name of the nodes to which the profile is subscribed on the target service. # @param service (JID): target service # @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions) # @param filter_ (str): filter the result according to the given subscription type: # - None: do not filter # - 'pending': subscription has not been approved yet by the node owner # - 'unconfigured': subscription options have not been configured yet # - 'subscribed': subscription is complete # @param profile (str): %(doc_profile)s # @return: Deferred list[str] # """ # d = self.subscriptions(service, nodeIdentifier, profile_key=profile) # d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_]) # return d def _sendItem(self, service, nodeIdentifier, payload, item_id=None, extra=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) d = self.sendItem(client, service, nodeIdentifier, payload, item_id or None, extra) d.addCallback(lambda ret: ret or u'') return d def _getPublishedItemId(self, iq_elt, original_id): """return item of published id if found in answer if not found original_id is returned, or empty string if it is None or empty string """ try: item_id = iq_elt.pubsub.publish.item['id'] except (AttributeError, KeyError): item_id = None return item_id or original_id def sendItem(self, client, service, nodeIdentifier, payload, item_id=None, extra=None): """high level method to send one item @param service(jid.JID, None): service to send the item to None to use PEP @param NodeIdentifier(unicode): PubSub node to use @param item_id(unicode, None): id to use or None to create one @param payload(domish.Element, unicode): payload of the item to send @param extra(dict, None): extra option, not used yet @return (unicode, None): id of the created item """ item_elt = pubsub.Item(id=item_id, payload=payload) d = self.publish(client, service, nodeIdentifier, [item_elt]) d.addCallback(self._getPublishedItemId, item_id) return d def publish(self, client, service, nodeIdentifier, items=None): return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid) def _unwrapMAMMessage(self, message_elt): try: item_elt = (message_elt.elements(mam.NS_MAM, 'result').next() .elements(C.NS_FORWARD, 'forwarded').next() .elements(C.NS_CLIENT, 'message').next() .elements('http://jabber.org/protocol/pubsub#event', 'event').next() .elements('http://jabber.org/protocol/pubsub#event', 'items').next() .elements('http://jabber.org/protocol/pubsub#event', 'item').next()) except StopIteration: raise exceptions.DataError(u"Can't find Item in MAM message element") return item_elt def _getItems(self, service='', node='', max_items=10, item_ids=None, sub_id=None, extra_dict=None, profile_key=C.PROF_KEY_NONE): """Get items from pubsub node @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit """ client = self.host.getClient(profile_key) service = jid.JID(service) if service else None max_items = None if max_items == C.NO_LIMIT else max_items extra = self.parseExtra(extra_dict) d = self.getItems(client, service, node or None, max_items or None, item_ids, sub_id or None, extra.rsm_request, extra.extra) d.addCallback(self.serItemsData) return d def getItems(self, client, service, node, max_items=None, item_ids=None, sub_id=None, rsm_request=None, extra=None): """Retrieve pubsub items from a node. @param service (JID, None): pubsub service. @param node (str): node id. @param max_items (int): optional limit on the number of retrieved items. @param item_ids (list[str]): identifiers of the items to be retrieved (can't be used with rsm_request). @param sub_id (str): optional subscription identifier. @param rsm_request (rsm.RSMRequest): RSM request data @return: a deferred couple (list[dict], dict) containing: - list of items - metadata with the following keys: - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse - service, node: service and node used """ if item_ids and max_items is not None: max_items = None if rsm_request and item_ids: raise ValueError(u"items_id can't be used with rsm") if extra is None: extra = {} try: mam_query = extra['mam'] except KeyError: d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, None, rsm_request) else: # if mam is requested, we have to do a totally different query if self._mam is None: raise exceptions.NotFound(u"MAM (XEP-0313) plugin is not available") if max_items is not None: raise exceptions.DataError(u"max_items parameter can't be used with MAM") if item_ids: raise exceptions.DataError(u"items_ids parameter can't be used with MAM") if mam_query.node is None: mam_query.node = node elif mam_query.node != node: raise exceptions.DataError(u"MAM query node is incoherent with getItems's node") if mam_query.rsm is None: mam_query.rsm = rsm_request else: if mam_query.rsm != rsm_request: raise exceptions.DataError(u"Conflict between RSM request and MAM's RSM request") d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) try: subscribe = C.bool(extra['subscribe']) except KeyError: subscribe = False def subscribeEb(failure, service, node): failure.trap(error.StanzaError) log.warning(u"Could not subscribe to node {} on service {}: {}".format(node, unicode(service), unicode(failure.value))) def doSubscribe(items): self.subscribe(service, node, profile_key=client.profile).addErrback(subscribeEb, service, node) return items if subscribe: d.addCallback(doSubscribe) def addMetadata(result): items, rsm_response = result service_jid = service if service else client.jid.userhostJID() metadata = {'service': service_jid, 'node': node, 'uri': self.getNodeURI(service_jid, node), } if rsm_request is not None and rsm_response is not None: metadata.update({'rsm_{}'.format(key): value for key, value in rsm_response.toDict().iteritems()}) return (items, metadata) d.addCallback(addMetadata) return d # @defer.inlineCallbacks # def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE): # """Massively retrieve pubsub items from many nodes. # @param service (JID): target service. # @param data (dict): dictionnary binding some arbitrary keys to the node identifiers. # @param max_items (int): optional limit on the number of retrieved items *per node*. # @param sub_id (str): optional subscription identifier. # @param rsm (dict): RSM request data # @param profile_key (str): %(doc_profile_key)s # @return: a deferred dict with: # - key: a value in (a subset of) data.keys() # - couple (list[dict], dict) containing: # - list of items # - RSM response data # """ # client = self.host.getClient(profile_key) # found_nodes = yield self.listNodes(service, profile=client.profile) # d_dict = {} # for publisher, node in data.items(): # if node not in found_nodes: # log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node)) # continue # avoid pubsub "item-not-found" error # d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile) # defer.returnValue(d_dict) def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier) def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier) def _createNode(self, service_s, nodeIdentifier, options, profile_key): client = self.host.getClient(profile_key) return self.createNode(client, jid.JID(service_s) if service_s else None, nodeIdentifier, options) def createNode(self, client, service, nodeIdentifier=None, options=None): """Create a new node @param service(jid.JID): PubSub service, @param NodeIdentifier(unicode, None): node name use None to create instant node (identifier will be returned by this method) @param option(dict[unicode, unicode], None): node configuration options @return (unicode): identifier of the created node (may be different from requested name) """ # TODO: if pubsub service doesn't hande publish-options, configure it in a second time return client.pubsub_client.createNode(service, nodeIdentifier, options) @defer.inlineCallbacks def createIfNewNode(self, client, service, nodeIdentifier, options=None): """Helper method similar to createNode, but will not fail in case of conflict""" try: yield self.createNode(client, service, nodeIdentifier, options) except error.StanzaError as e: if e.condition == 'conflict': pass else: raise e def _getNodeConfiguration(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getConfiguration(client, jid.JID(service_s) if service_s else None, nodeIdentifier) def serialize(form): # FIXME: better more generic dataform serialisation should be available in SàT return {f.var: unicode(f.value) for f in form.fields.values()} d.addCallback(serialize) return d def getConfiguration(self, client, service, nodeIdentifier): request = pubsub.PubSubRequest('configureGet') request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq): form = data_form.findForm(iq.pubsub.configure, pubsub.NS_PUBSUB_NODE_CONFIG) form.typeCheck() return form d = request.send(client.xmlstream) d.addCallback(cb) return d def _setNodeConfiguration(self, service_s, nodeIdentifier, options, profile_key): client = self.host.getClient(profile_key) d = self.setConfiguration(client, jid.JID(service_s) if service_s else None, nodeIdentifier, options) return d def setConfiguration(self, client, service, nodeIdentifier, options): request = pubsub.PubSubRequest('configureSet') request.recipient = service request.nodeIdentifier = nodeIdentifier form = data_form.Form(formType='submit', formNamespace=pubsub.NS_PUBSUB_NODE_CONFIG) form.makeFields(options) request.options = form d = request.send(client.xmlstream) return d def _getAffiliations(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier or None) return d def getAffiliations(self, client, service, nodeIdentifier=None): """Retrieve affiliations of an entity @param nodeIdentifier(unicode, None): node to get affiliation from None to get all nodes affiliations for this service """ request = pubsub.PubSubRequest('affiliations') request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq_elt): try: affiliations_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB, 'affiliations'))) except StopIteration: raise ValueError(_(u"Invalid result: missing <affiliations> element: {}").format(iq_elt.toXml)) try: return {e['node']: e['affiliation'] for e in affiliations_elt.elements((pubsub.NS_PUBSUB, 'affiliation'))} except KeyError: raise ValueError(_(u"Invalid result: bad <affiliation> element: {}").format(iq_elt.toXml)) d = request.send(client.xmlstream) d.addCallback(cb) return d def _getNodeAffiliations(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getNodeAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier) d.addCallback(lambda affiliations: {j.full(): a for j, a in affiliations.iteritems()}) return d def getNodeAffiliations(self, client, service, nodeIdentifier): """Retrieve affiliations of a node owned by profile""" request = pubsub.PubSubRequest('affiliationsGet') request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq_elt): try: affiliations_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB_OWNER, 'affiliations'))) except StopIteration: raise ValueError(_(u"Invalid result: missing <affiliations> element: {}").format(iq_elt.toXml)) try: return {jid.JID(e['jid']): e['affiliation'] for e in affiliations_elt.elements((pubsub.NS_PUBSUB_OWNER, 'affiliation'))} except KeyError: raise ValueError(_(u"Invalid result: bad <affiliation> element: {}").format(iq_elt.toXml)) d = request.send(client.xmlstream) d.addCallback(cb) return d def _setNodeAffiliations(self, service_s, nodeIdentifier, affiliations, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) affiliations = {jid.JID(jid_): affiliation for jid_, affiliation in affiliations.iteritems()} d = self.setNodeAffiliations(client, jid.JID(service_s) if service_s else None, nodeIdentifier, affiliations) return d def setNodeAffiliations(self, client, service, nodeIdentifier, affiliations): """Update affiliations of a node owned by profile @param affiliations(dict[jid.JID, unicode]): affiliations to set check https://xmpp.org/extensions/xep-0060.html#affiliations for a list of possible affiliations """ request = pubsub.PubSubRequest('affiliationsSet') request.recipient = service request.nodeIdentifier = nodeIdentifier request.affiliations = affiliations d = request.send(client.xmlstream) return d def _deleteNode(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) return self.deleteNode(client, jid.JID(service_s) if service_s else None, nodeIdentifier) def deleteNode(self, client, service, nodeIdentifier): return client.pubsub_client.deleteNode(service, nodeIdentifier) def _addWatch(self, service_s, node, profile_key): """watch modifications on a node This method should only be called from bridge """ client = self.host.getClient(profile_key) service = jid.JID(service_s) if service_s else client.jid.userhostJID() client.pubsub_watching.add((service, node)) def _removeWatch(self, service_s, node, profile_key): """remove a node watch This method should only be called from bridge """ client = self.host.getClient(profile_key) service = jid.JID(service_s) if service_s else client.jid.userhostJID() client.pubsub_watching.remove((service, node)) def _retractItem(self, service_s, nodeIdentifier, itemIdentifier, notify, profile_key): return self._retractItems(service_s, nodeIdentifier, (itemIdentifier,), notify, profile_key) def _retractItems(self, service_s, nodeIdentifier, itemIdentifiers, notify, profile_key): return self.retractItems(jid.JID(service_s) if service_s else None, nodeIdentifier, itemIdentifiers, notify, profile_key) def retractItems(self, service, nodeIdentifier, itemIdentifiers, notify=True, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers, notify=True) def _subscribe(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) d = self.subscribe(client, service, nodeIdentifier, options=options or None) d.addCallback(lambda subscription: subscription.subscriptionIdentifier or u'') return d def subscribe(self, client, service, nodeIdentifier, sub_jid=None, options=None): # TODO: reimplement a subscribtion cache, checking that we have not subscription before trying to subscribe return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.jid.userhostJID(), options=options) def _unsubscribe(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) return self.unsubscribe(client, service, nodeIdentifier) def unsubscribe(self, client, service, nodeIdentifier, sub_jid=None, subscriptionIdentifier=None, sender=None): return client.pubsub_client.unsubscribe(service, nodeIdentifier, sub_jid or client.jid.userhostJID(), subscriptionIdentifier, sender) def _subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) service = None if not service else jid.JID(service) def gotSubscriptions(subscriptions): # we replace pubsub.Subscription instance by dict that we can serialize for idx, sub in enumerate(subscriptions): sub_dict = {'node': sub.nodeIdentifier, 'subscriber': sub.subscriber.full(), 'state': sub.state } if sub.subscriptionIdentifier is not None: sub_dict['id'] = sub.subscriptionIdentifier subscriptions[idx] = sub_dict return subscriptions d = self.subscriptions(client, service, nodeIdentifier or None) d.addCallback(gotSubscriptions) return d def subscriptions(self, client, service, nodeIdentifier=None): """retrieve subscriptions from a service @param service(jid.JID): PubSub service @param nodeIdentifier(unicode, None): node to check None to get all subscriptions """ return client.pubsub_client.subscriptions(service, nodeIdentifier) ## misc tools ## def getNodeURI(self, service, node, item=None): """Return XMPP URI of a PubSub node @param service(jid.JID): PubSub service @param node(unicode): node @return (unicode): URI of the node """ assert service is not None # XXX: urllib.urlencode use "&" to separate value, while XMPP URL (cf. RFC 5122) # use ";" as a separator. So if more than one value is used in query_data, # urlencode MUST NOT BE USED. query_data = [('node', node.encode('utf-8'))] if item is not None: query_data.append(('item', item.encode('utf-8'))) return "xmpp:{service}?;{query}".format( service=service.userhost(), query=urllib.urlencode(query_data) ).decode('utf-8') ## methods to manage several stanzas/jids at once ## # generic # def getRTResults(self, session_id, on_success=None, on_error=None, profile=C.PROF_KEY_NONE): return self.rt_sessions.getResults(session_id, on_success, on_error, profile) def serItemsData(self, items_data, item_cb=lambda item: item.toXml()): """Helper method to serialise result from [getItems] the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) as returned by [getItems]. metadata values are then casted to unicode and each item is passed to items_cb @param items_data(tuple): tuple returned by [getItems] @param item_cb(callable): method to transform each item @return (tuple): a serialised form ready to go throught bridge """ items, metadata = items_data return [item_cb(item) for item in items], {key: unicode(value) for key, value in metadata.iteritems()} def serItemsDataD(self, items_data, item_cb): """Helper method to serialise result from [getItems], deferred version the items_data must be a tuple(list[domish.Element], dict[unicode, unicode]) as returned by [getItems]. metadata values are then casted to unicode and each item is passed to items_cb An errback is added to item_cb, and when it is fired the value is filtered from final items @param items_data(tuple): tuple returned by [getItems] @param item_cb(callable): method to transform each item (must return a deferred) @return (tuple): a deferred which fire a serialised form ready to go throught bridge """ items, metadata = items_data def eb(failure): log.warning("Error while serialising/parsing item: {}".format(unicode(failure.value))) d = defer.gatherResults([item_cb(item).addErrback(eb) for item in items]) def finishSerialisation(serialised_items): return [item for item in serialised_items if item is not None], {key: unicode(value) for key, value in metadata.iteritems()} d.addCallback(finishSerialisation) return d def serDList(self, results, failure_result=None): """Serialise a DeferredList result @param results: DeferredList results @param failure_result: value to use as value for failed Deferred (default: empty tuple) @return (list): list with: - failure: empty in case of success, else error message - result """ if failure_result is None: failure_result = () return [('', result) if success else (unicode(result.result) or UNSPECIFIED, failure_result) for success, result in results] # subscribe # def _getNodeSubscriptions(self, service_s, nodeIdentifier, profile_key): client = self.host.getClient(profile_key) d = self.getNodeSubscriptions(client, jid.JID(service_s) if service_s else None, nodeIdentifier) d.addCallback(lambda subscriptions: {j.full(): a for j, a in subscriptions.iteritems()}) return d def getNodeSubscriptions(self, client, service, nodeIdentifier): """Retrieve subscriptions to a node @param nodeIdentifier(unicode): node to get subscriptions from """ if not nodeIdentifier: raise exceptions.DataError("node identifier can't be empty") request = pubsub.PubSubRequest('subscriptionsGet') request.recipient = service request.nodeIdentifier = nodeIdentifier def cb(iq_elt): try: subscriptions_elt = next(iq_elt.pubsub.elements((pubsub.NS_PUBSUB, 'subscriptions'))) except StopIteration: raise ValueError(_(u"Invalid result: missing <subscriptions> element: {}").format(iq_elt.toXml)) except AttributeError as e: raise ValueError(_(u"Invalid result: {}").format(e)) try: return {jid.JID(s['jid']): s['subscription'] for s in subscriptions_elt.elements((pubsub.NS_PUBSUB, 'subscription'))} except KeyError: raise ValueError(_(u"Invalid result: bad <subscription> element: {}").format(iq_elt.toXml)) d = request.send(client.xmlstream) d.addCallback(cb) return d def _setNodeSubscriptions(self, service_s, nodeIdentifier, subscriptions, profile_key=C.PROF_KEY_NONE): client = self.host.getClient(profile_key) subscriptions = {jid.JID(jid_): subscription for jid_, subscription in subscriptions.iteritems()} d = self.setNodeSubscriptions(client, jid.JID(service_s) if service_s else None, nodeIdentifier, subscriptions) return d def setNodeSubscriptions(self, client, service, nodeIdentifier, subscriptions): """Set or update subscriptions of a node owned by profile @param subscriptions(dict[jid.JID, unicode]): subscriptions to set check https://xmpp.org/extensions/xep-0060.html#substates for a list of possible subscriptions """ request = pubsub.PubSubRequest('subscriptionsSet') request.recipient = service request.nodeIdentifier = nodeIdentifier request.subscriptions = {pubsub.Subscription(nodeIdentifier, jid_, state) for jid_, state in subscriptions.iteritems()} d = request.send(client.xmlstream) return d def _manySubscribeRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for subcribeToManu session @param session_id: id of the real-time deferred session @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of tuple(unicode, unicode, bool, unicode) with: - service: pubsub service - and node: pubsub node - failure(unicode): empty string in case of success, error message else @param profile_key: %(doc_profile_key)s """ profile = self.host.getClient(profile_key).profile d = self.rt_sessions.getResults(session_id, on_success=lambda result:'', on_error=lambda failure:unicode(failure.value), profile=profile) # we need to convert jid.JID to unicode with full() to serialise it for the bridge d.addCallback(lambda ret: (ret[0], [(service.full(), node, '' if success else failure or UNSPECIFIED) for (service, node), (success, failure) in ret[1].iteritems()])) return d def _subscribeToMany(self, node_data, subscriber=None, options=None, profile_key=C.PROF_KEY_NONE): return self.subscribeToMany([(jid.JID(service), unicode(node)) for service, node in node_data], jid.JID(subscriber), options, profile_key) def subscribeToMany(self, node_data, subscriber, options=None, profile_key=C.PROF_KEY_NONE): """Subscribe to several nodes at once. @param node_data (iterable[tuple]): iterable of tuple (service, node) where: - service (jid.JID) is the pubsub service - node (unicode) is the node to subscribe to @param subscriber (jid.JID): optional subscription identifier. @param options (dict): subscription options @param profile_key (str): %(doc_profile_key)s @return (str): RT Deferred session id """ client = self.host.getClient(profile_key) deferreds = {} for service, node in node_data: deferreds[(service, node)] = client.pubsub_client.subscribe(service, node, subscriber, options=options) return self.rt_sessions.newSession(deferreds, client.profile) # found_nodes = yield self.listNodes(service, profile=client.profile) # subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile) # d_list = [] # for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)): # if nodeIdentifier not in found_nodes: # log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier)) # continue # avoid sat-pubsub "SubscriptionExists" error # d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)) # defer.returnValue(d_list) # get # def _getFromManyRTResult(self, session_id, profile_key=C.PROF_KEY_DEFAULT): """Get real-time results for getFromMany session @param session_id: id of the real-time deferred session @param profile_key: %(doc_profile_key)s @param return (tuple): (remaining, results) where: - remaining is the number of still expected results - results is a list of tuple with - service (unicode): pubsub service - node (unicode): pubsub node - failure (unicode): empty string in case of success, error message else - items (list[s]): raw XML of items - metadata(dict): serialised metadata """ profile = self.host.getClient(profile_key).profile d = self.rt_sessions.getResults(session_id, on_success=lambda result: ('', self.serItemsData(result)), on_error=lambda failure: (unicode(failure.value) or UNSPECIFIED, ([],{})), profile=profile) d.addCallback(lambda ret: (ret[0], [(service.full(), node, failure, items, metadata) for (service, node), (success, (failure, (items, metadata))) in ret[1].iteritems()])) return d def _getFromMany(self, node_data, max_item=10, extra_dict=None, profile_key=C.PROF_KEY_NONE): """ @param max_item(int): maximum number of item to get, C.NO_LIMIT for no limit """ max_item = None if max_item == C.NO_LIMIT else max_item extra = self.parseExtra(extra_dict) return self.getFromMany([(jid.JID(service), unicode(node)) for service, node in node_data], max_item, extra.rsm_request, extra.extra, profile_key) def getFromMany(self, node_data, max_item=None, rsm_request=None, extra=None, profile_key=C.PROF_KEY_NONE): """Get items from many nodes at once @param node_data (iterable[tuple]): iterable of tuple (service, node) where: - service (jid.JID) is the pubsub service - node (unicode) is the node to get items from @param max_items (int): optional limit on the number of retrieved items. @param rsm_request (RSMRequest): RSM request data @param profile_key (unicode): %(doc_profile_key)s @return (str): RT Deferred session id """ client = self.host.getClient(profile_key) deferreds = {} for service, node in node_data: deferreds[(service, node)] = self.getItems(client, service, node, max_item, rsm_request=rsm_request, extra=extra) return self.rt_sessions.newSession(deferreds, client.profile) class SatPubSubClient(rsm.PubSubClient): implements(disco.IDisco) def __init__(self, host, parent_plugin): self.host = host self.parent_plugin = parent_plugin rsm.PubSubClient.__init__(self) def connectionInitialized(self): rsm.PubSubClient.connectionInitialized(self) def _getNodeCallbacks(self, node, event): """Generate callbacks from given node and event @param node(unicode): node used for the item any registered node which prefix the node will match @param event(unicode): one of C.PS_ITEMS, C.PS_RETRACT, C.PS_DELETE @return (iterator[callable]): callbacks for this node/event """ for registered_node, callbacks_dict in self.parent_plugin._node_cb.iteritems(): if not node.startswith(registered_node): continue try: for callback in callbacks_dict[event]: yield callback except KeyError: continue def itemsReceived(self, event): log.debug(u"Pubsub items received") for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_ITEMS): callback(self.parent, event) client = self.parent if (event.sender, event.nodeIdentifier) in client.pubsub_watching: raw_items = [i.toXml() for i in event.items] self.host.bridge.psEventRaw(event.sender.full(), event.nodeIdentifier, C.PS_ITEMS, raw_items, client.profile) def deleteReceived(self, event): log.debug((u"Publish node deleted")) for callback in self._getNodeCallbacks(event.nodeIdentifier, C.PS_DELETE): callback(self.parent, event) client = self.parent if (event.sender, event.nodeIdentifier) in client.pubsub_watching: self.host.bridge.psEventRaw(event.sender.full(), event.nodeIdentifier, C.PS_DELETE, [], client.profile) def subscriptions(self, service, nodeIdentifier, sender=None): """Return the list of subscriptions to the given service and node. @param service: The publish subscribe service to retrieve the subscriptions from. @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions). @type nodeIdentifier: C{unicode} @return (list[pubsub.Subscription]): list of subscriptions """ request = pubsub.PubSubRequest('subscriptions') request.recipient = service request.nodeIdentifier = nodeIdentifier request.sender = sender d = request.send(self.xmlstream) def cb(iq): subs = [] for subscription_elt in iq.pubsub.subscriptions.elements(pubsub.NS_PUBSUB, 'subscription'): subscription = pubsub.Subscription(subscription_elt['node'], jid.JID(subscription_elt['jid']), subscription_elt['subscription'], subscriptionIdentifier=subscription_elt.getAttribute('subid')) subs.append(subscription) return subs return d.addCallback(cb) def getDiscoInfo(self, requestor, service, nodeIdentifier=''): disco_info = [] self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile) return disco_info def getDiscoItems(self, requestor, service, nodeIdentifier=''): return []