changeset 2718:bb6adaa580ee

plugin XEP-0313, XEP-0045: loop MAM requests until whole archive is retrieved: - plugin xep-0313: new serialise method - : getArchives now returns an extra mam_response dict with "complete" and "stable" status - : MAMGet now return a new metadata dict before profile (with serialised RSM and MAM response)
author Goffi <goffi@goffi.org>
date Mon, 10 Dec 2018 20:34:45 +0100 (2018-12-10)
parents e3f6de6ce320
children 45189c8bd165
files sat/plugins/plugin_xep_0045.py sat/plugins/plugin_xep_0060.py sat/plugins/plugin_xep_0313.py
diffstat 3 files changed, 125 insertions(+), 62 deletions(-) [+]
line wrap: on
line diff
--- a/sat/plugins/plugin_xep_0045.py	Mon Dec 10 20:34:45 2018 +0100
+++ b/sat/plugins/plugin_xep_0045.py	Mon Dec 10 20:34:45 2018 +0100
@@ -870,43 +870,56 @@
                                                       profile=client.profile)
         if last_mess:
             stanza_id = last_mess[0][-1][u'stanza_id']
-            rsm_req = rsm.RSMRequest(after=stanza_id)
+            rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
         else:
             log.info(u"We have no MAM archive for room {room_jid}.".format(
                 room_jid=room_jid))
-            rsm_req = rsm.RSMRequest(max_=20)
+            # we don't want the whole archive if we have no archive yet
+            # as it can be huge
+            rsm_req = rsm.RSMRequest(max_=50)
 
         mam_req = mam.MAMRequest(rsm_=rsm_req)
-        mam_data = yield self._mam.getArchives(client, mam_req,
-                                               service=room_jid)
-        elt_list, rsm_response = mam_data
+        complete = False
+        count = 0
+        while not complete:
+            mam_data = yield self._mam.getArchives(client, mam_req,
+                                                   service=room_jid)
+            elt_list, rsm_response, mam_response = mam_data
+            complete = mam_response[u"complete"]
+            # we update MAM request for next iteration
+            mam_req.rsm.after = rsm_response.last
+
+            if not elt_list:
+                break
+            else:
+                count += len(elt_list)
 
-        if not elt_list:
+                for mess_elt in elt_list:
+                    try:
+                        fwd_message_elt = self._mam.getMessageFromResult(
+                            client, mess_elt, mam_req, service=room_jid)
+                    except exceptions.DataError:
+                        continue
+                    if fwd_message_elt.getAttribute(u"to"):
+                        log.warning(
+                            u'Forwarded message element has a "to" attribute while it is '
+                            u'forbidden by specifications')
+                    fwd_message_elt[u"to"] = client.jid.full()
+                    mess_data = client.messageProt.parseMessage(fwd_message_elt)
+                    # we attache parsed message data to element, to avoid parsing
+                    # again in _addToHistory
+                    fwd_message_elt._mess_data = mess_data
+                    # and we inject to MUC workflow
+                    client._muc_client._onGroupChat(fwd_message_elt)
+
+        if not count:
             log.info(_(u"No message received while offline in {room_jid}".format(
                 room_jid=room_jid)))
         else:
             log.info(
-                _(u"We have received {num_mess} message(s) in {room_jid} while offline.")
-                .format(num_mess=len(elt_list), room_jid=room_jid))
-
-            for mess_elt in elt_list:
-                try:
-                    fwd_message_elt = self._mam.getMessageFromResult(
-                        client, mess_elt, mam_req, service=room_jid)
-                except exceptions.DataError:
-                    continue
-                if fwd_message_elt.getAttribute(u"to"):
-                    log.warning(
-                        u'Forwarded message element has a "to" attribute while it is '
-                        u'forbidden by specifications')
-                fwd_message_elt[u"to"] = client.jid.full()
-                mess_data = client.messageProt.parseMessage(fwd_message_elt)
-                # we attache parsed message data to element, to avoid parsing
-                # again in _addToHistory
-                fwd_message_elt._mess_data = mess_data
-                # and we inject to MUC workflow
-                client._muc_client._onGroupChat(fwd_message_elt)
-
+                _(u"We have received {num_mess} message(s) in {room_jid} while "
+                  u"offline.")
+                .format(num_mess=count, room_jid=room_jid))
 
         # for legacy history, the following steps are done in receivedSubject but for MAM
         # the order is different (we have to join then get MAM archive, so subject
