view src/tmp/wokkel/rsm.py @ 1414:159d16336f87

core, bridge, jp: management of service discovery extensions (XEP-0128)
author Goffi <goffi@goffi.org>
date Fri, 17 Apr 2015 22:59:35 +0200
parents 7d9ff14a2d9d
children be2df1ddea8e
line wrap: on
line source

# -*- test-case-name: wokkel.test.test_rsm -*-
#
# Copyright (c) Adrien Cossa.
# See LICENSE for details.

"""
XMPP Result Set Management protocol.

This protocol is specified in
U{XEP-0059<http://xmpp.org/extensions/xep-0059.html>}.
"""

from twisted.words.xish import domish

import pubsub
import copy


# RSM namespace
NS_RSM = 'http://jabber.org/protocol/rsm'


class RSMError(Exception):
    """
    RSM error.
    """


class RSMNotFoundError(Exception):
    """
    An expected RSM element has not been found.
    """


class RSMRequest():
    """
    A Result Set Management request.

    @ivar max: limit on the number of retrieved items.
    @itype max: C{int} or C{unicode}

    @ivar index: starting index of the requested page.
    @itype index: C{int} or C{unicode}

    @ivar after: ID of the element immediately preceding the page.
    @itype after: C{unicode}

    @ivar before: ID of the element immediately following the page.
    @itype before: C{unicode}
    """

    max = 10
    index = None
    after = None
    before = None

    def __init__(self, max=None, index=None, after=None, before=None):
        if max is not None:
            max = int(max)
            assert(max >= 0)
            self.max = max

        if index is not None:
            assert(after is None and before is None)
            index = int(index)
            assert(index >= 0)
            self.index = index

        if after is not None:
            assert(before is None)
            assert(isinstance(after, unicode))
            self.after = after

        if before is not None:
            assert(isinstance(before, unicode))
            self.before = before

    @classmethod
    def parse(cls, element):
        """Parse the given request element.

        @param element: request containing a set element.
        @type element: L{domish.Element}

        @return: RSMRequest instance.
        @rtype: L{RSMRequest}
        """
        try:
            set_elt = domish.generateElementsQNamed(element.elements(),
                                                    name="set",
                                                    uri=NS_RSM).next()
        except StopIteration:
            raise RSMNotFoundError()

        request = RSMRequest()
        for elt in list(set_elt.elements()):
            if elt.name in ('before', 'after'):
                setattr(request, elt.name, ''.join(elt.children))
            elif elt.name in ('max', 'index'):
                setattr(request, elt.name, int(''.join(elt.children)))

        if request.max is None:
            raise RSMError("RSM request is missing its 'max' element")

        return request

    def toElement(self):
        """
        Return the DOM representation of this RSM request.

        @rtype: L{domish.Element}
        """
        set_elt = domish.Element((NS_RSM, 'set'))
        set_elt.addElement('max').addContent(unicode(self.max))

        if self.index is not None:
            set_elt.addElement('index').addContent(unicode(self.index))

        if self.before is not None:
            if self.before == '':  # request the last page
                set_elt.addElement('before')
            else:
                set_elt.addElement('before').addContent(self.before)

        if self.after is not None:
            set_elt.addElement('after').addContent(self.after)

        return set_elt

    def render(self, element):
        """Embed the DOM representation of this RSM request in the given element.

        @param element: Element to contain the RSM request.
        @type element: L{domish.Element}

        @return: RSM request element.
        @rtype: L{domish.Element}
        """
        if element.name == 'pubsub' and hasattr(element, 'items'):
            element.items.attributes['max_items'] = unicode(self.max)

        set_elt = self.toElement()
        element.addChild(set_elt)

        return set_elt


