changeset 2701:2ea2369ae7de

plugin XEP-0313: implementation of MAM for messages: - (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data - : parseMessage is not static anymore - : new "message_parse" trigger point - (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one - : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow. In other words, one2one history is synchronised on connection. - : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge.
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:33:43 +0100
parents 035901dc946d
children 6555e9835ff8
files sat/core/xmpp.py sat/plugins/plugin_xep_0313.py
diffstat 2 files changed, 224 insertions(+), 55 deletions(-) [+]
line wrap: on
line diff
--- a/sat/core/xmpp.py	Sat Dec 01 10:10:25 2018 +0100
+++ b/sat/core/xmpp.py	Sat Dec 01 10:33:43 2018 +0100
@@ -60,8 +60,8 @@
         self.profile = profile
         self.host_app = host_app
         self.cache = cache.Cache(host_app, profile)
-        self._mess_id_uid = {}  # map from message id to uid used in history.
-                                # Key: (full_jid,message_id) Value: uid
+        self.mess_id2uid = {}  # map from message id to uid used in history.
+                               # Key: (full_jid, message_id) Value: uid
         # this Deferred fire when entity is connected
         self.conn_deferred = defer.Deferred()
         self._progress_cb = {}  # callback called when a progress is requested
@@ -540,6 +540,13 @@
                 )  # empty body should be managed by plugins before this point
         return data
 
+    def messageGetBridgeArgs(self, data):
+        """Generate args to use with bridge from data dict"""
+        return (data[u"uid"], data[u"timestamp"], data[u"from"].full(),
+                data[u"to"].full(), data[u"message"], data[u"subject"],
+                data[u"type"], data[u"extra"])
+
+
     def messageSendToBridge(self, data):
         """Send message to bridge, so frontends can display it
 
@@ -549,20 +556,13 @@
         if data[u"type"] != C.MESS_TYPE_GROUPCHAT:
             # we don't send groupchat message to bridge, as we get them back
             # and they will be added the
-            if (
-                data[u"message"] or data[u"subject"]
-            ):  # we need a message to send something
+            if (data[u"message"] or data[u"subject"]):  # we need a message to send
+                                                        # something
+
                 # We send back the message, so all frontends are aware of it
                 self.host_app.bridge.messageNew(
-                    data[u"uid"],
-                    data[u"timestamp"],
-                    data[u"from"].full(),
-                    data[u"to"].full(),
-                    data[u"message"],
-                    data[u"subject"],
-                    data[u"type"],
-                    data[u"extra"],
-                    profile=self.profile,
+                    *self.messageGetBridgeArgs(data),
+                    profile=self.profile
                 )
             else:
                 log.warning(_(u"No message found"))
@@ -843,8 +843,7 @@
         xmppim.MessageProtocol.__init__(self)
         self.host = host
 
-    @staticmethod
-    def parseMessage(message_elt, client=None):
+    def parseMessage(self, message_elt):
         """parse a message XML and return message_data
 
         @param message_elt(domish.Element): raw <message> xml
@@ -852,28 +851,33 @@
             if None, mapping will not be done
         @return(dict): message data
         """
+        if message_elt.name != u"message":
+            log.warning(_(
+                u"parseMessage used with a non <message/> stanza, ignoring: {xml}"
+                .format(xml=message_elt.toXml())))
+            return {}
+        client = self.parent
         message = {}
         subject = {}
         extra = {}
         data = {
-            "from": jid.JID(message_elt["from"]),
-            "to": jid.JID(message_elt["to"]),
-            "uid": message_elt.getAttribute(
-                "uid", unicode(uuid.uuid4())
+            u"from": jid.JID(message_elt["from"]),
+            u"to": jid.JID(message_elt["to"]),
+            u"uid": message_elt.getAttribute(
+                u"uid", unicode(uuid.uuid4())
             ),  # XXX: uid is not a standard attribute but may be added by plugins
-            "message": message,
-            "subject": subject,
-            "type": message_elt.getAttribute("type", "normal"),
-            "extra": extra,
+            u"message": message,
+            u"subject": subject,
+            u"type": message_elt.getAttribute(u"type", u"normal"),
+            u"extra": extra,
         }
 
-        if client is not None:
-            try:
-                data["stanza_id"] = message_elt["id"]
-            except KeyError:
-                pass
-            else:
-                client._mess_id_uid[(data["from"], data["stanza_id"])] = data["uid"]
+        try:
+            message_id = data[u"extra"][u"message_id"] = message_elt[u"id"]
+        except KeyError:
+            pass
+        else:
+            client.mess_id2uid[(data["from"], message_id)] = data["uid"]
 
         # message
         for e in message_elt.elements(C.NS_CLIENT, "body"):
@@ -894,6 +898,8 @@
             data["received_timestamp"] = unicode(time.time())
             if parsed_delay.sender:
                 data["delay_sender"] = parsed_delay.sender.full()
+
+        self.host.trigger.point("message_parse", client,  message_elt, data)
         return data
 
     def _onMessageStartWorkflow(self, cont, client, message_elt, post_treat):
@@ -907,7 +913,7 @@
         """
         if not cont:
             return
-        data = self.parseMessage(message_elt, client=client)
+        data = self.parseMessage(message_elt)
         post_treat.addCallback(self.skipEmptyMessage)
         post_treat.addCallback(self.addToHistory, client)
         post_treat.addCallback(self.bridgeSignal, client, data)
--- a/sat/plugins/plugin_xep_0313.py	Sat Dec 01 10:10:25 2018 +0100
+++ b/sat/plugins/plugin_xep_0313.py	Sat Dec 01 10:33:43 2018 +0100
@@ -21,15 +21,15 @@
 from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
-
-log = getLogger(__name__)
 from sat.core import exceptions
-
+from sat.tools.common import data_format
 from twisted.words.protocols.jabber import jid
-
+from twisted.internet import defer
 from zope.interface import implements
-
+from datetime import datetime
+from dateutil import tz
 from wokkel import disco
+from wokkel import data_form
 import uuid
 
 # XXX: mam and rsm come from sat_tmp.wokkel
@@ -37,28 +37,178 @@
 from wokkel import mam
 
 
+log = getLogger(__name__)
+
+
 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
 
 PLUGIN_INFO = {
-    C.PI_NAME: "Message Archive Management",
-    C.PI_IMPORT_NAME: "XEP-0313",
-    C.PI_TYPE: "XEP",
-    C.PI_PROTOCOLS: ["XEP-0313"],
-    C.PI_MAIN: "XEP_0313",
-    C.PI_HANDLER: "yes",
-    C.PI_DESCRIPTION: _("""Implementation of Message Archive Management"""),
+    C.PI_NAME: u"Message Archive Management",
+    C.PI_IMPORT_NAME: u"XEP-0313",
+    C.PI_TYPE: u"XEP",
+    C.PI_PROTOCOLS: [u"XEP-0313"],
+    C.PI_DEPENDENCIES: [u"XEP-0059", u"XEP-0359"],
+    C.PI_MAIN: u"XEP_0313",
+    C.PI_HANDLER: u"yes",
+    C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""),
 }
 
