comparison sat/plugins/plugin_xep_0313.py @ 2701:2ea2369ae7de

plugin XEP-0313: implementation of MAM for messages: - (core/xmpp): new messageGetBridgeArgs to easily retrieve arguments used in bridge from message data - : parseMessage is not static anymore - : new "message_parse" trigger point - (xep-0313) : new "MAMGet" bridge method to retrieve history from MAM instead of local one - : on profileConnected, if previous MAM message is found (i.e. message with a stanza_id), message received while offline are retrieved and injected in message workflow. In other words, one2one history is synchronised on connection. - : new "parseExtra" method which parse MAM (and optionally RSM) option from extra dictionary used in bridge.
author Goffi <goffi@goffi.org>
date Sat, 01 Dec 2018 10:33:43 +0100
parents 56f94936df1e
children 19000c506d0c
comparison
equal deleted inserted replaced
2700:035901dc946d 2701:2ea2369ae7de
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. 19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 20
21 from sat.core.constants import Const as C 21 from sat.core.constants import Const as C
22 from sat.core.i18n import _ 22 from sat.core.i18n import _
23 from sat.core.log import getLogger 23 from sat.core.log import getLogger
24
25 log = getLogger(__name__)
26 from sat.core import exceptions 24 from sat.core import exceptions
27 25 from sat.tools.common import data_format
28 from twisted.words.protocols.jabber import jid 26 from twisted.words.protocols.jabber import jid
29 27 from twisted.internet import defer
30 from zope.interface import implements 28 from zope.interface import implements
31 29 from datetime import datetime
30 from dateutil import tz
32 from wokkel import disco 31 from wokkel import disco
32 from wokkel import data_form
33 import uuid 33 import uuid
34 34
35 # XXX: mam and rsm come from sat_tmp.wokkel 35 # XXX: mam and rsm come from sat_tmp.wokkel
36 from wokkel import rsm 36 from wokkel import rsm
37 from wokkel import mam 37 from wokkel import mam
38 38
39 39
40 log = getLogger(__name__)
41
42
40 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" 43 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
41 44
42 PLUGIN_INFO = { 45 PLUGIN_INFO = {
43 C.PI_NAME: "Message Archive Management", 46 C.PI_NAME: u"Message Archive Management",
44 C.PI_IMPORT_NAME: "XEP-0313", 47 C.PI_IMPORT_NAME: u"XEP-0313",
45 C.PI_TYPE: "XEP", 48 C.PI_TYPE: u"XEP",
46 C.PI_PROTOCOLS: ["XEP-0313"], 49 C.PI_PROTOCOLS: [u"XEP-0313"],
47 C.PI_MAIN: "XEP_0313", 50 C.PI_DEPENDENCIES: [u"XEP-0059", u"XEP-0359"],
48 C.PI_HANDLER: "yes", 51 C.PI_MAIN: u"XEP_0313",
49 C.PI_DESCRIPTION: _("""Implementation of Message Archive Management"""), 52 C.PI_HANDLER: u"yes",
53 C.PI_DESCRIPTION: _(u"""Implementation of Message Archive Management"""),
50 } 54 }
55
56 MAM_PREFIX = u"mam_"
57 FILTER_PREFIX = MAM_PREFIX + "filter_"
51 58
52 59
53 class XEP_0313(object): 60 class XEP_0313(object):
54 def __init__(self, host): 61 def __init__(self, host):
55 log.info(_("Message Archive Management plugin initialization")) 62 log.info(_("Message Archive Management plugin initialization"))
56 self.host = host 63 self.host = host
64 self.host.registerNamespace(u"mam", mam.NS_MAM)
65 self._rsm = host.plugins[u"XEP-0059"]
66 self._sid = host.plugins[u"XEP-0359"]
67 host.bridge.addMethod(
68 "MAMGet", ".plugin", in_sign='sss', out_sign='(a(sdssa{ss}a{ss}sa{ss})s)', method=self._getArchives,
69 async=True)
70
71 @defer.inlineCallbacks
72 def profileConnected(self, client):
73 last_mess = yield self.host.memory.historyGet(
74 None, None, limit=1, filters={u'last_stanza_id': True},
75 profile=client.profile)
76 if not last_mess:
77 log.info(_(u"It seems that we have no MAM history yet"))
78 return
79 stanza_id = last_mess[0][-1][u'stanza_id']
80 # XXX: test
81 # stanza_id = "IIheJOfiIhkPYkw6"
82 rsm_req = rsm.RSMRequest(after=stanza_id)
83 mam_req = mam.MAMRequest(rsm_=rsm_req)
84 mam_data = yield self.getArchives(client, mam_req,
85 service=client.jid.userhostJID())
86 elt_list, rsm_response = mam_data
87 if not elt_list:
88 log.info(_(u"We have received no message while offline"))
89 return
90 else:
91 log.info(_(u"We have received {num_mess} message(s) while offline.").format(
92 num_mess=len(elt_list)))
93
94 for mess_elt in elt_list:
95 try:
96 fwd_message_elt = self.getMessageFromResult(client, mess_elt, mam_req)
97 except exceptions.DataError:
98 continue
99
100 client.messageProt.onMessage(fwd_message_elt)
57 101
58 def getHandler(self, client): 102 def getHandler(self, client):
59 mam_client = client._mam = SatMAMClient() 103 mam_client = client._mam = SatMAMClient()
60 return mam_client 104 return mam_client
61 105
106 def parseExtra(self, extra, with_rsm=True):
107 """Parse extra dictionnary to retrieve MAM arguments
108
109 @param extra(dict): data for parse
110 @param with_rsm(bool): if True, RSM data will be parsed too
111 @return (data_form, None): request with parsed arguments
112 or None if no MAM arguments have been found
113 """
114 mam_args = {}
115 form_args = {}
116 for arg in (u"start", u"end"):
117 try:
118 value = extra.pop(MAM_PREFIX + arg)
119 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc())
120 except (TypeError, ValueError):
121 log.warning(u"Bad value for {arg} filter ({value}), ignoring".format(
122 arg=arg, value=value))
123 except KeyError:
124 continue
125
126 try:
127 form_args[u"with_jid"] = jid.JID(extra.pop(
128 MAM_PREFIX + u"with"))
129 except (jid.InvalidFormat):
130 log.warning(u"Bad value for jid filter")
131 except KeyError:
132 pass
133
134 for name, value in extra.iteritems():
135 if name.startswith(FILTER_PREFIX):
136 var = name[len(FILTER_PREFIX) :]
137 extra_fields = form_args.setdefault(u"extra_fields", [])
138 extra_fields.append(data_form.Field(var=var, value=value))
139
140 for arg in (u"node", u"query_id"):
141 try:
142 value = extra.pop(MAM_PREFIX + arg)
143 mam_args[arg] = value
144 except KeyError:
145 continue
146
147 if with_rsm:
148 rsm_request = self._rsm.parseExtra(extra)
149 if rsm_request is not None:
150 mam_args["rsm_"] = rsm_request
151
152 if form_args:
153 mam_args["form"] = mam.buildForm(**form_args)
154
155 return mam.MAMRequest(**mam_args) if mam_args else None
156
157 def getMessageFromResult(self, client, mess_elt, mam_req):
158 """Extract usable <message/> from MAM query result
159
160 The message will be validated, and stanza-id/delay will be added if necessary.
161 @param mess_elt(domish.Element): result <message/> element wrapping the message
162 to retrieve
163 @param mam_req(mam.MAMRequest): request used
164 @return (domish.Element): <message/> that can be used directly with onMessage
165 """
166 if mess_elt.name != u"message":
167 log.warning(u"unexpected stanza in archive: {xml}".format(
168 xml=mess_elt.toXml()))
169 raise exceptions.DataError(u"Invalid element")
170 mess_from = mess_elt[u"from"]
171 if mess_from != client.jid.host and mess_from != client.jid.userhost():
172 log.error(u"Message is not from our server, something went wrong: "
173 u"{xml}".format(xml=mess_elt.toXml()))
174 raise exceptions.DataError(u"Invalid element")
175 try:
176 result_elt = next(mess_elt.elements(mam.NS_MAM, u"result"))
177 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, u"forwarded"))
178 try:
179 delay_elt = next(forwarded_elt.elements(C.NS_DELAY, u"delay"))
180 except StopIteration:
181 # delay_elt is not mandatory
182 delay_elt = None
183 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, u"message"))
184 except StopIteration:
185 log.warning(u"Invalid message received from MAM: {xml}".format(
186 xml=mess_elt.toXml()))
187 raise exceptions.DataError(u"Invalid element")
188 else:
189 if not result_elt[u"queryid"] == mam_req.query_id:
190 log.error(u"Unexpected query id (was expecting {query_id}): {xml}"
191 .format(query_id=mam.query_id, xml=mess_elt.toXml()))
192 raise exceptions.DataError(u"Invalid element")
193 stanza_id = self._sid.getStanzaId(fwd_message_elt,
194 client.jid.userhostJID())
195 if stanza_id is None:
196 # not stanza-id element is present, we add one so message
197 # will be archived with it, and we won't request several times
198 # the same MAM achive
199 try:
200 stanza_id = result_elt[u"id"]
201 except AttributeError:
202 log.warning(u'Invalid MAM result: missing "id" attribute: {xml}'
203 .format(xml=result_elt.toXml()))
204 raise exceptions.DataError(u"Invalid element")
205 self._sid.addStanzaId(client, fwd_message_elt, stanza_id)
206
207 if delay_elt is not None:
208 fwd_message_elt.addChild(delay_elt)
209
210 return fwd_message_elt
211
62 def queryFields(self, client, service=None): 212 def queryFields(self, client, service=None):
63 """Ask the server about supported fields. 213 """Ask the server about supported fields.
64 214
65 @param service: entity offering the MAM service (None for user archives) 215 @param service: entity offering the MAM service (None for user archives)
66 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5) 216 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5)
67 """ 217 """
68 return client._mam.queryFields(service) 218 return client._mam.queryFields(service)
69 219
70 def queryArchive(self, client, mam_query, service=None): 220 def queryArchive(self, client, mam_req, service=None):
71 """Query a user, MUC or pubsub archive. 221 """Query a user, MUC or pubsub archive.
72 222
73 @param mam_query(mam.MAMRequest): MAM query instance 223 @param mam_req(mam.MAMRequest): MAM query instance
74 @param service(jid.JID, None): entity offering the MAM service 224 @param service(jid.JID, None): entity offering the MAM service
75 None for user server 225 None for user server
76 @return (D(domish.Element)): <IQ/> result 226 @return (D(domish.Element)): <IQ/> result
77 """ 227 """
78 return client._mam.queryArchive(mam_query, service) 228 return client._mam.queryArchive(mam_req, service)
79 229
80 def _appendMessage(self, elt_list, message_cb, message_elt): 230 def _appendMessage(self, elt_list, message_cb, message_elt):
81 if message_cb is not None: 231 if message_cb is not None:
82 elt_list.append(message_cb(message_elt)) 232 elt_list.append(message_cb(message_elt))
83 else: 233 else:
94 rsm_response = rsm.RSMResponse.fromElement(fin_elt) 244 rsm_response = rsm.RSMResponse.fromElement(fin_elt)
95 except rsm.RSMNotFoundError: 245 except rsm.RSMNotFoundError:
96 rsm_response = None 246 rsm_response = None
97 247
98 return (elt_list, rsm_response) 248 return (elt_list, rsm_response)
249
250 def serializeArchiveResult(self, data, client, mam_req):
251 elt_list, rsm_response = data
252 mess_list = []
253 for elt in elt_list:
254 fwd_message_elt = self.getMessageFromResult(client, elt, mam_req)
255 mess_data = client.messageProt.parseMessage(fwd_message_elt)
256 mess_list.append(client.messageGetBridgeArgs(mess_data))
257 return mess_list, client.profile
258
259 def _getArchives(self, service, extra_ser, profile_key):
260 client = self.host.getClient(profile_key)
261 service = jid.JID(service) if service else None
262 extra = data_format.deserialise(extra_ser, {})
263 mam_req = self.parseExtra(extra)
264
265 d = self.getArchives(client, mam_req, service=service)
266 d.addCallback(self.serializeArchiveResult, client, mam_req)
267 return d
99 268
100 def getArchives(self, client, query, service=None, message_cb=None): 269 def getArchives(self, client, query, service=None, message_cb=None):
101 """Query archive then grab and return them all in the result 270 """Query archive then grab and return them all in the result
102 271
103 """ 272 """
117 @return: the server response as a Deferred domish.Element 286 @return: the server response as a Deferred domish.Element
118 """ 287 """
119 # http://xmpp.org/extensions/xep-0313.html#prefs 288 # http://xmpp.org/extensions/xep-0313.html#prefs
120 return client._mam.queryPrefs(service) 289 return client._mam.queryPrefs(service)
121 290
122 def _setPrefs( 291 def _setPrefs(self, service_s=None, default="roster", always=None, never=None,
123 self, 292 profile_key=C.PROF_KEY_NONE):
124 service_s=None,
125 default="roster",
126 always=None,
127 never=None,
128 profile_key=C.PROF_KEY_NONE,
129 ):
130 service = jid.JID(service_s) if service_s else None 293 service = jid.JID(service_s) if service_s else None
131 always_jid = [jid.JID(entity) for entity in always] 294 always_jid = [jid.JID(entity) for entity in always]
132 never_jid = [jid.JID(entity) for entity in never] 295 never_jid = [jid.JID(entity) for entity in never]
133 # TODO: why not build here a MAMPrefs object instead of passing the args separately? 296 # TODO: why not build here a MAMPrefs object instead of passing the args separately?
134 return self.setPrefs(service, default, always_jid, never_jid, profile_key) 297 return self.setPrefs(service, default, always_jid, never_jid, profile_key)