comparison libervia/backend/plugins/plugin_xep_0313.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0313.py@524856bd7b19
children 85f5e6225aa1
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3
4 # SAT plugin for Message Archive Management (XEP-0313)
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
7
8 # This program is free software: you can redistribute it and/or modify
9 # it under the terms of the GNU Affero General Public License as published by
10 # the Free Software Foundation, either version 3 of the License, or
11 # (at your option) any later version.
12
13 # This program is distributed in the hope that it will be useful,
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 # GNU Affero General Public License for more details.
17
18 # You should have received a copy of the GNU Affero General Public License
19 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20
21 from libervia.backend.core.constants import Const as C
22 from libervia.backend.core.i18n import _
23 from libervia.backend.core.log import getLogger
24 from libervia.backend.core import exceptions
25 from libervia.backend.tools.common import data_format
26 from twisted.words.protocols.jabber import jid
27 from twisted.internet import defer
28 from zope.interface import implementer
29 from datetime import datetime
30 from dateutil import tz
31 from wokkel import disco
32 from wokkel import data_form
33 import uuid
34
35 # XXX: mam and rsm come from sat_tmp.wokkel
36 from wokkel import rsm
37 from wokkel import mam
38
39
40 log = getLogger(__name__)
41
42 PLUGIN_INFO = {
43 C.PI_NAME: "Message Archive Management",
44 C.PI_IMPORT_NAME: "XEP-0313",
45 C.PI_TYPE: "XEP",
46 # XEP-0431 only defines a namespace, so we register it here
47 C.PI_PROTOCOLS: ["XEP-0313", "XEP-0431"],
48 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0359"],
49 C.PI_MAIN: "XEP_0313",
50 C.PI_HANDLER: "yes",
51 C.PI_DESCRIPTION: _("""Implementation of Message Archive Management"""),
52 }
53
54 MAM_PREFIX = "mam_"
55 FILTER_PREFIX = MAM_PREFIX + "filter_"
56 KEY_LAST_STANZA_ID = "last_stanza_id"
57 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']"
58 MESSAGE_STANZA_ID = '/message/stanza-id[@xmlns="{ns_stanza_id}"]'
59 NS_FTS = "urn:xmpp:fulltext:0"
60
61
62 class XEP_0313(object):
63 def __init__(self, host):
64 log.info(_("Message Archive Management plugin initialization"))
65 self.host = host
66 self.host.register_namespace("mam", mam.NS_MAM)
67 host.register_namespace("fulltextmam", NS_FTS)
68 self._rsm = host.plugins["XEP-0059"]
69 self._sid = host.plugins["XEP-0359"]
70 # Deferred used to store last stanza id in order of reception
71 self._last_stanza_id_d = defer.Deferred()
72 self._last_stanza_id_d.callback(None)
73 host.bridge.add_method(
74 "mam_get", ".plugin", in_sign='sss',
75 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._get_archives,
76 async_=True)
77
78 async def resume(self, client):
79 """Retrieve one2one messages received since the last we have in local storage"""
80 stanza_id_data = await self.host.memory.storage.get_privates(
81 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile)
82 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
83 rsm_req = None
84 if stanza_id is None:
85 log.info("can't retrieve last stanza ID, checking history")
86 last_mess = await self.host.memory.history_get(
87 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT,
88 'last_stanza_id': True},
89 profile=client.profile)
90 if not last_mess:
91 log.info(_("It seems that we have no MAM history yet"))
92 stanza_id = None
93 rsm_req = rsm.RSMRequest(max_=50, before="")
94 else:
95 stanza_id = last_mess[0][-1]['stanza_id']
96 if rsm_req is None:
97 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
98 mam_req = mam.MAMRequest(rsm_=rsm_req)
99 complete = False
100 count = 0
101 while not complete:
102 mam_data = await self.get_archives(client, mam_req,
103 service=client.jid.userhostJID())
104 elt_list, rsm_response, mam_response = mam_data
105 complete = mam_response["complete"]
106 # we update MAM request for next iteration
107 mam_req.rsm.after = rsm_response.last
108 # before may be set if we had no previous history
109 mam_req.rsm.before = None
110 if not elt_list:
111 break
112 else:
113 count += len(elt_list)
114
115 for mess_elt in elt_list:
116 try:
117 fwd_message_elt = self.get_message_from_result(
118 client, mess_elt, mam_req)
119 except exceptions.DataError:
120 continue
121
122 try:
123 destinee = jid.JID(fwd_message_elt['to'])
124 except KeyError:
125 log.warning(_('missing "to" attribute in forwarded message'))
126 destinee = client.jid
127 if destinee.userhostJID() == client.jid.userhostJID():
128 # message to use, we insert the forwarded message in the normal
129 # workflow
130 client.xmlstream.dispatch(fwd_message_elt)
131 else:
132 # this message should be from us, we just add it to history
133 try:
134 from_jid = jid.JID(fwd_message_elt['from'])
135 except KeyError:
136 log.warning(_('missing "from" attribute in forwarded message'))
137 from_jid = client.jid
138 if from_jid.userhostJID() != client.jid.userhostJID():
139 log.warning(_(
140 'was expecting a message sent by our jid, but this one if '
141 'from {from_jid}, ignoring\n{xml}').format(
142 from_jid=from_jid.full(), xml=mess_elt.toXml()))
143 continue
144 # adding message to history
145 mess_data = client.messageProt.parse_message(fwd_message_elt)
146 try:
147 await client.messageProt.add_to_history(mess_data)
148 except exceptions.CancelError as e:
149 log.warning(
150 "message has not been added to history: {e}".format(e=e))
151 except Exception as e:
152 log.error(
153 "can't add message to history: {e}\n{xml}"
154 .format(e=e, xml=mess_elt.toXml()))
155
156 if not count:
157 log.info(_("We have received no message while offline"))
158 else:
159 log.info(_("We have received {num_mess} message(s) while offline.")
160 .format(num_mess=count))
161
162 def profile_connected(self, client):
163 defer.ensureDeferred(self.resume(client))
164
165 def get_handler(self, client):
166 mam_client = client._mam = SatMAMClient(self)
167 return mam_client
168
169 def parse_extra(self, extra, with_rsm=True):
170 """Parse extra dictionnary to retrieve MAM arguments
171
172 @param extra(dict): data for parse
173 @param with_rsm(bool): if True, RSM data will be parsed too
174 @return (data_form, None): request with parsed arguments
175 or None if no MAM arguments have been found
176 """
177 mam_args = {}
178 form_args = {}
179 for arg in ("start", "end"):
180 try:
181 value = extra.pop(MAM_PREFIX + arg)
182 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc())
183 except (TypeError, ValueError):
184 log.warning("Bad value for {arg} filter ({value}), ignoring".format(
185 arg=arg, value=value))
186 except KeyError:
187 continue
188
189 try:
190 form_args["with_jid"] = jid.JID(extra.pop(
191 MAM_PREFIX + "with"))
192 except (jid.InvalidFormat):
193 log.warning("Bad value for jid filter")
194 except KeyError:
195 pass
196
197 for name, value in extra.items():
198 if name.startswith(FILTER_PREFIX):
199 var = name[len(FILTER_PREFIX):]
200 extra_fields = form_args.setdefault("extra_fields", [])
201 extra_fields.append(data_form.Field(var=var, value=value))
202
203 for arg in ("node", "query_id"):
204 try:
205 value = extra.pop(MAM_PREFIX + arg)
206 mam_args[arg] = value
207 except KeyError:
208 continue
209
210 if with_rsm:
211 rsm_request = self._rsm.parse_extra(extra)
212 if rsm_request is not None:
213 mam_args["rsm_"] = rsm_request
214
215 if form_args:
216 mam_args["form"] = mam.buildForm(**form_args)
217
218 # we only set orderBy if we have other MAM args
219 # else we would make a MAM query while it's not expected
220 if "order_by" in extra and mam_args:
221 order_by = extra.pop("order_by")
222 assert isinstance(order_by, list)
223 mam_args["orderBy"] = order_by
224
225 return mam.MAMRequest(**mam_args) if mam_args else None
226
227 def get_message_from_result(self, client, mess_elt, mam_req, service=None):
228 """Extract usable <message/> from MAM query result
229
230 The message will be validated, and stanza-id/delay will be added if necessary.
231 @param mess_elt(domish.Element): result <message/> element wrapping the message
232 to retrieve
233 @param mam_req(mam.MAMRequest): request used (needed to get query_id)
234 @param service(jid.JID, None): MAM service where the request has been sent
235 None if it's user server
236 @return domish.Element): <message/> that can be used directly with onMessage
237 """
238 if mess_elt.name != "message":
239 log.warning("unexpected stanza in archive: {xml}".format(
240 xml=mess_elt.toXml()))
241 raise exceptions.DataError("Invalid element")
242 service_jid = client.jid.userhostJID() if service is None else service
243 mess_from = mess_elt["from"]
244 # we check that the message has been sent by the right service
245 # if service is None (i.e. message expected from our own server)
246 # from can be server jid or user's bare jid
247 if (mess_from != service_jid.full()
248 and not (service is None and mess_from == client.jid.host)):
249 log.error("Message is not from our server, something went wrong: "
250 "{xml}".format(xml=mess_elt.toXml()))
251 raise exceptions.DataError("Invalid element")
252 try:
253 result_elt = next(mess_elt.elements(mam.NS_MAM, "result"))
254 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, "forwarded"))
255 try:
256 delay_elt = next(forwarded_elt.elements(C.NS_DELAY, "delay"))
257 except StopIteration:
258 # delay_elt is not mandatory
259 delay_elt = None
260 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message"))
261 except StopIteration:
262 log.warning("Invalid message received from MAM: {xml}".format(
263 xml=mess_elt.toXml()))
264 raise exceptions.DataError("Invalid element")
265 else:
266 if not result_elt["queryid"] == mam_req.query_id:
267 log.error("Unexpected query id (was expecting {query_id}): {xml}"
268 .format(query_id=mam.query_id, xml=mess_elt.toXml()))
269 raise exceptions.DataError("Invalid element")
270 stanza_id = self._sid.get_stanza_id(fwd_message_elt,
271 service_jid)
272 if stanza_id is None:
273 # not stanza-id element is present, we add one so message
274 # will be archived with it, and we won't request several times
275 # the same MAM achive
276 try:
277 stanza_id = result_elt["id"]
278 except AttributeError:
279 log.warning('Invalid MAM result: missing "id" attribute: {xml}'
280 .format(xml=result_elt.toXml()))
281 raise exceptions.DataError("Invalid element")
282 self._sid.add_stanza_id(client, fwd_message_elt, stanza_id, by=service_jid)
283
284 if delay_elt is not None:
285 fwd_message_elt.addChild(delay_elt)
286
287 return fwd_message_elt
288
289 def queryFields(self, client, service=None):
290 """Ask the server about supported fields.
291
292 @param service: entity offering the MAM service (None for user archives)
293 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5)
294 """
295 return client._mam.queryFields(service)
296
297 def queryArchive(self, client, mam_req, service=None):
298 """Query a user, MUC or pubsub archive.
299
300 @param mam_req(mam.MAMRequest): MAM query instance
301 @param service(jid.JID, None): entity offering the MAM service
302 None for user server
303 @return (D(domish.Element)): <IQ/> result
304 """
305 return client._mam.queryArchive(mam_req, service)
306
307 def _append_message(self, elt_list, message_cb, message_elt):
308 if message_cb is not None:
309 elt_list.append(message_cb(message_elt))
310 else:
311 elt_list.append(message_elt)
312
313 def _query_finished(self, iq_result, client, elt_list, event):
314 client.xmlstream.removeObserver(event, self._append_message)
315 try:
316 fin_elt = next(iq_result.elements(mam.NS_MAM, "fin"))
317 except StopIteration:
318 raise exceptions.DataError("Invalid MAM result")
319
320 mam_response = {"complete": C.bool(fin_elt.getAttribute("complete", C.BOOL_FALSE)),
321 "stable": C.bool(fin_elt.getAttribute("stable", C.BOOL_TRUE))}
322
323 try:
324 rsm_response = rsm.RSMResponse.fromElement(fin_elt)
325 except rsm.RSMNotFoundError:
326 rsm_response = None
327
328 return (elt_list, rsm_response, mam_response)
329
330 def serialize_archive_result(self, data, client, mam_req, service):
331 elt_list, rsm_response, mam_response = data
332 mess_list = []
333 for elt in elt_list:
334 fwd_message_elt = self.get_message_from_result(client, elt, mam_req,
335 service=service)
336 mess_data = client.messageProt.parse_message(fwd_message_elt)
337 mess_list.append(client.message_get_bridge_args(mess_data))
338 metadata = {
339 'rsm': self._rsm.response2dict(rsm_response),
340 'mam': mam_response
341 }
342 return mess_list, data_format.serialise(metadata), client.profile
343
344 def _get_archives(self, service, extra_ser, profile_key):
345 """
346 @return: tuple with:
347 - list of message with same data as in bridge.message_new
348 - response metadata with:
349 - rsm data (first, last, count, index)
350 - mam data (complete, stable)
351 - profile
352 """
353 client = self.host.get_client(profile_key)
354 service = jid.JID(service) if service else None
355 extra = data_format.deserialise(extra_ser, {})
356 mam_req = self.parse_extra(extra)
357
358 d = self.get_archives(client, mam_req, service=service)
359 d.addCallback(self.serialize_archive_result, client, mam_req, service)
360 return d
361
362 def get_archives(self, client, query, service=None, message_cb=None):
363 """Query archive and gather page result
364
365 @param query(mam.MAMRequest): MAM request
366 @param service(jid.JID, None): MAM service to use
367 None to use our own server
368 @param message_cb(callable, None): callback to use on each message
369 this method can be used to unwrap messages
370 @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with:
371 - list of found elements
372 - RSM response
373 - MAM response, which is a dict with following value:
374 - complete: a boolean which is True if all items have been received
375 - stable: a boolean which is False if items order may be changed
376 """
377 if query.query_id is None:
378 query.query_id = str(uuid.uuid4())
379 elt_list = []
380 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id)
381 client.xmlstream.addObserver(event, self._append_message, 0, elt_list, message_cb)
382 d = self.queryArchive(client, query, service)
383 d.addCallback(self._query_finished, client, elt_list, event)
384 return d
385
386 def get_prefs(self, client, service=None):
387 """Retrieve the current user preferences.
388
389 @param service: entity offering the MAM service (None for user archives)
390 @return: the server response as a Deferred domish.Element
391 """
392 # http://xmpp.org/extensions/xep-0313.html#prefs
393 return client._mam.queryPrefs(service)
394
395 def _set_prefs(self, service_s=None, default="roster", always=None, never=None,
396 profile_key=C.PROF_KEY_NONE):
397 service = jid.JID(service_s) if service_s else None
398 always_jid = [jid.JID(entity) for entity in always]
399 never_jid = [jid.JID(entity) for entity in never]
400 # TODO: why not build here a MAMPrefs object instead of passing the args separately?
401 return self.setPrefs(service, default, always_jid, never_jid, profile_key)
402
403 def setPrefs(self, client, service=None, default="roster", always=None, never=None):
404 """Set news user preferences.
405
406 @param service: entity offering the MAM service (None for user archives)
407 @param default (unicode): a value in ('always', 'never', 'roster')
408 @param always (list): a list of JID instances
409 @param never (list): a list of JID instances
410 @param profile_key (unicode): %(doc_profile_key)s
411 @return: the server response as a Deferred domish.Element
412 """
413 # http://xmpp.org/extensions/xep-0313.html#prefs
414 return client._mam.setPrefs(service, default, always, never)
415
416 def on_message_stanza_id(self, message_elt, client):
417 """Called when a message with a stanza-id is received
418
419 the messages' stanza ids are stored when received, so the last one can be used
420 to retrieve missing history on next connection
421 @param message_elt(domish.Element): <message> with a stanza-id
422 """
423 service_jid = client.jid.userhostJID()
424 stanza_id = self._sid.get_stanza_id(message_elt, service_jid)
425 if stanza_id is None:
426 log.debug("Ignoring <message>, stanza id is not from our server")
427 else:
428 # we use self._last_stanza_id_d do be sure that last_stanza_id is stored in
429 # the order of reception
430 self._last_stanza_id_d.addCallback(
431 lambda __: self.host.memory.storage.set_private_value(
432 namespace=mam.NS_MAM,
433 key=KEY_LAST_STANZA_ID,
434 value=stanza_id,
435 profile=client.profile))
436
437
438 @implementer(disco.IDisco)
439 class SatMAMClient(mam.MAMClient):
440
441 def __init__(self, plugin_parent):
442 self.plugin_parent = plugin_parent
443
444 @property
445 def host(self):
446 return self.parent.host_app
447
448 def connectionInitialized(self):
449 observer_xpath = MESSAGE_STANZA_ID.format(
450 ns_stanza_id=self.host.ns_map['stanza_id'])
451 self.xmlstream.addObserver(
452 observer_xpath, self.plugin_parent.on_message_stanza_id, client=self.parent
453 )
454
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
456 return [disco.DiscoFeature(mam.NS_MAM)]
457
458 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
459 return []