+MAM_PREFIX = u"mam_"
+FILTER_PREFIX = MAM_PREFIX + "filter_"
+
 
 class XEP_0313(object):
     def __init__(self, host):
         log.info(_("Message Archive Management plugin initialization"))
         self.host = host
+        self.host.registerNamespace(u"mam", mam.NS_MAM)
+        self._rsm = host.plugins[u"XEP-0059"]
+        self._sid = host.plugins[u"XEP-0359"]
+        host.bridge.addMethod(
+            "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives,
+            async=True)
+
+    @defer.inlineCallbacks
+    def profileConnected(self, client):
+        last_mess = yield self.host.memory.historyGet(
+            None, None, limit=1, filters={u'last_stanza_id': True},
+            profile=client.profile)
+        if not last_mess:
+            log.info(_(u"It seems that we have no MAM history yet"))
+            return
+        stanza_id = last_mess[0][-1][u'stanza_id']
+        # XXX: test
+        # stanza_id = "IIheJOfiIhkPYkw6"
+        rsm_req = rsm.RSMRequest(after=stanza_id)
+        mam_req = mam.MAMRequest(rsm_=rsm_req)
+        mam_data = yield self.getArchives(client, mam_req,
+                                         service=client.jid.userhostJID())
+        elt_list, rsm_response = mam_data
+        if not elt_list:
+            log.info(_(u"We have received no message while offline"))
+            return
+        else:
+            log.info(_(u"We have received {num_mess} message(s) while offline.").format(
+                num_mess=len(elt_list)))
+
+        for mess_elt in elt_list:
+            try:
+                fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
+            except exceptions.DataError:
+                continue
+
+            client.messageProt.onMessage(fwd_message_elt)
 
     def getHandler(self, client):
         mam_client = client._mam = SatMAMClient()
         return mam_client
 
