Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0313.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0313.py@524856bd7b19 |
children | 85f5e6225aa1 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # SAT plugin for Message Archive Management (XEP-0313) | |
5 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
6 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) | |
7 | |
8 # This program is free software: you can redistribute it and/or modify | |
9 # it under the terms of the GNU Affero General Public License as published by | |
10 # the Free Software Foundation, either version 3 of the License, or | |
11 # (at your option) any later version. | |
12 | |
13 # This program is distributed in the hope that it will be useful, | |
14 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
15 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
16 # GNU Affero General Public License for more details. | |
17 | |
18 # You should have received a copy of the GNU Affero General Public License | |
19 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
20 | |
21 from libervia.backend.core.constants import Const as C | |
22 from libervia.backend.core.i18n import _ | |
23 from libervia.backend.core.log import getLogger | |
24 from libervia.backend.core import exceptions | |
25 from libervia.backend.tools.common import data_format | |
26 from twisted.words.protocols.jabber import jid | |
27 from twisted.internet import defer | |
28 from zope.interface import implementer | |
29 from datetime import datetime | |
30 from dateutil import tz | |
31 from wokkel import disco | |
32 from wokkel import data_form | |
33 import uuid | |
34 | |
35 # XXX: mam and rsm come from sat_tmp.wokkel | |
36 from wokkel import rsm | |
37 from wokkel import mam | |
38 | |
39 | |
40 log = getLogger(__name__) | |
41 | |
42 PLUGIN_INFO = { | |
43 C.PI_NAME: "Message Archive Management", | |
44 C.PI_IMPORT_NAME: "XEP-0313", | |
45 C.PI_TYPE: "XEP", | |
46 # XEP-0431 only defines a namespace, so we register it here | |
47 C.PI_PROTOCOLS: ["XEP-0313", "XEP-0431"], | |
48 C.PI_DEPENDENCIES: ["XEP-0059", "XEP-0359"], | |
49 C.PI_MAIN: "XEP_0313", | |
50 C.PI_HANDLER: "yes", | |
51 C.PI_DESCRIPTION: _("""Implementation of Message Archive Management"""), | |
52 } | |
53 | |
54 MAM_PREFIX = "mam_" | |
55 FILTER_PREFIX = MAM_PREFIX + "filter_" | |
56 KEY_LAST_STANZA_ID = "last_stanza_id" | |
57 MESSAGE_RESULT = "/message/result[@xmlns='{mam_ns}' and @queryid='{query_id}']" | |
58 MESSAGE_STANZA_ID = '/message/stanza-id[@xmlns="{ns_stanza_id}"]' | |
59 NS_FTS = "urn:xmpp:fulltext:0" | |
60 | |
61 | |
62 class XEP_0313(object): | |
63 def __init__(self, host): | |
64 log.info(_("Message Archive Management plugin initialization")) | |
65 self.host = host | |
66 self.host.register_namespace("mam", mam.NS_MAM) | |
67 host.register_namespace("fulltextmam", NS_FTS) | |
68 self._rsm = host.plugins["XEP-0059"] | |
69 self._sid = host.plugins["XEP-0359"] | |
70 # Deferred used to store last stanza id in order of reception | |
71 self._last_stanza_id_d = defer.Deferred() | |
72 self._last_stanza_id_d.callback(None) | |
73 host.bridge.add_method( | |
74 "mam_get", ".plugin", in_sign='sss', | |
75 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._get_archives, | |
76 async_=True) | |
77 | |
78 async def resume(self, client): | |
79 """Retrieve one2one messages received since the last we have in local storage""" | |
80 stanza_id_data = await self.host.memory.storage.get_privates( | |
81 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) | |
82 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) | |
83 rsm_req = None | |
84 if stanza_id is None: | |
85 log.info("can't retrieve last stanza ID, checking history") | |
86 last_mess = await self.host.memory.history_get( | |
87 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, | |
88 'last_stanza_id': True}, | |
89 profile=client.profile) | |
90 if not last_mess: | |
91 log.info(_("It seems that we have no MAM history yet")) | |
92 stanza_id = None | |
93 rsm_req = rsm.RSMRequest(max_=50, before="") | |
94 else: | |
95 stanza_id = last_mess[0][-1]['stanza_id'] | |
96 if rsm_req is None: | |
97 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) | |
98 mam_req = mam.MAMRequest(rsm_=rsm_req) | |
99 complete = False | |
100 count = 0 | |
101 while not complete: | |
102 mam_data = await self.get_archives(client, mam_req, | |
103 service=client.jid.userhostJID()) | |
104 elt_list, rsm_response, mam_response = mam_data | |
105 complete = mam_response["complete"] | |
106 # we update MAM request for next iteration | |
107 mam_req.rsm.after = rsm_response.last | |
108 # before may be set if we had no previous history | |
109 mam_req.rsm.before = None | |
110 if not elt_list: | |
111 break | |
112 else: | |
113 count += len(elt_list) | |
114 | |
115 for mess_elt in elt_list: | |
116 try: | |
117 fwd_message_elt = self.get_message_from_result( | |
118 client, mess_elt, mam_req) | |
119 except exceptions.DataError: | |
120 continue | |
121 | |
122 try: | |
123 destinee = jid.JID(fwd_message_elt['to']) | |
124 except KeyError: | |
125 log.warning(_('missing "to" attribute in forwarded message')) | |
126 destinee = client.jid | |
127 if destinee.userhostJID() == client.jid.userhostJID(): | |
128 # message to use, we insert the forwarded message in the normal | |
129 # workflow | |
130 client.xmlstream.dispatch(fwd_message_elt) | |
131 else: | |
132 # this message should be from us, we just add it to history | |
133 try: | |
134 from_jid = jid.JID(fwd_message_elt['from']) | |
135 except KeyError: | |
136 log.warning(_('missing "from" attribute in forwarded message')) | |
137 from_jid = client.jid | |
138 if from_jid.userhostJID() != client.jid.userhostJID(): | |
139 log.warning(_( | |
140 'was expecting a message sent by our jid, but this one if ' | |
141 'from {from_jid}, ignoring\n{xml}').format( | |
142 from_jid=from_jid.full(), xml=mess_elt.toXml())) | |
143 continue | |
144 # adding message to history | |
145 mess_data = client.messageProt.parse_message(fwd_message_elt) | |
146 try: | |
147 await client.messageProt.add_to_history(mess_data) | |
148 except exceptions.CancelError as e: | |
149 log.warning( | |
150 "message has not been added to history: {e}".format(e=e)) | |
151 except Exception as e: | |
152 log.error( | |
153 "can't add message to history: {e}\n{xml}" | |
154 .format(e=e, xml=mess_elt.toXml())) | |
155 | |
156 if not count: | |
157 log.info(_("We have received no message while offline")) | |
158 else: | |
159 log.info(_("We have received {num_mess} message(s) while offline.") | |
160 .format(num_mess=count)) | |
161 | |
162 def profile_connected(self, client): | |
163 defer.ensureDeferred(self.resume(client)) | |
164 | |
165 def get_handler(self, client): | |
166 mam_client = client._mam = SatMAMClient(self) | |
167 return mam_client | |
168 | |
169 def parse_extra(self, extra, with_rsm=True): | |
170 """Parse extra dictionnary to retrieve MAM arguments | |
171 | |
172 @param extra(dict): data for parse | |
173 @param with_rsm(bool): if True, RSM data will be parsed too | |
174 @return (data_form, None): request with parsed arguments | |
175 or None if no MAM arguments have been found | |
176 """ | |
177 mam_args = {} | |
178 form_args = {} | |
179 for arg in ("start", "end"): | |
180 try: | |
181 value = extra.pop(MAM_PREFIX + arg) | |
182 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) | |
183 except (TypeError, ValueError): | |
184 log.warning("Bad value for {arg} filter ({value}), ignoring".format( | |
185 arg=arg, value=value)) | |
186 except KeyError: | |
187 continue | |
188 | |
189 try: | |
190 form_args["with_jid"] = jid.JID(extra.pop( | |
191 MAM_PREFIX + "with")) | |
192 except (jid.InvalidFormat): | |
193 log.warning("Bad value for jid filter") | |
194 except KeyError: | |
195 pass | |
196 | |
197 for name, value in extra.items(): | |
198 if name.startswith(FILTER_PREFIX): | |
199 var = name[len(FILTER_PREFIX):] | |
200 extra_fields = form_args.setdefault("extra_fields", []) | |
201 extra_fields.append(data_form.Field(var=var, value=value)) | |
202 | |
203 for arg in ("node", "query_id"): | |
204 try: | |
205 value = extra.pop(MAM_PREFIX + arg) | |
206 mam_args[arg] = value | |
207 except KeyError: | |
208 continue | |
209 | |
210 if with_rsm: | |
211 rsm_request = self._rsm.parse_extra(extra) | |
212 if rsm_request is not None: | |
213 mam_args["rsm_"] = rsm_request | |
214 | |
215 if form_args: | |
216 mam_args["form"] = mam.buildForm(**form_args) | |
217 | |
218 # we only set orderBy if we have other MAM args | |
219 # else we would make a MAM query while it's not expected | |
220 if "order_by" in extra and mam_args: | |
221 order_by = extra.pop("order_by") | |
222 assert isinstance(order_by, list) | |
223 mam_args["orderBy"] = order_by | |
224 | |
225 return mam.MAMRequest(**mam_args) if mam_args else None | |
226 | |
227 def get_message_from_result(self, client, mess_elt, mam_req, service=None): | |
228 """Extract usable <message/> from MAM query result | |
229 | |
230 The message will be validated, and stanza-id/delay will be added if necessary. | |
231 @param mess_elt(domish.Element): result <message/> element wrapping the message | |
232 to retrieve | |
233 @param mam_req(mam.MAMRequest): request used (needed to get query_id) | |
234 @param service(jid.JID, None): MAM service where the request has been sent | |
235 None if it's user server | |
236 @return domish.Element): <message/> that can be used directly with onMessage | |
237 """ | |
238 if mess_elt.name != "message": | |
239 log.warning("unexpected stanza in archive: {xml}".format( | |
240 xml=mess_elt.toXml())) | |
241 raise exceptions.DataError("Invalid element") | |
242 service_jid = client.jid.userhostJID() if service is None else service | |
243 mess_from = mess_elt["from"] | |
244 # we check that the message has been sent by the right service | |
245 # if service is None (i.e. message expected from our own server) | |
246 # from can be server jid or user's bare jid | |
247 if (mess_from != service_jid.full() | |
248 and not (service is None and mess_from == client.jid.host)): | |
249 log.error("Message is not from our server, something went wrong: " | |
250 "{xml}".format(xml=mess_elt.toXml())) | |
251 raise exceptions.DataError("Invalid element") | |
252 try: | |
253 result_elt = next(mess_elt.elements(mam.NS_MAM, "result")) | |
254 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, "forwarded")) | |
255 try: | |
256 delay_elt = next(forwarded_elt.elements(C.NS_DELAY, "delay")) | |
257 except StopIteration: | |
258 # delay_elt is not mandatory | |
259 delay_elt = None | |
260 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message")) | |
261 except StopIteration: | |
262 log.warning("Invalid message received from MAM: {xml}".format( | |
263 xml=mess_elt.toXml())) | |
264 raise exceptions.DataError("Invalid element") | |
265 else: | |
266 if not result_elt["queryid"] == mam_req.query_id: | |
267 log.error("Unexpected query id (was expecting {query_id}): {xml}" | |
268 .format(query_id=mam.query_id, xml=mess_elt.toXml())) | |
269 raise exceptions.DataError("Invalid element") | |
270 stanza_id = self._sid.get_stanza_id(fwd_message_elt, | |
271 service_jid) | |
272 if stanza_id is None: | |
273 # not stanza-id element is present, we add one so message | |
274 # will be archived with it, and we won't request several times | |
275 # the same MAM achive | |
276 try: | |
277 stanza_id = result_elt["id"] | |
278 except AttributeError: | |
279 log.warning('Invalid MAM result: missing "id" attribute: {xml}' | |
280 .format(xml=result_elt.toXml())) | |
281 raise exceptions.DataError("Invalid element") | |
282 self._sid.add_stanza_id(client, fwd_message_elt, stanza_id, by=service_jid) | |
283 | |
284 if delay_elt is not None: | |
285 fwd_message_elt.addChild(delay_elt) | |
286 | |
287 return fwd_message_elt | |
288 | |
289 def queryFields(self, client, service=None): | |
290 """Ask the server about supported fields. | |
291 | |
292 @param service: entity offering the MAM service (None for user archives) | |
293 @return (D(data_form.Form)): form with the implemented fields (cf XEP-0313 §4.1.5) | |
294 """ | |
295 return client._mam.queryFields(service) | |
296 | |
297 def queryArchive(self, client, mam_req, service=None): | |
298 """Query a user, MUC or pubsub archive. | |
299 | |
300 @param mam_req(mam.MAMRequest): MAM query instance | |
301 @param service(jid.JID, None): entity offering the MAM service | |
302 None for user server | |
303 @return (D(domish.Element)): <IQ/> result | |
304 """ | |
305 return client._mam.queryArchive(mam_req, service) | |
306 | |
307 def _append_message(self, elt_list, message_cb, message_elt): | |
308 if message_cb is not None: | |
309 elt_list.append(message_cb(message_elt)) | |
310 else: | |
311 elt_list.append(message_elt) | |
312 | |
313 def _query_finished(self, iq_result, client, elt_list, event): | |
314 client.xmlstream.removeObserver(event, self._append_message) | |
315 try: | |
316 fin_elt = next(iq_result.elements(mam.NS_MAM, "fin")) | |
317 except StopIteration: | |
318 raise exceptions.DataError("Invalid MAM result") | |
319 | |
320 mam_response = {"complete": C.bool(fin_elt.getAttribute("complete", C.BOOL_FALSE)), | |
321 "stable": C.bool(fin_elt.getAttribute("stable", C.BOOL_TRUE))} | |
322 | |
323 try: | |
324 rsm_response = rsm.RSMResponse.fromElement(fin_elt) | |
325 except rsm.RSMNotFoundError: | |
326 rsm_response = None | |
327 | |
328 return (elt_list, rsm_response, mam_response) | |
329 | |
330 def serialize_archive_result(self, data, client, mam_req, service): | |
331 elt_list, rsm_response, mam_response = data | |
332 mess_list = [] | |
333 for elt in elt_list: | |
334 fwd_message_elt = self.get_message_from_result(client, elt, mam_req, | |
335 service=service) | |
336 mess_data = client.messageProt.parse_message(fwd_message_elt) | |
337 mess_list.append(client.message_get_bridge_args(mess_data)) | |
338 metadata = { | |
339 'rsm': self._rsm.response2dict(rsm_response), | |
340 'mam': mam_response | |
341 } | |
342 return mess_list, data_format.serialise(metadata), client.profile | |
343 | |
344 def _get_archives(self, service, extra_ser, profile_key): | |
345 """ | |
346 @return: tuple with: | |
347 - list of message with same data as in bridge.message_new | |
348 - response metadata with: | |
349 - rsm data (first, last, count, index) | |
350 - mam data (complete, stable) | |
351 - profile | |
352 """ | |
353 client = self.host.get_client(profile_key) | |
354 service = jid.JID(service) if service else None | |
355 extra = data_format.deserialise(extra_ser, {}) | |
356 mam_req = self.parse_extra(extra) | |
357 | |
358 d = self.get_archives(client, mam_req, service=service) | |
359 d.addCallback(self.serialize_archive_result, client, mam_req, service) | |
360 return d | |
361 | |
362 def get_archives(self, client, query, service=None, message_cb=None): | |
363 """Query archive and gather page result | |
364 | |
365 @param query(mam.MAMRequest): MAM request | |
366 @param service(jid.JID, None): MAM service to use | |
367 None to use our own server | |
368 @param message_cb(callable, None): callback to use on each message | |
369 this method can be used to unwrap messages | |
370 @return (tuple[list[domish.Element], rsm.RSMResponse, dict): result data with: | |
371 - list of found elements | |
372 - RSM response | |
373 - MAM response, which is a dict with following value: | |
374 - complete: a boolean which is True if all items have been received | |
375 - stable: a boolean which is False if items order may be changed | |
376 """ | |
377 if query.query_id is None: | |
378 query.query_id = str(uuid.uuid4()) | |
379 elt_list = [] | |
380 event = MESSAGE_RESULT.format(mam_ns=mam.NS_MAM, query_id=query.query_id) | |
381 client.xmlstream.addObserver(event, self._append_message, 0, elt_list, message_cb) | |
382 d = self.queryArchive(client, query, service) | |
383 d.addCallback(self._query_finished, client, elt_list, event) | |
384 return d | |
385 | |
386 def get_prefs(self, client, service=None): | |
387 """Retrieve the current user preferences. | |
388 | |
389 @param service: entity offering the MAM service (None for user archives) | |
390 @return: the server response as a Deferred domish.Element | |
391 """ | |
392 # http://xmpp.org/extensions/xep-0313.html#prefs | |
393 return client._mam.queryPrefs(service) | |
394 | |
395 def _set_prefs(self, service_s=None, default="roster", always=None, never=None, | |
396 profile_key=C.PROF_KEY_NONE): | |
397 service = jid.JID(service_s) if service_s else None | |
398 always_jid = [jid.JID(entity) for entity in always] | |
399 never_jid = [jid.JID(entity) for entity in never] | |
400 # TODO: why not build here a MAMPrefs object instead of passing the args separately? | |
401 return self.setPrefs(service, default, always_jid, never_jid, profile_key) | |
402 | |
403 def setPrefs(self, client, service=None, default="roster", always=None, never=None): | |
404 """Set news user preferences. | |
405 | |
406 @param service: entity offering the MAM service (None for user archives) | |
407 @param default (unicode): a value in ('always', 'never', 'roster') | |
408 @param always (list): a list of JID instances | |
409 @param never (list): a list of JID instances | |
410 @param profile_key (unicode): %(doc_profile_key)s | |
411 @return: the server response as a Deferred domish.Element | |
412 """ | |
413 # http://xmpp.org/extensions/xep-0313.html#prefs | |
414 return client._mam.setPrefs(service, default, always, never) | |
415 | |
416 def on_message_stanza_id(self, message_elt, client): | |
417 """Called when a message with a stanza-id is received | |
418 | |
419 the messages' stanza ids are stored when received, so the last one can be used | |
420 to retrieve missing history on next connection | |
421 @param message_elt(domish.Element): <message> with a stanza-id | |
422 """ | |
423 service_jid = client.jid.userhostJID() | |
424 stanza_id = self._sid.get_stanza_id(message_elt, service_jid) | |
425 if stanza_id is None: | |
426 log.debug("Ignoring <message>, stanza id is not from our server") | |
427 else: | |
428 # we use self._last_stanza_id_d do be sure that last_stanza_id is stored in | |
429 # the order of reception | |
430 self._last_stanza_id_d.addCallback( | |
431 lambda __: self.host.memory.storage.set_private_value( | |
432 namespace=mam.NS_MAM, | |
433 key=KEY_LAST_STANZA_ID, | |
434 value=stanza_id, | |
435 profile=client.profile)) | |
436 | |
437 | |
438 @implementer(disco.IDisco) | |
439 class SatMAMClient(mam.MAMClient): | |
440 | |
441 def __init__(self, plugin_parent): | |
442 self.plugin_parent = plugin_parent | |
443 | |
444 @property | |
445 def host(self): | |
446 return self.parent.host_app | |
447 | |
448 def connectionInitialized(self): | |
449 observer_xpath = MESSAGE_STANZA_ID.format( | |
450 ns_stanza_id=self.host.ns_map['stanza_id']) | |
451 self.xmlstream.addObserver( | |
452 observer_xpath, self.plugin_parent.on_message_stanza_id, client=self.parent | |
453 ) | |
454 | |
455 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
456 return [disco.DiscoFeature(mam.NS_MAM)] | |
457 | |
458 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
459 return [] |