view src/plugins/plugin_xep_0060.py @ 1422:be1fccf4854d

tmp (wokkel): licenses fixes: the licenses headers were wrong, it was fixed: original work from Adrien Cossa is directly under AGPL v3 (with his agreement), work derivated from Wokkel is sublicensed to AGPL v3 as allowed by the original license, to stay consistent with the rest of the code base. Theses files (and only these ones) can be relicensed again to fill Wokkel license if Ralph plan to merge them upstream...
author Goffi <goffi@goffi.org>
date Thu, 23 Apr 2015 10:57:40 +0200
parents 7c0acb966fd6
children e8c8e467964b
line wrap: on
line source

#!/usr/bin/python
# -*- coding: utf-8 -*-

# SAT plugin for Publish-Subscribe (xep-0060)
# Copyright (C) 2009, 2010, 2011, 2012, 2013, 2014, 2015 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.i18n import _
from sat.core.constants import Const as C
from sat.core.log import getLogger
log = getLogger(__name__)
from sat.memory.memory import Sessions

from wokkel import disco, pubsub, rsm
from zope.interface import implements
from twisted.internet import defer
import uuid


PLUGIN_INFO = {
    "name": "Publish-Subscribe",
    "import_name": "XEP-0060",
    "type": "XEP",
    "protocols": ["XEP-0060"],
    "dependencies": [],
    "recommendations": ["XEP-0059"],
    "main": "XEP_0060",
    "handler": "yes",
    "description": _("""Implementation of PubSub Protocol""")
}


