# HG changeset patch # User Goffi # Date 1727359921 -7200 # Node ID 94e0968987cde2f1b3350e9cc62a545a9ba7c144 # Parent 4cd4922de87673e8ec5d38b25977aa4420b52f4a plugin XEP-0033: code modernisation, improve delivery, data validation: - Code has been rewritten using Pydantic models and `async` coroutines for data validation and cleaner element parsing/generation. - Delivery has been completely rewritten. It now works even if server doesn't support multicast, and send to local multicast service first. Delivering to local multicast service first is due to bad support of XEP-0033 in server (notably Prosody which has an incomplete implementation), and the current impossibility to detect if a sub-domain service handles fully multicast or only for local domains. This is a workaround to have a good balance between backward compatilibity and use of bandwith, and to make it work with the incoming email gateway implementation (the gateway will only deliver to entities of its own domain). - disco feature checking now uses `async` corountines. `host` implementation still use Deferred return values for compatibility with legacy code. rel 450 diff -r 4cd4922de876 -r 94e0968987cd libervia/backend/core/main.py --- a/libervia/backend/core/main.py Thu Sep 26 16:11:56 2024 +0200 +++ b/libervia/backend/core/main.py Thu Sep 26 16:12:01 2024 +0200 @@ -1167,13 +1167,13 @@ # the main difference with client.disco is that self.memory.disco manage cache def hasFeature(self, *args, **kwargs): - return self.memory.disco.hasFeature(*args, **kwargs) + return defer.ensureDeferred(self.memory.disco.has_feature(*args, **kwargs)) def check_feature(self, *args, **kwargs): - return self.memory.disco.check_feature(*args, **kwargs) + return defer.ensureDeferred(self.memory.disco.check_feature(*args, **kwargs)) def check_features(self, *args, **kwargs): - return self.memory.disco.check_features(*args, **kwargs) + return defer.ensureDeferred(self.memory.disco.check_features(*args, **kwargs)) def has_identity(self, *args, **kwargs): return self.memory.disco.has_identity(*args, **kwargs) @@ -1190,8 +1190,8 @@ def find_service_entities(self, *args, **kwargs): return self.memory.disco.find_service_entities(*args, **kwargs) - def find_features_set(self, *args, **kwargs): - return self.memory.disco.find_features_set(*args, **kwargs) + async def find_features_set(self, *args, **kwargs): + return await self.memory.disco.find_features_set(*args, **kwargs) def _find_by_features( self, @@ -1266,7 +1266,7 @@ found_own = {} found_roster = {} if service: - services_jids = await self.find_features_set(client, namespaces) + services_jids = await self.memory.disco.find_features_set(client, namespaces) services_jids = list(services_jids) # we need a list to map results below services_infos = await defer.DeferredList( [ diff -r 4cd4922de876 -r 94e0968987cd libervia/backend/core/xmpp.py --- a/libervia/backend/core/xmpp.py Thu Sep 26 16:11:56 2024 +0200 +++ b/libervia/backend/core/xmpp.py Thu Sep 26 16:12:01 2024 +0200 @@ -717,7 +717,7 @@ @param mess_data(dict): message data as constructed by onMessage workflow @return (dict): mess_data (so it can be used in a deferred chain) """ - # XXX: This is the last trigger before u"send" (last but one globally) + # XXX: This is the last trigger before "send" (last but one globally) # for sending message. # This is intented for e2e encryption which doesn't do full stanza # encryption (e.g. OTR) @@ -894,7 +894,6 @@ """Send message to bridge, so frontends can display it @param data: message data dictionnary - @param client: profile's client """ if data["type"] != C.MESS_TYPE_GROUPCHAT: # we don't send groupchat message to bridge, as we get them back diff -r 4cd4922de876 -r 94e0968987cd libervia/backend/memory/disco.py --- a/libervia/backend/memory/disco.py Thu Sep 26 16:11:56 2024 +0200 +++ b/libervia/backend/memory/disco.py Thu Sep 26 16:12:01 2024 +0200 @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -# SAT: a jabber client -# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) +# Libervia XMPP client +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -17,7 +17,9 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from typing import Optional +from typing import Iterable, Optional, cast + +from twisted.internet.interfaces import IReactorCore from libervia.backend.core.i18n import _ from libervia.backend.core import exceptions from libervia.backend.core.log import getLogger @@ -122,8 +124,13 @@ self.hashes = HashManager(persistent.PersistentDict("disco")) return self.hashes.load() - @defer.inlineCallbacks - def hasFeature(self, client, feature, jid_=None, node=""): + async def has_feature( + self, + client: SatXMPPEntity, + feature: str, + jid_: jid.JID | None = None, + node: str = "", + ) -> bool: """Tell if an entity has the required feature @param feature: feature namespace @@ -131,40 +138,52 @@ @param node(unicode): optional node to use for disco request @return: a Deferred which fire a boolean (True if feature is available) """ - disco_infos = yield self.get_infos(client, jid_, node) - defer.returnValue(feature in disco_infos.features) + disco_infos = await self.get_infos(client, jid_, node) + return feature in disco_infos.features - @defer.inlineCallbacks - def check_feature(self, client, feature, jid_=None, node=""): - """Like hasFeature, but raise an exception is feature is not Found + async def check_feature( + self, + client: SatXMPPEntity, + feature: str, + jid_: jid.JID | None = None, + node: str = "", + ) -> None: + """Like has_feature, but raise an exception is feature is not Found @param feature: feature namespace @param jid_: jid of the target, or None for profile's server - @param node(unicode): optional node to use for disco request + @param node: optional node to use for disco request @raise: exceptions.FeatureNotFound """ - disco_infos = yield self.get_infos(client, jid_, node) + disco_infos = await self.get_infos(client, jid_, node) if not feature in disco_infos.features: - raise failure.Failure(exceptions.FeatureNotFound()) + raise exceptions.FeatureNotFound() - @defer.inlineCallbacks - def check_features(self, client, features, jid_=None, identity=None, node=""): + async def check_features( + self, + client: SatXMPPEntity, + features: Iterable[str], + jid_: jid.JID | None = None, + identity: tuple[str, str] | None = None, + node: str = "", + ) -> None: """Like check_feature, but check several features at once, and check also identity - @param features(iterable[unicode]): features to check - @param jid_(jid.JID): jid of the target, or None for profile's server - @param node(unicode): optional node to use for disco request - @param identity(None, tuple(unicode, unicode): if not None, the entity must have an identity with this (category, type) tuple + @param features: features to check + @param jid_: jid of the target, or None for profile's server + @param node: optional node to use for disco request + @param identity: if not None, the entity must have an identity with this + (category, type) tuple @raise: exceptions.FeatureNotFound """ - disco_infos = yield self.get_infos(client, jid_, node) + disco_infos = await self.get_infos(client, jid_, node) if not set(features).issubset(disco_infos.features): - raise failure.Failure(exceptions.FeatureNotFound()) + raise exceptions.FeatureNotFound() if identity is not None and identity not in disco_infos.identities: - raise failure.Failure(exceptions.FeatureNotFound()) + raise exceptions.FeatureNotFound() async def has_identity( self, @@ -338,14 +357,19 @@ ) # FIXME: one bad service make a general timeout return d - def find_features_set(self, client, features, identity=None, jid_=None): + async def find_features_set( + self, + client: SatXMPPEntity, + features: Iterable[str], + identity: tuple[str, str] | None = None, + jid_: jid.JID | None = None, + ) -> set[jid.JID]: """Return entities (including jid_ and its items) offering features + @param client: Client session. @param features: iterable of features which must be present - @param identity(None, tuple(unicode, unicode)): if not None, accept only this - (category/type) identity + @param identity: if not None, accept only this (category/type) identity @param jid_: the jid of the target server (None for profile's server) - @param profile: %(doc_profile)s @return: a set of found entities """ if jid_ is None: @@ -353,30 +377,24 @@ features = set(features) found_entities = set() - def infos_cb(infos, entity): + def infos_cb(infos: disco.DiscoInfo, entity: jid.JID) -> None: if entity is None: - log.warning(_("received an item without jid")) + log.warning(_("Received an item without JID")) return if identity is not None and identity not in infos.identities: return if features.issubset(infos.features): found_entities.add(entity) - def got_items(items): - defer_list = [] - for entity in [jid_] + [item.entity for item in items]: - infos_d = self.get_infos(client, entity) - infos_d.addCallbacks(infos_cb, self._infos_eb, [entity], None, [entity]) - defer_list.append(infos_d) - return defer.DeferredList(defer_list) - - d = self.get_items(client, jid_) - d.addCallback(got_items) - d.addCallback(lambda __: found_entities) - reactor.callLater( - TIMEOUT, d.cancel - ) # FIXME: one bad service make a general timeout - return d + items = await self.get_items(client, jid_) + defer_list = [] + for entity in [jid_] + [item.entity for item in items]: + infos_d = self.get_infos(client, entity) + infos_d.addCallbacks(infos_cb, self._infos_eb, [entity], None, [entity]) + infos_d.addTimeout(TIMEOUT, cast(IReactorCore, reactor)) + defer_list.append(infos_d) + await defer.DeferredList(defer_list) + return found_entities def generate_hash(self, services): """Generate a unique hash for given service diff -r 4cd4922de876 -r 94e0968987cd libervia/backend/plugins/plugin_xep_0033.py --- a/libervia/backend/plugins/plugin_xep_0033.py Thu Sep 26 16:11:56 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0033.py Thu Sep 26 16:12:01 2024 +0200 @@ -1,7 +1,7 @@ #!/usr/bin/env python3 - -# SAT plugin for Extended Stanza Addressing (xep-0033) +# Libervia plugin for Extended Stanza Addressing (XEP-0033) +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org) # This program is free software: you can redistribute it and/or modify @@ -17,209 +17,502 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -from libervia.backend.core.i18n import _ -from libervia.backend.core.constants import Const as C -from libervia.backend.core.log import getLogger +from typing import Iterator, Literal, Self -log = getLogger(__name__) -from libervia.backend.core import exceptions +from pydantic import BaseModel, model_validator +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer -from twisted.words.protocols.jabber.jid import JID -from twisted.python import failure -import copy -try: - from twisted.words.protocols.xmlstream import XMPPHandler -except ImportError: - from wokkel.subprotocols import XMPPHandler -from twisted.words.xish import domish -from twisted.internet import defer +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.models.core import MessageData +from libervia.backend.models.types import JIDType +from libervia.backend.tools import trigger +from libervia.backend.tools.xml_tools import element_copy -from libervia.backend.tools import trigger -from time import time +log = getLogger(__name__) + # TODO: fix Prosody "addressing" plugin to leave the concerned bcc according to the spec: +# http://xmpp.org/extensions/xep-0033.html#addr-type-bcc "This means that the server +# MUST remove these addresses before the stanza is delivered to anyone other than the +# given bcc addressee or the multicast service of the bcc addressee." # -# http://xmpp.org/extensions/xep-0033.html#addr-type-bcc -# "This means that the server MUST remove these addresses before the stanza is delivered to anyone other than the given bcc addressee or the multicast service of the bcc addressee." -# -# http://xmpp.org/extensions/xep-0033.html#multicast -# "Each 'bcc' recipient MUST receive only the
associated with that addressee." +# http://xmpp.org/extensions/xep-0033.html#multicast "Each 'bcc' recipient MUST receive +# only the
associated with that addressee." -# TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports this XEP - +# TODO: fix Prosody "addressing" plugin to determine itself if remote servers supports +# this XEP -NS_XMPP_CLIENT = "jabber:client" -NS_ADDRESS = "http://jabber.org/protocol/address" -ATTRIBUTES = ["jid", "uri", "node", "desc", "delivered", "type"] -ADDRESS_TYPES = ["to", "cc", "bcc", "replyto", "replyroom", "noreply"] PLUGIN_INFO = { C.PI_NAME: "Extended Stanza Addressing Protocol Plugin", C.PI_IMPORT_NAME: "XEP-0033", C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0033"], C.PI_DEPENDENCIES: [], C.PI_MAIN: "XEP_0033", C.PI_HANDLER: "yes", - C.PI_DESCRIPTION: _("""Implementation of Extended Stanza Addressing"""), + C.PI_DESCRIPTION: _( + "Efficiently send messages to several recipients, using metadata to transmit " + "them with main recipients (to), carbon copies (cc), and blind carbon copies " + "(bcc) fields in a similar manner as for email." + ), } +NS_ADDRESS = "http://jabber.org/protocol/address" +RECIPIENT_FIELDS = ("to", "cc", "bcc") + + +class AddressType(BaseModel): + jid: JIDType | None = None + desc: str | None = None + delivered: bool | None = None + + def set_attribute(self, address_elt: domish.Element) -> None: + """Set
element attribute from this instance's data.""" + if self.jid: + address_elt["jid"] = str(self.jid) + if self.desc: + address_elt["desc"] = self.desc + if self.delivered is not None: + address_elt["delivered"] = "true" if self.delivered else "false" + + @classmethod + def from_element(cls, address_elt: domish.Element) -> Self: + """Create an AddressType instance from an
element. + + @param address_elt: The
element. + @return: AddressType instance. + """ + if address_elt.uri != NS_ADDRESS or address_elt.name != "address": + raise ValueError("Element is not an
element") + + kwargs = {} + if address_elt.hasAttribute("jid"): + kwargs["jid"] = jid.JID(address_elt["jid"]) + if address_elt.hasAttribute("desc"): + kwargs["desc"] = address_elt["desc"] + if address_elt.hasAttribute("delivered"): + kwargs["delivered"] = address_elt["delivered"] == "true" + return cls(**kwargs) + + def to_element(self) -> domish.Element: + """Build the
element from this instance's data. + + @return:
element. + """ + address_elt = domish.Element((NS_ADDRESS, "address")) + self.set_attribute(address_elt) + return address_elt + -class XEP_0033(object): +class AddressesData(BaseModel): + to: list[AddressType] | None = None + cc: list[AddressType] | None = None + bcc: list[AddressType] | None = None + replyto: list[AddressType] | None = None + replyroom: list[AddressType] | None = None + noreply: bool | None = None + ofrom: JIDType | None = None + + @model_validator(mode="after") + def check_minimal_data(self) -> Self: + assert self.to or self.cc or self.bcc, "At least one recipent must be set" + if self.noreply and (self.replyto is not None or self.replyroom is not None): + log.warning( + '"noreply" can\'t be used with "replyto" or "replyroom". Ignoring reply ' + f'fields ({self.replyto=}, {self.replyroom=}).' + ) + # We reset instead of raising a ValueError, because this can happen in + # incoming messages and we should not discard them. + self.replyto = self.replyroom = None + return self + + @property + def addresses(self) -> Iterator[AddressType]: + """Iterator over all recipient addresses.""" + for field in RECIPIENT_FIELDS: + addresses = getattr(self, field) + if not addresses: + continue + yield from addresses + + @staticmethod + def add_address_element( + addresses_elt: domish.Element, type_: str, address: AddressType | None + ) -> None: + """Add
element to parent element. + + @param addresses_elt: Parent element. + @param type_: Value of "type" attribute. + @param address: Address data. + """ + + address_elt = addresses_elt.addElement("address") + address_elt["type"] = type_ + if address is not None: + address.set_attribute(address_elt) + + @classmethod + def from_element(cls, addresses_elt: domish.Element) -> Self: + """Create an AddressesData instance from an element. + + @param addresses_elt: The element or its direct parent. + @return: AddressesData instance. + @raise NotFound: No element found. + """ + if addresses_elt.uri != NS_ADDRESS or addresses_elt.name != "addresses": + child_addresses_elt = next( + addresses_elt.elements(NS_ADDRESS, "addresses"), None + ) + if child_addresses_elt is None: + raise exceptions.NotFound(" element not found") + else: + addresses_elt = child_addresses_elt + + kwargs = {} + for address_elt in addresses_elt.elements(NS_ADDRESS, "address"): + address_type = address_elt.getAttribute("type") + if address_type in ("to", "cc", "bcc", "replyto", "replyroom"): + try: + address = AddressType.from_element(address_elt) + except Exception as e: + log.warning(f"Invalid
element: {e}\n{address_elt.toXml()}") + else: + kwargs.setdefault(address_type, []).append(address) + elif address_type == "noreply": + kwargs["noreply"] = True + elif address_type == "ofrom": + kwargs["ofrom"] = jid.JID(address_elt["jid"]) + else: + log.warning( + f"Invalid
element: unknonwn type {address_type!r}\n" + f"{address_elt.toXml()}" + ) + return cls(**kwargs) + + def to_element(self) -> domish.Element: + """Build the element from this instance's data. + + @return: element. + """ + addresses_elt = domish.Element((NS_ADDRESS, "addresses")) + + if self.to: + for address in self.to: + self.add_address_element(addresses_elt, "to", address) + if self.cc: + for address in self.cc: + self.add_address_element(addresses_elt, "cc", address) + if self.bcc: + for address in self.bcc: + self.add_address_element(addresses_elt, "bcc", address) + if self.replyto: + for address in self.replyto: + self.add_address_element(addresses_elt, "replyto", address) + if self.replyroom: + for address in self.replyroom: + self.add_address_element(addresses_elt, "replyroom", address) + if self.noreply: + self.add_address_element(addresses_elt, "noreply", None) + if self.ofrom is not None: + address_elt = addresses_elt.addElement("address") + address_elt["type"] = "ofrom" + address_elt["jid"] = self.ofrom.full() + + return addresses_elt + + +class XEP_0033: """ - Implementation for XEP 0033 + Implementation for XEP-0033 """ def __init__(self, host): log.info(_("Extended Stanza Addressing plugin initialization")) self.host = host + host.register_namespace("address", NS_ADDRESS) self.internal_data = {} host.trigger.add( - "sendMessage", self.send_message_trigger, trigger.TriggerManager.MIN_PRIORITY + "sendMessage", + self.send_message_trigger, + # We want this trigger to be the last one, as it may send messages. + trigger.TriggerManager.MIN_PRIORITY, + ) + host.trigger.add( + "sendMessageComponent", + self.send_message_trigger, + # We want this trigger to be the last one, as it may send messages. + trigger.TriggerManager.MIN_PRIORITY, ) host.trigger.add("message_received", self.message_received_trigger) + async def _stop_if_all_delivered( + self, client: SatXMPPEntity, mess_data: MessageData, addr_data: AddressesData + ) -> None: + """Check if all message have been delivered, and stop workflow in this case. + + If workflow is stopped, message will be added to history and a signal will be sent + to bridge. + @param client: Client session. + @param mess_data: Message data. + @param addr_data: Addresses data. + + @raise exceptions.CancelError: All message have been delivered and workflow is + terminated. + """ + if all(a.delivered for a in addr_data.addresses): + await client.message_add_to_history(mess_data) + await client.message_send_to_bridge(mess_data) + raise exceptions.CancelError( + f"Message has been delivered by {PLUGIN_INFO['C.PI_NAME']}." + ) + + async def _handle_addresses(self, client, mess_data: MessageData) -> MessageData: + """Handle Extended Stanza Addressing metadata for outgoing messages.""" + if not "addresses" in mess_data["extra"]: + return mess_data + + if mess_data["extra"].get(C.MESS_KEY_ENCRYPTED, False): + # TODO: Message must be encrypted for all recipients, and "to" correspond to + # multicast service in this case. + raise NotImplementedError( + "End-to-end encryption is not supported yet with multicast addressing." + ) + + data = AddressesData(**mess_data["extra"]["addresses"]) + recipients = set() + domains: dict[str, list[AddressType]] = {} + for address in data.addresses: + if address.jid is None: + raise NotImplementedError("Non JID addresses are not supported yet.") + recipients.add(address.jid) + try: + domains[address.jid.host].append(address) + except KeyError: + domains[address.jid.host] = [address] + + to_recipient_jid = mess_data["to"] + + if to_recipient_jid.user and to_recipient_jid not in recipients: + # If the main recipient is not a service (i.e. it has a "user" part), we want + # to move it to the XEP-0033's "to" addresses, so we can use the multicast + # service for "to" attribute. + to_recipient_addr = AddressType(jid=to_recipient_jid) + if data.to is None: + data.to = [to_recipient_addr] + else: + data.to.insert(0, to_recipient_addr) + recipients.add(to_recipient_jid) + domains.setdefault(to_recipient_jid.host, []).append(to_recipient_addr) + + # XXX: If our server doesn't handle multicast, we don't check sub-services as + # requested in §2.2, because except if there is a special arrangement with the + # server, a service at a sub-domain can't send message in the name of the main + # domain (e.g. "multicast.example.org" can't send message from + # "juliet@example.org"). So the specification is a bit dubious here, and we only + # use the main server multicast feature if it's present. + if not await self.host.memory.disco.has_feature( + client, NS_ADDRESS, client.server_jid + ): + # No multicast service + log.warning( + _( + f"Server of {client.profile} does not support XEP-0033 " + f"({PLUGIN_INFO[C.PI_IMPORT_NAME]}). We will send all messages ourselves." + ) + ) + await self.deliver_messages(client, mess_data, data, domains) + await self._stop_if_all_delivered(client, mess_data, data) + else: + # XXX: We delived ourself to multicast services because it's not correctly + # handled by some multicast services, notably by Prosody mod_addresses. + # FIXME: Only do this workaround for known incomplete implementations. + # TODO: remove this workaround when known implementations have been completed. + if mess_data["to"] != client.server_jid: + # We send the message to our server which will distribute it to the right + # locations. The initial ``to`` jid has been moved to ``data.to`` above. + # FIXME: When sub-services issue is properly handler, a sub-service JID + # supporting multicast should be allowed here. + mess_data["to"] = client.server_jid + await self.deliver_messages( + client, mess_data, data, domains, multicast_only=True + ) + await self._stop_if_all_delivered(client, mess_data, data) + + message_elt = mess_data["xml"] + message_elt["to"] = str(mess_data["to"]) + message_elt.addChild(data.to_element()) + return mess_data + + async def deliver_messages( + self, + client, + mess_data: MessageData, + addr_data: AddressesData, + domains: dict[str, list[AddressType]], + multicast_only: bool = False, + ) -> None: + """Send messages to requested recipients. + + If a domain handles multicast, a single message will be send there. + @param client: Client session. + @param mess_data: Messsa data. + @param addr_data: XEP-0033 addresses data. + @param domains: Domain to addresses map. + Note that that the addresses instances in this argument must be the same as in + ``addr_data`` (There ``delivered`` status will be manipulated). + @param multicast_only: if True, only multicast domain will be delivered. + """ + # We'll modify delivered status, so we keep track here of addresses which have + # already be delivered. + already_delivered = [a for a in addr_data.addresses if a.delivered] + multicast_domains = set() + for domain, domain_addresses in domains.items(): + if domain == client.server_jid.host: + + # ``client.server_jid`` is discarded to avoid sending twice the same + # message. ``multicast_only`` flag is set when the server supports + # multicast, so the message will be sent to it at the end of the workflow. + continue + if len(domain_addresses) > 1: + # For domains with multiple recipients, we check if we they support + # multicast and so if we can deliver to them directly. + if await self.host.memory.disco.has_feature( + client, NS_ADDRESS, jid.JID(domain) + ): + multicast_domains.add(domain) + + # We remove bcc, they have a special handling. + bcc = addr_data.bcc or [] + addr_data.bcc = None + + # Mark all addresses as "delivered" upfront, even if some won't actually be sent + # by us (when multicast_only is set). This flag signals to multicast services that + # they shouldn't handle these addresses. We'll remove the "delivered" status from + # undelivered addresses post-delivery. + for address in addr_data.addresses: + address.delivered = True + + # First, we send multicast messages. + for domain in multicast_domains: + something_to_deliver = False + for address in domains[domain]: + if address in already_delivered: + continue + # We need to mark as non delivered, so the multicast service will deliver + # itself. + address.delivered = False + something_to_deliver = True + + if not something_to_deliver: + continue + + domain_bcc = [a for a in bcc if a.jid and a.jid.host == domain] + message_elt = element_copy(mess_data["xml"]) + # The service must only see BCC from its own domain. + addr_data.bcc = domain_bcc + message_elt.addChild(addr_data.to_element()) + message_elt["to"] = domain + await client.a_send(message_elt) + for address in domains[domain] + domain_bcc: + # Those addresses have now been delivered. + address.delivered = True + + if multicast_only: + # Only addresses from multicast domains must be marked as delivered. + for address in addr_data.addresses: + if ( + address.jid is not None + and address.jid.host not in multicast_domains + and address not in already_delivered + ): + address.delivered = None + + # We have delivered to all multicast services, we stop here. + # But first we need to restore BCC, without the delivered ones. + addr_data.bcc = [a for a in bcc if not a.delivered] + return + + # Then BCC + for address in bcc: + if address in already_delivered: + continue + if address.jid is None: + raise NotImplementedError( + f"Sending to non JID address is not supported yet" + ) + if address.jid.host in multicast_domains: + # Address has already be handled by a multicast domain + continue + message_elt = element_copy(mess_data["xml"]) + # The recipient must only get its own BCC + addr_data.bcc = [address] + message_elt.addChild(addr_data.to_element()) + message_elt["to"] = address.jid.full() + await client.a_send(message_elt) + + # BCC address must be removed. + addr_data.bcc = None + + # and finally, other ones. + message_elt = mess_data["xml"] + message_elt.addChild(addr_data.to_element()) + non_bcc_addresses = (addr_data.to or []) + (addr_data.cc or []) + for address in non_bcc_addresses: + if address in already_delivered: + continue + if address.jid is None: + raise NotImplementedError( + f"Sending to non JID address is not supported yet" + ) + if address.jid.host in multicast_domains: + # Multicast domains have already been delivered. + continue + message_elt["to"] = address.jid.full() + await client.a_send(message_elt) + def send_message_trigger( self, client, mess_data, pre_xml_treatments, post_xml_treatments - ): + ) -> Literal[True]: """Process the XEP-0033 related data to be sent""" - profile = client.profile - - def treatment(mess_data): - if not "address" in mess_data["extra"]: - return mess_data - - def disco_callback(entities): - if not entities: - log.warning( - _("XEP-0033 is being used but the server doesn't support it!") - ) - raise failure.Failure(exceptions.CancelError("Cancelled by XEP-0033")) - if mess_data["to"] not in entities: - expected = _(" or ").join([entity.userhost() for entity in entities]) - log.warning( - _( - "Stanzas using XEP-0033 should be addressed to %(expected)s, not %(current)s!" - ) - % {"expected": expected, "current": mess_data["to"]} - ) - log.warning( - _( - "TODO: addressing has been fixed by the backend... fix it in the frontend!" - ) - ) - mess_data["to"] = list(entities)[0].userhostJID() - element = mess_data["xml"].addElement("addresses", NS_ADDRESS) - entries = [ - entry.split(":") - for entry in mess_data["extra"]["address"].split("\n") - if entry != "" - ] - for type_, jid_ in entries: - element.addChild( - domish.Element( - (None, "address"), None, {"type": type_, "jid": jid_} - ) - ) - # when the prosody plugin is completed, we can immediately return mess_data from here - self.send_and_store_message(mess_data, entries, profile) - log.debug("XEP-0033 took over") - raise failure.Failure(exceptions.CancelError("Cancelled by XEP-0033")) - - d = self.host.find_features_set(client, [NS_ADDRESS]) - d.addCallbacks(disco_callback, lambda __: disco_callback(None)) - return d - - post_xml_treatments.addCallback(treatment) + post_xml_treatments.addCallback( + lambda mess_data: defer.ensureDeferred( + self._handle_addresses(client, mess_data) + ) + ) return True - def send_and_store_message(self, mess_data, entries, profile): - """Check if target servers support XEP-0033, send and store the messages - @return: a friendly failure to let the core know that we sent the message already - - Later we should be able to remove this method because: - # XXX: sending the messages should be done by the local server - # FIXME: for now we duplicate the messages in the history for each recipient, this should change - # FIXME: for now we duplicate the echoes to the sender, this should also change - Ideas: - - fix Prosody plugin to check if target server support the feature - - redesign the database to save only one entry to the database - - change the message_new signal to eventually pass more than one recipient - """ - client = self.host.get_client(profile) - - def send(mess_data, skip_send=False): - d = defer.Deferred() - if not skip_send: - d.addCallback( - lambda ret: defer.ensureDeferred(client.send_message_data(ret)) - ) - d.addCallback( - lambda ret: defer.ensureDeferred(client.message_add_to_history(ret)) - ) - d.addCallback(client.message_send_to_bridge) - d.addErrback(lambda failure: failure.trap(exceptions.CancelError)) - return d.callback(mess_data) - - def disco_callback(entities, to_jid_s): - history_data = copy.deepcopy(mess_data) - history_data["to"] = JID(to_jid_s) - history_data["xml"]["to"] = to_jid_s - if entities: - if entities not in self.internal_data[timestamp]: - sent_data = copy.deepcopy(mess_data) - sent_data["to"] = JID(JID(to_jid_s).host) - sent_data["xml"]["to"] = JID(to_jid_s).host - send(sent_data) - self.internal_data[timestamp].append(entities) - # we still need to fill the history and signal the echo... - send(history_data, skip_send=True) - else: - # target server misses the addressing feature - send(history_data) - - def errback(failure, to_jid): - disco_callback(None, to_jid) - - timestamp = time() - self.internal_data[timestamp] = [] - defer_list = [] - for type_, jid_ in entries: - d = defer.Deferred() - d.addCallback( - self.host.find_features_set, client=client, jid_=JID(JID(jid_).host) - ) - d.addCallbacks( - disco_callback, errback, callbackArgs=[jid_], errbackArgs=[jid_] - ) - d.callback([NS_ADDRESS]) - defer_list.append(d) - d = defer.Deferred().addCallback(lambda __: self.internal_data.pop(timestamp)) - defer.DeferredList(defer_list).chainDeferred(d) - - def message_received_trigger(self, client, message, post_treat): - """In order to save the addressing information in the history""" - - def post_treat_addr(data, addresses): - data["extra"]["addresses"] = "" - for address in addresses: - # Depending how message has been constructed, we could get here - # some noise like "\n " instead of an address element. - if isinstance(address, domish.Element): - data["extra"]["addresses"] += "%s:%s\n" % ( - address["type"], - address["jid"], - ) - return data + def message_received_trigger( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + post_treat: defer.Deferred, + ) -> Literal[True]: + """Parse addresses information and add them to message data.""" try: - addresses = next(message.elements(NS_ADDRESS, "addresses")) - except StopIteration: - pass # no addresses + addresses = AddressesData.from_element(message_elt) + except exceptions.NotFound: + pass else: - post_treat.addCallback(post_treat_addr, addresses.children) + + def post_treat_addr(mess_data: MessageData): + mess_data["extra"]["addresses"] = addresses.model_dump( + mode="json", exclude_none=True + ) + return mess_data + + post_treat.addCallback(post_treat_addr) return True def get_handler(self, client): @@ -234,8 +527,12 @@ self.host = plugin_parent.host self.profile = profile - def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + def getDiscoInfo( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoFeature]: return [disco.DiscoFeature(NS_ADDRESS)] - def getDiscoItems(self, requestor, target, nodeIdentifier=""): + def getDiscoItems( + self, requestor: jid.JID, target: jid.JID, nodeIdentifier: str = "" + ) -> list[disco.DiscoItem]: return [] diff -r 4cd4922de876 -r 94e0968987cd libervia/backend/plugins/plugin_xep_0055.py --- a/libervia/backend/plugins/plugin_xep_0055.py Thu Sep 26 16:11:56 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0055.py Thu Sep 26 16:12:01 2024 +0200 @@ -110,7 +110,7 @@ @return: list[jid.JID] """ client = self.host.get_client(profile) - d = self.host.find_features_set(client, [NS_SEARCH]) + d = defer.ensureDeferred(self.host.find_features_set(client, [NS_SEARCH])) return d.addCallback(lambda set_: list(set_)) ## Main search UI (menu item callback) ##