--- a/sat/plugins/plugin_xep_0060.py	Mon Dec 10 20:34:45 2018 +0100
+++ b/sat/plugins/plugin_xep_0060.py	Mon Dec 10 20:34:45 2018 +0100
@@ -475,18 +475,12 @@
     def _unwrapMAMMessage(self, message_elt):
         try:
             item_elt = (
-                message_elt.elements(mam.NS_MAM, "result")
-                .next()
-                .elements(C.NS_FORWARD, "forwarded")
-                .next()
-                .elements(C.NS_CLIENT, "message")
-                .next()
-                .elements("http://jabber.org/protocol/pubsub#event", "event")
-                .next()
-                .elements("http://jabber.org/protocol/pubsub#event", "items")
-                .next()
-                .elements("http://jabber.org/protocol/pubsub#event", "item")
-                .next()
+                message_elt.elements(mam.NS_MAM, "result").next()
+                .elements(C.NS_FORWARD, "forwarded").next()
+                .elements(C.NS_CLIENT, "message").next()
+                .elements("http://jabber.org/protocol/pubsub#event", "event").next()
+                .elements("http://jabber.org/protocol/pubsub#event", "items").next()
+                .elements("http://jabber.org/protocol/pubsub#event", "item").next()
             )
         except StopIteration:
             raise exceptions.DataError(u"Can't find Item in MAM message element")
@@ -528,7 +522,8 @@
         @return: a deferred couple (list[dict], dict) containing:
             - list of items
             - metadata with the following keys:
-                - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse
+                - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index
+                    value of RSMResponse
                 - service, node: service and node used
         """
         if item_ids and max_items is not None:
@@ -565,6 +560,8 @@
                         u"Conflict between RSM request and MAM's RSM request"
                     )
             d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage)
+            # FIXME: we only keep items for now, but RSM and MAM metadata should be used
+            d.addCallback(lambda archives: archives[0])
 
         try:
             subscribe = C.bool(extra["subscribe"])
--- a/sat/plugins/plugin_xep_0313.py	Mon Dec 10 20:34:45 2018 +0100
+++ b/sat/plugins/plugin_xep_0313.py	Mon Dec 10 20:34:45 2018 +0100
@@ -65,7 +65,8 @@
         self._rsm = host.plugins[u"XEP-0059"]
         self._sid = host.plugins[u"XEP-0359"]
         host.bridge.addMethod(
-            "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives,
+            "MAMGet", ".plugin", in_sign='sss',
+            out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives,
             async=True)
 
     @defer.inlineCallbacks
@@ -78,25 +79,35 @@
             log.info(_(u"It seems that we have no MAM history yet"))
             return
         stanza_id = last_mess[0][-1][u'stanza_id']
-        rsm_req = rsm.RSMRequest(after=stanza_id)
+        rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
         mam_req = mam.MAMRequest(rsm_=rsm_req)
-        mam_data = yield self.getArchives(client, mam_req,
-                                          service=client.jid.userhostJID())
-        elt_list, rsm_response = mam_data
-        if not elt_list:
+        complete = False
+        count = 0
+        while not complete:
+            mam_data = yield self.getArchives(client, mam_req,
+                                              service=client.jid.userhostJID())
+            elt_list, rsm_response, mam_response = mam_data
+            complete = mam_response[u"complete"]
+            # we update MAM request for next iteration
+            mam_req.rsm.after = rsm_response.last
+            if not elt_list:
+                break
+            else:
+                count += len(elt_list)
+
+            for mess_elt in elt_list:
+                try:
+                    fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
+                except exceptions.DataError:
+                    continue
+
+                client.messageProt.onMessage(fwd_message_elt)
+
+        if not count:
             log.info(_(u"We have received no message while offline"))
-            return
         else:
-            log.info(_(u"We have received {num_mess} message(s) while offline.").format(
-                num_mess=len(elt_list)))
-
-        for mess_elt in elt_list:
-            try:
-                fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
-            except exceptions.DataError:
-                continue
-
-            client.messageProt.onMessage(fwd_message_elt)
+            log.info(_(u"We have received {num_mess} message(s) while offline.")
+                .format(num_mess=count))
 
     def getHandler(self, client):
         mam_client = client._mam = SatMAMClient()
@@ -153,13 +164,31 @@
 
         return mam.MAMRequest(**mam_args) if mam_args else None
 
+    def serialise(self, mam_response, data=None):
+        """Serialise data for MAM
+
+        Key set in data can be:
+            - mam_complete: a bool const indicating if all items have been received
+            - mam_stable: a bool const which is False if items order may be changed
+        All values are set as strings.
+        @param mam_response(dict): response data to serialise
+        @param data(dict, None): dict to update with mam_* data.
+            If None, a new dict is created
+        @return (dict): data dict
+        """
+        if data is None:
+            data = {}
+        data[u"mam_complete"] = C.boolConst(mam_response[u'complete'])
+        data[u"mam_stable"] = C.boolConst(mam_response[u'stable'])
+        return data
+
     def getMessageFromResult(self, client, mess_elt, mam_req, service=None):
         """Extract usable <message/> from MAM query result
 
         The message will be validated, and stanza-id/delay will be added if necessary.
         @param mess_elt(domish.Element): result <message/> element wrapping the message
             to retrieve