+    def parseExtra(self, extra, with_rsm=True):
+        """Parse extra dictionnary to retrieve MAM arguments
+
+        @param extra(dict): data for parse
+        @param with_rsm(bool): if True, RSM data will be parsed too
+        @return (data_form, None): request with parsed arguments
+            or None if no MAM arguments have been found
+        """
+        mam_args = {}
+        form_args = {}
+        for arg in (u"start", u"end"):
+            try:
+                value = extra.pop(MAM_PREFIX + arg)
+                form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc())
+            except (TypeError, ValueError):
+                log.warning(u"Bad value for {arg} filter ({value}), ignoring".format(
+                    arg=arg, value=value))
+            except KeyError:
+                continue
+
+        try:
+            form_args[u"with_jid"] = jid.JID(extra.pop(
+                MAM_PREFIX + u"with"))
+        except (jid.InvalidFormat):
+            log.warning(u"Bad value for jid filter")
+        except KeyError:
+            pass
+
+        for name, value in extra.iteritems():
+            if name.startswith(FILTER_PREFIX):
+                var = name[len(FILTER_PREFIX) :]
+                extra_fields = form_args.setdefault(u"extra_fields", [])
+                extra_fields.append(data_form.Field(var=var, value=value))
+
+        for arg in (u"node", u"query_id"):
+            try:
+                value = extra.pop(MAM_PREFIX + arg)
+                mam_args[arg] = value
+            except KeyError:
+                continue
+
+        if with_rsm:
+            rsm_request = self._rsm.parseExtra(extra)
+            if rsm_request is not None:
+                mam_args["rsm_"] = rsm_request
+
+        if form_args:
+            mam_args["form"] = mam.buildForm(**form_args)
+
+        return mam.MAMRequest(**mam_args) if mam_args else None
+
+    def getMessageFromResult(self, client, mess_elt, mam_req):
+        """Extract usable <message/> from MAM query result
+
+        The message will be validated, and stanza-id/delay will be added if necessary.
+        @param mess_elt(domish.Element): result <message/> element wrapping the message
+            to retrieve
+        @param mam_req(mam.MAMRequest): request used
+        @return (domish.Element): <message/> that can be used directly with onMessage
+        """
+        if mess_elt.name != u"message":
+            log.warning(u"unexpected stanza in archive: {xml}".format(
+                xml=mess_elt.toXml()))
+            raise exceptions.DataError(u"Invalid element")
+        mess_from = mess_elt[u"from"]
+        if mess_from != client.jid.host and mess_from != client.jid.userhost():
+            log.error(u"Message is not from our server, something went wrong: "
+                      u"{xml}".format(xml=mess_elt.toXml()))
+            raise exceptions.DataError(u"Invalid element")
+        try:
+            result_elt = next(mess_elt.elements(mam.NS_MAM, u"result"))
+            forwarded_elt = next(result_elt.elements(C.NS_FORWARD, u"forwarded"))
+            try:
+                delay_elt = next(forwarded_elt.elements(C.NS_DELAY, u"delay"))
+            except StopIteration:
+                # delay_elt is not mandatory
+                delay_elt = None
+            fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, u"message"))
+        except StopIteration:
+            log.warning(u"Invalid message received from MAM: {xml}".format(
+                xml=mess_elt.toXml()))
+            raise exceptions.DataError(u"Invalid element")
+        else:
+            if not result_elt[u"queryid"] == mam_req.query_id:
+                log.error(u"Unexpected query id (was expecting {query_id}): {xml}"
+                    .format(query_id=mam.query_id, xml=mess_elt.toXml()))
+                raise exceptions.DataError(u"Invalid element")
+            stanza_id = self._sid.getStanzaId(fwd_message_elt,
+                                              client.jid.userhostJID())
+            if stanza_id is None:
+                # not stanza-id element is present, we add one so message
+                # will be archived with it, and we won't request several times
+                # the same MAM achive
+                try:
+                    stanza_id = result_elt[u"id"]
+                except AttributeError:
+                    log.warning(u'Invalid MAM result: missing "id" attribute: {xml}'
+                                .format(xml=result_elt.toXml()))
+                    raise exceptions.DataError(u"Invalid element")
+                self._sid.addStanzaId(client, fwd_message_elt, stanza_id)
+
+            if delay_elt is not None:
+                fwd_message_elt.addChild(delay_elt)
+
+            return fwd_message_elt
+
     def queryFields(self, client, service=None):
         """Ask the server about supported fields.
 
@@ -67,15 +217,15 @@
         """
         return client._mam.queryFields(service)
 
-    def queryArchive(self, client, mam_query, service=None):
+    def queryArchive(self, client, mam_req, service=None):
         """Query a user, MUC or pubsub archive.
 
-        @param mam_query(mam.MAMRequest): MAM query instance
+        @param mam_req(mam.MAMRequest): MAM query instance
         @param service(jid.JID, None): entity offering the MAM service
             None for user server
         @return (D(domish.Element)): <IQ/> result
         """
-        return client._mam.queryArchive(mam_query, service)
+        return client._mam.queryArchive(mam_req, service)
 
     def _appendMessage(self, elt_list, message_cb, message_elt):
         if message_cb is not None:
@@ -97,6 +247,25 @@
 
         return (elt_list, rsm_response)
 
+    def serializeArchiveResult(self, data, client, mam_req):
+        elt_list, rsm_response = data
+        mess_list = []
+        for elt in elt_list:
+            fwd_message_elt = self.getMessageFromResult(client, elt, mam_req)
+            mess_data = client.messageProt.parseMessage(fwd_message_elt)
+            mess_list.append(client.messageGetBridgeArgs(mess_data))
+        return mess_list, client.profile
+
+    def _getArchives(self, service, extra_ser, profile_key):
+        client = self.host.getClient(profile_key)
+        service = jid.JID(service) if service else None
+        extra = data_format.deserialise(extra_ser, {})
+        mam_req = self.parseExtra(extra)
+
+        d = self.getArchives(client, mam_req, service=service)
+        d.addCallback(self.serializeArchiveResult, client, mam_req)
+        return d
+
     def getArchives(self, client, query, service=None, message_cb=None):
         """Query archive then grab and return them all in the result
 
@@ -119,14 +288,8 @@
         # http://xmpp.org/extensions/xep-0313.html#prefs
         return client._mam.queryPrefs(service)
 
-    def _setPrefs(
-        self,
-        service_s=None,
-        default="roster",
-        always=None,
-        never=None,
-        profile_key=C.PROF_KEY_NONE,
-    ):
+    def _setPrefs(self, service_s=None, default="roster", always=None, never=None,
+                  profile_key=C.PROF_KEY_NONE):
         service = jid.JID(service_s) if service_s else None
         always_jid = [jid.JID(entity) for entity in always]
         never_jid = [jid.JID(entity) for entity in never]