Mercurial > libervia-backend
view src/tmp/wokkel/rsm.py @ 1422:be1fccf4854d
tmp (wokkel): licenses fixes:
the licenses headers were wrong, it was fixed: original work from Adrien Cossa is directly under AGPL v3 (with his agreement), work derivated from Wokkel is sublicensed to AGPL v3 as allowed by the original license, to stay consistent with the rest of the code base.
Theses files (and only these ones) can be relicensed again to fill Wokkel license if Ralph plan to merge them upstream...
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 23 Apr 2015 10:57:40 +0200 |
parents | be2df1ddea8e |
children | 882e5fabf68c |
line wrap: on
line source
# -*- test-case-name: wokkel.test.test_rsm -*- # # SàT Wokkel extension for Result Set Management (XEP-0059) # Copyright (C) 2015 Adien Cossa (souliane@mailoo.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. """ XMPP Result Set Management protocol. This protocol is specified in U{XEP-0059<http://xmpp.org/extensions/xep-0059.html>}. """ from twisted.words.xish import domish import pubsub import copy # RSM namespace NS_RSM = 'http://jabber.org/protocol/rsm' class RSMError(Exception): """ RSM error. """ class RSMNotFoundError(Exception): """ An expected RSM element has not been found. """ class RSMRequest(): """ A Result Set Management request. @ivar max_: limit on the number of retrieved items. @itype max_: C{int} or C{unicode} @ivar index: starting index of the requested page. @itype index: C{int} or C{unicode} @ivar after: ID of the element immediately preceding the page. @itype after: C{unicode} @ivar before: ID of the element immediately following the page. @itype before: C{unicode} """ max_ = 10 index = None after = None before = None def __init__(self, max_=None, index=None, after=None, before=None): if max_ is not None: max_ = int(max_) assert max_ >= 0 self.max_ = max_ if index is not None: assert after is None and before is None index = int(index) assert index >= 0 self.index = index if after is not None: assert before is None assert isinstance(after, unicode) self.after = after if before is not None: assert isinstance(before, unicode) self.before = before @classmethod def parse(cls, element): """Parse the given request element. @param element: request containing a set element. @type element: L{domish.Element} @return: RSMRequest instance. @rtype: L{RSMRequest} """ try: set_elt = domish.generateElementsQNamed(element.elements(), name="set", uri=NS_RSM).next() except StopIteration: raise RSMNotFoundError() request = RSMRequest() for elt in list(set_elt.elements()): if elt.name in ('before', 'after'): setattr(request, elt.name, ''.join(elt.children)) elif elt.name in ('max', 'index'): setattr(request, elt.name, int(''.join(elt.children))) if request.max_ is None: raise RSMError("RSM request is missing its 'max_' element") return request def toElement(self): """ Return the DOM representation of this RSM request. @rtype: L{domish.Element} """ set_elt = domish.Element((NS_RSM, 'set')) set_elt.addElement('max').addContent(unicode(self.max_)) if self.index is not None: set_elt.addElement('index').addContent(unicode(self.index)) if self.before is not None: if self.before == '': # request the last page set_elt.addElement('before') else: set_elt.addElement('before').addContent(self.before) if self.after is not None: set_elt.addElement('after').addContent(self.after) return set_elt def render(self, element): """Embed the DOM representation of this RSM request in the given element. @param element: Element to contain the RSM request. @type element: L{domish.Element} @return: RSM request element. @rtype: L{domish.Element} """ if element.name == 'pubsub' and hasattr(element, 'items'): element.items.attributes['max_items'] = unicode(self.max_) set_elt = self.toElement() element.addChild(set_elt) return set_elt class RSMResponse(): """ A Result Set Management response. @ivar count: total number of items. @itype count: C{int} @ivar index: starting index of the returned page. @itype index: C{int} @ivar first: ID of the first element of the returned page. @itype first: C{unicode} @ivar last: ID of the last element of the returned page. @itype last: C{unicode} """ count = 0 index = None first = None last = None def __init__(self, count=None, index=None, first=None, last=None): if count is not None: assert isinstance(count, int) and count >= 0 self.count = count if index is not None: assert isinstance(index, int) and index >= 0 self.index = index assert isinstance(first, unicode) self.first = first assert isinstance(last, unicode) self.last = last else: assert first is None and last is None @classmethod def parse(cls, element): """Parse the given response element. @param element: response element. @type element: L{domish.Element} @return: RSMResponse instance. @rtype: L{RSMResponse} """ try: set_elt = domish.generateElementsQNamed(element.elements(), name="set", uri=NS_RSM).next() except StopIteration: return RSMNotFoundError() response = RSMResponse() for elt in list(set_elt.elements()): if elt.name in ('first', 'last'): setattr(response, elt.name, ''.join(elt.children)) if elt.name == 'first': response.index = int(elt.getAttribute("index")) elif elt.name == 'count': response.count = int(''.join(elt.children)) if response.count is None: raise RSMError("RSM response is missing its 'count' element") return response def toElement(self): """ Return the DOM representation of this RSM request. @rtype: L{domish.Element} """ set_elt = domish.Element((NS_RSM, 'set')) set_elt.addElement('count').addContent(unicode(self.count)) if self.index is not None: first_elt = set_elt.addElement('first') first_elt.addContent(self.first) first_elt['index'] = unicode(self.index) set_elt.addElement('last').addContent(self.last) return set_elt def render(self, element): """Embed the DOM representation of this RSM response in the given element. @param element: Element to contain the RSM response. @type element: L{domish.Element} @return: RSM request element. @rtype: L{domish.Element} """ set_elt = self.toElement() element.addChild(set_elt) return set_elt def toDict(self): """Return a dict representation of the object. @return: a dict of strings. @rtype: C{dict} binding C{unicode} to C{unicode} """ result = {} for attr in ('count', 'index', 'first', 'last'): value = getattr(self, attr) if value is not None: result[attr] = unicode(value) return result class PubSubRequest(pubsub.PubSubRequest): """PubSubRequest extension to handle RSM. @ivar rsm: RSM request instance. @type rsm: L{RSMRequest} """ rsm = None def __init__(self, verb=None): pubsub.PubSubRequest.__init__(self, verb) self._parameters = copy.deepcopy(pubsub.PubSubRequest._parameters) self._parameters['items'].append('rsm') def _parse_rsm(self, verbElement): try: self.rsm = RSMRequest.parse(verbElement.parent) except RSMNotFoundError: self.rsm = None def _render_rsm(self, verbElement): if self.rsm: self.rsm.render(verbElement.parent) class PubSubClient(pubsub.PubSubClient): """PubSubClient extension to handle RSM.""" _rsm_responses = {} def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None, subscriptionIdentifier=None, sender=None, ext_data=None): """ Retrieve previously published items from a publish subscribe node. @param service: The publish subscribe service that keeps the node. @type service: L{JID<twisted.words.protocols.jabber.jid.JID>} @param nodeIdentifier: The identifier of the node. @type nodeIdentifier: C{unicode} @param maxItems: Optional limit on the number of retrieved items. @type maxItems: C{int} @param itemIdentifiers: Identifiers of the items to be retrieved. @type itemIdentifiers: C{set} @param subscriptionIdentifier: Optional subscription identifier. In case the node has been subscribed to multiple times, this narrows the results to the specific subscription. @type subscriptionIdentifier: C{unicode} @param ext_data: extension data. @type ext_data: L{dict} @return: a Deferred that fires a C{list} of L{domish.Element}. @rtype: L{defer.Deferred} """ request = PubSubRequest('items') # that's a rsm.PubSubRequest instance request.recipient = service request.nodeIdentifier = nodeIdentifier if maxItems: request.maxItems = str(int(maxItems)) request.subscriptionIdentifier = subscriptionIdentifier request.sender = sender request.itemIdentifiers = itemIdentifiers if ext_data and 'rsm' in ext_data: request.rsm = ext_data['rsm'] def cb(iq): items = [] if iq.pubsub.items: for element in iq.pubsub.items.elements(): if element.uri == pubsub.NS_PUBSUB and element.name == 'item': items.append(element) if request.rsm: response = RSMResponse.parse(iq.pubsub) if response is not None: self._rsm_responses[ext_data['id']] = response return items d = request.send(self.xmlstream) d.addCallback(cb) return d def getRSMResponse(self, id): """ Post-retrieve the RSM response data after items retrieval is done. @param id: extension data ID @type id: C{unicode} @return: dict representation of the RSM response. @rtype: C{dict} of C{unicode} """ # This method exists to not modify the return value of self.items. if id not in self._rsm_responses: return {} result = self._rsm_responses[id].toDict() del self._rsm_responses[id] return result class PubSubService(pubsub.PubSubService): """PubSubService extension to handle RSM.""" _request_class = PubSubRequest def _toResponse_items(self, result, resource, request): response = pubsub.PubSubService._toResponse_items(self, result, resource, request) set_elts = [elt for elt in result if elt.name == 'set'] if set_elts: assert len(set_elts) == 1 response.addChild(set_elts[0]) return response