class XEP_0060(object):
    OPT_ACCESS_MODEL = 'pubsub#access_model'
    OPT_PERSIST_ITEMS = 'pubsub#persist_items'
    OPT_MAX_ITEMS = 'pubsub#max_items'
    OPT_DELIVER_PAYLOADS = 'pubsub#deliver_payloads'
    OPT_SEND_ITEM_SUBSCRIBE = 'pubsub#send_item_subscribe'
    OPT_NODE_TYPE = 'pubsub#node_type'
    OPT_SUBSCRIPTION_TYPE = 'pubsub#subscription_type'
    OPT_SUBSCRIPTION_DEPTH = 'pubsub#subscription_depth'
    OPT_ROSTER_GROUPS_ALLOWED = 'pubsub#roster_groups_allowed'
    OPT_PUBLISH_MODEL = 'pubsub#publish_model'

    def __init__(self, host):
        log.info(_(u"PubSub plugin initialization"))
        self.host = host
        self.managedNodes = []
        self.node_cache = Sessions(timeout=60, resettable_timeout=False)

    def getHandler(self, profile):
        client = self.host.getClient(profile)
        client.pubsub_client = SatPubSubClient(self.host, self)
        return client.pubsub_client

    def addManagedNode(self, node_name, callback):
        """Add a handler for a namespace

        @param namespace: NS of the handler (will appear in disco info)
        @param callback: method to call when the handler is found
        @param profile: profile which manage this handler"""
        self.managedNodes.append((node_name, callback))

    def _getDeferredNodeCache(self, session_id, init, profile):
        """Manage a node cache with deferred initialisation and concurrent access.

        @param session_id (string): node cache session ID
        @param init (Deferred): deferred list of strings to initialise the cache.
        @param profile (str): %(doc_profile)s
        @return: Deferred list[str]
        """
        if session_id in self.node_cache:
            cache = self.node_cache.profileGet(session_id, profile)
            if cache['nodes'] is None:
                # init is still running
                d = defer.Deferred()
                cache['waiting'].append(d)
                return d
            return defer.succeed(cache['nodes'])

        cache = {'init': init, 'waiting': [], 'nodes': None}
        self.node_cache.newSession(cache, session_id=session_id, profile=profile)

        def cb(nodes):
            cache['nodes'] = nodes
            for d in cache['waiting']:
                d.callback(nodes)
            return nodes

        return init.addCallback(cb)

    def listNodes(self, service, nodeIdentifier='', profile=C.PROF_KEY_NONE):
        """Retrieve the name of the nodes that are accessible on the target service.

        @param service (JID): target service
        @param nodeIdentifier (str): the parent node name (leave empty to retrieve first-level nodes)
        @param profile (str): %(doc_profile)s
        @return: Deferred list[str]
        """
        session_id = profile + '@found@' + service.userhost()
        d = self.host.getDiscoItems(service, nodeIdentifier, profile_key=profile)
        d.addCallback(lambda result: [item.getAttribute('node') for item in result.toElement().children if item.hasAttribute('node')])
        return self._getDeferredNodeCache(session_id, d, profile)

    def listSubscribedNodes(self, service, nodeIdentifier='', filter_='subscribed', profile=C.PROF_KEY_NONE):
        """Retrieve the name of the nodes to which the profile is subscribed on the target service.

        @param service (JID): target service
        @param nodeIdentifier (str): the parent node name (leave empty to retrieve all subscriptions)
        @param filter_ (str): filter the result according to the given subscription type:
            - None: do not filter
            - 'pending': subscription has not been approved yet by the node owner
            - 'unconfigured': subscription options have not been configured yet
            - 'subscribed': subscription is complete
        @param profile (str): %(doc_profile)s
        @return: Deferred list[str]
        """
        session_id = profile + '@subscriptions@' + service.userhost()
        d = self.subscriptions(service, nodeIdentifier, profile_key=profile)
        d.addCallback(lambda subs: [sub.getAttribute('node') for sub in subs if sub.getAttribute('subscription') == filter_])
        return self._getDeferredNodeCache(session_id, d, profile)

    def publish(self, service, nodeIdentifier, items=None, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.publish(service, nodeIdentifier, items, client.pubsub_client.parent.jid)

    def getItems(self, service, node, max_items=None, item_ids=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
        """Retrieve pubsub items from a node.

        @param service (JID): target service.
        @param node (str): node id.
        @param max_items (int): optional limit on the number of retrieved items.
        @param item_ids (list[str]): identifiers of the items to be retrieved (should not be used).
        @param sub_id (str): optional subscription identifier.
        @param rsm (dict): RSM request data
        @param profile_key (str): %(doc_profile_key)s
        @return: a deferred couple (list[dict], dict) containing:
            - list of items
            - RSM response data
        """
        client = self.host.getClient(profile_key)
        ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm} if rsm else None
        d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data)
        d.addCallback(lambda items: (items, client.pubsub_client.getRSMResponse(ext_data['id']) if rsm else {}))
        return d

    @defer.inlineCallbacks
    def getItemsFromMany(self, service, data, max_items=None, sub_id=None, rsm=None, profile_key=C.PROF_KEY_NONE):
        """Massively retrieve pubsub items from many nodes.

        @param service (JID): target service.
        @param data (dict): dictionnary binding some arbitrary keys to the node identifiers.
        @param max_items (int): optional limit on the number of retrieved items *per node*.
        @param sub_id (str): optional subscription identifier.
        @param rsm (dict): RSM request data
        @param profile_key (str): %(doc_profile_key)s
        @return: a deferred dict with:
            - key: a value in (a subset of) data.keys()
            - couple (list[dict], dict) containing:
                - list of items
                - RSM response data
        """
        client = self.host.getClient(profile_key)
        found_nodes = yield self.listNodes(service, profile=client.profile)
        d_dict = {}
        for publisher, node in data.items():
            if node not in found_nodes:
                log.debug(u"Skip the items retrieval for [{node}]: node doesn't exist".format(node=node))
                continue  # avoid pubsub "item-not-found" error
            d_dict[publisher] = self.getItems(service, node, max_items, None, sub_id, rsm, client.profile)
        defer.returnValue(d_dict)

    def getOptions(self, service, nodeIdentifier, subscriber, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.getOptions(service, nodeIdentifier, subscriber, subscriptionIdentifier)

    def setOptions(self, service, nodeIdentifier, subscriber, options, subscriptionIdentifier=None, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.setOptions(service, nodeIdentifier, subscriber, options, subscriptionIdentifier)

    def createNode(self, service, nodeIdentifier, options, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.createNode(service, nodeIdentifier, options)

    def deleteNode(self, service, nodeIdentifier, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.deleteNode(service, nodeIdentifier)

    def retractItems(self, service, nodeIdentifier, itemIdentifiers, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.retractItems(service, nodeIdentifier, itemIdentifiers)

    def subscribe(self, service, nodeIdentifier, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options)

    @defer.inlineCallbacks
    def subscribeToMany(self, service, nodeIdentifiers, sub_jid=None, options=None, profile_key=C.PROF_KEY_NONE):
        """Massively subscribe to many nodes.

        @param service (JID): target service.
        @param nodeIdentifiers (list): the list of node identifiers to subscribe to.
        @param sub_id (str): optional subscription identifier.
        @param options (list): optional list of subscription options
        @param profile_key (str): %(doc_profile_key)s
        @return: list of Deferred instances.
        """
        client = self.host.getClient(profile_key)
        found_nodes = yield self.listNodes(service, profile=client.profile)
        subscribed_nodes = yield self.listSubscribedNodes(service, profile=client.profile)
        d_list = []
        for nodeIdentifier in (set(nodeIdentifiers) - set(subscribed_nodes)):
            if nodeIdentifier not in found_nodes:
                log.debug(u"Skip the subscription to [{node}]: node doesn't exist".format(node=nodeIdentifier))
                continue  # avoid sat-pubsub "SubscriptionExists" error
            d_list.append(client.pubsub_client.subscribe(service, nodeIdentifier, sub_jid or client.pubsub_client.parent.jid.userhostJID(), options=options))
        defer.returnValue(d_list)

    def subscriptions(self, service, nodeIdentifier='', profile_key=C.PROF_KEY_NONE):
        client = self.host.getClient(profile_key)
        return client.pubsub_client.subscriptions(service, nodeIdentifier)


class SatPubSubClient(rsm.PubSubClient):
    implements(disco.IDisco)

    def __init__(self, host, parent_plugin):
        self.host = host
        self.parent_plugin = parent_plugin
        rsm.PubSubClient.__init__(self)

    def connectionInitialized(self):
        rsm.PubSubClient.connectionInitialized(self)

    def itemsReceived(self, event):
        if not self.host.trigger.point("PubSubItemsReceived", event, self.parent.profile):
            return
        for node in self.parent_plugin.managedNodes:
            if event.nodeIdentifier == node[0]:
                node[1](event, self.parent.profile)

    def deleteReceived(self, event):
        #TODO: manage delete event
        log.debug(_(u"Publish node deleted"))

    # def purgeReceived(self, event):

    def subscriptions(self, service, nodeIdentifier, sender=None):
        """Return the list of subscriptions to the given service and node.

        @param service: The publish subscribe service to retrieve the subscriptions from.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}
        @param nodeIdentifier: The identifier of the node (leave empty to retrieve all subscriptions).
        @type nodeIdentifier: C{unicode}
        """
        request = pubsub.PubSubRequest('subscriptions')
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        request.sender = sender
        d = request.send(self.xmlstream)

        def cb(iq):
            return [sub for sub in iq.pubsub.subscriptions.elements() if
                    (sub.uri == pubsub.NS_PUBSUB and sub.name == 'subscription')]

        return d.addCallback(cb)

    def getDiscoInfo(self, requestor, service, nodeIdentifier=''):
        disco_info = []
        self.host.trigger.point("PubSub Disco Info", disco_info, self.parent.profile)
        return disco_info

    def getDiscoItems(self, requestor, service, nodeIdentifier=''):
        return []