comparison libervia/backend/plugins/plugin_xep_0313.py @ 4270:0d7bb4df2343

Reformatted code base using black.
author Goffi <goffi@goffi.org>
date Wed, 19 Jun 2024 18:44:57 +0200
parents 1c30d574df2b
children
comparison
equal deleted inserted replaced
4269:64a85ce8be70 4270:0d7bb4df2343
71 self._sid = host.plugins["XEP-0359"] 71 self._sid = host.plugins["XEP-0359"]
72 # Deferred used to store last stanza id in order of reception 72 # Deferred used to store last stanza id in order of reception
73 self._last_stanza_id_d = defer.Deferred() 73 self._last_stanza_id_d = defer.Deferred()
74 self._last_stanza_id_d.callback(None) 74 self._last_stanza_id_d.callback(None)
75 host.bridge.add_method( 75 host.bridge.add_method(
76 "mam_get", ".plugin", in_sign='sss', 76 "mam_get",
77 out_sign='(a(sdssa{ss}a{ss}ss)ss)', method=self._get_archives, 77 ".plugin",
78 async_=True) 78 in_sign="sss",
79 out_sign="(a(sdssa{ss}a{ss}ss)ss)",
80 method=self._get_archives,
81 async_=True,
82 )
79 83
80 async def resume(self, client): 84 async def resume(self, client):
81 """Retrieve one2one messages received since the last we have in local storage""" 85 """Retrieve one2one messages received since the last we have in local storage"""
82 stanza_id_data = await self.host.memory.storage.get_privates( 86 stanza_id_data = await self.host.memory.storage.get_privates(
83 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile) 87 mam.NS_MAM, [KEY_LAST_STANZA_ID], profile=client.profile
88 )
84 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID) 89 stanza_id = stanza_id_data.get(KEY_LAST_STANZA_ID)
85 rsm_req = None 90 rsm_req = None
86 if stanza_id is None: 91 if stanza_id is None:
87 log.info("can't retrieve last stanza ID, checking history") 92 log.info("can't retrieve last stanza ID, checking history")
88 last_mess = await self.host.memory.history_get( 93 last_mess = await self.host.memory.history_get(
89 None, None, limit=1, filters={'not_types': C.MESS_TYPE_GROUPCHAT, 94 None,
90 'last_stanza_id': True}, 95 None,
91 profile=client.profile) 96 limit=1,
97 filters={"not_types": C.MESS_TYPE_GROUPCHAT, "last_stanza_id": True},
98 profile=client.profile,
99 )
92 if not last_mess: 100 if not last_mess:
93 log.info(_("It seems that we have no MAM history yet")) 101 log.info(_("It seems that we have no MAM history yet"))
94 stanza_id = None 102 stanza_id = None
95 rsm_req = rsm.RSMRequest(max_=50, before="") 103 rsm_req = rsm.RSMRequest(max_=50, before="")
96 else: 104 else:
97 stanza_id = last_mess[0][-1]['stanza_id'] 105 stanza_id = last_mess[0][-1]["stanza_id"]
98 if rsm_req is None: 106 if rsm_req is None:
99 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id) 107 rsm_req = rsm.RSMRequest(max_=100, after=stanza_id)
100 mam_req = mam.MAMRequest(rsm_=rsm_req) 108 mam_req = mam.MAMRequest(rsm_=rsm_req)
101 complete = False 109 complete = False
102 count = 0 110 count = 0
103 while not complete: 111 while not complete:
104 try: 112 try:
105 mam_data = await self.get_archives(client, mam_req, 113 mam_data = await self.get_archives(
106 service=client.jid.userhostJID()) 114 client, mam_req, service=client.jid.userhostJID()
115 )
107 except StanzaError as e: 116 except StanzaError as e:
108 log.warning( 117 log.warning(
109 f"Can't retrieve MAM archives: {e}\n" 118 f"Can't retrieve MAM archives: {e}\n"
110 f"{xml_tools.pp_elt(mam_req.toElement())}\n" 119 f"{xml_tools.pp_elt(mam_req.toElement())}\n"
111 f"{xml_tools.pp_elt(e.stanza)}" 120 f"{xml_tools.pp_elt(e.stanza)}"
112 ) 121 )
113 return 122 return
114 except Exception as e: 123 except Exception as e:
115 log.exception( 124 log.exception(f"Can't retrieve retrieve MAM archive")
116 f"Can't retrieve retrieve MAM archive"
117 )
118 return 125 return
119 elt_list, rsm_response, mam_response = mam_data 126 elt_list, rsm_response, mam_response = mam_data
120 complete = mam_response["complete"] 127 complete = mam_response["complete"]
121 # we update MAM request for next iteration 128 # we update MAM request for next iteration
122 mam_req.rsm.after = rsm_response.last 129 mam_req.rsm.after = rsm_response.last
128 count += len(elt_list) 135 count += len(elt_list)
129 136
130 for idx, mess_elt in enumerate(elt_list): 137 for idx, mess_elt in enumerate(elt_list):
131 try: 138 try:
132 fwd_message_elt = self.get_message_from_result( 139 fwd_message_elt = self.get_message_from_result(
133 client, mess_elt, mam_req) 140 client, mess_elt, mam_req
141 )
134 except exceptions.DataError: 142 except exceptions.DataError:
135 continue 143 continue
136 144
137 try: 145 try:
138 destinee = jid.JID(fwd_message_elt['to']) 146 destinee = jid.JID(fwd_message_elt["to"])
139 except KeyError: 147 except KeyError:
140 log.warning(_('missing "to" attribute in forwarded message')) 148 log.warning(_('missing "to" attribute in forwarded message'))
141 destinee = client.jid 149 destinee = client.jid
142 if destinee.userhostJID() == client.jid.userhostJID(): 150 if destinee.userhostJID() == client.jid.userhostJID():
143 # message to use, we insert the forwarded message in the normal 151 # message to use, we insert the forwarded message in the normal
144 # workflow 152 # workflow
145 client.xmlstream.dispatch(fwd_message_elt) 153 client.xmlstream.dispatch(fwd_message_elt)
146 else: 154 else:
147 # this message should be from us, we just add it to history 155 # this message should be from us, we just add it to history
148 try: 156 try:
149 from_jid = jid.JID(fwd_message_elt['from']) 157 from_jid = jid.JID(fwd_message_elt["from"])
150 except KeyError: 158 except KeyError:
151 log.warning(_('missing "from" attribute in forwarded message')) 159 log.warning(_('missing "from" attribute in forwarded message'))
152 from_jid = client.jid 160 from_jid = client.jid
153 if from_jid.userhostJID() != client.jid.userhostJID(): 161 if from_jid.userhostJID() != client.jid.userhostJID():
154 log.warning(_( 162 log.warning(
155 'was expecting a message sent by our jid, but this one if ' 163 _(
156 'from {from_jid}, ignoring\n{xml}').format( 164 "was expecting a message sent by our jid, but this one if "
157 from_jid=from_jid.full(), xml=mess_elt.toXml())) 165 "from {from_jid}, ignoring\n{xml}"
166 ).format(from_jid=from_jid.full(), xml=mess_elt.toXml())
167 )
158 continue 168 continue
159 # adding message to history 169 # adding message to history
160 mess_data = client.messageProt.parse_message(fwd_message_elt) 170 mess_data = client.messageProt.parse_message(fwd_message_elt)
161 try: 171 try:
162 await client.messageProt.add_to_history(mess_data) 172 await client.messageProt.add_to_history(mess_data)
163 except exceptions.CancelError as e: 173 except exceptions.CancelError as e:
164 log.warning( 174 log.warning(
165 "message has not been added to history: {e}".format(e=e)) 175 "message has not been added to history: {e}".format(e=e)
176 )
166 except Exception as e: 177 except Exception as e:
167 log.error( 178 log.error(
168 "can't add message to history: {e}\n{xml}" 179 "can't add message to history: {e}\n{xml}".format(
169 .format(e=e, xml=mess_elt.toXml())) 180 e=e, xml=mess_elt.toXml()
181 )
182 )
170 if complete and idx == len(elt_list) - 1: 183 if complete and idx == len(elt_list) - 1:
171 # We are at the last message from archive, we store the ID to not 184 # We are at the last message from archive, we store the ID to not
172 # ask again the same messages next time. 185 # ask again the same messages next time.
173 try: 186 try:
174 stanza_id = mess_elt.result["id"] 187 stanza_id = mess_elt.result["id"]
175 await self.host.memory.storage.set_private_value( 188 await self.host.memory.storage.set_private_value(
176 namespace=mam.NS_MAM, 189 namespace=mam.NS_MAM,
177 key=KEY_LAST_STANZA_ID, 190 key=KEY_LAST_STANZA_ID,
178 value=stanza_id, 191 value=stanza_id,
179 profile=client.profile 192 profile=client.profile,
180 ) 193 )
181 except Exception: 194 except Exception:
182 log.exception("Can't store last stanza ID") 195 log.exception("Can't store last stanza ID")
183 196
184 if not count: 197 if not count:
185 log.info(_("We have received no message while offline")) 198 log.info(_("We have received no message while offline"))
186 else: 199 else:
187 log.info(_("We have received {num_mess} message(s) while offline.") 200 log.info(
188 .format(num_mess=count)) 201 _("We have received {num_mess} message(s) while offline.").format(
202 num_mess=count
203 )
204 )
189 205
190 def profile_connected(self, client): 206 def profile_connected(self, client):
191 defer.ensureDeferred(self.resume(client)) 207 defer.ensureDeferred(self.resume(client))
192 208
193 def get_handler(self, client): 209 def get_handler(self, client):
207 for arg in ("start", "end"): 223 for arg in ("start", "end"):
208 try: 224 try:
209 value = extra.pop(MAM_PREFIX + arg) 225 value = extra.pop(MAM_PREFIX + arg)
210 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc()) 226 form_args[arg] = datetime.fromtimestamp(float(value), tz.tzutc())
211 except (TypeError, ValueError): 227 except (TypeError, ValueError):
212 log.warning("Bad value for {arg} filter ({value}), ignoring".format( 228 log.warning(
213 arg=arg, value=value)) 229 "Bad value for {arg} filter ({value}), ignoring".format(
230 arg=arg, value=value
231 )
232 )
214 except KeyError: 233 except KeyError:
215 continue 234 continue
216 235
217 try: 236 try:
218 form_args["with_jid"] = jid.JID(extra.pop( 237 form_args["with_jid"] = jid.JID(extra.pop(MAM_PREFIX + "with"))
219 MAM_PREFIX + "with")) 238 except jid.InvalidFormat:
220 except (jid.InvalidFormat):
221 log.warning("Bad value for jid filter") 239 log.warning("Bad value for jid filter")
222 except KeyError: 240 except KeyError:
223 pass 241 pass
224 242
225 for name, value in extra.items(): 243 for name, value in extra.items():
226 if name.startswith(FILTER_PREFIX): 244 if name.startswith(FILTER_PREFIX):
227 var = name[len(FILTER_PREFIX):] 245 var = name[len(FILTER_PREFIX) :]
228 extra_fields = form_args.setdefault("extra_fields", []) 246 extra_fields = form_args.setdefault("extra_fields", [])
229 extra_fields.append(data_form.Field(var=var, value=value)) 247 extra_fields.append(data_form.Field(var=var, value=value))
230 248
231 for arg in ("node", "query_id"): 249 for arg in ("node", "query_id"):
232 try: 250 try:
262 @param service(jid.JID, None): MAM service where the request has been sent 280 @param service(jid.JID, None): MAM service where the request has been sent
263 None if it's user server 281 None if it's user server
264 @return domish.Element): <message/> that can be used directly with onMessage 282 @return domish.Element): <message/> that can be used directly with onMessage
265 """ 283 """
266 if mess_elt.name != "message": 284 if mess_elt.name != "message":
267 log.warning("unexpected stanza in archive: {xml}".format( 285 log.warning(
268 xml=mess_elt.toXml())) 286 "unexpected stanza in archive: {xml}".format(xml=mess_elt.toXml())
287 )
269 raise exceptions.DataError("Invalid element") 288 raise exceptions.DataError("Invalid element")
270 service_jid = client.jid.userhostJID() if service is None else service 289 service_jid = client.jid.userhostJID() if service is None else service
271 mess_from = mess_elt.getAttribute("from") or client.jid.userhost() 290 mess_from = mess_elt.getAttribute("from") or client.jid.userhost()
272 # we check that the message has been sent by the right service 291 # we check that the message has been sent by the right service
273 # if service is None (i.e. message expected from our own server) 292 # if service is None (i.e. message expected from our own server)
274 # from can be server jid or user's bare jid 293 # from can be server jid or user's bare jid
275 if (mess_from != service_jid.full() 294 if mess_from != service_jid.full() and not (
276 and not (service is None and mess_from == client.jid.host)): 295 service is None and mess_from == client.jid.host
277 log.error("Message is not from our server, something went wrong: " 296 ):
278 "{xml}".format(xml=mess_elt.toXml())) 297 log.error(
298 "Message is not from our server, something went wrong: "
299 "{xml}".format(xml=mess_elt.toXml())
300 )
279 raise exceptions.DataError("Invalid element") 301 raise exceptions.DataError("Invalid element")
280 try: 302 try:
281 result_elt = next(mess_elt.elements(mam.NS_MAM, "result")) 303 result_elt = next(mess_elt.elements(mam.NS_MAM, "result"))
282 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, "forwarded")) 304 forwarded_elt = next(result_elt.elements(C.NS_FORWARD, "forwarded"))
283 try: 305 try:
285 except StopIteration: 307 except StopIteration:
286 # delay_elt is not mandatory 308 # delay_elt is not mandatory
287 delay_elt = None 309 delay_elt = None
288 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message")) 310 fwd_message_elt = next(forwarded_elt.elements(C.NS_CLIENT, "message"))
289 except StopIteration: 311 except StopIteration:
290 log.warning("Invalid message received from MAM: {xml}".format( 312 log.warning(
291 xml=mess_elt.toXml())) 313 "Invalid message received from MAM: {xml}".format(xml=mess_elt.toXml())
314 )
292 raise exceptions.DataError("Invalid element") 315 raise exceptions.DataError("Invalid element")
293 else: 316 else:
294 if not result_elt["queryid"] == mam_req.query_id: 317 if not result_elt["queryid"] == mam_req.query_id:
295 log.error("Unexpected query id (was expecting {query_id}): {xml}" 318 log.error(
296 .format(query_id=mam.query_id, xml=mess_elt.toXml())) 319 "Unexpected query id (was expecting {query_id}): {xml}".format(
320 query_id=mam.query_id, xml=mess_elt.toXml()
321 )
322 )
297 raise exceptions.DataError("Invalid element") 323 raise exceptions.DataError("Invalid element")
298 stanza_id = self._sid.get_stanza_id(fwd_message_elt, 324 stanza_id = self._sid.get_stanza_id(fwd_message_elt, service_jid)
299 service_jid)
300 if stanza_id is None: 325 if stanza_id is None:
301 # not stanza-id element is present, we add one so message 326 # not stanza-id element is present, we add one so message
302 # will be archived with it, and we won't request several times 327 # will be archived with it, and we won't request several times
303 # the same MAM achive 328 # the same MAM achive
304 try: 329 try:
305 stanza_id = result_elt["id"] 330 stanza_id = result_elt["id"]
306 except AttributeError: 331 except AttributeError:
307 log.warning('Invalid MAM result: missing "id" attribute: {xml}' 332 log.warning(
308 .format(xml=result_elt.toXml())) 333 'Invalid MAM result: missing "id" attribute: {xml}'.format(
334 xml=result_elt.toXml()
335 )
336 )
309 raise exceptions.DataError("Invalid element") 337 raise exceptions.DataError("Invalid element")
310 self._sid.add_stanza_id(client, fwd_message_elt, stanza_id, by=service_jid) 338 self._sid.add_stanza_id(
339 client, fwd_message_elt, stanza_id, by=service_jid
340 )
311 341
312 if delay_elt is not None: 342 if delay_elt is not None:
313 fwd_message_elt.addChild(delay_elt) 343 fwd_message_elt.addChild(delay_elt)
314 344
315 return fwd_message_elt 345 return fwd_message_elt
343 try: 373 try:
344 fin_elt = next(iq_result.elements(mam.NS_MAM, "fin")) 374 fin_elt = next(iq_result.elements(mam.NS_MAM, "fin"))
345 except StopIteration: 375 except StopIteration:
346 raise exceptions.DataError("Invalid MAM result") 376 raise exceptions.DataError("Invalid MAM result")
347 377
348 mam_response = {"complete": C.bool(fin_elt.getAttribute("complete", C.BOOL_FALSE)), 378 mam_response = {
349 "stable": C.bool(fin_elt.getAttribute("stable", C.BOOL_TRUE))} 379 "complete": C.bool(fin_elt.getAttribute("complete", C.BOOL_FALSE)),
380 "stable": C.bool(fin_elt.getAttribute("stable", C.BOOL_TRUE)),
381 }
350 382
351 try: 383 try:
352 rsm_response = rsm.RSMResponse.fromElement(fin_elt) 384 rsm_response = rsm.RSMResponse.fromElement(fin_elt)
353 except rsm.RSMNotFoundError: 385 except rsm.RSMNotFoundError:
354 rsm_response = None 386 rsm_response = None
357 389
358 def serialize_archive_result(self, data, client, mam_req, service): 390 def serialize_archive_result(self, data, client, mam_req, service):
359 elt_list, rsm_response, mam_response = data 391 elt_list, rsm_response, mam_response = data
360 mess_list = [] 392 mess_list = []
361 for elt in elt_list: 393 for elt in elt_list:
362 fwd_message_elt = self.get_message_from_result(client, elt, mam_req, 394 fwd_message_elt = self.get_message_from_result(
363 service=service) 395 client, elt, mam_req, service=service
396 )
364 mess_data = client.messageProt.parse_message(fwd_message_elt) 397 mess_data = client.messageProt.parse_message(fwd_message_elt)
365 mess_list.append(client.message_get_bridge_args(mess_data)) 398 mess_list.append(client.message_get_bridge_args(mess_data))
366 metadata = { 399 metadata = {"rsm": self._rsm.response2dict(rsm_response), "mam": mam_response}
367 'rsm': self._rsm.response2dict(rsm_response),
368 'mam': mam_response
369 }
370 return mess_list, data_format.serialise(metadata), client.profile 400 return mess_list, data_format.serialise(metadata), client.profile
371 401
372 def _get_archives(self, service, extra_ser, profile_key): 402 def _get_archives(self, service, extra_ser, profile_key):
373 """ 403 """
374 @return: tuple with: 404 @return: tuple with:
418 @return: the server response as a Deferred domish.Element 448 @return: the server response as a Deferred domish.Element
419 """ 449 """
420 # http://xmpp.org/extensions/xep-0313.html#prefs 450 # http://xmpp.org/extensions/xep-0313.html#prefs
421 return client._mam.queryPrefs(service) 451 return client._mam.queryPrefs(service)
422 452
423 def _set_prefs(self, service_s=None, default="roster", always=None, never=None, 453 def _set_prefs(
424 profile_key=C.PROF_KEY_NONE): 454 self,
455 service_s=None,
456 default="roster",
457 always=None,
458 never=None,
459 profile_key=C.PROF_KEY_NONE,
460 ):
425 service = jid.JID(service_s) if service_s else None 461 service = jid.JID(service_s) if service_s else None
426 always_jid = [jid.JID(entity) for entity in always] 462 always_jid = [jid.JID(entity) for entity in always]
427 never_jid = [jid.JID(entity) for entity in never] 463 never_jid = [jid.JID(entity) for entity in never]
428 # TODO: why not build here a MAMPrefs object instead of passing the args separately? 464 # TODO: why not build here a MAMPrefs object instead of passing the args separately?
429 return self.setPrefs(service, default, always_jid, never_jid, profile_key) 465 return self.setPrefs(service, default, always_jid, never_jid, profile_key)
461 self._last_stanza_id_d.addCallback( 497 self._last_stanza_id_d.addCallback(
462 lambda __: self.host.memory.storage.set_private_value( 498 lambda __: self.host.memory.storage.set_private_value(
463 namespace=mam.NS_MAM, 499 namespace=mam.NS_MAM,
464 key=KEY_LAST_STANZA_ID, 500 key=KEY_LAST_STANZA_ID,
465 value=stanza_id, 501 value=stanza_id,
466 profile=client.profile)) 502 profile=client.profile,
503 )
504 )
467 505
468 506
469 @implementer(disco.IDisco) 507 @implementer(disco.IDisco)
470 class LiberviaMAMClient(mam.MAMClient): 508 class LiberviaMAMClient(mam.MAMClient):
471 509
476 def host(self): 514 def host(self):
477 return self.parent.host_app 515 return self.parent.host_app
478 516
479 def connectionInitialized(self): 517 def connectionInitialized(self):
480 observer_xpath = MESSAGE_STANZA_ID.format( 518 observer_xpath = MESSAGE_STANZA_ID.format(
481 ns_stanza_id=self.host.ns_map['stanza_id']) 519 ns_stanza_id=self.host.ns_map["stanza_id"]
520 )
482 self.xmlstream.addObserver( 521 self.xmlstream.addObserver(
483 observer_xpath, self.plugin_parent.on_message_stanza_id, client=self.parent 522 observer_xpath, self.plugin_parent.on_message_stanza_id, client=self.parent
484 ) 523 )
485 524
486 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 525 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):