# HG changeset patch # User Goffi # Date 1556881208 -7200 # Node ID 32b6893240e0fe811a13d6a8aa9bc5b7e174a28e # Parent e0429ff7f6b666e7bd0beb8b945edeb662aa86f6 plugin XEP-0313: fixed archive retrieval on connection: last stanza-id is now saved for every message received, and not only retrieved from local message history anymore. If we are the sender of a message from archive, it is only added to local history, else it is injected to normal workflow. diff -r e0429ff7f6b6 -r 32b6893240e0 sat/plugins/plugin_xep_0313.py --- a/sat/plugins/plugin_xep_0313.py Sun Apr 28 09:00:51 2019 +0200 +++ b/sat/plugins/plugin_xep_0313.py Fri May 03 13:00:08 2019 +0200 @@ -39,9 +39,6 @@ log = getLogger(__name__) - -MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" - PLUGIN_INFO = { C.PI_NAME: u"Message Archive Management", C.PI_IMPORT_NAME: u"XEP-0313", @@ -55,6 +52,9 @@ MAM_PREFIX = u"mam_" FILTER_PREFIX = MAM_PREFIX + "filter_" +KEY_LAST_STANZA_ID = u"last_stanza_id" +MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" +MESSAGE_STANZA_ID = '/message/stanza-id[@xmlns="{ns_stanza_id}"]' class XEP_0313(object): @@ -64,6 +64,9 @@ self.host.registerNamespace(u"mam", mam.NS_MAM) self._rsm = host.plugins[u"XEP-0059"] self._sid = host.plugins[u"XEP-0359"] + # Deferred used to store last stanza id in order of reception + self._last_stanza_id_d = defer.Deferred() + self._last_stanza_id_d.callback(None) host.bridge.addMethod( "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})a{ss}s)', method=self._getArchives, @@ -71,14 +74,19 @@ @defer.inlineCallbacks def profileConnected(self, client): - last_mess = yield self.host.memory.historyGet( - None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT, - u'last_stanza_id': True}, - profile=client.profile) - if not last_mess: - log.info(_(u"It seems that we have no MAM history yet")) - return - stanza_id = last_mess[0][-1][u'stanza_id'] + stanza_id_data = yield self.host.memory.storage.getPrivates( + mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) + stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) + if stanza_id is None: + log.info(u"can't retrieve last stanza ID, checking history") + last_mess = yield self.host.memory.historyGet( + None, None, limit=1, filters={u'not_types': C.MESS_TYPE_GROUPCHAT, + u'last_stanza_id': True}, + profile=client.profile) + if not last_mess: + log.info(_(u"It seems that we have no MAM history yet")) + return + stanza_id = last_mess[0][-1][u'stanza_id'] rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) mam_req = mam.MAMRequest(rsm_=rsm_req) complete = False @@ -97,11 +105,36 @@ for mess_elt in elt_list: try: - fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) + fwd_message_elt = self.getMessageFromResult( + client, mess_elt, mam_req) except exceptions.DataError: continue - client.messageProt.onMessage(fwd_message_elt) + try: + destinee = jid.JID(fwd_message_elt['to']) + except KeyError: + log.warning(_(u'missing "to" attribute in forwarded message')) + destinee = client.jid + if destinee.userhostJID() == client.jid.userhostJID(): + # message to use, we insert the forwarded message in the normal + # workflow + client.xmlstream.dispatch(fwd_message_elt) + else: + # this message should be from us, we just add it to history + try: + from_jid = jid.JID(fwd_message_elt['from']) + except KeyError: + log.warning(_(u'missing "from" attribute in forwarded message')) + from_jid = client.jid + if from_jid.userhostJID() != client.jid.userhostJID(): + log.warning(_( + u'was expecting a message sent by our jid, but this one if ' + u'from {from_jid}, ignoring\n{xml}').format( + from_jid=from_jid.full(), xml=mess_elt.toXml())) + continue + # adding message to history + mess_data = client.messageProt.parseMessage(fwd_message_elt) + yield client.messageProt.addToHistory(mess_data) if not count: log.info(_(u"We have received no message while offline")) @@ -110,7 +143,7 @@ .format(num_mess=count)) def getHandler(self, client): - mam_client = client._mam = SatMAMClient() + mam_client = client._mam = SatMAMClient(self) return mam_client def parseExtra(self, extra, with_rsm=True): @@ -198,7 +231,7 @@ @param mam_req(mam.MAMRequest): request used (needed to get query_id) @param service(jid.JID, None): MAM service where the request has been sent None if it's user server - @return (domish.Element): that can be used directly with onMessage + @return domish.Element): that can be used directly with onMessage """ if mess_elt.name != u"message": log.warning(u"unexpected stanza in archive: {xml}".format( @@ -376,10 +409,45 @@ # http://xmpp.org/extensions/xep-0313.html#prefs return client._mam.setPrefs(service, default, always, never) + def onMessageStanzaId(self, message_elt, client): + """Called when a message with a stanza-id is received + + the messages' stanza ids are stored when received, so the last one can be used + to retrieve missing history on next connection + @param message_elt(domish.Element): with a stanza-id + """ + service_jid = client.jid.userhostJID() + stanza_id = self._sid.getStanzaId(message_elt, service_jid) + if stanza_id is None: + log.debug(u"Ignoring , stanza id is not from our server") + else: + # we use self._last_stanza_id_d do be sure that last_stanza_id is stored in + # the order of reception + self._last_stanza_id_d.addCallback( + lambda __: self.host.memory.storage.setPrivateValue( + namespace=mam.NS_MAM, + key=KEY_LAST_STANZA_ID, + value=stanza_id, + profile=client.profile)) + class SatMAMClient(mam.MAMClient): implements(disco.IDisco) + def __init__(self, plugin_parent): + self.plugin_parent = plugin_parent + + @property + def host(self): + return self.parent.host_app + + def connectionInitialized(self): + observer_xpath = MESSAGE_STANZA_ID.format( + ns_stanza_id=self.host.ns_map[u'stanza_id']) + self.xmlstream.addObserver( + observer_xpath, self.plugin_parent.onMessageStanzaId, client=self.parent + ) + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(mam.NS_MAM)]