Mercurial > libervia-backend
view sat/plugins/plugin_xep_0313.py @ 2701:2ea2369ae7de
plugin XEP-0313: implementation of MAM for messages:
- (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data
- : parseMessage is not static anymore
- : new "message_parse" trigger point
- (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one
- : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow.
In other words, one2one history is synchronised on connection.
- : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 01 Dec 2018 10:33:43 +0100 |
parents | 56f94936df1e |
children | 19000c506d0c |
line wrap: on
line source
#!/usr/bin/env python2 # -*- coding: utf-8 -*- # SAT plugin for Message Archive Management (XEP-0313) # Copyright (C) 2009-2018 Jérôme Poisson (goffi@goffi.org) # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger from sat.core import exceptions from sat.tools.common import data_format from twisted.words.protocols.jabber import jid from twisted.internet import defer from zope.interface import implements from datetime import datetime from dateutil import tz from wokkel import disco from wokkel import data_form import uuid # XXX: mam and rsm come from sat_tmp.wokkel from wokkel import rsm from wokkel import mam log = getLogger(__name__) MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" PLUGIN_INFO = { C.PI_NAME: u"Message Archive Management", C.PI_IMPORT_NAME: u"XEP-0313", C.PI_TYPE: u"XEP", C.PI_PROTOCOLS: [u"XEP-0313"], C.PI_DEPENDENCIES: [u"XEP-0059", u"XEP-0359"], C.PI_MAIN: u"XEP_0313", C.PI_HANDLER: u"yes", C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""), } MAM_PREFIX = u"mam_" FILTER_PREFIX = MAM_PREFIX + "filter_" class XEP_0313(object): def __init__(self, host): log.info(_("Message Archive Management plugin initialization")) self.host = host self.host.registerNamespace(u"mam", mam.NS_MAM) self._rsm = host.plugins[u"XEP-0059"] self._sid = host.plugins[u"XEP-0359"] host.bridge.addMethod( "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives, async=True) @defer.inlineCallbacks def profileConnected(self, client): last_mess = yield self.host.memory.historyGet( None, None, limit=1, filters={u'last_stanza_id': True}, profile=client.profile) if not last_mess: log.info(_(u"It seems that we have no MAM history yet")) return stanza_id = last_mess[0][-1][u'stanza_id'] # XXX: test # stanza_id = "IIheJOfiIhkPYkw6" rsm_req = rsm.RSMRequest(after=stanza_id) mam_req = mam.MAMRequest(rsm_=rsm_req) mam_data = yield self.getArchives(client, mam_req, service=client.jid.userhostJID()) elt_list, rsm_response = mam_data if not elt_list: log.info(_(u"We have received no message while offline")) return else: log.info(_(u"We have received {num_mess} message(s) while offline.").format( num_mess=len(elt_list))) for mess_elt in elt_list: try: fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req) except exceptions.DataError: continue client.messageProt.onMessage(fwd_message_elt) def getHandler(self, client): mam_client = client._mam = SatMAMClient() return mam_client def parseExtra(self, extra, with_rsm=True): """Parse extra dictionnary to retrieve MAM arguments @param extra(dict): data for parse @param with_rsm(bool): if True, RSM data will be parsed too @return (data_form, None): request with parsed arguments or None if no MAM arguments have been found """ mam_args = {} form_args = {} for arg in (u"start", u"end"): try: value = extra.pop(MAM_PREFIX + arg) form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) except (TypeError, ValueError): log.warning(u"Bad value for {arg} filter ({value}), ignoring".format( arg=arg, value=value)) except KeyError: continue try: form_args[u"with_jid"] = jid.JID(extra.pop( MAM_PREFIX + u"with")) except (jid.InvalidFormat): log.warning(u"Bad value for jid filter") except KeyError: pass for name, value in extra.iteritems(): if name.startswith(FILTER_PREFIX): var = name[len(FILTER_PREFIX) :] extra_fields = form_args.setdefault(u"extra_fields", []) extra_fields.append(data_form.Field(var=var, value=value)) for arg in (u"node", u"query_id"): try: value = extra.pop(MAM_PREFIX + arg) mam_args[arg] = value except KeyError: continue if with_rsm: rsm_request = self._rsm.parseExtra(extra) if rsm_request is not None: mam_args["rsm_"] = rsm_request if form_args: mam_args["form"] = mam.buildForm(**form_args) return mam.MAMRequest(**mam_args) if mam_args else None def getMessageFromResult(self, client, mess_elt, mam_req): """Extract usable <message/> from MAM query result The message will be validated, and stanza-id/delay will be added if necessary. @param mess_elt(domish.Element): result <message/> element wrapping the message to retrieve @param mam_req(mam.MAMRequest): request used @return (domish.Element): <message/> that can be used directly with onMessage """ if mess_elt.name != u"message": log.warning(u"unexpected stanza in archive: {xml}".format( xml=mess_elt.toXml())) raise exceptions.DataError(u"Invalid element") mess_from = mess_elt[u"from"] if mess_from != client.jid.host and mess_from != client.jid.userhost(): log.error(u"Message is not from our server, something went wrong: " u"{xml}".format(xml=mess_elt.toXml())) raise exceptions.DataError(u"Invalid element") try: result_elt = next(mess_elt.elements(mam.NS_MAM, u"result")) forwarded_elt = next(result_elt.elements(C.NS_FORWARD, u"forwarded")) try: delay_elt = next(forwarded_elt.elements(C.NS_DELAY, u"delay")) except StopIteration: # delay_elt is not mandatory delay_elt = None fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, u"message")) except StopIteration: log.warning(u"Invalid message received from MAM: {xml}".format( xml=mess_elt.toXml())) raise exceptions.DataError(u"Invalid element") else: if not result_elt[u"queryid"] == mam_req.query_id: log.error(u"Unexpected query id (was expecting {query_id}): {xml}" .format(query_id=mam.query_id, xml=mess_elt.toXml())) raise exceptions.DataError(u"Invalid element") stanza_id = self._sid.getStanzaId(fwd_message_elt, client.jid.userhostJID()) if stanza_id is None: # not stanza-id element is present, we add one so message # will be archived with it, and we won't request several times # the same MAM achive try: stanza_id = result_elt[u"id"] except AttributeError: log.warning(u'Invalid MAM result: missing "id" attribute: {xml}' .format(xml=result_elt.toXml())) raise exceptions.DataError(u"Invalid element") self._sid.addStanzaId(client, fwd_message_elt, stanza_id) if delay_elt is not None: fwd_message_elt.addChild(delay_elt) return fwd_message_elt def queryFields(self, client, service=None): """Ask the server about supported fields. @param service: entity offering the MAM service (None for user archives) @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5) """ return client._mam.queryFields(service) def queryArchive(self, client, mam_req, service=None): """Query a user, MUC or pubsub archive. @param mam_req(mam.MAMRequest): MAM query instance @param service(jid.JID, None): entity offering the MAM service None for user server @return (D(domish.Element)): <IQ/> result """ return client._mam.queryArchive(mam_req, service) def _appendMessage(self, elt_list, message_cb, message_elt): if message_cb is not None: elt_list.append(message_cb(message_elt)) else: elt_list.append(message_elt) def _queryFinished(self, iq_result, client, elt_list, event): client.xmlstream.removeObserver(event, self._appendMessage) try: fin_elt = iq_result.elements(mam.NS_MAM, "fin").next() except StopIteration: raise exceptions.DataError(u"Invalid MAM result") try: rsm_response = rsm.RSMResponse.fromElement(fin_elt) except rsm.RSMNotFoundError: rsm_response = None return (elt_list, rsm_response) def serializeArchiveResult(self, data, client, mam_req): elt_list, rsm_response = data mess_list = [] for elt in elt_list: fwd_message_elt = self.getMessageFromResult(client, elt, mam_req) mess_data = client.messageProt.parseMessage(fwd_message_elt) mess_list.append(client.messageGetBridgeArgs(mess_data)) return mess_list, client.profile def _getArchives(self, service, extra_ser, profile_key): client = self.host.getClient(profile_key) service = jid.JID(service) if service else None extra = data_format.deserialise(extra_ser, {}) mam_req = self.parseExtra(extra) d = self.getArchives(client, mam_req, service=service) d.addCallback(self.serializeArchiveResult, client, mam_req) return d def getArchives(self, client, query, service=None, message_cb=None): """Query archive then grab and return them all in the result """ if query.query_id is None: query.query_id = unicode(uuid.uuid4()) elt_list = [] event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id) client.xmlstream.addObserver(event, self._appendMessage, 0, elt_list, message_cb) d = self.queryArchive(client, query, service) d.addCallback(self._queryFinished, client, elt_list, event) return d def getPrefs(self, client, service=None): """Retrieve the current user preferences. @param service: entity offering the MAM service (None for user archives) @return: the server response as a Deferred domish.Element """ # http://xmpp.org/extensions/xep-0313.html#prefs return client._mam.queryPrefs(service) def _setPrefs(self, service_s=None, default="roster", always=None, never=None, profile_key=C.PROF_KEY_NONE): service = jid.JID(service_s) if service_s else None always_jid = [jid.JID(entity) for entity in always] never_jid = [jid.JID(entity) for entity in never] # TODO: why not build here a MAMPrefs object instead of passing the args separately? return self.setPrefs(service, default, always_jid, never_jid, profile_key) def setPrefs(self, client, service=None, default="roster", always=None, never=None): """Set news user preferences. @param service: entity offering the MAM service (None for user archives) @param default (unicode): a value in ('always', 'never', 'roster') @param always (list): a list of JID instances @param never (list): a list of JID instances @param profile_key (unicode): %(doc_profile_key)s @return: the server response as a Deferred domish.Element """ # http://xmpp.org/extensions/xep-0313.html#prefs return client._mam.setPrefs(service, default, always, never) class SatMAMClient(mam.MAMClient): implements(disco.IDisco) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(mam.NS_MAM)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []