view sat/plugins/plugin_xep_0313.py @ 2701:2ea2369ae7de

plugin XEP-0313: implementation of MAM for messages: - (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data - : parseMessage is not static anymore - : new "message_parse" trigger point - (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one - : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow. In other words, one2one history is synchronised on connection. - : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge.
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:33:43 +0100
parents 56f94936df1e
children 19000c506d0c
line wrap: on
line source

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

# SAT plugin for Message Archive Management (XEP-0313)
# Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from sat.core.constants import Const as C
from sat.core.i18n import _
from sat.core.log import getLogger
from sat.core import exceptions
from sat.tools.common import data_format
from twisted.words.protocols.jabber import jid
from twisted.internet import defer
from zope.interface import implements
from datetime import datetime
from dateutil import tz
from wokkel import disco
from wokkel import data_form
import uuid

# XXX: mam and rsm come from sat_tmp.wokkel
from wokkel import rsm
from wokkel import mam


log = getLogger(__name__)


MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"

PLUGIN_INFO = {
    C.PI_NAME: u"Message Archive Management",
    C.PI_IMPORT_NAME: u"XEP-0313",
    C.PI_TYPE: u"XEP",
    C.PI_PROTOCOLS: [u"XEP-0313"],
    C.PI_DEPENDENCIES: [u"XEP-0059", u"XEP-0359"],
    C.PI_MAIN: u"XEP_0313",
    C.PI_HANDLER: u"yes",
    C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""),
}

MAM_PREFIX = u"mam_"
FILTER_PREFIX = MAM_PREFIX + "filter_"


class XEP_0313(object):
    def __init__(self, host):
        log.info(_("Message Archive Management plugin initialization"))
        self.host = host
        self.host.registerNamespace(u"mam", mam.NS_MAM)
        self._rsm = host.plugins[u"XEP-0059"]
        self._sid = host.plugins[u"XEP-0359"]
        host.bridge.addMethod(
            "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives,
            async=True)

    @defer.inlineCallbacks
    def profileConnected(self, client):
        last_mess = yield self.host.memory.historyGet(
            None, None, limit=1, filters={u'last_stanza_id': True},
            profile=client.profile)
        if not last_mess:
            log.info(_(u"It seems that we have no MAM history yet"))
            return
        stanza_id = last_mess[0][-1][u'stanza_id']
        # XXX: test
        # stanza_id = "IIheJOfiIhkPYkw6"
        rsm_req = rsm.RSMRequest(after=stanza_id)
        mam_req = mam.MAMRequest(rsm_=rsm_req)
        mam_data = yield self.getArchives(client, mam_req,
                                         service=client.jid.userhostJID())
        elt_list, rsm_response = mam_data
        if not elt_list:
            log.info(_(u"We have received no message while offline"))
            return
        else:
            log.info(_(u"We have received {num_mess} message(s) while offline.").format(
                num_mess=len(elt_list)))

        for mess_elt in elt_list:
            try:
                fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
            except exceptions.DataError:
                continue

            client.messageProt.onMessage(fwd_message_elt)

    def getHandler(self, client):
        mam_client = client._mam = SatMAMClient()
        return mam_client

    def parseExtra(self, extra, with_rsm=True):
        """Parse extra dictionnary to retrieve MAM arguments

        @param extra(dict): data for parse
        @param with_rsm(bool): if True, RSM data will be parsed too
        @return (data_form, None): request with parsed arguments
            or None if no MAM arguments have been found
        """
        mam_args = {}
        form_args = {}
        for arg in (u"start", u"end"):
            try:
                value = extra.pop(MAM_PREFIX + arg)
                form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc())
            except (TypeError, ValueError):
                log.warning(u"Bad value for {arg} filter ({value}), ignoring".format(
                    arg=arg, value=value))
            except KeyError:
                continue

        try:
            form_args[u"with_jid"] = jid.JID(extra.pop(
                MAM_PREFIX + u"with"))
        except (jid.InvalidFormat):
            log.warning(u"Bad value for jid filter")
        except KeyError:
            pass

        for name, value in extra.iteritems():
            if name.startswith(FILTER_PREFIX):
                var = name[len(FILTER_PREFIX) :]
                extra_fields = form_args.setdefault(u"extra_fields", [])
                extra_fields.append(data_form.Field(var=var, value=value))

        for arg in (u"node", u"query_id"):
            try:
                value = extra.pop(MAM_PREFIX + arg)
                mam_args[arg] = value
            except KeyError:
                continue

        if with_rsm:
            rsm_request = self._rsm.parseExtra(extra)
            if rsm_request is not None:
                mam_args["rsm_"] = rsm_request

        if form_args:
            mam_args["form"] = mam.buildForm(**form_args)

        return mam.MAMRequest(**mam_args) if mam_args else None

    def getMessageFromResult(self, client, mess_elt, mam_req):
        """Extract usable <message/> from MAM query result

        The message will be validated, and stanza-id/delay will be added if necessary.
        @param mess_elt(domish.Element): result <message/> element wrapping the message
            to retrieve
        @param mam_req(mam.MAMRequest): request used
        @return (domish.Element): <message/> that can be used directly with onMessage
        """
        if mess_elt.name != u"message":
            log.warning(u"unexpected stanza in archive: {xml}".format(
                xml=mess_elt.toXml()))
            raise exceptions.DataError(u"Invalid element")
        mess_from = mess_elt[u"from"]
        if mess_from != client.jid.host and mess_from != client.jid.userhost():
            log.error(u"Message is not from our server, something went wrong: "
                      u"{xml}".format(xml=mess_elt.toXml()))
            raise exceptions.DataError(u"Invalid element")
        try:
            result_elt = next(mess_elt.elements(mam.NS_MAM, u"result"))
            forwarded_elt = next(result_elt.elements(C.NS_FORWARD, u"forwarded"))
            try:
                delay_elt = next(forwarded_elt.elements(C.NS_DELAY, u"delay"))
            except StopIteration:
                # delay_elt is not mandatory
                delay_elt = None
            fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, u"message"))
        except StopIteration:
            log.warning(u"Invalid message received from MAM: {xml}".format(
                xml=mess_elt.toXml()))
            raise exceptions.DataError(u"Invalid element")
        else:
            if not result_elt[u"queryid"] == mam_req.query_id:
                log.error(u"Unexpected query id (was expecting {query_id}): {xml}"
                    .format(query_id=mam.query_id, xml=mess_elt.toXml()))
                raise exceptions.DataError(u"Invalid element")
            stanza_id = self._sid.getStanzaId(fwd_message_elt,
                                              client.jid.userhostJID())
            if stanza_id is None:
                # not stanza-id element is present, we add one so message
                # will be archived with it, and we won't request several times
                # the same MAM achive
                try:
                    stanza_id = result_elt[u"id"]
                except AttributeError:
                    log.warning(u'Invalid MAM result: missing "id" attribute: {xml}'
                                .format(xml=result_elt.toXml()))
                    raise exceptions.DataError(u"Invalid element")
                self._sid.addStanzaId(client, fwd_message_elt, stanza_id)

            if delay_elt is not None:
                fwd_message_elt.addChild(delay_elt)

            return fwd_message_elt

    def queryFields(self, client, service=None):
        """Ask the server about supported fields.

        @param service: entity offering the MAM service (None for user archives)
        @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5)
        """
        return client._mam.queryFields(service)

    def queryArchive(self, client, mam_req, service=None):
        """Query a user, MUC or pubsub archive.

        @param mam_req(mam.MAMRequest): MAM query instance
        @param service(jid.JID, None): entity offering the MAM service
            None for user server
        @return (D(domish.Element)): <IQ/> result
        """
        return client._mam.queryArchive(mam_req, service)

    def _appendMessage(self, elt_list, message_cb, message_elt):
        if message_cb is not None:
            elt_list.append(message_cb(message_elt))
        else:
            elt_list.append(message_elt)

    def _queryFinished(self, iq_result, client, elt_list, event):
        client.xmlstream.removeObserver(event, self._appendMessage)
        try:
            fin_elt = iq_result.elements(mam.NS_MAM, "fin").next()
        except StopIteration:
            raise exceptions.DataError(u"Invalid MAM result")

        try:
            rsm_response = rsm.RSMResponse.fromElement(fin_elt)
        except rsm.RSMNotFoundError:
            rsm_response = None

        return (elt_list, rsm_response)

    def serializeArchiveResult(self, data, client, mam_req):
        elt_list, rsm_response = data
        mess_list = []
        for elt in elt_list:
            fwd_message_elt = self.getMessageFromResult(client, elt, mam_req)
            mess_data = client.messageProt.parseMessage(fwd_message_elt)
            mess_list.append(client.messageGetBridgeArgs(mess_data))
        return mess_list, client.profile

    def _getArchives(self, service, extra_ser, profile_key):
        client = self.host.getClient(profile_key)
        service = jid.JID(service) if service else None
        extra = data_format.deserialise(extra_ser, {})
        mam_req = self.parseExtra(extra)

        d = self.getArchives(client, mam_req, service=service)
        d.addCallback(self.serializeArchiveResult, client, mam_req)
        return d

    def getArchives(self, client, query, service=None, message_cb=None):
        """Query archive then grab and return them all in the result

        """
        if query.query_id is None:
            query.query_id = unicode(uuid.uuid4())
        elt_list = []
        event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id)
        client.xmlstream.addObserver(event, self._appendMessage, 0, elt_list, message_cb)
        d = self.queryArchive(client, query, service)
        d.addCallback(self._queryFinished, client, elt_list, event)
        return d

    def getPrefs(self, client, service=None):
        """Retrieve the current user preferences.

        @param service: entity offering the MAM service (None for user archives)
        @return: the server response as a Deferred domish.Element
        """
        # http://xmpp.org/extensions/xep-0313.html#prefs
        return client._mam.queryPrefs(service)

    def _setPrefs(self, service_s=None, default="roster", always=None, never=None,
                  profile_key=C.PROF_KEY_NONE):
        service = jid.JID(service_s) if service_s else None
        always_jid = [jid.JID(entity) for entity in always]
        never_jid = [jid.JID(entity) for entity in never]
        # TODO: why not build here a MAMPrefs object instead of passing the args separately?
        return self.setPrefs(service, default, always_jid, never_jid, profile_key)

    def setPrefs(self, client, service=None, default="roster", always=None, never=None):
        """Set news user preferences.

        @param service: entity offering the MAM service (None for user archives)
        @param default (unicode): a value in ('always', 'never', 'roster')
        @param always (list): a list of JID instances
        @param never (list): a list of JID instances
        @param profile_key (unicode): %(doc_profile_key)s
        @return: the server response as a Deferred domish.Element
        """
        # http://xmpp.org/extensions/xep-0313.html#prefs
        return client._mam.setPrefs(service, default, always, never)


class SatMAMClient(mam.MAMClient):
    implements(disco.IDisco)

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(mam.NS_MAM)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []