Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0313.py @ 4270:0d7bb4df2343
Reformatted code base using black.
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 19 Jun 2024 18:44:57 +0200 |
parents | 1c30d574df2b |
children |
comparison
equal
deleted
inserted
replaced
4269:64a85ce8be70 | 4270:0d7bb4df2343 |
---|---|
71 self._sid = host.plugins["XEP-0359"] | 71 self._sid = host.plugins["XEP-0359"] |
72 # Deferred used to store last stanza id in order of reception | 72 # Deferred used to store last stanza id in order of reception |
73 self._last_stanza_id_d = defer.Deferred() | 73 self._last_stanza_id_d = defer.Deferred() |
74 self._last_stanza_id_d.callback(None) | 74 self._last_stanza_id_d.callback(None) |
75 host.bridge.add_method( | 75 host.bridge.add_method( |
76 "mam_get", ".plugin", in_sign='sss', | 76 "mam_get", |
77 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._get_archives, | 77 ".plugin", |
78 async_=True) | 78 in_sign="sss", |
79 out_sign="(a(sdssa{ss}a{ss}ss)ss)", | |
80 method=self._get_archives, | |
81 async_=True, | |
82 ) | |
79 | 83 |
80 async def resume(self, client): | 84 async def resume(self, client): |
81 """Retrieve one2one messages received since the last we have in local storage""" | 85 """Retrieve one2one messages received since the last we have in local storage""" |
82 stanza_id_data = await self.host.memory.storage.get_privates( | 86 stanza_id_data = await self.host.memory.storage.get_privates( |
83 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) | 87 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile |
88 ) | |
84 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) | 89 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) |
85 rsm_req = None | 90 rsm_req = None |
86 if stanza_id is None: | 91 if stanza_id is None: |
87 log.info("can't retrieve last stanza ID, checking history") | 92 log.info("can't retrieve last stanza ID, checking history") |
88 last_mess = await self.host.memory.history_get( | 93 last_mess = await self.host.memory.history_get( |
89 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, | 94 None, |
90 'last_stanza_id': True}, | 95 None, |
91 profile=client.profile) | 96 limit=1, |
97 filters={"not_types": C.MESS_TYPE_GROUPCHAT, "last_stanza_id": True}, | |
98 profile=client.profile, | |
99 ) | |
92 if not last_mess: | 100 if not last_mess: |
93 log.info(_("It seems that we have no MAM history yet")) | 101 log.info(_("It seems that we have no MAM history yet")) |
94 stanza_id = None | 102 stanza_id = None |
95 rsm_req = rsm.RSMRequest(max_=50, before="") | 103 rsm_req = rsm.RSMRequest(max_=50, before="") |
96 else: | 104 else: |
97 stanza_id = last_mess[0][-1]['stanza_id'] | 105 stanza_id = last_mess[0][-1]["stanza_id"] |
98 if rsm_req is None: | 106 if rsm_req is None: |
99 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) | 107 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) |
100 mam_req = mam.MAMRequest(rsm_=rsm_req) | 108 mam_req = mam.MAMRequest(rsm_=rsm_req) |
101 complete = False | 109 complete = False |
102 count = 0 | 110 count = 0 |
103 while not complete: | 111 while not complete: |
104 try: | 112 try: |
105 mam_data = await self.get_archives(client, mam_req, | 113 mam_data = await self.get_archives( |
106 service=client.jid.userhostJID()) | 114 client, mam_req, service=client.jid.userhostJID() |
115 ) | |
107 except StanzaError as e: | 116 except StanzaError as e: |
108 log.warning( | 117 log.warning( |
109 f"Can't retrieve MAM archives: {e}\n" | 118 f"Can't retrieve MAM archives: {e}\n" |
110 f"{xml_tools.pp_elt(mam_req.toElement())}\n" | 119 f"{xml_tools.pp_elt(mam_req.toElement())}\n" |
111 f"{xml_tools.pp_elt(e.stanza)}" | 120 f"{xml_tools.pp_elt(e.stanza)}" |
112 ) | 121 ) |
113 return | 122 return |
114 except Exception as e: | 123 except Exception as e: |
115 log.exception( | 124 log.exception(f"Can't retrieve retrieve MAM archive") |
116 f"Can't retrieve retrieve MAM archive" | |
117 ) | |
118 return | 125 return |
119 elt_list, rsm_response, mam_response = mam_data | 126 elt_list, rsm_response, mam_response = mam_data |
120 complete = mam_response["complete"] | 127 complete = mam_response["complete"] |
121 # we update MAM request for next iteration | 128 # we update MAM request for next iteration |
122 mam_req.rsm.after = rsm_response.last | 129 mam_req.rsm.after = rsm_response.last |
128 count += len(elt_list) | 135 count += len(elt_list) |
129 | 136 |
130 for idx, mess_elt in enumerate(elt_list): | 137 for idx, mess_elt in enumerate(elt_list): |
131 try: | 138 try: |
132 fwd_message_elt = self.get_message_from_result( | 139 fwd_message_elt = self.get_message_from_result( |
133 client, mess_elt, mam_req) | 140 client, mess_elt, mam_req |
141 ) | |
134 except exceptions.DataError: | 142 except exceptions.DataError: |
135 continue | 143 continue |
136 | 144 |
137 try: | 145 try: |
138 destinee = jid.JID(fwd_message_elt['to']) | 146 destinee = jid.JID(fwd_message_elt["to"]) |
139 except KeyError: | 147 except KeyError: |
140 log.warning(_('missing "to" attribute in forwarded message')) | 148 log.warning(_('missing "to" attribute in forwarded message')) |
141 destinee = client.jid | 149 destinee = client.jid |
142 if destinee.userhostJID() == client.jid.userhostJID(): | 150 if destinee.userhostJID() == client.jid.userhostJID(): |
143 # message to use, we insert the forwarded message in the normal | 151 # message to use, we insert the forwarded message in the normal |
144 # workflow | 152 # workflow |
145 client.xmlstream.dispatch(fwd_message_elt) | 153 client.xmlstream.dispatch(fwd_message_elt) |
146 else: | 154 else: |
147 # this message should be from us, we just add it to history | 155 # this message should be from us, we just add it to history |
148 try: | 156 try: |
149 from_jid = jid.JID(fwd_message_elt['from']) | 157 from_jid = jid.JID(fwd_message_elt["from"]) |
150 except KeyError: | 158 except KeyError: |
151 log.warning(_('missing "from" attribute in forwarded message')) | 159 log.warning(_('missing "from" attribute in forwarded message')) |
152 from_jid = client.jid | 160 from_jid = client.jid |
153 if from_jid.userhostJID() != client.jid.userhostJID(): | 161 if from_jid.userhostJID() != client.jid.userhostJID(): |
154 log.warning(_( | 162 log.warning( |
155 'was expecting a message sent by our jid, but this one if ' | 163 _( |
156 'from {from_jid}, ignoring\n{xml}').format( | 164 "was expecting a message sent by our jid, but this one if " |
157 from_jid=from_jid.full(), xml=mess_elt.toXml())) | 165 "from {from_jid}, ignoring\n{xml}" |
166 ).format(from_jid=from_jid.full(), xml=mess_elt.toXml()) | |
167 ) | |
158 continue | 168 continue |
159 # adding message to history | 169 # adding message to history |
160 mess_data = client.messageProt.parse_message(fwd_message_elt) | 170 mess_data = client.messageProt.parse_message(fwd_message_elt) |
161 try: | 171 try: |
162 await client.messageProt.add_to_history(mess_data) | 172 await client.messageProt.add_to_history(mess_data) |
163 except exceptions.CancelError as e: | 173 except exceptions.CancelError as e: |
164 log.warning( | 174 log.warning( |
165 "message has not been added to history: {e}".format(e=e)) | 175 "message has not been added to history: {e}".format(e=e) |
176 ) | |
166 except Exception as e: | 177 except Exception as e: |
167 log.error( | 178 log.error( |
168 "can't add message to history: {e}\n{xml}" | 179 "can't add message to history: {e}\n{xml}".format( |
169 .format(e=e, xml=mess_elt.toXml())) | 180 e=e, xml=mess_elt.toXml() |
181 ) | |
182 ) | |
170 if complete and idx == len(elt_list) - 1: | 183 if complete and idx == len(elt_list) - 1: |
171 # We are at the last message from archive, we store the ID to not | 184 # We are at the last message from archive, we store the ID to not |
172 # ask again the same messages next time. | 185 # ask again the same messages next time. |
173 try: | 186 try: |
174 stanza_id = mess_elt.result["id"] | 187 stanza_id = mess_elt.result["id"] |
175 await self.host.memory.storage.set_private_value( | 188 await self.host.memory.storage.set_private_value( |
176 namespace=mam.NS_MAM, | 189 namespace=mam.NS_MAM, |
177 key=KEY_LAST_STANZA_ID, | 190 key=KEY_LAST_STANZA_ID, |
178 value=stanza_id, | 191 value=stanza_id, |
179 profile=client.profile | 192 profile=client.profile, |
180 ) | 193 ) |
181 except Exception: | 194 except Exception: |
182 log.exception("Can't store last stanza ID") | 195 log.exception("Can't store last stanza ID") |
183 | 196 |
184 if not count: | 197 if not count: |
185 log.info(_("We have received no message while offline")) | 198 log.info(_("We have received no message while offline")) |
186 else: | 199 else: |
187 log.info(_("We have received {num_mess} message(s) while offline.") | 200 log.info( |
188 .format(num_mess=count)) | 201 _("We have received {num_mess} message(s) while offline.").format( |
202 num_mess=count | |
203 ) | |
204 ) | |
189 | 205 |
190 def profile_connected(self, client): | 206 def profile_connected(self, client): |
191 defer.ensureDeferred(self.resume(client)) | 207 defer.ensureDeferred(self.resume(client)) |
192 | 208 |
193 def get_handler(self, client): | 209 def get_handler(self, client): |
207 for arg in ("start", "end"): | 223 for arg in ("start", "end"): |
208 try: | 224 try: |
209 value = extra.pop(MAM_PREFIX + arg) | 225 value = extra.pop(MAM_PREFIX + arg) |
210 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) | 226 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) |
211 except (TypeError, ValueError): | 227 except (TypeError, ValueError): |
212 log.warning("Bad value for {arg} filter ({value}), ignoring".format( | 228 log.warning( |
213 arg=arg, value=value)) | 229 "Bad value for {arg} filter ({value}), ignoring".format( |
230 arg=arg, value=value | |
231 ) | |
232 ) | |
214 except KeyError: | 233 except KeyError: |
215 continue | 234 continue |
216 | 235 |
217 try: | 236 try: |
218 form_args["with_jid"] = jid.JID(extra.pop( | 237 form_args["with_jid"] = jid.JID(extra.pop(MAM_PREFIX + "with")) |
219 MAM_PREFIX + "with")) | 238 except jid.InvalidFormat: |
220 except (jid.InvalidFormat): | |
221 log.warning("Bad value for jid filter") | 239 log.warning("Bad value for jid filter") |
222 except KeyError: | 240 except KeyError: |
223 pass | 241 pass |
224 | 242 |
225 for name, value in extra.items(): | 243 for name, value in extra.items(): |
226 if name.startswith(FILTER_PREFIX): | 244 if name.startswith(FILTER_PREFIX): |
227 var = name[len(FILTER_PREFIX):] | 245 var = name[len(FILTER_PREFIX) :] |
228 extra_fields = form_args.setdefault("extra_fields", []) | 246 extra_fields = form_args.setdefault("extra_fields", []) |
229 extra_fields.append(data_form.Field(var=var, value=value)) | 247 extra_fields.append(data_form.Field(var=var, value=value)) |
230 | 248 |
231 for arg in ("node", "query_id"): | 249 for arg in ("node", "query_id"): |
232 try: | 250 try: |
262 @param service(jid.JID, None): MAM service where the request has been sent | 280 @param service(jid.JID, None): MAM service where the request has been sent |
263 None if it's user server | 281 None if it's user server |
264 @return domish.Element): <message/> that can be used directly with onMessage | 282 @return domish.Element): <message/> that can be used directly with onMessage |
265 """ | 283 """ |
266 if mess_elt.name != "message": | 284 if mess_elt.name != "message": |
267 log.warning("unexpected stanza in archive: {xml}".format( | 285 log.warning( |
268 xml=mess_elt.toXml())) | 286 "unexpected stanza in archive: {xml}".format(xml=mess_elt.toXml()) |
287 ) | |
269 raise exceptions.DataError("Invalid element") | 288 raise exceptions.DataError("Invalid element") |
270 service_jid = client.jid.userhostJID() if service is None else service | 289 service_jid = client.jid.userhostJID() if service is None else service |
271 mess_from = mess_elt.getAttribute("from") or client.jid.userhost() | 290 mess_from = mess_elt.getAttribute("from") or client.jid.userhost() |
272 # we check that the message has been sent by the right service | 291 # we check that the message has been sent by the right service |
273 # if service is None (i.e. message expected from our own server) | 292 # if service is None (i.e. message expected from our own server) |
274 # from can be server jid or user's bare jid | 293 # from can be server jid or user's bare jid |
275 if (mess_from != service_jid.full() | 294 if mess_from != service_jid.full() and not ( |
276 and not (service is None and mess_from == client.jid.host)): | 295 service is None and mess_from == client.jid.host |
277 log.error("Message is not from our server, something went wrong: " | 296 ): |
278 "{xml}".format(xml=mess_elt.toXml())) | 297 log.error( |
298 "Message is not from our server, something went wrong: " | |
299 "{xml}".format(xml=mess_elt.toXml()) | |
300 ) | |
279 raise exceptions.DataError("Invalid element") | 301 raise exceptions.DataError("Invalid element") |
280 try: | 302 try: |
281 result_elt = next(mess_elt.elements(mam.NS_MAM, "result")) | 303 result_elt = next(mess_elt.elements(mam.NS_MAM, "result")) |
282 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, "forwarded")) | 304 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, "forwarded")) |
283 try: | 305 try: |
285 except StopIteration: | 307 except StopIteration: |
286 # delay_elt is not mandatory | 308 # delay_elt is not mandatory |
287 delay_elt = None | 309 delay_elt = None |
288 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message")) | 310 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message")) |
289 except StopIteration: | 311 except StopIteration: |
290 log.warning("Invalid message received from MAM: {xml}".format( | 312 log.warning( |
291 xml=mess_elt.toXml())) | 313 "Invalid message received from MAM: {xml}".format(xml=mess_elt.toXml()) |
314 ) | |
292 raise exceptions.DataError("Invalid element") | 315 raise exceptions.DataError("Invalid element") |
293 else: | 316 else: |
294 if not result_elt["queryid"] == mam_req.query_id: | 317 if not result_elt["queryid"] == mam_req.query_id: |
295 log.error("Unexpected query id (was expecting {query_id}): {xml}" | 318 log.error( |
296 .format(query_id=mam.query_id, xml=mess_elt.toXml())) | 319 "Unexpected query id (was expecting {query_id}): {xml}".format( |
320 query_id=mam.query_id, xml=mess_elt.toXml() | |
321 ) | |
322 ) | |
297 raise exceptions.DataError("Invalid element") | 323 raise exceptions.DataError("Invalid element") |
298 stanza_id = self._sid.get_stanza_id(fwd_message_elt, | 324 stanza_id = self._sid.get_stanza_id(fwd_message_elt, service_jid) |
299 service_jid) | |
300 if stanza_id is None: | 325 if stanza_id is None: |
301 # not stanza-id element is present, we add one so message | 326 # not stanza-id element is present, we add one so message |
302 # will be archived with it, and we won't request several times | 327 # will be archived with it, and we won't request several times |
303 # the same MAM achive | 328 # the same MAM achive |
304 try: | 329 try: |
305 stanza_id = result_elt["id"] | 330 stanza_id = result_elt["id"] |
306 except AttributeError: | 331 except AttributeError: |
307 log.warning('Invalid MAM result: missing "id" attribute: {xml}' | 332 log.warning( |
308 .format(xml=result_elt.toXml())) | 333 'Invalid MAM result: missing "id" attribute: {xml}'.format( |
334 xml=result_elt.toXml() | |
335 ) | |
336 ) | |
309 raise exceptions.DataError("Invalid element") | 337 raise exceptions.DataError("Invalid element") |
310 self._sid.add_stanza_id(client, fwd_message_elt, stanza_id, by=service_jid) | 338 self._sid.add_stanza_id( |
339 client, fwd_message_elt, stanza_id, by=service_jid | |
340 ) | |
311 | 341 |
312 if delay_elt is not None: | 342 if delay_elt is not None: |
313 fwd_message_elt.addChild(delay_elt) | 343 fwd_message_elt.addChild(delay_elt) |
314 | 344 |
315 return fwd_message_elt | 345 return fwd_message_elt |
343 try: | 373 try: |
344 fin_elt = next(iq_result.elements(mam.NS_MAM, "fin")) | 374 fin_elt = next(iq_result.elements(mam.NS_MAM, "fin")) |
345 except StopIteration: | 375 except StopIteration: |
346 raise exceptions.DataError("Invalid MAM result") | 376 raise exceptions.DataError("Invalid MAM result") |
347 | 377 |
348 mam_response = {"complete": C.bool(fin_elt.getAttribute("complete", C.BOOL_FALSE)), | 378 mam_response = { |
349 "stable": C.bool(fin_elt.getAttribute("stable", C.BOOL_TRUE))} | 379 "complete": C.bool(fin_elt.getAttribute("complete", C.BOOL_FALSE)), |
380 "stable": C.bool(fin_elt.getAttribute("stable", C.BOOL_TRUE)), | |
381 } | |
350 | 382 |
351 try: | 383 try: |
352 rsm_response = rsm.RSMResponse.fromElement(fin_elt) | 384 rsm_response = rsm.RSMResponse.fromElement(fin_elt) |
353 except rsm.RSMNotFoundError: | 385 except rsm.RSMNotFoundError: |
354 rsm_response = None | 386 rsm_response = None |
357 | 389 |
358 def serialize_archive_result(self, data, client, mam_req, service): | 390 def serialize_archive_result(self, data, client, mam_req, service): |
359 elt_list, rsm_response, mam_response = data | 391 elt_list, rsm_response, mam_response = data |
360 mess_list = [] | 392 mess_list = [] |
361 for elt in elt_list: | 393 for elt in elt_list: |
362 fwd_message_elt = self.get_message_from_result(client, elt, mam_req, | 394 fwd_message_elt = self.get_message_from_result( |
363 service=service) | 395 client, elt, mam_req, service=service |
396 ) | |
364 mess_data = client.messageProt.parse_message(fwd_message_elt) | 397 mess_data = client.messageProt.parse_message(fwd_message_elt) |
365 mess_list.append(client.message_get_bridge_args(mess_data)) | 398 mess_list.append(client.message_get_bridge_args(mess_data)) |
366 metadata = { | 399 metadata = {"rsm": self._rsm.response2dict(rsm_response), "mam": mam_response} |
367 'rsm': self._rsm.response2dict(rsm_response), | |
368 'mam': mam_response | |
369 } | |
370 return mess_list, data_format.serialise(metadata), client.profile | 400 return mess_list, data_format.serialise(metadata), client.profile |
371 | 401 |
372 def _get_archives(self, service, extra_ser, profile_key): | 402 def _get_archives(self, service, extra_ser, profile_key): |
373 """ | 403 """ |
374 @return: tuple with: | 404 @return: tuple with: |
418 @return: the server response as a Deferred domish.Element | 448 @return: the server response as a Deferred domish.Element |
419 """ | 449 """ |
420 # http://xmpp.org/extensions/xep-0313.html#prefs | 450 # http://xmpp.org/extensions/xep-0313.html#prefs |
421 return client._mam.queryPrefs(service) | 451 return client._mam.queryPrefs(service) |
422 | 452 |
423 def _set_prefs(self, service_s=None, default="roster", always=None, never=None, | 453 def _set_prefs( |
424 profile_key=C.PROF_KEY_NONE): | 454 self, |
455 service_s=None, | |
456 default="roster", | |
457 always=None, | |
458 never=None, | |
459 profile_key=C.PROF_KEY_NONE, | |
460 ): | |
425 service = jid.JID(service_s) if service_s else None | 461 service = jid.JID(service_s) if service_s else None |
426 always_jid = [jid.JID(entity) for entity in always] | 462 always_jid = [jid.JID(entity) for entity in always] |
427 never_jid = [jid.JID(entity) for entity in never] | 463 never_jid = [jid.JID(entity) for entity in never] |
428 # TODO: why not build here a MAMPrefs object instead of passing the args separately? | 464 # TODO: why not build here a MAMPrefs object instead of passing the args separately? |
429 return self.setPrefs(service, default, always_jid, never_jid, profile_key) | 465 return self.setPrefs(service, default, always_jid, never_jid, profile_key) |
461 self._last_stanza_id_d.addCallback( | 497 self._last_stanza_id_d.addCallback( |
462 lambda __: self.host.memory.storage.set_private_value( | 498 lambda __: self.host.memory.storage.set_private_value( |
463 namespace=mam.NS_MAM, | 499 namespace=mam.NS_MAM, |
464 key=KEY_LAST_STANZA_ID, | 500 key=KEY_LAST_STANZA_ID, |
465 value=stanza_id, | 501 value=stanza_id, |
466 profile=client.profile)) | 502 profile=client.profile, |
503 ) | |
504 ) | |
467 | 505 |
468 | 506 |
469 @implementer(disco.IDisco) | 507 @implementer(disco.IDisco) |
470 class LiberviaMAMClient(mam.MAMClient): | 508 class LiberviaMAMClient(mam.MAMClient): |
471 | 509 |
476 def host(self): | 514 def host(self): |
477 return self.parent.host_app | 515 return self.parent.host_app |
478 | 516 |
479 def connectionInitialized(self): | 517 def connectionInitialized(self): |
480 observer_xpath = MESSAGE_STANZA_ID.format( | 518 observer_xpath = MESSAGE_STANZA_ID.format( |
481 ns_stanza_id=self.host.ns_map['stanza_id']) | 519 ns_stanza_id=self.host.ns_map["stanza_id"] |
520 ) | |
482 self.xmlstream.addObserver( | 521 self.xmlstream.addObserver( |
483 observer_xpath, self.plugin_parent.on_message_stanza_id, client=self.parent | 522 observer_xpath, self.plugin_parent.on_message_stanza_id, client=self.parent |
484 ) | 523 ) |
485 | 524 |
486 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | 525 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): |