changeset 1773:6e867caf4621

plugin XEP-0060, tmp(wokkel.rsm): small refactoring: - don't use getRSMResponse anymore: rsm.PubSubClient use a different signature which return items and RSMResponse, this is better than the previous method as if the requested don't call getRSMResponse, there is no memory leak anymore. - use RSMResponse directly instead of using ext_data, as other extension (like MAM) will be probably be managed with an other workflow - don't do a useless deep copy of pubsub.PubSubRequest._parameters on each instance anymore - RSMResponse.parse do now manage parsing of <set/> element directly
author Goffi <goffi@goffi.org>
date Tue, 05 Jan 2016 23:20:22 +0100
parents 666ab06a9d81
children 0c21dafedd22
files src/plugins/plugin_xep_0060.py src/tmp/wokkel/rsm.py
diffstat 2 files changed, 37 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- a/src/plugins/plugin_xep_0060.py	Tue Jan 05 23:20:22 2016 +0100
+++ b/src/plugins/plugin_xep_0060.py	Tue Jan 05 23:20:22 2016 +0100
@@ -34,7 +34,7 @@
 from wokkel import rsm
 from zope.interface import implements
 from collections import namedtuple
-import uuid
+
 
 UNSPECIFIED = "unspecified error"
 
@@ -234,12 +234,11 @@
                 - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse
         """
         if rsm_request and item_ids:
-            raise ValueError("items_id can't be used with rsm")
+            raise ValueError(u"items_id can't be used with rsm")
         if extra is None:
             extra = {}
         client = self.host.getClient(profile_key)
-        ext_data = {'id': unicode(uuid.uuid4()), 'rsm': rsm_request} if rsm_request is not None else None
-        d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, ext_data)
+        d = client.pubsub_client.items(service, node, max_items, item_ids, sub_id, client.pubsub_client.parent.jid, rsm_request)
 
         try:
             subscribe = C.bool(extra['subscribe'])
@@ -248,7 +247,7 @@
 
         def subscribeEb(failure, service, node):
             failure.trap(error.StanzaError)
-            log.warning("Could not subscribe to node {} on service {}: {}".format(node, unicode(service), unicode(failure.value)))
+            log.warning(u"Could not subscribe to node {} on service {}: {}".format(node, unicode(service), unicode(failure.value)))
 
         def doSubscribe(items):
             self.subscribe(service, node, profile_key=profile_key).addErrback(subscribeEb, service, node)
@@ -257,11 +256,12 @@
         if subscribe:
             d.addCallback(doSubscribe)
 
-        def addMetadata(items):
-            metadata = {}
-            if rsm_request is not None:
-                rsm_data = client.pubsub_client.getRSMResponse(ext_data['id'])
-                metadata.update({'rsm_{}'.format(key): value for key, value in rsm_data.iteritems()})
+        def addMetadata(result):
+            items, rsm_response = result
+            if rsm_request is not None and rsm_response is not None:
+                metadata = {'rsm_{}'.format(key): value for key, value in rsm_response.toDict().iteritems()}
+            else:
+                metadata = {}
             return (items, metadata)
 
         d.addCallback(addMetadata)
--- a/src/tmp/wokkel/rsm.py	Tue Jan 05 23:20:22 2016 +0100
+++ b/src/tmp/wokkel/rsm.py	Tue Jan 05 23:20:22 2016 +0100
@@ -86,16 +86,20 @@
     def fromElement(cls, element):
         """Parse the given request element.
 
-        @param element: request containing a set element.
+        @param element: request containing a set element, or set element itself.
         @type element: L{domish.Element}
 
         @return: RSMRequest instance.
         @rtype: L{RSMRequest}
         """
-        try:
-            set_elt = element.elements(NS_RSM, 'set').next()
-        except StopIteration:
-            raise RSMNotFoundError()
+
+        if element.name == 'set' and element.uri == NS_RSM:
+            set_elt = element
+        else:
+            try:
+                set_elt = element.elements(NS_RSM, 'set').next()
+            except StopIteration:
+                raise RSMNotFoundError()
 
         try:
             before_elt = set_elt.elements(NS_RSM, 'before').next()
@@ -320,11 +324,8 @@
     """
 
     rsm = None
-
-    def __init__(self, verb=None):
-        super(PubSubRequest, self).__init__(verb)
-        self._parameters = copy.deepcopy(pubsub.PubSubRequest._parameters)
-        self._parameters['items'].append('rsm')
+    _parameters = copy.deepcopy(pubsub.PubSubRequest._parameters)
+    _parameters['items'].append('rsm')
 
     def _parse_rsm(self, verbElement):
         try:
@@ -340,10 +341,10 @@
 class PubSubClient(pubsub.PubSubClient):
     """PubSubClient extension to handle RSM."""
 
-    _rsm_responses = {}
+    _request_class = PubSubRequest
 
     def items(self, service, nodeIdentifier, maxItems=None, itemIdentifiers=None,
-              subscriptionIdentifier=None, sender=None, ext_data=None):
+              subscriptionIdentifier=None, sender=None, rsm_request=None):
         """
         Retrieve previously published items from a publish subscribe node.
 
@@ -367,55 +368,37 @@
         @param ext_data: extension data.
         @type ext_data: L{dict}
 
-        @return: a Deferred that fires a C{list} of L{domish.Element}.
+        @return: a Deferred that fires a C{list} of C{tuple} of L{domish.Element}, L{RSMResponse}.
         @rtype: L{defer.Deferred}
         """
-        request = PubSubRequest('items')  # that's a rsm.PubSubRequest instance
+        # XXX: we have to copy initial method instead of calling it,
+        #      as original cb remove all non item elements
+        request = self._request_class('items')
         request.recipient = service
         request.nodeIdentifier = nodeIdentifier
-        if maxItems is not None:
+        if maxItems:
             request.maxItems = str(int(maxItems))
         request.subscriptionIdentifier = subscriptionIdentifier
         request.sender = sender
         request.itemIdentifiers = itemIdentifiers
-        if ext_data and 'rsm' in ext_data:
-            request.rsm = ext_data['rsm']
+        request.rsm = rsm_request
 
         def cb(iq):
             items = []
-            if iq.pubsub.items:
-                for element in iq.pubsub.items.elements(pubsub.NS_PUBSUB, 'item'):
-                    items.append(element)
+            pubsub_elt = iq.pubsub
+            for element in pubsub_elt.items.elements(pubsub.NS_PUBSUB, 'item'):
+                items.append(element)
 
-            if request.rsm:
-                try:
-                    response = RSMResponse.fromElement(iq.pubsub)
-                    if response is not None:
-                        self._rsm_responses[ext_data['id']] = response
-                except RSMNotFoundError:  # target pubsub server doesn't support RSM
-                    pass
-            return items
+            try:
+                rsm_response = RSMResponse.parse(pubsub_elt)
+            except RSMNotFoundError:
+                rsm_response = None
+            return (items, rsm_response)
 
         d = request.send(self.xmlstream)
         d.addCallback(cb)
         return d
 
-    def getRSMResponse(self, id_):
-        """Post-retrieve the RSM response data after items retrieval is done.
-
-        @param id_: extension data ID
-        @type id_: C{unicode}
-
-        @return: dict representation of the RSM response.
-        @rtype: C{dict} of C{unicode}
-        """
-        # This method exists to not modify the return value of self.items.
-        if id_ not in self._rsm_responses:
-            return {}
-        result = self._rsm_responses[id_].toDict()
-        del self._rsm_responses[id_]
-        return result
-
 
 class PubSubService(pubsub.PubSubService):
     """PubSubService extension to handle RSM."""