# HG changeset patch # User Goffi # Date 1544470485 -3600 # Node ID bb6adaa580ee60dfa332af97970e5347401c50b5 # Parent e3f6de6ce3209654623aaedb0d6487652d948e61 plugin XEP-0313, XEP-0045: loop MAM requests until whole archive is retrieved: - plugin xep-0313: new serialise method - : getArchives now returns an extra mam_response dict with "complete" and "stable" status - : MAMGet now return a new metadata dict before profile (with serialised RSM and MAM response) diff -r e3f6de6ce320 -r bb6adaa580ee sat/plugins/plugin_xep_0045.py --- a/sat/plugins/plugin_xep_0045.py Mon Dec 10 20:34:45 2018 +0100 +++ b/sat/plugins/plugin_xep_0045.py Mon Dec 10 20:34:45 2018 +0100 @@ -870,43 +870,56 @@ profile=client.profile) if last_mess: stanza_id = last_mess[0][-1][u'stanza_id'] - rsm_req = rsm.RSMRequest(after=stanza_id) + rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) else: log.info(u"We have no MAM archive for room {room_jid}.".format( room_jid=room_jid)) - rsm_req = rsm.RSMRequest(max_=20) + # we don't want the whole archive if we have no archive yet + # as it can be huge + rsm_req = rsm.RSMRequest(max_=50) mam_req = mam.MAMRequest(rsm_=rsm_req) - mam_data = yield self._mam.getArchives(client, mam_req, - service=room_jid) - elt_list, rsm_response = mam_data + complete = False + count = 0 + while not complete: + mam_data = yield self._mam.getArchives(client, mam_req, + service=room_jid) + elt_list, rsm_response, mam_response = mam_data + complete = mam_response[u"complete"] + # we update MAM request for next iteration + mam_req.rsm.after = rsm_response.last + + if not elt_list: + break + else: + count += len(elt_list) - if not elt_list: + for mess_elt in elt_list: + try: + fwd_message_elt = self._mam.getMessageFromResult( + client, mess_elt, mam_req, service=room_jid) + except exceptions.DataError: + continue + if fwd_message_elt.getAttribute(u"to"): + log.warning( + u'Forwarded message element has a "to" attribute while it is ' + u'forbidden by specifications') + fwd_message_elt[u"to"] = client.jid.full() + mess_data = client.messageProt.parseMessage(fwd_message_elt) + # we attache parsed message data to element, to avoid parsing + # again in _addToHistory + fwd_message_elt._mess_data = mess_data + # and we inject to MUC workflow + client._muc_client._onGroupChat(fwd_message_elt) + + if not count: log.info(_(u"No message received while offline in {room_jid}".format( room_jid=room_jid))) else: log.info( - _(u"We have received {num_mess} message(s) in {room_jid} while offline.") - .format(num_mess=len(elt_list), room_jid=room_jid)) - - for mess_elt in elt_list: - try: - fwd_message_elt = self._mam.getMessageFromResult( - client, mess_elt, mam_req, service=room_jid) - except exceptions.DataError: - continue - if fwd_message_elt.getAttribute(u"to"): - log.warning( - u'Forwarded message element has a "to" attribute while it is ' - u'forbidden by specifications') - fwd_message_elt[u"to"] = client.jid.full() - mess_data = client.messageProt.parseMessage(fwd_message_elt) - # we attache parsed message data to element, to avoid parsing - # again in _addToHistory - fwd_message_elt._mess_data = mess_data - # and we inject to MUC workflow - client._muc_client._onGroupChat(fwd_message_elt) - + _(u"We have received {num_mess} message(s) in {room_jid} while " + u"offline.") + .format(num_mess=count, room_jid=room_jid)) # for legacy history, the following steps are done in receivedSubject but for MAM # the order is different (we have to join then get MAM archive, so subject diff -r e3f6de6ce320 -r bb6adaa580ee sat/plugins/plugin_xep_0060.py --- a/sat/plugins/plugin_xep_0060.py Mon Dec 10 20:34:45 2018 +0100 +++ b/sat/plugins/plugin_xep_0060.py Mon Dec 10 20:34:45 2018 +0100 @@ -475,18 +475,12 @@ def _unwrapMAMMessage(self, message_elt): try: item_elt = ( - message_elt.elements(mam.NS_MAM, "result") - .next() - .elements(C.NS_FORWARD, "forwarded") - .next() - .elements(C.NS_CLIENT, "message") - .next() - .elements("http://jabber.org/protocol/pubsub#event", "event") - .next() - .elements("http://jabber.org/protocol/pubsub#event", "items") - .next() - .elements("http://jabber.org/protocol/pubsub#event", "item") - .next() + message_elt.elements(mam.NS_MAM, "result").next() + .elements(C.NS_FORWARD, "forwarded").next() + .elements(C.NS_CLIENT, "message").next() + .elements("http://jabber.org/protocol/pubsub#event", "event").next() + .elements("http://jabber.org/protocol/pubsub#event", "items").next() + .elements("http://jabber.org/protocol/pubsub#event", "item").next() ) except StopIteration: raise exceptions.DataError(u"Can't find Item in MAM message element") @@ -528,7 +522,8 @@ @return: a deferred couple (list[dict], dict) containing: - list of items - metadata with the following keys: - - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index value of RSMResponse + - rsm_first, rsm_last, rsm_count, rsm_index: first, last, count and index + value of RSMResponse - service, node: service and node used """ if item_ids and max_items is not None: @@ -565,6 +560,8 @@ u"Conflict between RSM request and MAM's RSM request" ) d = self._mam.getArchives(client, mam_query, service, self._unwrapMAMMessage) + # FIXME: we only keep items for now, but RSM and MAM metadata should be used + d.addCallback(lambda archives: archives[0]) try: subscribe = C.bool(extra["subscribe"]) diff -r e3f6de6ce320 -r bb6adaa580ee sat/plugins/plugin_xep_0313.py --- a/sat/plugins/plugin_xep_0313.py Mon Dec 10 20:34:45 2018 +0100 +++ b/sat/plugins/plugin_xep_0313.py Mon Dec 10 20:34:45 2018 +0100 @@ -65,7 +65,8 @@ self._rsm = host.plugins[u"XEP-0059"] self._sid = host.plugins[u"XEP-0359"] host.bridge.addMethod( - "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives, + "MAMGet", ".plugin", in_sign='sss', + out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives, async=True) @defer.inlineCallbacks @@ -78,25 +79,35 @@ log.info(_(u"It seems that we have no MAM history yet")) return stanza_id = last_mess[0][-1][u'stanza_id'] - rsm_req = rsm.RSMRequest(after=stanza_id) + rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) mam_req = mam.MAMRequest(rsm_=rsm_req) - mam_data = yield self.getArchives(client, mam_req, - service=client.jid.userhostJID()) - elt_list, rsm_response = mam_data - if not elt_list: + complete = False + count = 0 + while not complete: + mam_data = yield self.getArchives(client, mam_req, + service=client.jid.userhostJID()) + elt_list, rsm_response, mam_response = mam_data + complete = mam_response[u"complete"] + # we update MAM request for next iteration + mam_req.rsm.after = rsm_response.last + if not elt_list: + break + else: + count += len(elt_list) + + for mess_elt in elt_list: + try: + fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) + except exceptions.DataError: + continue + + client.messageProt.onMessage(fwd_message_elt) + + if not count: log.info(_(u"We have received no message while offline")) - return else: - log.info(_(u"We have received {num_mess} message(s) while offline.").format( - num_mess=len(elt_list))) - - for mess_elt in elt_list: - try: - fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) - except exceptions.DataError: - continue - - client.messageProt.onMessage(fwd_message_elt) + log.info(_(u"We have received {num_mess} message(s) while offline.") + .format(num_mess=count)) def getHandler(self, client): mam_client = client._mam = SatMAMClient() @@ -153,13 +164,31 @@ return mam.MAMRequest(**mam_args) if mam_args else None + def serialise(self, mam_response, data=None): + """Serialise data for MAM + + Key set in data can be: + - mam_complete: a bool const indicating if all items have been received + - mam_stable: a bool const which is False if items order may be changed + All values are set as strings. + @param mam_response(dict): response data to serialise + @param data(dict, None): dict to update with mam_* data. + If None, a new dict is created + @return (dict): data dict + """ + if data is None: + data = {} + data[u"mam_complete"] = C.boolConst(mam_response[u'complete']) + data[u"mam_stable"] = C.boolConst(mam_response[u'stable']) + return data + def getMessageFromResult(self, client, mess_elt, mam_req, service=None): """Extract usable from MAM query result The message will be validated, and stanza-id/delay will be added if necessary. @param mess_elt(domish.Element): result element wrapping the message to retrieve - @param mam_req(mam.MAMRequest): request used + @param mam_req(mam.MAMRequest): request used (needed to get query_id) @param service(jid.JID, None): MAM service where the request has been sent None if it's user server @return (domish.Element): that can be used directly with onMessage @@ -246,24 +275,37 @@ except StopIteration: raise exceptions.DataError(u"Invalid MAM result") + mam_response = {u"complete": C.bool(fin_elt.getAttribute(u"complete", C.BOOL_FALSE)), + u"stable": C.bool(fin_elt.getAttribute(u"stable", C.BOOL_TRUE))} + try: rsm_response = rsm.RSMResponse.fromElement(fin_elt) except rsm.RSMNotFoundError: rsm_response = None - return (elt_list, rsm_response) + return (elt_list, rsm_response, mam_response) def serializeArchiveResult(self, data, client, mam_req, service): - elt_list, rsm_response = data + elt_list, rsm_response, mam_response = data mess_list = [] for elt in elt_list: fwd_message_elt = self.getMessageFromResult(client, elt, mam_req, service=service) mess_data = client.messageProt.parseMessage(fwd_message_elt) mess_list.append(client.messageGetBridgeArgs(mess_data)) - return mess_list, client.profile + metadata = self._rsm.serialise(rsm_response) + self.serialise(mam_response, metadata) + return mess_list, metadata, client.profile def _getArchives(self, service, extra_ser, profile_key): + """ + @return: tuple with: + - list of message with same data as in bridge.messageNew + - response metadata with: + - rsm data (rsm_first, rsm_last, rsm_count, rsm_index) + - mam data (mam_complete, mam_stable) + - profile + """ client = self.host.getClient(profile_key) service = jid.JID(service) if service else None extra = data_format.deserialise(extra_ser, {}) @@ -274,8 +316,19 @@ return d def getArchives(self, client, query, service=None, message_cb=None): - """Query archive then grab and return them all in the result + """Query archive and gather page result + @param query(mam.MAMRequest): MAM request + @param service(jid.JID, None): MAM service to use + None to use our own server + @param message_cb(callable, None): callback to use on each message + this method can be used to unwrap messages + @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with: + - list of found elements + - RSM response + - MAM response, which is a dict with following value: + - complete: a boolean which is True if all items have been received + - stable: a boolean which is False if items order may be changed """ if query.query_id is None: query.query_id = unicode(uuid.uuid4())