Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0033.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children | 530f86f078cc |
comparison
equal
deleted
inserted
replaced
4305:4cd4922de876 | 4306:94e0968987cd |
---|---|
1 #!/usr/bin/env python3 | 1 #!/usr/bin/env python3 |
2 | 2 |
3 | 3 # Libervia plugin for Extended Stanza Addressing (XEP-0033) |
4 # SAT plugin for Extended Stanza Addressing (xep-0033) | 4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) |
5 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) | 5 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) |
6 | 6 |
7 # This program is free software: you can redistribute it and/or modify | 7 # This program is free software: you can redistribute it and/or modify |
8 # it under the terms of the GNU Affero General Public License as published by | 8 # it under the terms of the GNU Affero General Public License as published by |
9 # the Free Software Foundation, either version 3 of the License, or | 9 # the Free Software Foundation, either version 3 of the License, or |
15 # GNU Affero General Public License for more details. | 15 # GNU Affero General Public License for more details. |
16 | 16 |
17 # You should have received a copy of the GNU Affero General Public License | 17 # You should have received a copy of the GNU Affero General Public License |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 18 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
19 | 19 |
20 from libervia.backend.core.i18n import _ | 20 from typing import Iterator, Literal, Self |
21 from libervia.backend.core.constants import Const as C | 21 |
22 from libervia.backend.core.log import getLogger | 22 from pydantic import BaseModel, model_validator |
23 | 23 from twisted.internet import defer |
24 log = getLogger(__name__) | 24 from twisted.words.protocols.jabber import jid |
25 from libervia.backend.core import exceptions | 25 from twisted.words.protocols.jabber.xmlstream import XMPPHandler |
26 from twisted.words.xish import domish | |
26 from wokkel import disco, iwokkel | 27 from wokkel import disco, iwokkel |
27 from zope.interface import implementer | 28 from zope.interface import implementer |
28 from twisted.words.protocols.jabber.jid import JID | 29 |
29 from twisted.python import failure | 30 from libervia.backend.core import exceptions |
30 import copy | 31 from libervia.backend.core.constants import Const as C |
31 | 32 from libervia.backend.core.core_types import SatXMPPEntity |
32 try: | 33 from libervia.backend.core.i18n import _ |
33 from twisted.words.protocols.xmlstream import XMPPHandler | 34 from libervia.backend.core.log import getLogger |
34 except ImportError: | 35 from libervia.backend.models.core import MessageData |
35 from wokkel.subprotocols import XMPPHandler | 36 from libervia.backend.models.types import JIDType |
36 from twisted.words.xish import domish | |
37 from twisted.internet import defer | |
38 | |
39 from libervia.backend.tools import trigger | 37 from libervia.backend.tools import trigger |
40 from time import time | 38 from libervia.backend.tools.xml_tools import element_copy |
39 | |
40 log = getLogger(__name__) | |
41 | |
41 | 42 |
42 # TODO: fix Prosody "addressing" plugin to leave the concerned bcc according to the spec: | 43 # TODO: fix Prosody "addressing" plugin to leave the concerned bcc according to the spec: |
44 # http://xmpp.org/extensions/xep-0033.html#addr-type-bcc "This means that the server | |
45 # MUST remove these addresses before the stanza is delivered to anyone other than the | |
46 # given bcc addressee or the multicast service of the bcc addressee." | |
43 # | 47 # |
44 # http://xmpp.org/extensions/xep-0033.html#addr-type-bcc | 48 # http://xmpp.org/extensions/xep-0033.html#multicast "Each 'bcc' recipient MUST receive |
45 # "This means that the server MUST remove these addresses before the stanza is delivered to anyone other than the given bcc addressee or the multicast service of the bcc addressee." | 49 # only the <address type='bcc'/> associated with that addressee." |
46 # | 50 |
47 # http://xmpp.org/extensions/xep-0033.html#multicast | 51 # TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports |
48 # "Each 'bcc' recipient MUST receive only the <address type='bcc'/> associated with that addressee." | 52 # this XEP |
49 | 53 |
50 # TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports this XEP | |
51 | |
52 | |
53 NS_XMPP_CLIENT = "jabber:client" | |
54 NS_ADDRESS = "http://jabber.org/protocol/address" | |
55 ATTRIBUTES = ["jid", "uri", "node", "desc", "delivered", "type"] | |
56 ADDRESS_TYPES = ["to", "cc", "bcc", "replyto", "replyroom", "noreply"] | |
57 | 54 |
58 PLUGIN_INFO = { | 55 PLUGIN_INFO = { |
59 C.PI_NAME: "Extended Stanza Addressing Protocol Plugin", | 56 C.PI_NAME: "Extended Stanza Addressing Protocol Plugin", |
60 C.PI_IMPORT_NAME: "XEP-0033", | 57 C.PI_IMPORT_NAME: "XEP-0033", |
61 C.PI_TYPE: "XEP", | 58 C.PI_TYPE: "XEP", |
59 C.PI_MODES: C.PLUG_MODE_BOTH, | |
62 C.PI_PROTOCOLS: ["XEP-0033"], | 60 C.PI_PROTOCOLS: ["XEP-0033"], |
63 C.PI_DEPENDENCIES: [], | 61 C.PI_DEPENDENCIES: [], |
64 C.PI_MAIN: "XEP_0033", | 62 C.PI_MAIN: "XEP_0033", |
65 C.PI_HANDLER: "yes", | 63 C.PI_HANDLER: "yes", |
66 C.PI_DESCRIPTION: _("""Implementation of Extended Stanza Addressing"""), | 64 C.PI_DESCRIPTION: _( |
65 "Efficiently send messages to several recipients, using metadata to transmit " | |
66 "them with main recipients (to), carbon copies (cc), and blind carbon copies " | |
67 "(bcc) fields in a similar manner as for email." | |
68 ), | |
67 } | 69 } |
68 | 70 |
69 | 71 NS_ADDRESS = "http://jabber.org/protocol/address" |
70 class XEP_0033(object): | 72 RECIPIENT_FIELDS = ("to", "cc", "bcc") |
73 | |
74 | |
75 class AddressType(BaseModel): | |
76 jid: JIDType | None = None | |
77 desc: str | None = None | |
78 delivered: bool | None = None | |
79 | |
80 def set_attribute(self, address_elt: domish.Element) -> None: | |
81 """Set <address> element attribute from this instance's data.""" | |
82 if self.jid: | |
83 address_elt["jid"] = str(self.jid) | |
84 if self.desc: | |
85 address_elt["desc"] = self.desc | |
86 if self.delivered is not None: | |
87 address_elt["delivered"] = "true" if self.delivered else "false" | |
88 | |
89 @classmethod | |
90 def from_element(cls, address_elt: domish.Element) -> Self: | |
91 """Create an AddressType instance from an <address> element. | |
92 | |
93 @param address_elt: The <address> element. | |
94 @return: AddressType instance. | |
95 """ | |
96 if address_elt.uri != NS_ADDRESS or address_elt.name != "address": | |
97 raise ValueError("Element is not an <address> element") | |
98 | |
99 kwargs = {} | |
100 if address_elt.hasAttribute("jid"): | |
101 kwargs["jid"] = jid.JID(address_elt["jid"]) | |
102 if address_elt.hasAttribute("desc"): | |
103 kwargs["desc"] = address_elt["desc"] | |
104 if address_elt.hasAttribute("delivered"): | |
105 kwargs["delivered"] = address_elt["delivered"] == "true" | |
106 return cls(**kwargs) | |
107 | |
108 def to_element(self) -> domish.Element: | |
109 """Build the <address> element from this instance's data. | |
110 | |
111 @return: <address> element. | |
112 """ | |
113 address_elt = domish.Element((NS_ADDRESS, "address")) | |
114 self.set_attribute(address_elt) | |
115 return address_elt | |
116 | |
117 | |
118 class AddressesData(BaseModel): | |
119 to: list[AddressType] | None = None | |
120 cc: list[AddressType] | None = None | |
121 bcc: list[AddressType] | None = None | |
122 replyto: list[AddressType] | None = None | |
123 replyroom: list[AddressType] | None = None | |
124 noreply: bool | None = None | |
125 ofrom: JIDType | None = None | |
126 | |
127 @model_validator(mode="after") | |
128 def check_minimal_data(self) -> Self: | |
129 assert self.to or self.cc or self.bcc, "At least one recipent must be set" | |
130 if self.noreply and (self.replyto is not None or self.replyroom is not None): | |
131 log.warning( | |
132 '"noreply" can\'t be used with "replyto" or "replyroom". Ignoring reply ' | |
133 f'fields ({self.replyto=}, {self.replyroom=}).' | |
134 ) | |
135 # We reset instead of raising a ValueError, because this can happen in | |
136 # incoming messages and we should not discard them. | |
137 self.replyto = self.replyroom = None | |
138 return self | |
139 | |
140 @property | |
141 def addresses(self) -> Iterator[AddressType]: | |
142 """Iterator over all recipient addresses.""" | |
143 for field in RECIPIENT_FIELDS: | |
144 addresses = getattr(self, field) | |
145 if not addresses: | |
146 continue | |
147 yield from addresses | |
148 | |
149 @staticmethod | |
150 def add_address_element( | |
151 addresses_elt: domish.Element, type_: str, address: AddressType | None | |
152 ) -> None: | |
153 """Add <address> element to parent <addresses> element. | |
154 | |
155 @param addresses_elt: Parent <addresses> element. | |
156 @param type_: Value of "type" attribute. | |
157 @param address: Address data. | |
158 """ | |
159 | |
160 address_elt = addresses_elt.addElement("address") | |
161 address_elt["type"] = type_ | |
162 if address is not None: | |
163 address.set_attribute(address_elt) | |
164 | |
165 @classmethod | |
166 def from_element(cls, addresses_elt: domish.Element) -> Self: | |
167 """Create an AddressesData instance from an <addresses> element. | |
168 | |
169 @param addresses_elt: The <addresses> element or its direct parent. | |
170 @return: AddressesData instance. | |
171 @raise NotFound: No <addresses> element found. | |
172 """ | |
173 if addresses_elt.uri != NS_ADDRESS or addresses_elt.name != "addresses": | |
174 child_addresses_elt = next( | |
175 addresses_elt.elements(NS_ADDRESS, "addresses"), None | |
176 ) | |
177 if child_addresses_elt is None: | |
178 raise exceptions.NotFound("<addresses> element not found") | |
179 else: | |
180 addresses_elt = child_addresses_elt | |
181 | |
182 kwargs = {} | |
183 for address_elt in addresses_elt.elements(NS_ADDRESS, "address"): | |
184 address_type = address_elt.getAttribute("type") | |
185 if address_type in ("to", "cc", "bcc", "replyto", "replyroom"): | |
186 try: | |
187 address = AddressType.from_element(address_elt) | |
188 except Exception as e: | |
189 log.warning(f"Invalid <address> element: {e}\n{address_elt.toXml()}") | |
190 else: | |
191 kwargs.setdefault(address_type, []).append(address) | |
192 elif address_type == "noreply": | |
193 kwargs["noreply"] = True | |
194 elif address_type == "ofrom": | |
195 kwargs["ofrom"] = jid.JID(address_elt["jid"]) | |
196 else: | |
197 log.warning( | |
198 f"Invalid <address> element: unknonwn type {address_type!r}\n" | |
199 f"{address_elt.toXml()}" | |
200 ) | |
201 return cls(**kwargs) | |
202 | |
203 def to_element(self) -> domish.Element: | |
204 """Build the <addresses> element from this instance's data. | |
205 | |
206 @return: <addresses> element. | |
207 """ | |
208 addresses_elt = domish.Element((NS_ADDRESS, "addresses")) | |
209 | |
210 if self.to: | |
211 for address in self.to: | |
212 self.add_address_element(addresses_elt, "to", address) | |
213 if self.cc: | |
214 for address in self.cc: | |
215 self.add_address_element(addresses_elt, "cc", address) | |
216 if self.bcc: | |
217 for address in self.bcc: | |
218 self.add_address_element(addresses_elt, "bcc", address) | |
219 if self.replyto: | |
220 for address in self.replyto: | |
221 self.add_address_element(addresses_elt, "replyto", address) | |
222 if self.replyroom: | |
223 for address in self.replyroom: | |
224 self.add_address_element(addresses_elt, "replyroom", address) | |
225 if self.noreply: | |
226 self.add_address_element(addresses_elt, "noreply", None) | |
227 if self.ofrom is not None: | |
228 address_elt = addresses_elt.addElement("address") | |
229 address_elt["type"] = "ofrom" | |
230 address_elt["jid"] = self.ofrom.full() | |
231 | |
232 return addresses_elt | |
233 | |
234 | |
235 class XEP_0033: | |
71 """ | 236 """ |
72 Implementation for XEP 0033 | 237 Implementation for XEP-0033 |
73 """ | 238 """ |
74 | 239 |
75 def __init__(self, host): | 240 def __init__(self, host): |
76 log.info(_("Extended Stanza Addressing plugin initialization")) | 241 log.info(_("Extended Stanza Addressing plugin initialization")) |
77 self.host = host | 242 self.host = host |
243 host.register_namespace("address", NS_ADDRESS) | |
78 self.internal_data = {} | 244 self.internal_data = {} |
79 host.trigger.add( | 245 host.trigger.add( |
80 "sendMessage", self.send_message_trigger, trigger.TriggerManager.MIN_PRIORITY | 246 "sendMessage", |
247 self.send_message_trigger, | |
248 # We want this trigger to be the last one, as it may send messages. | |
249 trigger.TriggerManager.MIN_PRIORITY, | |
250 ) | |
251 host.trigger.add( | |
252 "sendMessageComponent", | |
253 self.send_message_trigger, | |
254 # We want this trigger to be the last one, as it may send messages. | |
255 trigger.TriggerManager.MIN_PRIORITY, | |
81 ) | 256 ) |
82 host.trigger.add("message_received", self.message_received_trigger) | 257 host.trigger.add("message_received", self.message_received_trigger) |
83 | 258 |
259 async def _stop_if_all_delivered( | |
260 self, client: SatXMPPEntity, mess_data: MessageData, addr_data: AddressesData | |
261 ) -> None: | |
262 """Check if all message have been delivered, and stop workflow in this case. | |
263 | |
264 If workflow is stopped, message will be added to history and a signal will be sent | |
265 to bridge. | |
266 @param client: Client session. | |
267 @param mess_data: Message data. | |
268 @param addr_data: Addresses data. | |
269 | |
270 @raise exceptions.CancelError: All message have been delivered and workflow is | |
271 terminated. | |
272 """ | |
273 if all(a.delivered for a in addr_data.addresses): | |
274 await client.message_add_to_history(mess_data) | |
275 await client.message_send_to_bridge(mess_data) | |
276 raise exceptions.CancelError( | |
277 f"Message has been delivered by {PLUGIN_INFO['C.PI_NAME']}." | |
278 ) | |
279 | |
280 async def _handle_addresses(self, client, mess_data: MessageData) -> MessageData: | |
281 """Handle Extended Stanza Addressing metadata for outgoing messages.""" | |
282 if not "addresses" in mess_data["extra"]: | |
283 return mess_data | |
284 | |
285 if mess_data["extra"].get(C.MESS_KEY_ENCRYPTED, False): | |
286 # TODO: Message must be encrypted for all recipients, and "to" correspond to | |
287 # multicast service in this case. | |
288 raise NotImplementedError( | |
289 "End-to-end encryption is not supported yet with multicast addressing." | |
290 ) | |
291 | |
292 data = AddressesData(**mess_data["extra"]["addresses"]) | |
293 recipients = set() | |
294 domains: dict[str, list[AddressType]] = {} | |
295 for address in data.addresses: | |
296 if address.jid is None: | |
297 raise NotImplementedError("Non JID addresses are not supported yet.") | |
298 recipients.add(address.jid) | |
299 try: | |
300 domains[address.jid.host].append(address) | |
301 except KeyError: | |
302 domains[address.jid.host] = [address] | |
303 | |
304 to_recipient_jid = mess_data["to"] | |
305 | |
306 if to_recipient_jid.user and to_recipient_jid not in recipients: | |
307 # If the main recipient is not a service (i.e. it has a "user" part), we want | |
308 # to move it to the XEP-0033's "to" addresses, so we can use the multicast | |
309 # service for <message> "to" attribute. | |
310 to_recipient_addr = AddressType(jid=to_recipient_jid) | |
311 if data.to is None: | |
312 data.to = [to_recipient_addr] | |
313 else: | |
314 data.to.insert(0, to_recipient_addr) | |
315 recipients.add(to_recipient_jid) | |
316 domains.setdefault(to_recipient_jid.host, []).append(to_recipient_addr) | |
317 | |
318 # XXX: If our server doesn't handle multicast, we don't check sub-services as | |
319 # requested in §2.2, because except if there is a special arrangement with the | |
320 # server, a service at a sub-domain can't send message in the name of the main | |
321 # domain (e.g. "multicast.example.org" can't send message from | |
322 # "juliet@example.org"). So the specification is a bit dubious here, and we only | |
323 # use the main server multicast feature if it's present. | |
324 if not await self.host.memory.disco.has_feature( | |
325 client, NS_ADDRESS, client.server_jid | |
326 ): | |
327 # No multicast service | |
328 log.warning( | |
329 _( | |
330 f"Server of {client.profile} does not support XEP-0033 " | |
331 f"({PLUGIN_INFO[C.PI_IMPORT_NAME]}). We will send all messages ourselves." | |
332 ) | |
333 ) | |
334 await self.deliver_messages(client, mess_data, data, domains) | |
335 await self._stop_if_all_delivered(client, mess_data, data) | |
336 else: | |
337 # XXX: We delived ourself to multicast services because it's not correctly | |
338 # handled by some multicast services, notably by Prosody mod_addresses. | |
339 # FIXME: Only do this workaround for known incomplete implementations. | |
340 # TODO: remove this workaround when known implementations have been completed. | |
341 if mess_data["to"] != client.server_jid: | |
342 # We send the message to our server which will distribute it to the right | |
343 # locations. The initial ``to`` jid has been moved to ``data.to`` above. | |
344 # FIXME: When sub-services issue is properly handler, a sub-service JID | |
345 # supporting multicast should be allowed here. | |
346 mess_data["to"] = client.server_jid | |
347 await self.deliver_messages( | |
348 client, mess_data, data, domains, multicast_only=True | |
349 ) | |
350 await self._stop_if_all_delivered(client, mess_data, data) | |
351 | |
352 message_elt = mess_data["xml"] | |
353 message_elt["to"] = str(mess_data["to"]) | |
354 message_elt.addChild(data.to_element()) | |
355 return mess_data | |
356 | |
357 async def deliver_messages( | |
358 self, | |
359 client, | |
360 mess_data: MessageData, | |
361 addr_data: AddressesData, | |
362 domains: dict[str, list[AddressType]], | |
363 multicast_only: bool = False, | |
364 ) -> None: | |
365 """Send messages to requested recipients. | |
366 | |
367 If a domain handles multicast, a single message will be send there. | |
368 @param client: Client session. | |
369 @param mess_data: Messsa data. | |
370 @param addr_data: XEP-0033 addresses data. | |
371 @param domains: Domain to addresses map. | |
372 Note that that the addresses instances in this argument must be the same as in | |
373 ``addr_data`` (There ``delivered`` status will be manipulated). | |
374 @param multicast_only: if True, only multicast domain will be delivered. | |
375 """ | |
376 # We'll modify delivered status, so we keep track here of addresses which have | |
377 # already be delivered. | |
378 already_delivered = [a for a in addr_data.addresses if a.delivered] | |
379 multicast_domains = set() | |
380 for domain, domain_addresses in domains.items(): | |
381 if domain == client.server_jid.host: | |
382 | |
383 # ``client.server_jid`` is discarded to avoid sending twice the same | |
384 # message. ``multicast_only`` flag is set when the server supports | |
385 # multicast, so the message will be sent to it at the end of the workflow. | |
386 continue | |
387 if len(domain_addresses) > 1: | |
388 # For domains with multiple recipients, we check if we they support | |
389 # multicast and so if we can deliver to them directly. | |
390 if await self.host.memory.disco.has_feature( | |
391 client, NS_ADDRESS, jid.JID(domain) | |
392 ): | |
393 multicast_domains.add(domain) | |
394 | |
395 # We remove bcc, they have a special handling. | |
396 bcc = addr_data.bcc or [] | |
397 addr_data.bcc = None | |
398 | |
399 # Mark all addresses as "delivered" upfront, even if some won't actually be sent | |
400 # by us (when multicast_only is set). This flag signals to multicast services that | |
401 # they shouldn't handle these addresses. We'll remove the "delivered" status from | |
402 # undelivered addresses post-delivery. | |
403 for address in addr_data.addresses: | |
404 address.delivered = True | |
405 | |
406 # First, we send multicast messages. | |
407 for domain in multicast_domains: | |
408 something_to_deliver = False | |
409 for address in domains[domain]: | |
410 if address in already_delivered: | |
411 continue | |
412 # We need to mark as non delivered, so the multicast service will deliver | |
413 # itself. | |
414 address.delivered = False | |
415 something_to_deliver = True | |
416 | |
417 if not something_to_deliver: | |
418 continue | |
419 | |
420 domain_bcc = [a for a in bcc if a.jid and a.jid.host == domain] | |
421 message_elt = element_copy(mess_data["xml"]) | |
422 # The service must only see BCC from its own domain. | |
423 addr_data.bcc = domain_bcc | |
424 message_elt.addChild(addr_data.to_element()) | |
425 message_elt["to"] = domain | |
426 await client.a_send(message_elt) | |
427 for address in domains[domain] + domain_bcc: | |
428 # Those addresses have now been delivered. | |
429 address.delivered = True | |
430 | |
431 if multicast_only: | |
432 # Only addresses from multicast domains must be marked as delivered. | |
433 for address in addr_data.addresses: | |
434 if ( | |
435 address.jid is not None | |
436 and address.jid.host not in multicast_domains | |
437 and address not in already_delivered | |
438 ): | |
439 address.delivered = None | |
440 | |
441 # We have delivered to all multicast services, we stop here. | |
442 # But first we need to restore BCC, without the delivered ones. | |
443 addr_data.bcc = [a for a in bcc if not a.delivered] | |
444 return | |
445 | |
446 # Then BCC | |
447 for address in bcc: | |
448 if address in already_delivered: | |
449 continue | |
450 if address.jid is None: | |
451 raise NotImplementedError( | |
452 f"Sending to non JID address is not supported yet" | |
453 ) | |
454 if address.jid.host in multicast_domains: | |
455 # Address has already be handled by a multicast domain | |
456 continue | |
457 message_elt = element_copy(mess_data["xml"]) | |
458 # The recipient must only get its own BCC | |
459 addr_data.bcc = [address] | |
460 message_elt.addChild(addr_data.to_element()) | |
461 message_elt["to"] = address.jid.full() | |
462 await client.a_send(message_elt) | |
463 | |
464 # BCC address must be removed. | |
465 addr_data.bcc = None | |
466 | |
467 # and finally, other ones. | |
468 message_elt = mess_data["xml"] | |
469 message_elt.addChild(addr_data.to_element()) | |
470 non_bcc_addresses = (addr_data.to or []) + (addr_data.cc or []) | |
471 for address in non_bcc_addresses: | |
472 if address in already_delivered: | |
473 continue | |
474 if address.jid is None: | |
475 raise NotImplementedError( | |
476 f"Sending to non JID address is not supported yet" | |
477 ) | |
478 if address.jid.host in multicast_domains: | |
479 # Multicast domains have already been delivered. | |
480 continue | |
481 message_elt["to"] = address.jid.full() | |
482 await client.a_send(message_elt) | |
483 | |
84 def send_message_trigger( | 484 def send_message_trigger( |
85 self, client, mess_data, pre_xml_treatments, post_xml_treatments | 485 self, client, mess_data, pre_xml_treatments, post_xml_treatments |
86 ): | 486 ) -> Literal[True]: |
87 """Process the XEP-0033 related data to be sent""" | 487 """Process the XEP-0033 related data to be sent""" |
88 profile = client.profile | 488 post_xml_treatments.addCallback( |
89 | 489 lambda mess_data: defer.ensureDeferred( |
90 def treatment(mess_data): | 490 self._handle_addresses(client, mess_data) |
91 if not "address" in mess_data["extra"]: | 491 ) |
492 ) | |
493 return True | |
494 | |
495 def message_received_trigger( | |
496 self, | |
497 client: SatXMPPEntity, | |
498 message_elt: domish.Element, | |
499 post_treat: defer.Deferred, | |
500 ) -> Literal[True]: | |
501 """Parse addresses information and add them to message data.""" | |
502 | |
503 try: | |
504 addresses = AddressesData.from_element(message_elt) | |
505 except exceptions.NotFound: | |
506 pass | |
507 else: | |
508 | |
509 def post_treat_addr(mess_data: MessageData): | |
510 mess_data["extra"]["addresses"] = addresses.model_dump( | |
511 mode="json", exclude_none=True | |
512 ) | |
92 return mess_data | 513 return mess_data |
93 | 514 |
94 def disco_callback(entities): | 515 post_treat.addCallback(post_treat_addr) |
95 if not entities: | |
96 log.warning( | |
97 _("XEP-0033 is being used but the server doesn't support it!") | |
98 ) | |
99 raise failure.Failure(exceptions.CancelError("Cancelled by XEP-0033")) | |
100 if mess_data["to"] not in entities: | |
101 expected = _(" or ").join([entity.userhost() for entity in entities]) | |
102 log.warning( | |
103 _( | |
104 "Stanzas using XEP-0033 should be addressed to %(expected)s, not %(current)s!" | |
105 ) | |
106 % {"expected": expected, "current": mess_data["to"]} | |
107 ) | |
108 log.warning( | |
109 _( | |
110 "TODO: addressing has been fixed by the backend... fix it in the frontend!" | |
111 ) | |
112 ) | |
113 mess_data["to"] = list(entities)[0].userhostJID() | |
114 element = mess_data["xml"].addElement("addresses", NS_ADDRESS) | |
115 entries = [ | |
116 entry.split(":") | |
117 for entry in mess_data["extra"]["address"].split("\n") | |
118 if entry != "" | |
119 ] | |
120 for type_, jid_ in entries: | |
121 element.addChild( | |
122 domish.Element( | |
123 (None, "address"), None, {"type": type_, "jid": jid_} | |
124 ) | |
125 ) | |
126 # when the prosody plugin is completed, we can immediately return mess_data from here | |
127 self.send_and_store_message(mess_data, entries, profile) | |
128 log.debug("XEP-0033 took over") | |
129 raise failure.Failure(exceptions.CancelError("Cancelled by XEP-0033")) | |
130 | |
131 d = self.host.find_features_set(client, [NS_ADDRESS]) | |
132 d.addCallbacks(disco_callback, lambda __: disco_callback(None)) | |
133 return d | |
134 | |
135 post_xml_treatments.addCallback(treatment) | |
136 return True | |
137 | |
138 def send_and_store_message(self, mess_data, entries, profile): | |
139 """Check if target servers support XEP-0033, send and store the messages | |
140 @return: a friendly failure to let the core know that we sent the message already | |
141 | |
142 Later we should be able to remove this method because: | |
143 # XXX: sending the messages should be done by the local server | |
144 # FIXME: for now we duplicate the messages in the history for each recipient, this should change | |
145 # FIXME: for now we duplicate the echoes to the sender, this should also change | |
146 Ideas: | |
147 - fix Prosody plugin to check if target server support the feature | |
148 - redesign the database to save only one entry to the database | |
149 - change the message_new signal to eventually pass more than one recipient | |
150 """ | |
151 client = self.host.get_client(profile) | |
152 | |
153 def send(mess_data, skip_send=False): | |
154 d = defer.Deferred() | |
155 if not skip_send: | |
156 d.addCallback( | |
157 lambda ret: defer.ensureDeferred(client.send_message_data(ret)) | |
158 ) | |
159 d.addCallback( | |
160 lambda ret: defer.ensureDeferred(client.message_add_to_history(ret)) | |
161 ) | |
162 d.addCallback(client.message_send_to_bridge) | |
163 d.addErrback(lambda failure: failure.trap(exceptions.CancelError)) | |
164 return d.callback(mess_data) | |
165 | |
166 def disco_callback(entities, to_jid_s): | |
167 history_data = copy.deepcopy(mess_data) | |
168 history_data["to"] = JID(to_jid_s) | |
169 history_data["xml"]["to"] = to_jid_s | |
170 if entities: | |
171 if entities not in self.internal_data[timestamp]: | |
172 sent_data = copy.deepcopy(mess_data) | |
173 sent_data["to"] = JID(JID(to_jid_s).host) | |
174 sent_data["xml"]["to"] = JID(to_jid_s).host | |
175 send(sent_data) | |
176 self.internal_data[timestamp].append(entities) | |
177 # we still need to fill the history and signal the echo... | |
178 send(history_data, skip_send=True) | |
179 else: | |
180 # target server misses the addressing feature | |
181 send(history_data) | |
182 | |
183 def errback(failure, to_jid): | |
184 disco_callback(None, to_jid) | |
185 | |
186 timestamp = time() | |
187 self.internal_data[timestamp] = [] | |
188 defer_list = [] | |
189 for type_, jid_ in entries: | |
190 d = defer.Deferred() | |
191 d.addCallback( | |
192 self.host.find_features_set, client=client, jid_=JID(JID(jid_).host) | |
193 ) | |
194 d.addCallbacks( | |
195 disco_callback, errback, callbackArgs=[jid_], errbackArgs=[jid_] | |
196 ) | |
197 d.callback([NS_ADDRESS]) | |
198 defer_list.append(d) | |
199 d = defer.Deferred().addCallback(lambda __: self.internal_data.pop(timestamp)) | |
200 defer.DeferredList(defer_list).chainDeferred(d) | |
201 | |
202 def message_received_trigger(self, client, message, post_treat): | |
203 """In order to save the addressing information in the history""" | |
204 | |
205 def post_treat_addr(data, addresses): | |
206 data["extra"]["addresses"] = "" | |
207 for address in addresses: | |
208 # Depending how message has been constructed, we could get here | |
209 # some noise like "\n " instead of an address element. | |
210 if isinstance(address, domish.Element): | |
211 data["extra"]["addresses"] += "%s:%s\n" % ( | |
212 address["type"], | |
213 address["jid"], | |
214 ) | |
215 return data | |
216 | |
217 try: | |
218 addresses = next(message.elements(NS_ADDRESS, "addresses")) | |
219 except StopIteration: | |
220 pass # no addresses | |
221 else: | |
222 post_treat.addCallback(post_treat_addr, addresses.children) | |
223 return True | 516 return True |
224 | 517 |
225 def get_handler(self, client): | 518 def get_handler(self, client): |
226 return XEP_0033_handler(self, client.profile) | 519 return XEP_0033_handler(self, client.profile) |
227 | 520 |
232 def __init__(self, plugin_parent, profile): | 525 def __init__(self, plugin_parent, profile): |
233 self.plugin_parent = plugin_parent | 526 self.plugin_parent = plugin_parent |
234 self.host = plugin_parent.host | 527 self.host = plugin_parent.host |
235 self.profile = profile | 528 self.profile = profile |
236 | 529 |
237 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | 530 def getDiscoInfo( |
531 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" | |
532 ) -> list[disco.DiscoFeature]: | |
238 return [disco.DiscoFeature(NS_ADDRESS)] | 533 return [disco.DiscoFeature(NS_ADDRESS)] |
239 | 534 |
240 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | 535 def getDiscoItems( |
536 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" | |
537 ) -> list[disco.DiscoItem]: | |
241 return [] | 538 return [] |