-        @param mam_req(mam.MAMRequest): request used
+        @param mam_req(mam.MAMRequest): request used (needed to get query_id)
         @param service(jid.JID, None): MAM service where the request has been sent
             None if it's user server
         @return (domish.Element): <message/> that can be used directly with onMessage
@@ -246,24 +275,37 @@
         except StopIteration:
             raise exceptions.DataError(u"Invalid MAM result")
 
+        mam_response = {u"complete": C.bool(fin_elt.getAttribute(u"complete", C.BOOL_FALSE)),
+                        u"stable": C.bool(fin_elt.getAttribute(u"stable", C.BOOL_TRUE))}
+
         try:
             rsm_response = rsm.RSMResponse.fromElement(fin_elt)
         except rsm.RSMNotFoundError:
             rsm_response = None
 
-        return (elt_list, rsm_response)
+        return (elt_list, rsm_response, mam_response)
 
     def serializeArchiveResult(self, data, client, mam_req, service):
-        elt_list, rsm_response = data
+        elt_list, rsm_response, mam_response = data
         mess_list = []
         for elt in elt_list:
             fwd_message_elt = self.getMessageFromResult(client, elt, mam_req,
                                                         service=service)
             mess_data = client.messageProt.parseMessage(fwd_message_elt)
             mess_list.append(client.messageGetBridgeArgs(mess_data))
-        return mess_list, client.profile
+        metadata = self._rsm.serialise(rsm_response)
+        self.serialise(mam_response, metadata)
+        return mess_list, metadata, client.profile
 
     def _getArchives(self, service, extra_ser, profile_key):
+        """
+        @return: tuple with:
+            - list of message with same data as in bridge.messageNew
+            - response metadata with:
+                - rsm data (rsm_first, rsm_last, rsm_count, rsm_index)
+                - mam data (mam_complete, mam_stable)
+            - profile
+        """
         client = self.host.getClient(profile_key)
         service = jid.JID(service) if service else None
         extra = data_format.deserialise(extra_ser, {})
@@ -274,8 +316,19 @@
         return d
 
     def getArchives(self, client, query, service=None, message_cb=None):
-        """Query archive then grab and return them all in the result
+        """Query archive and gather page result
 
+        @param query(mam.MAMRequest): MAM request
+        @param service(jid.JID, None): MAM service to use
+            None to use our own server
+        @param message_cb(callable, None): callback to use on each message
+            this method can be used to unwrap messages
+        @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with:
+            - list of found elements
+            - RSM response
+            - MAM response, which is a dict with following value:
+                - complete: a boolean which is True if all items have been received
+                - stable: a boolean which is False if items order may be changed
         """
         if query.query_id is None:
             query.query_id = unicode(uuid.uuid4())