class RSMResponse():
    """
    A Result Set Management response.

    @ivar count: total number of items.
    @itype count: C{int}

    @ivar index: starting index of the returned page.
    @itype index: C{int}

    @ivar first: ID of the first element of the returned page.
    @itype first: C{unicode}

    @ivar last: ID of the last element of the returned page.
    @itype last: C{unicode}
    """

    count = 0
    index = None
    first = None
    last = None

    def __init__(self, count=None, index=None, first=None, last=None):
        if count is not None:
            assert(isinstance(count, int) and count >= 0)
            self.count = count

        if index is not None:
            assert(isinstance(index, int) and index >= 0)
            self.index = index
            assert(isinstance(first, unicode))
            self.first = first
            assert(isinstance(last, unicode))
            self.last = last
        else:
            assert(first is None and last is None)

    @classmethod
    def parse(cls, element):
        """Parse the given response element.

        @param element: response element.
        @type element: L{domish.Element}

        @return: RSMResponse instance.
        @rtype: L{RSMResponse}
        """
        try:
            set_elt = domish.generateElementsQNamed(element.elements(),
                                                    name="set",
                                                    uri=NS_RSM).next()
        except StopIteration:
            return RSMNotFoundError()

        response = RSMResponse()
        for elt in list(set_elt.elements()):
            if elt.name in ('first', 'last'):
                setattr(response, elt.name, ''.join(elt.children))
                if elt.name == 'first':
                    response.index = int(elt.getAttribute("index"))
            elif elt.name == 'count':
                response.count = int(''.join(elt.children))

        if response.count is None:
            raise RSMError("RSM response is missing its 'count' element")

        return response

    def toElement(self):
        """
        Return the DOM representation of this RSM request.

        @rtype: L{domish.Element}
        """
        set_elt = domish.Element((NS_RSM, 'set'))
        set_elt.addElement('count').addContent(unicode(self.count))

        if self.index is not None:
            first_elt = set_elt.addElement('first')
            first_elt.addContent(self.first)
            first_elt['index'] = unicode(self.index)

            set_elt.addElement('last').addContent(self.last)

        return set_elt

    def render(self, element):
        """Embed the DOM representation of this RSM response in the given element.

        @param element: Element to contain the RSM response.
        @type element:  L{domish.Element}

        @return: RSM request element.
        @rtype: L{domish.Element}
        """
        set_elt = self.toElement()
        element.addChild(set_elt)
        return set_elt

    def toDict(self):
        """Return a dict representation of the object.

        @return: a dict of strings.
        @rtype: C{dict} binding C{unicode} to C{unicode}
        """
        result = {}
        for attr in ('count', 'index', 'first', 'last'):
            value = getattr(self, attr)
            if value is not None:
                result[attr] = unicode(value)
        return result


class PubSubRequest(pubsub.PubSubRequest):
    """PubSubRequest extension to handle RSM.

    @ivar rsm: RSM request instance.
    @type rsm: L{RSMRequest}
    """

    rsm = None

    def __init__(self, verb=None):
        pubsub.PubSubRequest.__init__(self, verb)
        self._parameters = copy.deepcopy(pubsub.PubSubRequest._parameters)
        self._parameters['items'].append('rsm')

    def _parse_rsm(self, verbElement):
        try:
            self.rsm = RSMRequest.parse(verbElement.parent)
        except RSMNotFoundError:
            self.rsm = None

    def _render_rsm(self, verbElement):
        if self.rsm:
            self.rsm.render(verbElement.parent)


class PubSubClient(pubsub.PubSubClient):
    """PubSubClient extension to handle RSM."""

    _rsm_responses = {}

    def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None,
              subscriptionIdentifier=None, sender=None, ext_data=None):
        """
        Retrieve previously published items from a publish subscribe node.

        @param service: The publish subscribe service that keeps the node.
        @type service: L{JID<twisted.words.protocols.jabber.jid.JID>}

        @param nodeIdentifier: The identifier of the node.
        @type nodeIdentifier: C{unicode}

        @param maxItems: Optional limit on the number of retrieved items.
        @type maxItems: C{int}

        @param itemIdentifiers: Identifiers of the items to be retrieved.
        @type itemIdentifiers: C{set}

        @param subscriptionIdentifier: Optional subscription identifier. In
            case the node has been subscribed to multiple times, this narrows
            the results to the specific subscription.
        @type subscriptionIdentifier: C{unicode}

        @param ext_data: extension data.
        @type ext_data: L{dict}

        @return: a Deferred that fires a C{list} of L{domish.Element}.
        @rtype: L{defer.Deferred}
        """
        request = PubSubRequest('items')  # that's a rsm.PubSubRequest instance
        request.recipient = service
        request.nodeIdentifier = nodeIdentifier
        if maxItems:
            request.maxItems = str(int(maxItems))
        request.subscriptionIdentifier = subscriptionIdentifier
        request.sender = sender
        request.itemIdentifiers = itemIdentifiers
        if ext_data and 'rsm' in ext_data:
            request.rsm = ext_data['rsm']

        def cb(iq):
            items = []
            if iq.pubsub.items:
                for element in iq.pubsub.items.elements():
                    if element.uri == pubsub.NS_PUBSUB and element.name == 'item':
                        items.append(element)

            if request.rsm:
                response = RSMResponse.parse(iq.pubsub)
                if response is not None:
                    self._rsm_responses[ext_data['id']] = response
            return items

        d = request.send(self.xmlstream)
        d.addCallback(cb)
        return d

    def getRSMResponse(self, id):
        """
        Post-retrieve the RSM response data after items retrieval is done.

        @param id: extension data ID
        @type id: C{unicode}

        @return: dict representation of the RSM response.
        @rtype: C{dict} of C{unicode}
        """
        # This method exists to not modify the return value of self.items.
        if id not in self._rsm_responses:
            return {}
        result = self._rsm_responses[id].toDict()
        del self._rsm_responses[id]
        return result


class PubSubService(pubsub.PubSubService):
    """PubSubService extension to handle RSM."""

    _request_class = PubSubRequest

    def _toResponse_items(self, result, resource, request):
        response = pubsub.PubSubService._toResponse_items(self, result,
                                                          resource, request)
        set_elts = [elt for elt in result if elt.name == 'set']
        if set_elts:
            assert(len(set_elts) == 1)
            response.addChild(set_elts[0])

        return response