# HG changeset patch # User Goffi # Date 1712401043 -7200 # Node ID e11b13418ba6f069a6bdb41afeb6263ac214ade5 # Parent 314d3c02bb67b7133cc57dce0f3b9af7a4978f1b plugin XEP-0353, XEP-0234, jingle: WebRTC data channel signaling implementation: Implement XEP-0343: Signaling WebRTC Data Channels in Jingle. The current version of the XEP (0.3.1) has no implementation and contains some flaws. After discussing this on xsf@, Daniel (from Conversations) mentioned that they had a sprint with Larma (from Dino) to work on another version and provided me with this link: https://gist.github.com/iNPUTmice/6c56f3e948cca517c5fb129016d99e74 . I have used it for my implementation. This implementation reuses work done on Jingle A/V call (notably XEP-0176 and XEP-0167 plugins), with adaptations. When used, XEP-0234 will not handle the file itself as it normally does. This is because WebRTC has several implementations (browser for web interface, GStreamer for others), and file/data must be handled directly by the frontend. This is particularly important for web frontends, as the file is not sent from the backend but from the end-user's browser device. Among the changes, there are: - XEP-0343 implementation. - `file_send` bridge method now use serialised dict as output. - New `BaseTransportHandler.is_usable` method which get content data and returns a boolean (default to `True`) to tell if this transport can actually be used in this context (when we are initiator). Used in webRTC case to see if call data are available. - Support of `application` media type, and everything necessary to handle data channels. - Better confirmation message, with file name, size and description when available. - When file is accepted in preflight, it is specified in following `action_new` signal for actual file transfer. This way, frontend can avoid the display or 2 confirmation messages. - XEP-0166: when not specified, default `content` name is now its index number instead of a UUID. This follows the behaviour of browsers. - XEP-0353: better handling of events such as call taken by another device. - various other updates. rel 441 diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/core/main.py --- a/libervia/backend/core/main.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/core/main.py Sat Apr 06 12:57:23 2024 +0200 @@ -727,7 +727,9 @@ def register_namespace(self, short_name, namespace): """associate a namespace to a short name""" if short_name in self.ns_map: - raise exceptions.ConflictError("this short name is already used") + raise exceptions.ConflictError( + f"This short name {short_name!r} is already used." + ) log.debug(f"registering namespace {short_name} => {namespace}") self.ns_map[short_name] = namespace diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/memory/cache.py --- a/libervia/backend/memory/cache.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/memory/cache.py Sat Apr 06 12:57:23 2024 +0200 @@ -97,8 +97,8 @@ to_delete.add(cache_data_file) elif cache_data.eol < now: log.debug( - "Purging expired cache file {cache_data_file!r} (expired for " - "{time}s)".format(time=int(time.time() - cache_data.eol)) + f"Purging expired cache file {cache_data_file} (expired for " + f"{int(time.time() - cache_data.eol)}s)" ) to_delete.add(cache_data_file) seen.add(cached_file) diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_misc_file.py --- a/libervia/backend/plugins/plugin_misc_file.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_misc_file.py Sat Apr 06 12:57:23 2024 +0200 @@ -17,15 +17,19 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from functools import partial import os import os.path -from functools import partial +from pathlib import Path + from twisted.internet import defer from twisted.words.protocols.jabber import jid -from libervia.backend.core.i18n import _, D_ + +from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger -from libervia.backend.core import exceptions from libervia.backend.tools import xml_tools from libervia.backend.tools import stream from libervia.backend.tools import utils @@ -73,7 +77,7 @@ "file_send", ".plugin", in_sign="ssssss", - out_sign="a{ss}", + out_sign="s", method=self._file_send, async_=True, ) @@ -96,31 +100,40 @@ profile: str = C.PROF_KEY_NONE ) -> defer.Deferred: client = self.host.get_client(profile) - return defer.ensureDeferred(self.file_send( + d = defer.ensureDeferred(self.file_send( client, jid.JID(peer_jid_s), filepath, name or None, file_desc or None, data_format.deserialise(extra_s) )) + d.addCallback(data_format.serialise) + return d async def file_send( - self, client, peer_jid, filepath, filename=None, file_desc=None, extra=None - ): + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + filepath: str|Path, + filename: str|None=None, + file_desc: str|None=None, + extra: dict|None=None + ) -> dict: """Send a file using best available method - @param peer_jid(jid.JID): jid of the destinee - @param filepath(str): absolute path to the file - @param filename(unicode, None): name to use, or None to find it from filepath - @param file_desc(unicode, None): description of the file + @param peer_jid: jid of the destinee + @param filepath: absolute path to the file + @param filename: name to use, or None to find it from filepath + @param file_desc: description of the file @param profile: %(doc_profile)s - @return (dict): action dictionary, with progress id in case of success, else + @return: action dictionary, with progress id in case of success, else xmlui message """ - if not os.path.isfile(filepath): + filepath = Path(filepath) + if not filepath.is_file(): raise exceptions.DataError("The given path doesn't link to a file") if not filename: - filename = os.path.basename(filepath) or "_" - for manager, priority in self._file_managers: + filename = filepath.name + for manager, __ in self._file_managers: if await utils.as_deferred(manager.can_handle_file_send, - client, peer_jid, filepath): + client, peer_jid, str(filepath)): try: method_name = manager.name except AttributeError: @@ -131,8 +144,8 @@ ) ) try: - progress_id = await utils.as_deferred( - manager.file_send, client, peer_jid, filepath, filename, file_desc, + file_data= await utils.as_deferred( + manager.file_send, client, peer_jid, str(filepath), filename, file_desc, extra ) except Exception as e: @@ -146,7 +159,11 @@ ) ) continue - return {"progress": progress_id} + if "progress" not in file_data: + raise exceptions.InternalError( + '"progress" should be set in "file_send" returned file data dict.' + ) + return file_data msg = "Can't find any method to send file to {jid}".format(jid=peer_jid.full()) log.warning(msg) return { diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0096.py --- a/libervia/backend/plugins/plugin_xep_0096.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0096.py Sat Apr 06 12:57:23 2024 +0200 @@ -26,6 +26,7 @@ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger from libervia.backend.core import exceptions +from libervia.backend.core.xmpp import SatXMPPEntity from libervia.backend.tools import xml_tools from libervia.backend.tools import stream @@ -291,16 +292,25 @@ client, jid.JID(peer_jid_s), filepath, name or None, desc or None ) - def file_send(self, client, peer_jid, filepath, name=None, desc=None, extra=None): - """Send a file using XEP-0096 + def file_send( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + filepath: str, + name: str|None = None, + desc: str|None = None, + extra: dict|None = None + ) -> dict: + """Send a file using XEP-0096. - @param peer_jid(jid.JID): recipient - @param filepath(str): absolute path to the file to send - @param name(unicode): name of the file to send + @param peer_jid: recipient + @param filepath: absolute path to the file to send + @param name: name of the file to send name must not contain "/" characters @param desc: description of the file @param extra: not used here - @return: an unique id to identify the transfer + @return: a dict with the key "progress" containing an unique id to identify the + transfer """ feature_elt = self.host.plugins["XEP-0020"].propose_features( {"stream-method": self.managed_stream_m}, namespace=None @@ -325,7 +335,7 @@ ) args = [filepath, sid, size, client] offer_d.addCallbacks(self._file_cb, self._file_eb, args, None, args) - return sid + return {"progress": sid} def _file_cb(self, result_tuple, filepath, sid, size, client): iq_elt, si_elt = result_tuple diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0166/__init__.py --- a/libervia/backend/plugins/plugin_xep_0166/__init__.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/__init__.py Sat Apr 06 12:57:23 2024 +0200 @@ -534,7 +534,7 @@ f"No application registered for {namespace}" ) - def get_content_data(self, content: dict) -> ContentData: + def get_content_data(self, content: dict, content_idx: int) -> ContentData: """"Retrieve application and its argument from content""" app_ns = content["app_ns"] try: @@ -547,7 +547,7 @@ try: content_name = content["name"] except KeyError: - content_name = content["name"] = str(uuid.uuid4()) + content_name = content["name"] = str(content_idx) return ContentData( application, app_args, @@ -611,15 +611,16 @@ session_contents = session["contents"] - for content in contents: + for content_idx, content in enumerate(contents): # we get the application plugin - content_data = self.get_content_data(content) + content_data = self.get_content_data(content, content_idx) # and the transport plugin transport_type = content.get("transport_type", XEP_0166.TRANSPORT_STREAMING) - try: - transport = self._type_transports[transport_type][0] - except IndexError: + for transport in self._type_transports[transport_type]: + if transport.handler.is_usable(client, content_data): + break + else: raise exceptions.InternalError( "No transport registered for {}".format(transport_type) ) diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0166/models.py --- a/libervia/backend/plugins/plugin_xep_0166/models.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0166/models.py Sat Apr 06 12:57:23 2024 +0200 @@ -156,6 +156,18 @@ class BaseTransportHandler(abc.ABC): + def is_usable(self, client, content_data: "ContentData") -> bool: + """Return True if this transport is usable with given content + + This method provides a default implementation that always returns True. It can + be overridden in subclasses to provide custom usability logic. + + @param client: SatXMPPEntity instance + @param content_data: Jingle content data. + @return: True if the transport is usable with this content, False otherwise + """ + return True + @abc.abstractmethod def jingle_session_init( self, diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0167/__init__.py --- a/libervia/backend/plugins/plugin_xep_0167/__init__.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Sat Apr 06 12:57:23 2024 +0200 @@ -81,6 +81,7 @@ mapping.host = host self._j = host.plugins["XEP-0166"] self._j.register_application(NS_JINGLE_RTP, self) + host.register_namespace("jingle-rtp", NS_JINGLE_RTP) host.bridge.add_method( "call_start", ".plugin", @@ -141,6 +142,50 @@ ) ) + def parse_call_data(self, call_data: dict) -> dict: + """Parse ``call_data`` and return corresponding contents end metadata""" + metadata = call_data.get("metadata") or {} + + if "sdp" in call_data: + sdp_data = mapping.parse_sdp(call_data["sdp"]) + to_delete = set() + for media, data in sdp_data.items(): + if media not in ("audio", "video", "application"): + continue + to_delete.add(media) + media_type, media_data = media, data + call_data[media_type] = media_data["application_data"] + transport_data = media_data["transport_data"] + try: + call_data[media_type]["fingerprint"] = transport_data["fingerprint"] + except KeyError: + log.warning("fingerprint is missing") + pass + try: + call_data[media_type]["id"] = media_data["id"] + except KeyError: + log.warning(f"no media ID found for {media_type}: {media_data}") + # FIXME: the 2 values below are linked to XEP-0343, they should be added + # there instead, maybe with some new trigger? + for key in ("sctp-port","max-message-size"): + value = transport_data.get(key) + if value is not None: + metadata[key] = value + try: + call_data[media_type]["ice-candidates"] = transport_data.get( + "candidates", [] + ) + metadata["ice-ufrag"] = transport_data["ufrag"] + metadata["ice-pwd"] = transport_data["pwd"] + except KeyError: + log.warning("ICE data are missing from SDP") + continue + for media in to_delete: + del sdp_data[media] + metadata.update(sdp_data.get("metadata", {})) + + return metadata + async def call_start( self, client: SatXMPPEntity, @@ -166,46 +211,9 @@ @raises exceptions.DataError: If media data is invalid or duplicate content name (mid) is found. """ + sid = str(uuid.uuid4()) + metadata = self.parse_call_data(call_data) contents = [] - metadata = call_data.get("metadata") or {} - - if "sdp" in call_data: - sdp_data = mapping.parse_sdp(call_data["sdp"]) - to_delete = set() - for media, data in sdp_data.items(): - if media not in ("audio", "video"): - continue - to_delete.add(media) - media_type, media_data = media, data - call_data[media_type] = media_data["application_data"] - transport_data = media_data["transport_data"] - try: - call_data[media_type]["fingerprint"] = transport_data["fingerprint"] - except KeyError: - log.warning("fingerprint is missing") - pass - try: - call_data[media_type]["id"] = media_data["id"] - except KeyError: - log.warning(f"no media ID found for {media_type}: {media_data}") - try: - call_data[media_type]["ice-candidates"] = transport_data.get( - "candidates", [] - ) - metadata["ice-ufrag"] = transport_data["ufrag"] - metadata["ice-pwd"] = transport_data["pwd"] - except KeyError: - log.warning("ICE data are missing from SDP") - continue - for media in to_delete: - del sdp_data[media] - metadata.update(sdp_data.get("metadata", {})) - - call_type = ( - C.META_SUBTYPE_CALL_VIDEO - if "video" in call_data - else C.META_SUBTYPE_CALL_AUDIO - ) seen_names = set() for media, media_data in call_data.items(): @@ -235,7 +243,12 @@ contents.append(content) if not contents: raise exceptions.DataError("no valid media data found: {call_data}") - sid = str(uuid.uuid4()) + + call_type = ( + C.META_SUBTYPE_CALL_VIDEO if "video" in call_data + else C.META_SUBTYPE_CALL_AUDIO + ) + defer.ensureDeferred( self._j.initiate( client, @@ -295,7 +308,7 @@ @param client: The client entity. @param session: The Jingle session. - @param media_type: Type of media (audio or video). + @param call_type: Type of media (audio or video). @return: True if the call has been accepted """ @@ -393,15 +406,13 @@ self, client: SatXMPPEntity, session: dict, cancel_error: exceptions.CancelError ) -> None: """The call has been rejected""" - # call_ended is use to send the signal only once even if there are audio and video - # contents + # call_ended is used to send the signal only once even if there are audio and + # video contents call_ended = session.get("call_ended", False) if call_ended: return - data = {"reason": getattr(cancel_error, "reason", "cancelled")} - text = getattr(cancel_error, "text", None) - if text: - data["text"] = text + data = {"reason": getattr(cancel_error, "reason", None) or "cancelled"} + data["text"] = str(cancel_error) self.host.bridge.call_ended( session["id"], data_format.serialise(data), client.profile ) diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0167/mapping.py --- a/libervia/backend/plugins/plugin_xep_0167/mapping.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0167/mapping.py Sat Apr 06 12:57:23 2024 +0200 @@ -122,9 +122,13 @@ payload_types = media_data.get("payload_types", {}) # Generate m= line - transport = "UDP/TLS/RTP/SAVPF" - payload_type_ids = [str(pt_id) for pt_id in payload_types] - m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}" + if media == "application": + transport = "UDP/DTLS/SCTP" + m_line = f"m={media} {port} {transport} webrtc-datachannel" + else: + transport = "UDP/TLS/RTP/SAVPF" + payload_type_ids = [str(pt_id) for pt_id in payload_types] + m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}" sdp_lines.append(m_line) sdp_lines.append(f"c={network_type} {address_type} {connection_address}") @@ -269,12 +273,18 @@ if prefix == "m=": media_type = parts[0] port = int(parts[1]) - payload_types = {} - for payload_type_id in [int(pt_id) for pt_id in parts[3:]]: - payload_type = {"id": payload_type_id} - payload_types[payload_type_id] = payload_type + application_data = {"media": media_type} + if media_type in ("video", "audio"): + payload_types = application_data["payload_types"] = {} + for payload_type_id in [int(pt_id) for pt_id in parts[3:]]: + payload_type = {"id": payload_type_id} + payload_types[payload_type_id] = payload_type + elif media_type == "application": + if parts[3] != "webrtc-datachannel": + raise NotImplementedError( + f"{media_type!r} application is not supported" + ) - application_data = {"media": media_type, "payload_types": payload_types} transport_data = {"port": port} if fingerprint_data is not None: transport_data["fingerprint"] = fingerprint_data diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0176.py --- a/libervia/backend/plugins/plugin_xep_0176.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0176.py Sat Apr 06 12:57:23 2024 +0200 @@ -53,6 +53,8 @@ class XEP_0176(BaseTransportHandler): + namespace = NS_JINGLE_ICE_UDP + def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host @@ -282,6 +284,15 @@ content_name, context_elt=transport_elt, ) + self.host.trigger.point( + "XEP-0176_jingle_handler_send_buffer", + client, + session, + content_name, + content_data, + transport_elt, + iq_elt + ) await iq_elt.send() elif action == self._j.A_START: @@ -370,6 +381,8 @@ """ session = self._j.get_session(client, session_id) iq_elt: Optional[domish.Element] = None + content_name: str|None = None + content_data: dict|None = None for media_type, new_ice_data in media_ice_data.items(): if session["state"] == self._j.STATE_PENDING: @@ -420,7 +433,12 @@ ) if iq_elt is not None: + assert content_name is not None and content_data is not None try: + self.host.trigger.point( + "XEP-0176_ice_candidate_send", client, session, media_ice_data, + content_name, content_data, iq_elt + ) await iq_elt.send() except Exception as e: log.warning(f"Could not send new ICE candidates: {e}") diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0234.py --- a/libervia/backend/plugins/plugin_xep_0234.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0234.py Sat Apr 06 12:57:23 2024 +0200 @@ -38,8 +38,9 @@ from libervia.backend.tools import xml_tools from libervia.backend.tools import utils from libervia.backend.tools import stream -from libervia.backend.tools.common import date_utils +from libervia.backend.tools.common import data_format, date_utils from libervia.backend.tools.common import regex +from libervia.backend.tools.common.utils import get_human_size from .plugin_xep_0166 import BaseApplicationHandler @@ -60,7 +61,11 @@ C.PI_DESCRIPTION: _("""Implementation of Jingle File Transfer"""), } -EXTRA_ALLOWED = {"path", "namespace", "file_desc", "file_hash", "hash_algo"} +# TODO: use a Pydantic model for extra +EXTRA_ALLOWED = { + "path", "namespace", "file_desc", "file_hash", "hash_algo", "webrtc", "call_data", + "size", "media_type" +} Range = namedtuple("Range", ("offset", "length")) @@ -84,8 +89,8 @@ host.bridge.add_method( "file_jingle_send", ".plugin", - in_sign="ssssa{ss}s", - out_sign="", + in_sign="ssssss", + out_sign="s", method=self._file_send, async_=True, ) @@ -202,26 +207,68 @@ return self.build_file_element(client, **file_data) async def parse_file_element( - self, client, file_elt, file_data=None, given=False, parent_elt=None, - keep_empty_range=False): - """Parse a element and file dictionary accordingly + self, + client: SatXMPPEntity, + file_elt: domish.Element|None, + file_data: dict | None = None, + given: bool = False, + parent_elt: domish.Element | None = None, + keep_empty_range: bool = False + ) -> dict: + """Parse a element and updates file dictionary accordingly. + + @param client: The SatXMPPEntity instance. + @param file_elt: The file element to parse. + @param file_data: Dict where the data will be set. Data are overwritten if they + exists (see @return for details). + If None, a new dict is created. + @param given: If True, prefix hash key with "given_". + @param parent_elt: Parent of the file element. If set, file_elt must not be set. + @param keep_empty_range: If True, keep empty range (i.e. range when offset + and length are None). Useful to know if a peer_jid can handle range. + + @return: file_data which is an updated or newly created dictionary with the following keys: + **name** (str) + Name of the file. Defaults to "unnamed" if not provided, "--" if the name is + "..", or a sanitized version if it contains path separators. + + **file_hash** (str, optional) + Hash of the file. Prefixed with "given_" if `given` is True. + + **hash_algo** (str, optional) + Algorithm used for the file hash. - @param file_data(dict, None): dict where the data will be set - following keys will be set (and overwritten if they already exist): - name, file_hash, hash_algo, size, mime_type, desc, path, namespace, range - if None, a new dict is created - @param given(bool): if True, prefix hash key with "given_" - @param parent_elt(domish.Element, None): parent of the file element - if set, file_elt must not be set - @param keep_empty_range(bool): if True, keep empty range (i.e. range when offset - and length are None). - Empty range is useful to know if a peer_jid can handle range - @return (dict): file_data - @trigger XEP-0234_parseFileElement(file_elt, file_data): can be used to parse new - elements - @raise exceptions.NotFound: there is not element in parent_elt - @raise exceptions.DataError: if file_elt uri is not NS_JINGLE_FT + **size** (int, optional) + Size of the file in bytes. + + **mime_type** (str, optional) + Media type of the file. + + **desc** (str, optional) + Description of the file. + + **path** (str, optional) + Path of the file. + + **namespace** (str, optional) + Namespace associated with the file. + + **transfer_range** (Range, optional) + Range of the file transfer. Present only if offset or length are specified, + or if `keep_empty_range` is True. + + **modified** (datetime, optional) + Last modified date of the file. + + @trigger XEP-0234_parseFileElement(file_elt, file_data): Can be used to parse new + elements. + + @raise exceptions.NotFound: Raised if there is no element in `parent_elt` + or if required elements are missing. + @raise exceptions.DataError: Raised if `file_elt` URI is not NS_JINGLE_FT or if + the file element is invalid. """ + if parent_elt is not None: if file_elt is not None: raise exceptions.InternalError( @@ -234,7 +281,7 @@ else: if not file_elt or file_elt.uri != NS_JINGLE_FT: raise exceptions.DataError( - "invalid element: {stanza}".format(stanza=file_elt.toXml()) + f"invalid element: {file_elt.toXml() if file_elt else 'None'}" ) if file_data is None: @@ -247,11 +294,12 @@ pass name = file_data.get("name") - if name == "..": + if name is None: + file_data["name"] = "unnamed" + elif name == "..": # we don't want to go to parent dir when joining to a path - name = "--" - file_data["name"] = name - elif name is not None and ("/" in name or "\\" in name): + file_data["name"] = "--" + elif "/" in name or "\\" in name: file_data["name"] = regex.path_escape(name) try: @@ -302,33 +350,42 @@ def _file_send( self, - peer_jid, - filepath, - name="", - file_desc="", - extra=None, - profile=C.PROF_KEY_NONE, - ): + peer_jid_s: str, + filepath: str, + name: str, + file_desc: str, + extra_s: str, + profile: str, + ) -> defer.Deferred[str]: client = self.host.get_client(profile) - return defer.ensureDeferred(self.file_send( + extra = data_format.deserialise(extra_s) + d = defer.ensureDeferred(self.file_send( client, - jid.JID(peer_jid), + jid.JID(peer_jid_s), filepath, name or None, file_desc or None, - extra or None, + extra, )) + d.addCallback(data_format.serialise) + return d async def file_send( - self, client, peer_jid, filepath, name, file_desc=None, extra=None - ): + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + filepath: str, + name: str|None, + file_desc: str|None = None, + extra: dict|None = None + ) -> dict: """Send a file using jingle file transfer - @param peer_jid(jid.JID): destinee jid - @param filepath(str): absolute path of the file - @param name(unicode, None): name of the file - @param file_desc(unicode, None): description of the file - @return (D(unicode)): progress id + @param peer_jid: destinee jid + @param filepath: absolute path of the file + @param name: name of the file + @param file_desc: description of the file + @return: progress id """ progress_id_d = defer.Deferred() if extra is None: @@ -336,24 +393,34 @@ if file_desc is not None: extra["file_desc"] = file_desc encrypted = extra.pop("encrypted", False) - await self._j.initiate( + + content = { + "app_ns": NS_JINGLE_FT, + "senders": self._j.ROLE_INITIATOR, + "app_kwargs": { + "filepath": filepath, + "name": name, + "extra": extra, + "progress_id_d": progress_id_d, + }, + } + + await self.host.trigger.async_point( + "XEP-0234_file_jingle_send", + client, peer_jid, content + ) + + session_id = await self._j.initiate( client, peer_jid, - [ - { - "app_ns": NS_JINGLE_FT, - "senders": self._j.ROLE_INITIATOR, - "app_kwargs": { - "filepath": filepath, - "name": name, - "extra": extra, - "progress_id_d": progress_id_d, - }, - } - ], + [content], encrypted = encrypted ) - return await progress_id_d + progress_id = await progress_id_d + return { + "progress": progress_id, + "session_id": session_id + } def _file_jingle_request( self, peer_jid, filepath, name="", file_hash="", hash_algo="", extra=None, @@ -391,26 +458,76 @@ else: if hash_algo is not None: raise ValueError(_("file_hash must be set if hash_algo is set")) + + content = { + "app_ns": NS_JINGLE_FT, + "senders": self._j.ROLE_RESPONDER, + "app_kwargs": { + "filepath": filepath, + "name": name, + "extra": extra, + "progress_id_d": progress_id_d, + }, + } + await self._j.initiate( client, peer_jid, - [ - { - "app_ns": NS_JINGLE_FT, - "senders": self._j.ROLE_RESPONDER, - "app_kwargs": { - "filepath": filepath, - "name": name, - "extra": extra, - "progress_id_d": progress_id_d, - }, - } - ], + [content], ) return await progress_id_d # jingle callbacks + def _get_confirm_msg( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + file_data: dict + ) -> tuple[bool, str, str]: + """Get confirmation message to display to user. + + This is the message to show when a file sending request is received.""" + file_name = file_data.get('name') + file_size = file_data.get('size') + + if file_name: + file_name_msg = D_('wants to send you the file "{file_name}"').format( + file_name=file_name + ) + else: + file_name_msg = D_('wants to send you an unnamed file') + + if file_size is not None: + file_size_msg = D_("which has a size of {file_size_human}").format( + file_size_human=get_human_size(file_size) + ) + else: + file_size_msg = D_("which has an unknown size") + + file_description = file_data.get('desc') + if file_description: + description_msg = " Description: {}.".format(file_description) + else: + description_msg = "" + + if client.roster and peer_jid.userhostJID() not in client.roster: + is_in_roster = False + confirm_msg = D_( + "Somebody not in your contact list ({peer_jid}) {file_name_msg} {file_size_msg}.{description_msg} " + "Accepting this could leak your presence and possibly your IP address. Do you accept?" + ).format(peer_jid=peer_jid, file_name_msg=file_name_msg, file_size_msg=file_size_msg, description_msg=description_msg) + confirm_title = D_("File sent from an unknown contact") + else: + is_in_roster = True + confirm_msg = D_( + "{peer_jid} {file_name_msg} {file_size_msg}.{description_msg} Do you " + "accept?" + ).format(peer_jid=peer_jid, file_name_msg=file_name_msg, file_size_msg=file_size_msg, description_msg=description_msg) + confirm_title = D_("File Proposed") + + return (is_in_roster, confirm_msg, confirm_title) + async def jingle_preflight( self, client: SatXMPPEntity, @@ -435,26 +552,28 @@ # transfer (metadata are not used). We must check with other clients what is # actually send, and if XEP-0353 is used, and do a better integration. - if client.roster and peer_jid.userhostJID() not in client.roster: - confirm_msg = D_( - "Somebody not in your contact list ({peer_jid}) wants to do a " - '"{human_name}" session with you, this would leak your presence and ' - "possibly you IP (internet localisation), do you accept?" - ).format(peer_jid=peer_jid, human_name=self.human_name) - confirm_title = D_("File sent from an unknown contact") + try: + file_elt = next(description_elt.elements(NS_JINGLE_FT, "file")) + except StopIteration: + file_data = {} + else: + file_data = await self.parse_file_element(client, file_elt) + + is_in_roster, confirm_msg, confirm_title = self._get_confirm_msg( + client, peer_jid, file_data + ) + if is_in_roster: + action_type = C.META_TYPE_CONFIRM + action_subtype = C.META_TYPE_FILE + else: action_type = C.META_TYPE_NOT_IN_ROSTER_LEAK action_subtype = None - else: - confirm_msg = D_( - "{peer_jid} wants to send a file to you, do you accept?" - ).format(peer_jid=peer_jid) - confirm_title = D_("File Proposed") - action_type = C.META_TYPE_CONFIRM - action_subtype = C.META_TYPE_FILE + action_extra = { "type": action_type, "session_id": session_id, "from_jid": peer_jid.full(), + "file_data": file_data } if action_subtype is not None: action_extra["subtype"] = action_subtype @@ -506,6 +625,9 @@ progress_id_d.callback(self.get_progress_id(session, content_name)) content_data = session["contents"][content_name] application_data = content_data["application_data"] + if extra.get("webrtc"): + transport_data = content_data["transport_data"] + transport_data["webrtc"] = True assert "file_path" not in application_data application_data["file_path"] = filepath file_data = application_data["file_data"] = {} @@ -520,10 +642,15 @@ file_data["date"] = utils.xmpp_date() file_data["desc"] = extra.pop("file_desc", "") file_data["name"] = name - mime_type = mimetypes.guess_type(name, strict=False)[0] + mime_type = ( + file_data.get("media_type") or mimetypes.guess_type(name, strict=False)[0] + ) if mime_type is not None: file_data["mime_type"] = mime_type - file_data["size"] = os.path.getsize(filepath) + size = extra.pop("size", None) + if size is None: + size = os.path.getsize(filepath) + file_data["size"] = size if "namespace" in extra: file_data["namespace"] = extra["namespace"] if "path" in extra: @@ -557,7 +684,7 @@ session: dict, content_name: str, desc_elt: domish.Element, - ): + ) -> bool: """This method request confirmation for a jingle session""" content_data = session["contents"][content_name] senders = content_data["senders"] @@ -613,10 +740,21 @@ return False async def _file_receiving_request_conf( - self, client, session, content_data, content_name, file_data, file_elt - ): + self, + client: SatXMPPEntity, + session: dict, + content_data: dict, + content_name: str, + file_data: dict, + file_elt: domish.Element + ) -> bool: """parse file_elt, and handle user permission/file opening""" - await self.parse_file_element(client, file_elt, file_data, given=True) + transport_data = content_data["transport_data"] + webrtc = transport_data.get("webrtc", False) + # file may have been already accepted in preflight + file_accepted = session.get("file_accepted", False) + file_data = await self.parse_file_element(client, file_elt, file_data, given=True) + # FIXME: looks redundant with code done in self.parse_file_element try: hash_algo, file_data["given_file_hash"] = self._hash.parse_hash_elt(file_elt) except exceptions.NotFound: @@ -625,32 +763,52 @@ except exceptions.NotFound: raise failure.Failure(exceptions.DataError) - if hash_algo is not None: - file_data["hash_algo"] = hash_algo - file_data["hash_hasher"] = hasher = self._hash.get_hasher(hash_algo) - file_data["data_cb"] = lambda data: hasher.update(data) - - try: - file_data["size"] = int(file_data["size"]) - except ValueError: - raise failure.Failure(exceptions.DataError) - - name = file_data["name"] - if "/" in name or "\\" in name: - log.warning( - "File name contain path characters, we replace them: {}".format(name) - ) - file_data["name"] = name.replace("/", "_").replace("\\", "_") - - content_data["application_data"]["file_data"] = file_data - - # now we actualy request permission to user - # deferred to track end of transfer finished_d = content_data["finished_d"] = defer.Deferred() - confirmed = await self._f.get_dest_dir( - client, session["peer_jid"], content_data, file_data, stream_object=True - ) + + if webrtc: + peer_jid = session["peer_jid"] + __, confirm_msg, confirm_title = self._get_confirm_msg( + client, peer_jid, file_data + ) + action_extra = { + "webrtc": webrtc, + "file_accepted": file_accepted, + "type": C.META_TYPE_FILE, + "session_id": session["id"], + "from_jid": peer_jid.full(), + "file_data": file_data, + "progress_id": file_data["progress_id"], + } + # we need a confirm dialog here and not a file dialog, as the file handling is + # managed by the frontends with webRTC. + confirmed = await xml_tools.defer_confirm( + self.host, + confirm_msg, + confirm_title, + profile=client.profile, + action_extra=action_extra + ) + else: + + if hash_algo is not None: + file_data["hash_algo"] = hash_algo + file_data["hash_hasher"] = hasher = self._hash.get_hasher(hash_algo) + file_data["data_cb"] = lambda data: hasher.update(data) + + try: + file_data["size"] = int(file_data["size"]) + except ValueError: + raise failure.Failure(exceptions.DataError) + + content_data["application_data"]["file_data"] = file_data + + # now we actualy request permission to user + + confirmed = await self._f.get_dest_dir( + client, session["peer_jid"], content_data, file_data, stream_object=True + ) + if confirmed: await self.host.trigger.async_point( "XEP-0234_file_receiving_request_conf", @@ -660,6 +818,7 @@ finished_d.addCallbacks( self._finished_cb, self._finished_eb, args, None, args ) + return confirmed async def jingle_handler(self, client, action, session, content_name, desc_elt): @@ -678,53 +837,62 @@ file_elt.addElement("range") elif action == self._j.A_SESSION_ACCEPT: assert not "stream_object" in content_data - file_data = application_data["file_data"] - file_path = application_data["file_path"] - senders = content_data["senders"] - if senders != session["role"]: - # we are receiving the file - try: - # did the responder specified the size of the file? - file_elt = next(desc_elt.elements(NS_JINGLE_FT, "file")) - size_elt = next(file_elt.elements(NS_JINGLE_FT, "size")) - size = int(str(size_elt)) - except (StopIteration, ValueError): - size = None - # XXX: hash security is not critical here, so we just take the higher - # mandatory one - hasher = file_data["hash_hasher"] = self._hash.get_hasher() - progress_id = self.get_progress_id(session, content_name) - try: + transport_data = content_data["transport_data"] + if transport_data.get("webrtc"): + # WebRTC case is special, the file is transfered by the frontend + # implementation directly, there is nothing done in backend. + log.debug( + "We're using WebRTC datachannel, the frontend handles it from now on." + ) + else: + file_data = application_data["file_data"] + file_path = application_data["file_path"] + senders = content_data["senders"] + if senders != session["role"]: + # we are receiving the file + try: + # did the responder specified the size of the file? + file_elt = next(desc_elt.elements(NS_JINGLE_FT, "file")) + size_elt = next(file_elt.elements(NS_JINGLE_FT, "size")) + size = int(str(size_elt)) + except (StopIteration, ValueError): + size = None + # XXX: hash security is not critical here, so we just take the higher + # mandatory one + hasher = file_data["hash_hasher"] = self._hash.get_hasher() + progress_id = self.get_progress_id(session, content_name) + try: + content_data["stream_object"] = stream.FileStreamObject( + self.host, + client, + file_path, + mode="wb", + uid=progress_id, + size=size, + data_cb=lambda data: hasher.update(data), + ) + except Exception as e: + self.host.bridge.progress_error( + progress_id, C.PROGRESS_ERROR_FAILED, client.profile + ) + await self._j.terminate( + client, self._j.REASON_FAILED_APPLICATION, session) + raise e + else: + # we are sending the file + size = file_data["size"] + # XXX: hash security is not critical here, so we just take the higher + # mandatory one + hasher = file_data["hash_hasher"] = self._hash.get_hasher() content_data["stream_object"] = stream.FileStreamObject( self.host, client, file_path, - mode="wb", - uid=progress_id, + uid=self.get_progress_id(session, content_name), size=size, data_cb=lambda data: hasher.update(data), ) - except Exception as e: - self.host.bridge.progress_error( - progress_id, C.PROGRESS_ERROR_FAILED, client.profile - ) - await self._j.terminate( - client, self._j.REASON_FAILED_APPLICATION, session) - raise e - else: - # we are sending the file - size = file_data["size"] - # XXX: hash security is not critical here, so we just take the higher - # mandatory one - hasher = file_data["hash_hasher"] = self._hash.get_hasher() - content_data["stream_object"] = stream.FileStreamObject( - self.host, - client, - file_path, - uid=self.get_progress_id(session, content_name), - size=size, - data_cb=lambda data: hasher.update(data), - ) + finished_d = content_data["finished_d"] = defer.Deferred() args = [client, session, content_name, content_data] finished_d.addCallbacks(self._finished_cb, self._finished_eb, args, None, args) diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0260.py --- a/libervia/backend/plugins/plugin_xep_0260.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0260.py Sat Apr 06 12:57:23 2024 +0200 @@ -20,6 +20,7 @@ from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler log = getLogger(__name__) from libervia.backend.core import exceptions @@ -57,7 +58,7 @@ return "an error happened while trying to use the proxy" -class XEP_0260(object): +class XEP_0260(BaseTransportHandler): # TODO: udp handling def __init__(self, host): diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0261.py --- a/libervia/backend/plugins/plugin_xep_0261.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0261.py Sat Apr 06 12:57:23 2024 +0200 @@ -20,6 +20,7 @@ from libervia.backend.core.i18n import _ from libervia.backend.core.constants import Const as C from libervia.backend.core.log import getLogger +from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler log = getLogger(__name__) from wokkel import disco, iwokkel @@ -48,7 +49,7 @@ } -class XEP_0261(object): +class XEP_0261(BaseTransportHandler): NAMESPACE = NS_JINGLE_IBB # used by XEP-0260 plugin for transport-replace def __init__(self, host): diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0280.py --- a/libervia/backend/plugins/plugin_xep_0280.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0280.py Sat Apr 06 12:57:23 2024 +0200 @@ -139,9 +139,18 @@ # we replace the wrapping message with the CCed one # and continue the normal behaviour + message_elt["type"] = cc_message_elt.getAttribute("type", C.MESS_TYPE_NORMAL) + mess_id = cc_message_elt.getAttribute("id") + if mess_id: + message_elt[mess_id] = mess_id if carbons_elt.name == "received": message_elt["from"] = cc_message_elt["from"] + lang = cc_message_elt.getAttribute("xml:lang") + if lang: + message_elt["xml:lang"] = lang elif carbons_elt.name == "sent": + # the cc_message_elt is the full JID, so we copy it + message_elt["from"] = cc_message_elt["from"] try: message_elt["to"] = cc_message_elt["to"] except KeyError: diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0343.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0343.py Sat Apr 06 12:57:23 2024 +0200 @@ -0,0 +1,373 @@ +#!/usr/bin/env python3 + +# Libervia plugin +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from typing import Final + +from twisted.internet import defer, reactor +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.core.xmpp import SatXMPPEntity +from libervia.backend.plugins.plugin_xep_0166.models import BaseTransportHandler, ContentData +from libervia.backend.tools.common import data_format + +from .plugin_xep_0167 import mapping + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "WebRTC datachannels in Jingle", + C.PI_IMPORT_NAME: "XEP-0343", + C.PI_TYPE: "XEP", + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167", "XEP-0176", "XEP-0234", "XEP-0320"], + C.PI_RECOMMENDATIONS: [], + C.PI_MAIN: "XEP_0343", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Use WebRTC to create a generic data transport."""), +} +NS_JINGLE_WEBRTC_DATACHANNELS: Final[ + str +] = "urn:xmpp:jingle:transports:webrtc-datachannel:1" + + +class XEP_0343(BaseTransportHandler): + namespace = NS_JINGLE_WEBRTC_DATACHANNELS + + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self.host = host + self._ice_udp = host.plugins["XEP-0176"] + self._j = host.plugins["XEP-0166"] + self._j.register_transport( + NS_JINGLE_WEBRTC_DATACHANNELS, self._j.TRANSPORT_STREAMING, self, 10000 + ) + self._rtp = host.plugins["XEP-0167"] + host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger) + host.trigger.add( + "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger + ) + host.trigger.add("XEP-0176_jingle_handler_send_buffer", self._on_send_ice_buffer) + host.trigger.add("XEP-0176_ice_candidate_send", self._on_ice_candidate_send) + host.trigger.add( + "XEP-0234_file_jingle_send", self._file_jingle_send + ) + + def get_handler(self, client: SatXMPPEntity): + return XEP0343Handler() + + def is_usable(self, client, content_data: ContentData) -> bool: + try: + return content_data.app_kwargs["extra"]["webrtc"] + except KeyError: + return False + + def _parse_sdp_a_trigger( + self, + attribute: str, + parts: list[str], + call_data: dict, + metadata: dict, + media_type: str, + application_data: dict, + transport_data: dict, + ) -> None: + """Parse "sctp-port" and "max-message-size" attributes""" + try: + if attribute == "sctp-port": + transport_data["sctp-port"] = int(parts[0]) + elif attribute == "max-message-size": + transport_data["max-message-size"] = int(parts[0]) + except ValueError: + log.warning(f"Can't parse value of {attribute}, ignoring: {parts}") + + def _generate_sdp_content_trigger( + self, + session: dict, + local: bool, + idx: int, + content_data: dict, + sdp_lines: list[str], + application_data: dict, + app_data_key: str, + media_data: dict, + media: str + ) -> None: + """Generate "sctp-port" and "max-message-size" attributes""" + transport_data = content_data["transport_data"] + sctp_port = transport_data.get("sctp-port") + if sctp_port is not None: + sdp_lines.append(f"a=sctp-port:{sctp_port}") + + max_message_size = transport_data.get("max-message-size") + if max_message_size is not None: + sdp_lines.append(f"a=max-message-size:{max_message_size}") + + def _wrap_transport_element( + self, + transport_elt: domish.Element + ) -> None: + """Wrap the XEP-0176 transport in a transport with this XEP namespace + + @param transport_elt: ICE UDP . Must be already a child of a + element. + """ + content_elt = transport_elt.parent + if content_elt is None or not content_elt.name == "content": + raise exceptions.InternalError("Was expecting element.") + content_elt.children.remove(transport_elt) + wrapping_transport_elt = content_elt.addElement( + (NS_JINGLE_WEBRTC_DATACHANNELS, "transport") + ) + wrapping_transport_elt.addChild(transport_elt) + + def _on_send_ice_buffer( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + content_data: dict, + transport_elt: domish.Element, + iq_elt: domish.Element + ) -> bool: + if content_data["transport"].handler == self: + self._wrap_transport_element(transport_elt) + return True + + def _on_ice_candidate_send( + self, + client: SatXMPPEntity, + session: dict, + media_ice_data: dict[str, dict], + content_name: str, + content_data: dict, + iq_elt: domish.Element + ) -> bool: + if content_data["transport"].handler == self: + transport_elt = iq_elt.jingle.content.transport + if transport_elt.uri != self._ice_udp.namespace: + raise exceptions.InternalError("Was expecting an ICE UDP transport") + self._wrap_transport_element(transport_elt) + return True + + async def _file_jingle_send( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + content: dict + ) -> None: + call_data = content["app_kwargs"]["extra"].pop("call_data", None) + if call_data: + metadata = self._rtp.parse_call_data(call_data) + try: + application_data = call_data["application"] + except KeyError: + raise exceptions.DataError( + '"call_data" must have an application media.' + ) + try: + content["transport_data"] = { + "sctp-port": metadata["sctp-port"], + "max-message-size": metadata.get("max-message-size", 65536), + "local_ice_data": { + "ufrag": metadata["ice-ufrag"], + "pwd": metadata["ice-pwd"], + "candidates": application_data.pop("ice-candidates"), + "fingerprint": application_data.pop("fingerprint", {}), + } + } + except KeyError as e: + raise exceptions.DataError(f"Mandatory key is missing: {e}") + + async def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + ) -> domish.Element: + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + ice_transport_elt = await self._ice_udp.jingle_session_init( + client, + session, + content_name + ) + transport_elt = domish.Element( + (NS_JINGLE_WEBRTC_DATACHANNELS, "transport"), + attribs={ + "sctp-port": str(transport_data["sctp-port"]), + "max-message-size": str(transport_data["max-message-size"]) + } + ) + transport_elt.addChild(ice_transport_elt) + return transport_elt + + async def _call_ice_udp_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element, + ): + """Unwrap XEP-0176 element, and call its Jingle handler with it""" + try: + ice_transport_elt = next( + transport_elt.elements(self._ice_udp.namespace, "transport") + ) + except StopIteration: + raise exceptions.DataError("Missing ICE UDP element.") + else: + await self._ice_udp.jingle_handler( + client, action, session, content_name, ice_transport_elt + ) + + async def jingle_handler( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + transport_elt: domish.Element, + ) -> domish.Element: + """Handle Jingle requests + + @param client: The SatXMPPEntity instance. + @param action: The action to be performed with the session. + @param session: A dictionary containing the session information. + @param content_name: The name of the content. + @param transport_elt: The domish.Element instance representing the transport + element. + + @return: element + """ + content_data = session["contents"][content_name] + transport_data = content_data["transport_data"] + if action in (self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR): + session.setdefault("metadata", {}) + session.setdefault("peer_metadata", {}) + # we have to set application data despite being a transport handler, + # because the SDP generation needs application data + application_data = content_data["application_data"] + application_data.setdefault("peer_data", {}) + application_data.setdefault("media", "application") + + if action == self._j.A_PREPARE_CONFIRMATION: + await self._call_ice_udp_handler( + client, action, session, content_name, transport_elt + ) + try: + transport_data["sctp-port"] = int(transport_elt["sctp-port"]) + transport_data["max-message-size"] = int( + transport_elt.getAttribute("max-message-size", 65536) + ) + except (KeyError, ValueError): + raise exceptions.DataError( + f"Invalid datachannel signaling element: {transport_elt.toXml()}" + ) + transport_data["webrtc"] = True + elif action in ( + self._j.A_PREPARE_CONFIRMATION, self._j.A_PREPARE_INITIATOR, + self._j.A_TRANSPORT_INFO + ): + await self._call_ice_udp_handler( + client, action, session, content_name, transport_elt + ) + elif action == self._j.A_SESSION_ACCEPT: + await self._call_ice_udp_handler( + client, action, session, content_name, transport_elt + ) + answer_sdp = mapping.generate_sdp_from_session(session) + self.host.bridge.call_setup( + session["id"], + data_format.serialise( + { + "role": session["role"], + "sdp": answer_sdp, + } + ), + client.profile, + ) + elif action in (self._j.A_ACCEPTED_ACK, self._j.A_PREPARE_RESPONDER): + pass + elif action == self._j.A_START: + pass + elif action == self._j.A_SESSION_INITIATE: + # responder side + + sdp = mapping.generate_sdp_from_session(session) + session["answer_sdp_d"] = answer_sdp_d = defer.Deferred() + # we should have the answer long before 2 min + answer_sdp_d.addTimeout(2 * 60, reactor) + + self.host.bridge.call_setup( + session["id"], + data_format.serialise( + { + "role": session["role"], + "sdp": sdp, + } + ), + client.profile, + ) + + answer_sdp = await answer_sdp_d + parsed_answer = mapping.parse_sdp(answer_sdp) + session["metadata"].update(parsed_answer["metadata"]) + contents = session["contents"] + if len(contents) != 1: + raise NotImplementedError( + "Only a singlecontent is supported at the moment." + ) + content = next(iter(contents.values())) + media_data = parsed_answer["application"] + application_data = content["application_data"] + application_data["local_data"] = media_data["application_data"] + transport_data = content["transport_data"] + local_ice_data = media_data["transport_data"] + transport_data["local_ice_data"] = local_ice_data + transport_elt.children.clear() + ice_transport_elt = await self._ice_udp.jingle_handler( + client, action, session, content_name, transport_elt + ) + transport_elt.addChild(ice_transport_elt) + elif action == self._j.A_DESTROY: + # the transport is replaced (fallback ?) + pass + else: + log.warning(f"FIXME: unmanaged action {action}") + + return transport_elt + + +@implementer(iwokkel.IDisco) +class XEP0343Handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [disco.DiscoFeature(NS_JINGLE_WEBRTC_DATACHANNELS)] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return [] diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0353.py --- a/libervia/backend/plugins/plugin_xep_0353.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0353.py Sat Apr 06 12:57:23 2024 +0200 @@ -29,6 +29,12 @@ from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger +from libervia.backend.tools.xml_tools import element_copy + +try: + from .plugin_xep_0167 import NS_JINGLE_RTP +except ImportError: + NS_JINGLE_RTP = None log = getLogger(__name__) @@ -55,6 +61,14 @@ self.reason = reason +class TakenByOtherDeviceException(exceptions.CancelError): + reason: str = "taken_by_other_device" + + def __init__(self, device_jid: jid.JID): + super().__init__(device_jid.full()) + self.device_jid = device_jid + + class RetractException(exceptions.CancelError): pass @@ -70,7 +84,7 @@ "XEP-0166_initiate_elt_built", self, self._on_initiate_trigger, - # this plugin set the resource, we want it to happen first to other trigger + # this plugin set the resource, we want it to happen first so other triggers # can get the full peer JID priority=host.trigger.MAX_PRIORITY, ) @@ -140,13 +154,20 @@ for content_data in session["contents"].values(): # we get the full element build by the application plugin jingle_description_elt = content_data["application_data"]["desc_elt"] - # and copy it to only keep the root element, no children - description_elt = domish.Element( - (jingle_description_elt.uri, jingle_description_elt.name), - defaultUri=jingle_description_elt.defaultUri, - attribs=jingle_description_elt.attributes, - localPrefixes=jingle_description_elt.localPrefixes, - ) + + # we need to copy the element + if jingle_description_elt.uri == NS_JINGLE_RTP: + # for RTP, we only keep the root element, no children + description_elt = domish.Element( + (jingle_description_elt.uri, jingle_description_elt.name), + defaultUri=jingle_description_elt.defaultUri, + attribs=jingle_description_elt.attributes, + localPrefixes=jingle_description_elt.localPrefixes, + ) + else: + # Otherwise we keep the children to have application useful data + description_elt = element_copy(jingle_description_elt, with_parent=False) + message_elt.propose.addChild(description_elt) response_d = defer.Deferred() # we wait for 2 min before cancelling the session init @@ -206,47 +227,82 @@ async def _on_message_received(self, client, message_elt, post_treat): for elt in message_elt.elements(): if elt.uri == NS_JINGLE_MESSAGE: - if elt.name == "propose": - return await self._handle_propose(client, message_elt, elt) - elif elt.name == "retract": - return self._handle_retract(client, message_elt, elt) - elif elt.name == "proceed": - return self._handle_proceed(client, message_elt, elt) - elif elt.name == "accept": - return self._handle_accept(client, message_elt, elt) - elif elt.name == "reject": - return self._handle_reject(client, message_elt, elt) - elif elt.name == "ringing": - return await self._handle_ringing(client, message_elt, elt) - else: - log.warning(f"invalid element: {elt.toXml}") - return True + # We use ensureDeferred to process the message initiation workflow in + # parallel and to avoid blocking the message queue. + defer.ensureDeferred(self._handle_mess_init(client, message_elt, elt)) + return False return True + async def _handle_mess_init( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + mess_init_elt: domish.Element + ) -> None: + if mess_init_elt.name == "propose": + await self._handle_propose(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "retract": + self._handle_retract(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "proceed": + self._handle_proceed(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "accept": + self._handle_accept(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "reject": + self._handle_reject(client, message_elt, mess_init_elt) + elif mess_init_elt.name == "ringing": + await self._handle_ringing(client, message_elt, mess_init_elt) + else: + log.warning(f"invalid element: {mess_init_elt.toXml}") + + def _get_sid_and_session_d( + self, + client: SatXMPPEntity, + elt: domish.Element + ) -> tuple[str, defer.Deferred|list[defer.Deferred]]: + """Retrieve session ID and deferred or list of deferred from response element""" + try: + session_id = elt["id"] + except KeyError as e: + assert elt.parent is not None + log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") + raise e + try: + session_d = client._xep_0353_pending_sessions[session_id] + except KeyError as e: + log.warning( + _( + "no pending session found with id {session_id}, did it timed out?" + ).format(session_id=session_id) + ) + raise e + return session_id, session_d + def _get_sid_and_response_d( self, client: SatXMPPEntity, elt: domish.Element ) -> tuple[str, defer.Deferred]: """Retrieve session ID and response_d from response element""" - try: - session_id = elt["id"] - except KeyError as e: - assert elt.parent is not None - log.warning(f"invalid proceed element in message_elt: {elt.parent.toXml()}") - raise e - try: - response_d = client._xep_0353_pending_sessions[session_id] - except KeyError as e: - log.warning( - _( - "no pending session found with id {session_id}, did it timed out?" - ).format(session_id=session_id) - ) - raise e + session_id, response_d = self._get_sid_and_session_d(client, elt) + assert isinstance(response_d, defer.Deferred) return session_id, response_d - async def _handle_propose(self, client, message_elt, elt): + def _get_sid_and_preflight_d_list( + self, + client: SatXMPPEntity, + elt: domish.Element + ) -> tuple[str, list[defer.Deferred]]: + """Retrieve session ID and list of preflight_d from response element""" + session_id, preflight_d_list = self._get_sid_and_session_d(client, elt) + assert isinstance(preflight_d_list, list) + return session_id, preflight_d_list + + async def _handle_propose( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + elt: domish.Element + ) -> None: peer_jid = jid.JID(message_elt["from"]) local_jid = jid.JID(message_elt["to"]) session_id = elt["id"] @@ -260,17 +316,17 @@ raise AttributeError except AttributeError: log.warning(f"Invalid propose element: {message_elt.toXml()}") - return False + return except exceptions.NotFound: log.warning( f"There is not registered application to handle this " f"proposal: {elt.toXml()}" ) - return False + return if not desc_and_apps: log.warning("No application specified: {message_elt.toXml()}") - return False + return session = self._j.create_session( client, session_id, self._j.ROLE_RESPONDER, peer_jid, local_jid @@ -284,35 +340,51 @@ mess_data = self.build_message_data(client, peer_jid, "ringing", session_id) await client.send_message_data(mess_data) - for description_elt, application in desc_and_apps: - try: - await application.handler.jingle_preflight( - client, session, description_elt - ) - except exceptions.CancelError as e: - log.info(f"{client.profile} refused the session: {e}") + try: + for description_elt, application in desc_and_apps: + try: + preflight_d = defer.ensureDeferred( + application.handler.jingle_preflight( + client, session, description_elt + ) + ) + client._xep_0353_pending_sessions.setdefault(session_id, []).append( + preflight_d + ) + await preflight_d + except TakenByOtherDeviceException as e: + log.info(f"The call has been takend by {e.device_jid}") + await application.handler.jingle_preflight_cancel(client, session, e) + self._j.delete_session(client, session_id) + return + except exceptions.CancelError as e: + log.info(f"{client.profile} refused the session: {e}") - if is_in_roster: - # peer is in our roster, we send reject to them, ou other devices will - # get carbon copies - reject_dest_jid = peer_jid - else: - # peer is not in our roster, we send the "reject" only to our own - # devices to make them stop ringing/doing notification, and we don't - # send anything to peer to avoid presence leak. - reject_dest_jid = client.jid.userhostJID() + if is_in_roster: + # peer is in our roster, we send reject to them, ou other devices + # will get carbon copies + reject_dest_jid = peer_jid + else: + # peer is not in our roster, we send the "reject" only to our own + # devices to make them stop ringing/doing notification, and we + # don't send anything to peer to avoid presence leak. + reject_dest_jid = client.jid.userhostJID() - mess_data = self.build_message_data( - client, reject_dest_jid, "reject", session_id - ) - await client.send_message_data(mess_data) - self._j.delete_session(client, session_id) - - return False - except defer.CancelledError: - # raised when call is retracted before user can reply - self._j.delete_session(client, session_id) - return False + mess_data = self.build_message_data( + client, reject_dest_jid, "reject", session_id + ) + await client.send_message_data(mess_data) + self._j.delete_session(client, session_id) + return + except defer.CancelledError: + # raised when call is retracted before user can reply + self._j.delete_session(client, session_id) + return + finally: + try: + del client._xep_0353_pending_sessions[session_id] + except KeyError: + pass if peer_jid.userhostJID() not in client.roster: await client.presence.available(peer_jid) @@ -320,8 +392,6 @@ mess_data = self.build_message_data(client, peer_jid, "proceed", session_id) await client.send_message_data(mess_data) - return False - def _handle_retract(self, client, message_elt, retract_elt): try: session = self._j.get_session(client, retract_elt["id"]) @@ -345,14 +415,44 @@ d.cancel() return False - def _handle_proceed(self, client, message_elt, proceed_elt): - try: - __, response_d = self._get_sid_and_response_d(client, proceed_elt) - except KeyError: - return True + def _handle_proceed( + self, + client: SatXMPPEntity, + message_elt: domish.Element, + proceed_elt: domish.Element + ) -> None: + from_jid = jid.JID(message_elt["from"]) + # session_d is the deferred of the session, it can be preflight_d or response_d + if from_jid.userhostJID() == client.jid.userhostJID(): + # an other device took the session + try: + sid, preflight_d_list = self._get_sid_and_preflight_d_list( + client, proceed_elt + ) + except KeyError: + return + for preflight_d in preflight_d_list: + if not preflight_d.called: + preflight_d.errback(TakenByOtherDeviceException(from_jid)) - response_d.callback(jid.JID(message_elt["from"])) - return False + try: + session = self._j.get_session(client, sid) + except exceptions.NotFound: + log.warning("No session found with sid {sid!r}.") + else: + # jingle_preflight_cancel? + pass + + # FIXME: Is preflight cancel handler correctly? Check if preflight_d is always + # cleaned correctly (use a timeout?) + + else: + try: + __, response_d = self._get_sid_and_response_d(client, proceed_elt) + except KeyError: + return + # we have a response deferred + response_d.callback(jid.JID(message_elt["from"])) def _handle_accept(self, client, message_elt, accept_elt): pass diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0384.py --- a/libervia/backend/plugins/plugin_xep_0384.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0384.py Sat Apr 06 12:57:23 2024 +0200 @@ -2628,7 +2628,7 @@ if plaintext is None: log.warning( "No body found in intercepted message to be encrypted with" - " oldmemo." + f" oldmemo. [{client.profile}]" ) return plaintext diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/plugins/plugin_xep_0391.py --- a/libervia/backend/plugins/plugin_xep_0391.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/plugins/plugin_xep_0391.py Sat Apr 06 12:57:23 2024 +0200 @@ -110,6 +110,11 @@ return True for content_elt in jingle_elt.elements(self._j.namespace, "content"): content_data = session["contents"][content_elt["name"]] + transport_data = content_data["transport_data"] + if transport_data.get("webrtc"): + # webRTC is already e2e encrypted, we skip this content to avoid double + # encryption + continue security_elt = content_elt.addElement((NS_JET, "security")) security_elt["name"] = content_elt["name"] # XXX: for now only OLDMEMO is supported, thus we do it directly here. If some @@ -244,7 +249,14 @@ content_data: Dict[str, Any], elt: domish.Element ) -> bool: - file_obj = content_data["stream_object"].file_obj + try: + file_obj = content_data["stream_object"].file_obj + except KeyError: + transport_data = content_data["transport_data"] + if transport_data.get("webrtc"): + # we skip JET to avoid double encryption + log.debug("JET skipped due to webrtc transport.") + return True try: encryption_data=content_data["encryption"] except KeyError: diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/tools/async_trigger.py --- a/libervia/backend/tools/async_trigger.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/tools/async_trigger.py Sat Apr 06 12:57:23 2024 +0200 @@ -20,6 +20,8 @@ """Misc usefull classes""" from typing import Tuple, Any + +from libervia.backend.core.xmpp import SatXMPPEntity from . import trigger as sync_trigger from . import utils from twisted.internet import defer @@ -27,8 +29,11 @@ class TriggerManager(sync_trigger.TriggerManager): """This is a TriggerManager with an new async_point method""" - @defer.inlineCallbacks - def async_point(self, point_name, *args, **kwargs): + async def async_point( + self, + point_name: str, + *args, **kwargs + ) -> bool: """This put a trigger point with potentially async Deferred All the triggers for that point will be run @@ -41,18 +46,18 @@ @return D(bool): True if the action must be continued, False else """ if point_name not in self.__triggers: - defer.returnValue(True) + return True can_cancel = not kwargs.pop('triggers_no_cancel', False) - for priority, trigger in self.__triggers[point_name]: + for __, trigger in self.__triggers[point_name]: try: - cont = yield utils.as_deferred(trigger, *args, **kwargs) + cont = await utils.as_deferred(trigger, *args, **kwargs) if can_cancel and not cont: - defer.returnValue(False) + return False except sync_trigger.SkipOtherTriggers: break - defer.returnValue(True) + return True async def async_return_point( self, diff -r 314d3c02bb67 -r e11b13418ba6 libervia/backend/tools/common/utils.py --- a/libervia/backend/tools/common/utils.py Sat Apr 06 12:21:04 2024 +0200 +++ b/libervia/backend/tools/common/utils.py Sat Apr 06 12:57:23 2024 +0200 @@ -143,7 +143,7 @@ raise ValueError(f"invalid size: {e}") -def get_size_multiplier(size, suffix="o"): +def get_size_multiplier(size: int|str, suffix="o"): """Get multiplier of a file size""" size = int(size) #  cf. https://stackoverflow.com/a/1094933 (thanks) @@ -154,6 +154,7 @@ return size, f"Yi{suffix}" -def get_human_size(size, suffix="o", sep=" "): +def get_human_size(size: int|str, suffix: str="o", sep: str=" ") -> str: + """Return data size in a human readable format.""" size, symbol = get_size_multiplier(size, suffix) return f"{size:.2f}{sep}{symbol}"