diff sat/plugins/plugin_xep_0313.py @ 2718:bb6adaa580ee

plugin XEP-0313, XEP-0045: loop MAM requests until whole archive is retrieved: - plugin xep-0313: new serialise method - : getArchives now returns an extra mam_response dict with "complete" and "stable" status - : MAMGet now return a new metadata dict before profile (with serialised RSM and MAM response)
author Goffi <goffi@goffi.org>
date Mon, 10 Dec 2018 20:34:45 +0100
parents 19000c506d0c
children 3480d4fdf83a
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0313.py	Mon Dec 10 20:34:45 2018 +0100
+++ b/sat/plugins/plugin_xep_0313.py	Mon Dec 10 20:34:45 2018 +0100
@@ -65,7 +65,8 @@
         self._rsm = host.plugins[u"XEP-0059"]
         self._sid = host.plugins[u"XEP-0359"]
         host.bridge.addMethod(
-            "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives,
+            "MAMGet", ".plugin", in_sign='sss',
+            out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives,
             async=True)
 
     @defer.inlineCallbacks
@@ -78,25 +79,35 @@
             log.info(_(u"It seems that we have no MAM history yet"))
             return
         stanza_id = last_mess[0][-1][u'stanza_id']
-        rsm_req = rsm.RSMRequest(after=stanza_id)
+        rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
         mam_req = mam.MAMRequest(rsm_=rsm_req)
-        mam_data = yield self.getArchives(client, mam_req,
-                                          service=client.jid.userhostJID())
-        elt_list, rsm_response = mam_data
-        if not elt_list:
+        complete = False
+        count = 0
+        while not complete:
+            mam_data = yield self.getArchives(client, mam_req,
+                                              service=client.jid.userhostJID())
+            elt_list, rsm_response, mam_response = mam_data
+            complete = mam_response[u"complete"]
+            # we update MAM request for next iteration
+            mam_req.rsm.after = rsm_response.last
+            if not elt_list:
+                break
+            else:
+                count += len(elt_list)
+
+            for mess_elt in elt_list:
+                try:
+                    fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
+                except exceptions.DataError:
+                    continue
+
+                client.messageProt.onMessage(fwd_message_elt)
+
+        if not count:
             log.info(_(u"We have received no message while offline"))
-            return
         else:
-            log.info(_(u"We have received {num_mess} message(s) while offline.").format(
-                num_mess=len(elt_list)))
-
-        for mess_elt in elt_list:
-            try:
-                fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
-            except exceptions.DataError:
-                continue
-
-            client.messageProt.onMessage(fwd_message_elt)
+            log.info(_(u"We have received {num_mess} message(s) while offline.")
+                .format(num_mess=count))
 
     def getHandler(self, client):
         mam_client = client._mam = SatMAMClient()
@@ -153,13 +164,31 @@
 
         return mam.MAMRequest(**mam_args) if mam_args else None
 
+    def serialise(self, mam_response, data=None):
+        """Serialise data for MAM
+
+        Key set in data can be:
+            - mam_complete: a bool const indicating if all items have been received
+            - mam_stable: a bool const which is False if items order may be changed
+        All values are set as strings.
+        @param mam_response(dict): response data to serialise
+        @param data(dict, None): dict to update with mam_* data.
+            If None, a new dict is created
+        @return (dict): data dict
+        """
+        if data is None:
+            data = {}
+        data[u"mam_complete"] = C.boolConst(mam_response[u'complete'])
+        data[u"mam_stable"] = C.boolConst(mam_response[u'stable'])
+        return data
+
     def getMessageFromResult(self, client, mess_elt, mam_req, service=None):
         """Extract usable <message/> from MAM query result
 
         The message will be validated, and stanza-id/delay will be added if necessary.
         @param mess_elt(domish.Element): result <message/> element wrapping the message
             to retrieve
-        @param mam_req(mam.MAMRequest): request used
+        @param mam_req(mam.MAMRequest): request used (needed to get query_id)
         @param service(jid.JID, None): MAM service where the request has been sent
             None if it's user server
         @return (domish.Element): <message/> that can be used directly with onMessage
@@ -246,24 +275,37 @@
         except StopIteration:
             raise exceptions.DataError(u"Invalid MAM result")
 
+        mam_response = {u"complete": C.bool(fin_elt.getAttribute(u"complete", C.BOOL_FALSE)),
+                        u"stable": C.bool(fin_elt.getAttribute(u"stable", C.BOOL_TRUE))}
+
         try:
             rsm_response = rsm.RSMResponse.fromElement(fin_elt)
         except rsm.RSMNotFoundError:
             rsm_response = None
 
-        return (elt_list, rsm_response)
+        return (elt_list, rsm_response, mam_response)
 
     def serializeArchiveResult(self, data, client, mam_req, service):
-        elt_list, rsm_response = data
+        elt_list, rsm_response, mam_response = data
         mess_list = []
         for elt in elt_list:
             fwd_message_elt = self.getMessageFromResult(client, elt, mam_req,
                                                         service=service)
             mess_data = client.messageProt.parseMessage(fwd_message_elt)
             mess_list.append(client.messageGetBridgeArgs(mess_data))
-        return mess_list, client.profile
+        metadata = self._rsm.serialise(rsm_response)
+        self.serialise(mam_response, metadata)
+        return mess_list, metadata, client.profile
 
     def _getArchives(self, service, extra_ser, profile_key):
+        """
+        @return: tuple with:
+            - list of message with same data as in bridge.messageNew
+            - response metadata with:
+                - rsm data (rsm_first, rsm_last, rsm_count, rsm_index)
+                - mam data (mam_complete, mam_stable)
+            - profile
+        """
         client = self.host.getClient(profile_key)
         service = jid.JID(service) if service else None
         extra = data_format.deserialise(extra_ser, {})
@@ -274,8 +316,19 @@
         return d
 
     def getArchives(self, client, query, service=None, message_cb=None):
-        """Query archive then grab and return them all in the result
+        """Query archive and gather page result
 
+        @param query(mam.MAMRequest): MAM request
+        @param service(jid.JID, None): MAM service to use
+            None to use our own server
+        @param message_cb(callable, None): callback to use on each message
+            this method can be used to unwrap messages
+        @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with:
+            - list of found elements
+            - RSM response
+            - MAM response, which is a dict with following value:
+                - complete: a boolean which is True if all items have been received
+                - stable: a boolean which is False if items order may be changed
         """
         if query.query_id is None:
             query.query_id = unicode(uuid.uuid4())