comparison libervia/backend/plugins/plugin_xep_0033.py @ 4306:94e0968987cd

plugin XEP-0033: code modernisation, improve delivery, data validation: - Code has been rewritten using Pydantic models and `async` coroutines for data validation and cleaner element parsing/generation. - Delivery has been completely rewritten. It now works even if server doesn't support multicast, and send to local multicast service first. Delivering to local multicast service first is due to bad support of XEP-0033 in server (notably Prosody which has an incomplete implementation), and the current impossibility to detect if a sub-domain service handles fully multicast or only for local domains. This is a workaround to have a good balance between backward compatilibity and use of bandwith, and to make it work with the incoming email gateway implementation (the gateway will only deliver to entities of its own domain). - disco feature checking now uses `async` corountines. `host` implementation still use Deferred return values for compatibility with legacy code. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 0d7bb4df2343
children 530f86f078cc
comparison
equal deleted inserted replaced
4305:4cd4922de876 4306:94e0968987cd
1 #!/usr/bin/env python3 1 #!/usr/bin/env python3
2 2
3 3 # Libervia plugin for Extended Stanza Addressing (XEP-0033)
4 # SAT plugin for Extended Stanza Addressing (xep-0033) 4 # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)
5 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) 5 # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)
6 6
7 # This program is free software: you can redistribute it and/or modify 7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by 8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or 9 # the Free Software Foundation, either version 3 of the License, or
15 # GNU Affero General Public License for more details. 15 # GNU Affero General Public License for more details.
16 16
17 # You should have received a copy of the GNU Affero General Public License 17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. 18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19 19
20 from libervia.backend.core.i18n import _ 20 from typing import Iterator, Literal, Self
21 from libervia.backend.core.constants import Const as C 21
22 from libervia.backend.core.log import getLogger 22 from pydantic import BaseModel, model_validator
23 23 from twisted.internet import defer
24 log = getLogger(__name__) 24 from twisted.words.protocols.jabber import jid
25 from libervia.backend.core import exceptions 25 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
26 from twisted.words.xish import domish
26 from wokkel import disco, iwokkel 27 from wokkel import disco, iwokkel
27 from zope.interface import implementer 28 from zope.interface import implementer
28 from twisted.words.protocols.jabber.jid import JID 29
29 from twisted.python import failure 30 from libervia.backend.core import exceptions
30 import copy 31 from libervia.backend.core.constants import Const as C
31 32 from libervia.backend.core.core_types import SatXMPPEntity
32 try: 33 from libervia.backend.core.i18n import _
33 from twisted.words.protocols.xmlstream import XMPPHandler 34 from libervia.backend.core.log import getLogger
34 except ImportError: 35 from libervia.backend.models.core import MessageData
35 from wokkel.subprotocols import XMPPHandler 36 from libervia.backend.models.types import JIDType
36 from twisted.words.xish import domish
37 from twisted.internet import defer
38
39 from libervia.backend.tools import trigger 37 from libervia.backend.tools import trigger
40 from time import time 38 from libervia.backend.tools.xml_tools import element_copy
39
40 log = getLogger(__name__)
41
41 42
42 # TODO: fix Prosody "addressing" plugin to leave the concerned bcc according to the spec: 43 # TODO: fix Prosody "addressing" plugin to leave the concerned bcc according to the spec:
44 # http://xmpp.org/extensions/xep-0033.html#addr-type-bcc "This means that the server
45 # MUST remove these addresses before the stanza is delivered to anyone other than the
46 # given bcc addressee or the multicast service of the bcc addressee."
43 # 47 #
44 # http://xmpp.org/extensions/xep-0033.html#addr-type-bcc 48 # http://xmpp.org/extensions/xep-0033.html#multicast "Each 'bcc' recipient MUST receive
45 # "This means that the server MUST remove these addresses before the stanza is delivered to anyone other than the given bcc addressee or the multicast service of the bcc addressee." 49 # only the <address type='bcc'/> associated with that addressee."
46 # 50
47 # http://xmpp.org/extensions/xep-0033.html#multicast 51 # TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports
48 # "Each 'bcc' recipient MUST receive only the <address type='bcc'/> associated with that addressee." 52 # this XEP
49 53
50 # TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports this XEP
51
52
53 NS_XMPP_CLIENT = "jabber:client"
54 NS_ADDRESS = "http://jabber.org/protocol/address"
55 ATTRIBUTES = ["jid", "uri", "node", "desc", "delivered", "type"]
56 ADDRESS_TYPES = ["to", "cc", "bcc", "replyto", "replyroom", "noreply"]
57 54
58 PLUGIN_INFO = { 55 PLUGIN_INFO = {
59 C.PI_NAME: "Extended Stanza Addressing Protocol Plugin", 56 C.PI_NAME: "Extended Stanza Addressing Protocol Plugin",
60 C.PI_IMPORT_NAME: "XEP-0033", 57 C.PI_IMPORT_NAME: "XEP-0033",
61 C.PI_TYPE: "XEP", 58 C.PI_TYPE: "XEP",
59 C.PI_MODES: C.PLUG_MODE_BOTH,
62 C.PI_PROTOCOLS: ["XEP-0033"], 60 C.PI_PROTOCOLS: ["XEP-0033"],
63 C.PI_DEPENDENCIES: [], 61 C.PI_DEPENDENCIES: [],
64 C.PI_MAIN: "XEP_0033", 62 C.PI_MAIN: "XEP_0033",
65 C.PI_HANDLER: "yes", 63 C.PI_HANDLER: "yes",
66 C.PI_DESCRIPTION: _("""Implementation of Extended Stanza Addressing"""), 64 C.PI_DESCRIPTION: _(
65 "Efficiently send messages to several recipients, using metadata to transmit "
66 "them with main recipients (to), carbon copies (cc), and blind carbon copies "
67 "(bcc) fields in a similar manner as for email."
68 ),
67 } 69 }
68 70
69 71 NS_ADDRESS = "http://jabber.org/protocol/address"
70 class XEP_0033(object): 72 RECIPIENT_FIELDS = ("to", "cc", "bcc")
73
74
75 class AddressType(BaseModel):
76 jid: JIDType | None = None
77 desc: str | None = None
78 delivered: bool | None = None
79
80 def set_attribute(self, address_elt: domish.Element) -> None:
81 """Set <address> element attribute from this instance's data."""
82 if self.jid:
83 address_elt["jid"] = str(self.jid)
84 if self.desc:
85 address_elt["desc"] = self.desc
86 if self.delivered is not None:
87 address_elt["delivered"] = "true" if self.delivered else "false"
88
89 @classmethod
90 def from_element(cls, address_elt: domish.Element) -> Self:
91 """Create an AddressType instance from an <address> element.
92
93 @param address_elt: The <address> element.
94 @return: AddressType instance.
95 """
96 if address_elt.uri != NS_ADDRESS or address_elt.name != "address":
97 raise ValueError("Element is not an <address> element")
98
99 kwargs = {}
100 if address_elt.hasAttribute("jid"):
101 kwargs["jid"] = jid.JID(address_elt["jid"])
102 if address_elt.hasAttribute("desc"):
103 kwargs["desc"] = address_elt["desc"]
104 if address_elt.hasAttribute("delivered"):
105 kwargs["delivered"] = address_elt["delivered"] == "true"
106 return cls(**kwargs)
107
108 def to_element(self) -> domish.Element:
109 """Build the <address> element from this instance's data.
110
111 @return: <address> element.
112 """
113 address_elt = domish.Element((NS_ADDRESS, "address"))
114 self.set_attribute(address_elt)
115 return address_elt
116
117
118 class AddressesData(BaseModel):
119 to: list[AddressType] | None = None
120 cc: list[AddressType] | None = None
121 bcc: list[AddressType] | None = None
122 replyto: list[AddressType] | None = None
123 replyroom: list[AddressType] | None = None
124 noreply: bool | None = None
125 ofrom: JIDType | None = None
126
127 @model_validator(mode="after")
128 def check_minimal_data(self) -> Self:
129 assert self.to or self.cc or self.bcc, "At least one recipent must be set"
130 if self.noreply and (self.replyto is not None or self.replyroom is not None):
131 log.warning(
132 '"noreply" can\'t be used with "replyto" or "replyroom". Ignoring reply '
133 f'fields ({self.replyto=}, {self.replyroom=}).'
134 )
135 # We reset instead of raising a ValueError, because this can happen in
136 # incoming messages and we should not discard them.
137 self.replyto = self.replyroom = None
138 return self
139
140 @property
141 def addresses(self) -> Iterator[AddressType]:
142 """Iterator over all recipient addresses."""
143 for field in RECIPIENT_FIELDS:
144 addresses = getattr(self, field)
145 if not addresses:
146 continue
147 yield from addresses
148
149 @staticmethod
150 def add_address_element(
151 addresses_elt: domish.Element, type_: str, address: AddressType | None
152 ) -> None:
153 """Add <address> element to parent <addresses> element.
154
155 @param addresses_elt: Parent <addresses> element.
156 @param type_: Value of "type" attribute.
157 @param address: Address data.
158 """
159
160 address_elt = addresses_elt.addElement("address")
161 address_elt["type"] = type_
162 if address is not None:
163 address.set_attribute(address_elt)
164
165 @classmethod
166 def from_element(cls, addresses_elt: domish.Element) -> Self:
167 """Create an AddressesData instance from an <addresses> element.
168
169 @param addresses_elt: The <addresses> element or its direct parent.
170 @return: AddressesData instance.
171 @raise NotFound: No <addresses> element found.
172 """
173 if addresses_elt.uri != NS_ADDRESS or addresses_elt.name != "addresses":
174 child_addresses_elt = next(
175 addresses_elt.elements(NS_ADDRESS, "addresses"), None
176 )
177 if child_addresses_elt is None:
178 raise exceptions.NotFound("<addresses> element not found")
179 else:
180 addresses_elt = child_addresses_elt
181
182 kwargs = {}
183 for address_elt in addresses_elt.elements(NS_ADDRESS, "address"):
184 address_type = address_elt.getAttribute("type")
185 if address_type in ("to", "cc", "bcc", "replyto", "replyroom"):
186 try:
187 address = AddressType.from_element(address_elt)
188 except Exception as e:
189 log.warning(f"Invalid <address> element: {e}\n{address_elt.toXml()}")
190 else:
191 kwargs.setdefault(address_type, []).append(address)
192 elif address_type == "noreply":
193 kwargs["noreply"] = True
194 elif address_type == "ofrom":
195 kwargs["ofrom"] = jid.JID(address_elt["jid"])
196 else:
197 log.warning(
198 f"Invalid <address> element: unknonwn type {address_type!r}\n"
199 f"{address_elt.toXml()}"
200 )
201 return cls(**kwargs)
202
203 def to_element(self) -> domish.Element:
204 """Build the <addresses> element from this instance's data.
205
206 @return: <addresses> element.
207 """
208 addresses_elt = domish.Element((NS_ADDRESS, "addresses"))
209
210 if self.to:
211 for address in self.to:
212 self.add_address_element(addresses_elt, "to", address)
213 if self.cc:
214 for address in self.cc:
215 self.add_address_element(addresses_elt, "cc", address)
216 if self.bcc:
217 for address in self.bcc:
218 self.add_address_element(addresses_elt, "bcc", address)
219 if self.replyto:
220 for address in self.replyto:
221 self.add_address_element(addresses_elt, "replyto", address)
222 if self.replyroom:
223 for address in self.replyroom:
224 self.add_address_element(addresses_elt, "replyroom", address)
225 if self.noreply:
226 self.add_address_element(addresses_elt, "noreply", None)
227 if self.ofrom is not None:
228 address_elt = addresses_elt.addElement("address")
229 address_elt["type"] = "ofrom"
230 address_elt["jid"] = self.ofrom.full()
231
232 return addresses_elt
233
234
235 class XEP_0033:
71 """ 236 """
72 Implementation for XEP 0033 237 Implementation for XEP-0033
73 """ 238 """
74 239
75 def __init__(self, host): 240 def __init__(self, host):
76 log.info(_("Extended Stanza Addressing plugin initialization")) 241 log.info(_("Extended Stanza Addressing plugin initialization"))
77 self.host = host 242 self.host = host
243 host.register_namespace("address", NS_ADDRESS)
78 self.internal_data = {} 244 self.internal_data = {}
79 host.trigger.add( 245 host.trigger.add(
80 "sendMessage", self.send_message_trigger, trigger.TriggerManager.MIN_PRIORITY 246 "sendMessage",
247 self.send_message_trigger,
248 # We want this trigger to be the last one, as it may send messages.
249 trigger.TriggerManager.MIN_PRIORITY,
250 )
251 host.trigger.add(
252 "sendMessageComponent",
253 self.send_message_trigger,
254 # We want this trigger to be the last one, as it may send messages.
255 trigger.TriggerManager.MIN_PRIORITY,
81 ) 256 )
82 host.trigger.add("message_received", self.message_received_trigger) 257 host.trigger.add("message_received", self.message_received_trigger)
83 258
259 async def _stop_if_all_delivered(
260 self, client: SatXMPPEntity, mess_data: MessageData, addr_data: AddressesData
261 ) -> None:
262 """Check if all message have been delivered, and stop workflow in this case.
263
264 If workflow is stopped, message will be added to history and a signal will be sent
265 to bridge.
266 @param client: Client session.
267 @param mess_data: Message data.
268 @param addr_data: Addresses data.
269
270 @raise exceptions.CancelError: All message have been delivered and workflow is
271 terminated.
272 """
273 if all(a.delivered for a in addr_data.addresses):
274 await client.message_add_to_history(mess_data)
275 await client.message_send_to_bridge(mess_data)
276 raise exceptions.CancelError(
277 f"Message has been delivered by {PLUGIN_INFO['C.PI_NAME']}."
278 )
279
280 async def _handle_addresses(self, client, mess_data: MessageData) -> MessageData:
281 """Handle Extended Stanza Addressing metadata for outgoing messages."""
282 if not "addresses" in mess_data["extra"]:
283 return mess_data
284
285 if mess_data["extra"].get(C.MESS_KEY_ENCRYPTED, False):
286 # TODO: Message must be encrypted for all recipients, and "to" correspond to
287 # multicast service in this case.
288 raise NotImplementedError(
289 "End-to-end encryption is not supported yet with multicast addressing."
290 )
291
292 data = AddressesData(**mess_data["extra"]["addresses"])
293 recipients = set()
294 domains: dict[str, list[AddressType]] = {}
295 for address in data.addresses:
296 if address.jid is None:
297 raise NotImplementedError("Non JID addresses are not supported yet.")
298 recipients.add(address.jid)
299 try:
300 domains[address.jid.host].append(address)
301 except KeyError:
302 domains[address.jid.host] = [address]
303
304 to_recipient_jid = mess_data["to"]
305
306 if to_recipient_jid.user and to_recipient_jid not in recipients:
307 # If the main recipient is not a service (i.e. it has a "user" part), we want
308 # to move it to the XEP-0033's "to" addresses, so we can use the multicast
309 # service for <message> "to" attribute.
310 to_recipient_addr = AddressType(jid=to_recipient_jid)
311 if data.to is None:
312 data.to = [to_recipient_addr]
313 else:
314 data.to.insert(0, to_recipient_addr)
315 recipients.add(to_recipient_jid)
316 domains.setdefault(to_recipient_jid.host, []).append(to_recipient_addr)
317
318 # XXX: If our server doesn't handle multicast, we don't check sub-services as
319 # requested in §2.2, because except if there is a special arrangement with the
320 # server, a service at a sub-domain can't send message in the name of the main
321 # domain (e.g. "multicast.example.org" can't send message from
322 # "juliet@example.org"). So the specification is a bit dubious here, and we only
323 # use the main server multicast feature if it's present.
324 if not await self.host.memory.disco.has_feature(
325 client, NS_ADDRESS, client.server_jid
326 ):
327 # No multicast service
328 log.warning(
329 _(
330 f"Server of {client.profile} does not support XEP-0033 "
331 f"({PLUGIN_INFO[C.PI_IMPORT_NAME]}). We will send all messages ourselves."
332 )
333 )
334 await self.deliver_messages(client, mess_data, data, domains)
335 await self._stop_if_all_delivered(client, mess_data, data)
336 else:
337 # XXX: We delived ourself to multicast services because it's not correctly
338 # handled by some multicast services, notably by Prosody mod_addresses.
339 # FIXME: Only do this workaround for known incomplete implementations.
340 # TODO: remove this workaround when known implementations have been completed.
341 if mess_data["to"] != client.server_jid:
342 # We send the message to our server which will distribute it to the right
343 # locations. The initial ``to`` jid has been moved to ``data.to`` above.
344 # FIXME: When sub-services issue is properly handler, a sub-service JID
345 # supporting multicast should be allowed here.
346 mess_data["to"] = client.server_jid
347 await self.deliver_messages(
348 client, mess_data, data, domains, multicast_only=True
349 )
350 await self._stop_if_all_delivered(client, mess_data, data)
351
352 message_elt = mess_data["xml"]
353 message_elt["to"] = str(mess_data["to"])
354 message_elt.addChild(data.to_element())
355 return mess_data
356
357 async def deliver_messages(
358 self,
359 client,
360 mess_data: MessageData,
361 addr_data: AddressesData,
362 domains: dict[str, list[AddressType]],
363 multicast_only: bool = False,
364 ) -> None:
365 """Send messages to requested recipients.
366
367 If a domain handles multicast, a single message will be send there.
368 @param client: Client session.
369 @param mess_data: Messsa data.
370 @param addr_data: XEP-0033 addresses data.
371 @param domains: Domain to addresses map.
372 Note that that the addresses instances in this argument must be the same as in
373 ``addr_data`` (There ``delivered`` status will be manipulated).
374 @param multicast_only: if True, only multicast domain will be delivered.
375 """
376 # We'll modify delivered status, so we keep track here of addresses which have
377 # already be delivered.
378 already_delivered = [a for a in addr_data.addresses if a.delivered]
379 multicast_domains = set()
380 for domain, domain_addresses in domains.items():
381 if domain == client.server_jid.host:
382
383 # ``client.server_jid`` is discarded to avoid sending twice the same
384 # message. ``multicast_only`` flag is set when the server supports
385 # multicast, so the message will be sent to it at the end of the workflow.
386 continue
387 if len(domain_addresses) > 1:
388 # For domains with multiple recipients, we check if we they support
389 # multicast and so if we can deliver to them directly.
390 if await self.host.memory.disco.has_feature(
391 client, NS_ADDRESS, jid.JID(domain)
392 ):
393 multicast_domains.add(domain)
394
395 # We remove bcc, they have a special handling.
396 bcc = addr_data.bcc or []
397 addr_data.bcc = None
398
399 # Mark all addresses as "delivered" upfront, even if some won't actually be sent
400 # by us (when multicast_only is set). This flag signals to multicast services that
401 # they shouldn't handle these addresses. We'll remove the "delivered" status from
402 # undelivered addresses post-delivery.
403 for address in addr_data.addresses:
404 address.delivered = True
405
406 # First, we send multicast messages.
407 for domain in multicast_domains:
408 something_to_deliver = False
409 for address in domains[domain]:
410 if address in already_delivered:
411 continue
412 # We need to mark as non delivered, so the multicast service will deliver
413 # itself.
414 address.delivered = False
415 something_to_deliver = True
416
417 if not something_to_deliver:
418 continue
419
420 domain_bcc = [a for a in bcc if a.jid and a.jid.host == domain]
421 message_elt = element_copy(mess_data["xml"])
422 # The service must only see BCC from its own domain.
423 addr_data.bcc = domain_bcc
424 message_elt.addChild(addr_data.to_element())
425 message_elt["to"] = domain
426 await client.a_send(message_elt)
427 for address in domains[domain] + domain_bcc:
428 # Those addresses have now been delivered.
429 address.delivered = True
430
431 if multicast_only:
432 # Only addresses from multicast domains must be marked as delivered.
433 for address in addr_data.addresses:
434 if (
435 address.jid is not None
436 and address.jid.host not in multicast_domains
437 and address not in already_delivered
438 ):
439 address.delivered = None
440
441 # We have delivered to all multicast services, we stop here.
442 # But first we need to restore BCC, without the delivered ones.
443 addr_data.bcc = [a for a in bcc if not a.delivered]
444 return
445
446 # Then BCC
447 for address in bcc:
448 if address in already_delivered:
449 continue
450 if address.jid is None:
451 raise NotImplementedError(
452 f"Sending to non JID address is not supported yet"
453 )
454 if address.jid.host in multicast_domains:
455 # Address has already be handled by a multicast domain
456 continue
457 message_elt = element_copy(mess_data["xml"])
458 # The recipient must only get its own BCC
459 addr_data.bcc = [address]
460 message_elt.addChild(addr_data.to_element())
461 message_elt["to"] = address.jid.full()
462 await client.a_send(message_elt)
463
464 # BCC address must be removed.
465 addr_data.bcc = None
466
467 # and finally, other ones.
468 message_elt = mess_data["xml"]
469 message_elt.addChild(addr_data.to_element())
470 non_bcc_addresses = (addr_data.to or []) + (addr_data.cc or [])
471 for address in non_bcc_addresses:
472 if address in already_delivered:
473 continue
474 if address.jid is None:
475 raise NotImplementedError(
476 f"Sending to non JID address is not supported yet"
477 )
478 if address.jid.host in multicast_domains:
479 # Multicast domains have already been delivered.
480 continue
481 message_elt["to"] = address.jid.full()
482 await client.a_send(message_elt)
483
84 def send_message_trigger( 484 def send_message_trigger(
85 self, client, mess_data, pre_xml_treatments, post_xml_treatments 485 self, client, mess_data, pre_xml_treatments, post_xml_treatments
86 ): 486 ) -> Literal[True]:
87 """Process the XEP-0033 related data to be sent""" 487 """Process the XEP-0033 related data to be sent"""
88 profile = client.profile 488 post_xml_treatments.addCallback(
89 489 lambda mess_data: defer.ensureDeferred(
90 def treatment(mess_data): 490 self._handle_addresses(client, mess_data)
91 if not "address" in mess_data["extra"]: 491 )
492 )
493 return True
494
495 def message_received_trigger(
496 self,
497 client: SatXMPPEntity,
498 message_elt: domish.Element,
499 post_treat: defer.Deferred,
500 ) -> Literal[True]:
501 """Parse addresses information and add them to message data."""
502
503 try:
504 addresses = AddressesData.from_element(message_elt)
505 except exceptions.NotFound:
506 pass
507 else:
508
509 def post_treat_addr(mess_data: MessageData):
510 mess_data["extra"]["addresses"] = addresses.model_dump(
511 mode="json", exclude_none=True
512 )
92 return mess_data 513 return mess_data
93 514
94 def disco_callback(entities): 515 post_treat.addCallback(post_treat_addr)
95 if not entities:
96 log.warning(
97 _("XEP-0033 is being used but the server doesn't support it!")
98 )
99 raise failure.Failure(exceptions.CancelError("Cancelled by XEP-0033"))
100 if mess_data["to"] not in entities:
101 expected = _(" or ").join([entity.userhost() for entity in entities])
102 log.warning(
103 _(
104 "Stanzas using XEP-0033 should be addressed to %(expected)s, not %(current)s!"
105 )
106 % {"expected": expected, "current": mess_data["to"]}
107 )
108 log.warning(
109 _(
110 "TODO: addressing has been fixed by the backend... fix it in the frontend!"
111 )
112 )
113 mess_data["to"] = list(entities)[0].userhostJID()
114 element = mess_data["xml"].addElement("addresses", NS_ADDRESS)
115 entries = [
116 entry.split(":")
117 for entry in mess_data["extra"]["address"].split("\n")
118 if entry != ""
119 ]
120 for type_, jid_ in entries:
121 element.addChild(
122 domish.Element(
123 (None, "address"), None, {"type": type_, "jid": jid_}
124 )
125 )
126 # when the prosody plugin is completed, we can immediately return mess_data from here
127 self.send_and_store_message(mess_data, entries, profile)
128 log.debug("XEP-0033 took over")
129 raise failure.Failure(exceptions.CancelError("Cancelled by XEP-0033"))
130
131 d = self.host.find_features_set(client, [NS_ADDRESS])
132 d.addCallbacks(disco_callback, lambda __: disco_callback(None))
133 return d
134
135 post_xml_treatments.addCallback(treatment)
136 return True
137
138 def send_and_store_message(self, mess_data, entries, profile):
139 """Check if target servers support XEP-0033, send and store the messages
140 @return: a friendly failure to let the core know that we sent the message already
141
142 Later we should be able to remove this method because:
143 # XXX: sending the messages should be done by the local server
144 # FIXME: for now we duplicate the messages in the history for each recipient, this should change
145 # FIXME: for now we duplicate the echoes to the sender, this should also change
146 Ideas:
147 - fix Prosody plugin to check if target server support the feature
148 - redesign the database to save only one entry to the database
149 - change the message_new signal to eventually pass more than one recipient
150 """
151 client = self.host.get_client(profile)
152
153 def send(mess_data, skip_send=False):
154 d = defer.Deferred()
155 if not skip_send:
156 d.addCallback(
157 lambda ret: defer.ensureDeferred(client.send_message_data(ret))
158 )
159 d.addCallback(
160 lambda ret: defer.ensureDeferred(client.message_add_to_history(ret))
161 )
162 d.addCallback(client.message_send_to_bridge)
163 d.addErrback(lambda failure: failure.trap(exceptions.CancelError))
164 return d.callback(mess_data)
165
166 def disco_callback(entities, to_jid_s):
167 history_data = copy.deepcopy(mess_data)
168 history_data["to"] = JID(to_jid_s)
169 history_data["xml"]["to"] = to_jid_s
170 if entities:
171 if entities not in self.internal_data[timestamp]:
172 sent_data = copy.deepcopy(mess_data)
173 sent_data["to"] = JID(JID(to_jid_s).host)
174 sent_data["xml"]["to"] = JID(to_jid_s).host
175 send(sent_data)
176 self.internal_data[timestamp].append(entities)
177 # we still need to fill the history and signal the echo...
178 send(history_data, skip_send=True)
179 else:
180 # target server misses the addressing feature
181 send(history_data)
182
183 def errback(failure, to_jid):
184 disco_callback(None, to_jid)
185
186 timestamp = time()
187 self.internal_data[timestamp] = []
188 defer_list = []
189 for type_, jid_ in entries:
190 d = defer.Deferred()
191 d.addCallback(
192 self.host.find_features_set, client=client, jid_=JID(JID(jid_).host)
193 )
194 d.addCallbacks(
195 disco_callback, errback, callbackArgs=[jid_], errbackArgs=[jid_]
196 )
197 d.callback([NS_ADDRESS])
198 defer_list.append(d)
199 d = defer.Deferred().addCallback(lambda __: self.internal_data.pop(timestamp))
200 defer.DeferredList(defer_list).chainDeferred(d)
201
202 def message_received_trigger(self, client, message, post_treat):
203 """In order to save the addressing information in the history"""
204
205 def post_treat_addr(data, addresses):
206 data["extra"]["addresses"] = ""
207 for address in addresses:
208 # Depending how message has been constructed, we could get here
209 # some noise like "\n " instead of an address element.
210 if isinstance(address, domish.Element):
211 data["extra"]["addresses"] += "%s:%s\n" % (
212 address["type"],
213 address["jid"],
214 )
215 return data
216
217 try:
218 addresses = next(message.elements(NS_ADDRESS, "addresses"))
219 except StopIteration:
220 pass # no addresses
221 else:
222 post_treat.addCallback(post_treat_addr, addresses.children)
223 return True 516 return True
224 517
225 def get_handler(self, client): 518 def get_handler(self, client):
226 return XEP_0033_handler(self, client.profile) 519 return XEP_0033_handler(self, client.profile)
227 520
232 def __init__(self, plugin_parent, profile): 525 def __init__(self, plugin_parent, profile):
233 self.plugin_parent = plugin_parent 526 self.plugin_parent = plugin_parent
234 self.host = plugin_parent.host 527 self.host = plugin_parent.host
235 self.profile = profile 528 self.profile = profile
236 529
237 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): 530 def getDiscoInfo(
531 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
532 ) -> list[disco.DiscoFeature]:
238 return [disco.DiscoFeature(NS_ADDRESS)] 533 return [disco.DiscoFeature(NS_ADDRESS)]
239 534
240 def getDiscoItems(self, requestor, target, nodeIdentifier=""): 535 def getDiscoItems(
536 self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = ""
537 ) -> list[disco.DiscoItem]:
241 return [] 538 return []