# HG changeset patch # User Goffi # Date 1543656823 -3600 # Node ID 2ea2369ae7de717382e01034e33929927ba38aff # Parent 035901dc946dd9943d91b727787b66943eaaa169 plugin XEP-0313: implementation of MAM for messages: - (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data - : parseMessage is not static anymore - : new "message_parse" trigger point - (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one - : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow. In other words, one2one history is synchronised on connection. - : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge. diff -r 035901dc946d -r 2ea2369ae7de sat/core/xmpp.py --- a/sat/core/xmpp.py Sat Dec 01 10:10:25 2018 +0100 +++ b/sat/core/xmpp.py Sat Dec 01 10:33:43 2018 +0100 @@ -60,8 +60,8 @@ self.profile = profile self.host_app = host_app self.cache = cache.Cache(host_app, profile) - self._mess_id_uid = {} # map from message id to uid used in history. - # Key: (full_jid,message_id) Value: uid + self.mess_id2uid = {} # map from message id to uid used in history. + # Key: (full_jid, message_id) Value: uid # this Deferred fire when entity is connected self.conn_deferred = defer.Deferred() self._progress_cb = {} # callback called when a progress is requested @@ -540,6 +540,13 @@ ) # empty body should be managed by plugins before this point return data + def messageGetBridgeArgs(self, data): + """Generate args to use with bridge from data dict""" + return (data[u"uid"], data[u"timestamp"], data[u"from"].full(), + data[u"to"].full(), data[u"message"], data[u"subject"], + data[u"type"], data[u"extra"]) + + def messageSendToBridge(self, data): """Send message to bridge, so frontends can display it @@ -549,20 +556,13 @@ if data[u"type"] != C.MESS_TYPE_GROUPCHAT: # we don't send groupchat message to bridge, as we get them back # and they will be added the - if ( - data[u"message"] or data[u"subject"] - ): # we need a message to send something + if (data[u"message"] or data[u"subject"]): # we need a message to send + # something + # We send back the message, so all frontends are aware of it self.host_app.bridge.messageNew( - data[u"uid"], - data[u"timestamp"], - data[u"from"].full(), - data[u"to"].full(), - data[u"message"], - data[u"subject"], - data[u"type"], - data[u"extra"], - profile=self.profile, + *self.messageGetBridgeArgs(data), + profile=self.profile ) else: log.warning(_(u"No message found")) @@ -843,8 +843,7 @@ xmppim.MessageProtocol.__init__(self) self.host = host - @staticmethod - def parseMessage(message_elt, client=None): + def parseMessage(self, message_elt): """parse a message XML and return message_data @param message_elt(domish.Element): raw xml @@ -852,28 +851,33 @@ if None, mapping will not be done @return(dict): message data """ + if message_elt.name != u"message": + log.warning(_( + u"parseMessage used with a non stanza, ignoring: {xml}" + .format(xml=message_elt.toXml()))) + return {} + client = self.parent message = {} subject = {} extra = {} data = { - "from": jid.JID(message_elt["from"]), - "to": jid.JID(message_elt["to"]), - "uid": message_elt.getAttribute( - "uid", unicode(uuid.uuid4()) + u"from": jid.JID(message_elt["from"]), + u"to": jid.JID(message_elt["to"]), + u"uid": message_elt.getAttribute( + u"uid", unicode(uuid.uuid4()) ), # XXX: uid is not a standard attribute but may be added by plugins - "message": message, - "subject": subject, - "type": message_elt.getAttribute("type", "normal"), - "extra": extra, + u"message": message, + u"subject": subject, + u"type": message_elt.getAttribute(u"type", u"normal"), + u"extra": extra, } - if client is not None: - try: - data["stanza_id"] = message_elt["id"] - except KeyError: - pass - else: - client._mess_id_uid[(data["from"], data["stanza_id"])] = data["uid"] + try: + message_id = data[u"extra"][u"message_id"] = message_elt[u"id"] + except KeyError: + pass + else: + client.mess_id2uid[(data["from"], message_id)] = data["uid"] # message for e in message_elt.elements(C.NS_CLIENT, "body"): @@ -894,6 +898,8 @@ data["received_timestamp"] = unicode(time.time()) if parsed_delay.sender: data["delay_sender"] = parsed_delay.sender.full() + + self.host.trigger.point("message_parse", client, message_elt, data) return data def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat): @@ -907,7 +913,7 @@ """ if not cont: return - data = self.parseMessage(message_elt, client=client) + data = self.parseMessage(message_elt) post_treat.addCallback(self.skipEmptyMessage) post_treat.addCallback(self.addToHistory, client) post_treat.addCallback(self.bridgeSignal, client, data) diff -r 035901dc946d -r 2ea2369ae7de sat/plugins/plugin_xep_0313.py --- a/sat/plugins/plugin_xep_0313.py Sat Dec 01 10:10:25 2018 +0100 +++ b/sat/plugins/plugin_xep_0313.py Sat Dec 01 10:33:43 2018 +0100 @@ -21,15 +21,15 @@ from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger - -log = getLogger(__name__) from sat.core import exceptions - +from sat.tools.common import data_format from twisted.words.protocols.jabber import jid - +from twisted.internet import defer from zope.interface import implements - +from datetime import datetime +from dateutil import tz from wokkel import disco +from wokkel import data_form import uuid # XXX: mam and rsm come from sat_tmp.wokkel @@ -37,28 +37,178 @@ from wokkel import mam +log = getLogger(__name__) + + MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" PLUGIN_INFO = { - C.PI_NAME: "Message Archive Management", - C.PI_IMPORT_NAME: "XEP-0313", - C.PI_TYPE: "XEP", - C.PI_PROTOCOLS: ["XEP-0313"], - C.PI_MAIN: "XEP_0313", - C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Implementation of Message Archive Management"""), + C.PI_NAME: u"Message Archive Management", + C.PI_IMPORT_NAME: u"XEP-0313", + C.PI_TYPE: u"XEP", + C.PI_PROTOCOLS: [u"XEP-0313"], + C.PI_DEPENDENCIES: [u"XEP-0059", u"XEP-0359"], + C.PI_MAIN: u"XEP_0313", + C.PI_HANDLER: u"yes", + C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""), } +MAM_PREFIX = u"mam_" +FILTER_PREFIX = MAM_PREFIX + "filter_" + class XEP_0313(object): def __init__(self, host): log.info(_("Message Archive Management plugin initialization")) self.host = host + self.host.registerNamespace(u"mam", mam.NS_MAM) + self._rsm = host.plugins[u"XEP-0059"] + self._sid = host.plugins[u"XEP-0359"] + host.bridge.addMethod( + "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives, + async=True) + + @defer.inlineCallbacks + def profileConnected(self, client): + last_mess = yield self.host.memory.historyGet( + None, None, limit=1, filters={u'last_stanza_id': True}, + profile=client.profile) + if not last_mess: + log.info(_(u"It seems that we have no MAM history yet")) + return + stanza_id = last_mess[0][-1][u'stanza_id'] + # XXX: test + # stanza_id = "IIheJOfiIhkPYkw6" + rsm_req = rsm.RSMRequest(after=stanza_id) + mam_req = mam.MAMRequest(rsm_=rsm_req) + mam_data = yield self.getArchives(client, mam_req, + service=client.jid.userhostJID()) + elt_list, rsm_response = mam_data + if not elt_list: + log.info(_(u"We have received no message while offline")) + return + else: + log.info(_(u"We have received {num_mess} message(s) while offline.").format( + num_mess=len(elt_list))) + + for mess_elt in elt_list: + try: + fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) + except exceptions.DataError: + continue + + client.messageProt.onMessage(fwd_message_elt) def getHandler(self, client): mam_client = client._mam = SatMAMClient() return mam_client + def parseExtra(self, extra, with_rsm=True): + """Parse extra dictionnary to retrieve MAM arguments + + @param extra(dict): data for parse + @param with_rsm(bool): if True, RSM data will be parsed too + @return (data_form, None): request with parsed arguments + or None if no MAM arguments have been found + """ + mam_args = {} + form_args = {} + for arg in (u"start", u"end"): + try: + value = extra.pop(MAM_PREFIX + arg) + form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) + except (TypeError, ValueError): + log.warning(u"Bad value for {arg} filter ({value}), ignoring".format( + arg=arg, value=value)) + except KeyError: + continue + + try: + form_args[u"with_jid"] = jid.JID(extra.pop( + MAM_PREFIX + u"with")) + except (jid.InvalidFormat): + log.warning(u"Bad value for jid filter") + except KeyError: + pass + + for name, value in extra.iteritems(): + if name.startswith(FILTER_PREFIX): + var = name[len(FILTER_PREFIX) :] + extra_fields = form_args.setdefault(u"extra_fields", []) + extra_fields.append(data_form.Field(var=var, value=value)) + + for arg in (u"node", u"query_id"): + try: + value = extra.pop(MAM_PREFIX + arg) + mam_args[arg] = value + except KeyError: + continue + + if with_rsm: + rsm_request = self._rsm.parseExtra(extra) + if rsm_request is not None: + mam_args["rsm_"] = rsm_request + + if form_args: + mam_args["form"] = mam.buildForm(**form_args) + + return mam.MAMRequest(**mam_args) if mam_args else None + + def getMessageFromResult(self, client, mess_elt, mam_req): + """Extract usable from MAM query result + + The message will be validated, and stanza-id/delay will be added if necessary. + @param mess_elt(domish.Element): result element wrapping the message + to retrieve + @param mam_req(mam.MAMRequest): request used + @return (domish.Element): that can be used directly with onMessage + """ + if mess_elt.name != u"message": + log.warning(u"unexpected stanza in archive: {xml}".format( + xml=mess_elt.toXml())) + raise exceptions.DataError(u"Invalid element") + mess_from = mess_elt[u"from"] + if mess_from != client.jid.host and mess_from != client.jid.userhost(): + log.error(u"Message is not from our server, something went wrong: " + u"{xml}".format(xml=mess_elt.toXml())) + raise exceptions.DataError(u"Invalid element") + try: + result_elt = next(mess_elt.elements(mam.NS_MAM, u"result")) + forwarded_elt = next(result_elt.elements(C.NS_FORWARD, u"forwarded")) + try: + delay_elt = next(forwarded_elt.elements(C.NS_DELAY, u"delay")) + except StopIteration: + # delay_elt is not mandatory + delay_elt = None + fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, u"message")) + except StopIteration: + log.warning(u"Invalid message received from MAM: {xml}".format( + xml=mess_elt.toXml())) + raise exceptions.DataError(u"Invalid element") + else: + if not result_elt[u"queryid"] == mam_req.query_id: + log.error(u"Unexpected query id (was expecting {query_id}): {xml}" + .format(query_id=mam.query_id, xml=mess_elt.toXml())) + raise exceptions.DataError(u"Invalid element") + stanza_id = self._sid.getStanzaId(fwd_message_elt, + client.jid.userhostJID()) + if stanza_id is None: + # not stanza-id element is present, we add one so message + # will be archived with it, and we won't request several times + # the same MAM achive + try: + stanza_id = result_elt[u"id"] + except AttributeError: + log.warning(u'Invalid MAM result: missing "id" attribute: {xml}' + .format(xml=result_elt.toXml())) + raise exceptions.DataError(u"Invalid element") + self._sid.addStanzaId(client, fwd_message_elt, stanza_id) + + if delay_elt is not None: + fwd_message_elt.addChild(delay_elt) + + return fwd_message_elt + def queryFields(self, client, service=None): """Ask the server about supported fields. @@ -67,15 +217,15 @@ """ return client._mam.queryFields(service) - def queryArchive(self, client, mam_query, service=None): + def queryArchive(self, client, mam_req, service=None): """Query a user, MUC or pubsub archive. - @param mam_query(mam.MAMRequest): MAM query instance + @param mam_req(mam.MAMRequest): MAM query instance @param service(jid.JID, None): entity offering the MAM service None for user server @return (D(domish.Element)): result """ - return client._mam.queryArchive(mam_query, service) + return client._mam.queryArchive(mam_req, service) def _appendMessage(self, elt_list, message_cb, message_elt): if message_cb is not None: @@ -97,6 +247,25 @@ return (elt_list, rsm_response) + def serializeArchiveResult(self, data, client, mam_req): + elt_list, rsm_response = data + mess_list = [] + for elt in elt_list: + fwd_message_elt = self.getMessageFromResult(client, elt, mam_req) + mess_data = client.messageProt.parseMessage(fwd_message_elt) + mess_list.append(client.messageGetBridgeArgs(mess_data)) + return mess_list, client.profile + + def _getArchives(self, service, extra_ser, profile_key): + client = self.host.getClient(profile_key) + service = jid.JID(service) if service else None + extra = data_format.deserialise(extra_ser, {}) + mam_req = self.parseExtra(extra) + + d = self.getArchives(client, mam_req, service=service) + d.addCallback(self.serializeArchiveResult, client, mam_req) + return d + def getArchives(self, client, query, service=None, message_cb=None): """Query archive then grab and return them all in the result @@ -119,14 +288,8 @@ # http://xmpp.org/extensions/xep-0313.html#prefs return client._mam.queryPrefs(service) - def _setPrefs( - self, - service_s=None, - default="roster", - always=None, - never=None, - profile_key=C.PROF_KEY_NONE, - ): + def _setPrefs(self, service_s=None, default="roster", always=None, never=None, + profile_key=C.PROF_KEY_NONE): service = jid.JID(service_s) if service_s else None always_jid = [jid.JID(entity) for entity in always] never_jid = [jid.JID(entity) for entity in never]