Mercurial > libervia-backend
changeset 4056:1c4f4aa36d98
plugin XEP-0167: Jingle RTP Sessions implementation:
rel 420
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 29 May 2023 13:38:10 +0200 (19 months ago) |
parents | 38819c69aa39 |
children | e807a5434f82 |
files | sat/core/constants.py sat/plugins/plugin_xep_0167/__init__.py sat/plugins/plugin_xep_0167/constants.py sat/plugins/plugin_xep_0167/mapping.py |
diffstat | 4 files changed, 1077 insertions(+), 1 deletions(-) [+] |
line wrap: on
line diff
--- a/sat/core/constants.py Mon May 29 13:32:40 2023 +0200 +++ b/sat/core/constants.py Mon May 29 13:38:10 2023 +0200 @@ -22,7 +22,7 @@ except ImportError: BaseDirectory = None from os.path import dirname -from typing_extensions import Final +from typing import Final import sat @@ -327,8 +327,11 @@ ## action constants ## META_TYPE_FILE = "file" + META_TYPE_CALL = "call" META_TYPE_OVERWRITE = "overwrite" META_TYPE_NOT_IN_ROSTER_LEAK = "not_in_roster_leak" + META_SUBTYPE_CALL_AUDIO = "audio" + META_SUBTYPE_CALL_VIDEO = "video" ## HARD-CODED ACTIONS IDS (generated with uuid.uuid4) ## AUTHENTICATE_PROFILE_ID = "b03bbfa8-a4ae-4734-a248-06ce6c7cf562"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0167/__init__.py Mon May 29 13:38:10 2023 +0200 @@ -0,0 +1,401 @@ +#!/usr/bin/env python3 + +# Libervia plugin for managing pipes (experimental) +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Optional + +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from sat.core import exceptions +from sat.core.constants import Const as C +from sat.core.core_types import SatXMPPEntity +from sat.core.i18n import D_, _ +from sat.core.log import getLogger +from sat.tools import xml_tools +from sat.tools.common import data_format + +from . import mapping +from ..plugin_xep_0166 import BaseApplicationHandler +from .constants import ( + NS_JINGLE_RTP, + NS_JINGLE_RTP_INFO, + NS_JINGLE_RTP_AUDIO, + NS_JINGLE_RTP_VIDEO, +) + + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Jingle RTP Sessions", + C.PI_IMPORT_NAME: "XEP-0167", + C.PI_TYPE: "XEP", + C.PI_PROTOCOLS: ["XEP-0167"], + C.PI_DEPENDENCIES: ["XEP-0166"], + C.PI_MAIN: "XEP_0167", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""), +} + +CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?") +CONFIRM_TITLE = D_("Incoming Call") +SECURITY_LIMIT = 0 + +ALLOWED_ACTIONS = ( + "active", + "hold", + "unhold", + "mute", + "unmute", + "ringing", +) + + +class XEP_0167(BaseApplicationHandler): + def __init__(self, host): + log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') + self.host = host + # FIXME: to be removed once host is accessible from global var + mapping.host = host + self._j = host.plugins["XEP-0166"] + self._j.register_application(NS_JINGLE_RTP, self) + host.bridge.add_method( + "call_start", + ".plugin", + in_sign="sss", + out_sign="s", + method=self._call_start, + async_=True, + ) + host.bridge.add_method( + "call_info", + ".plugin", + in_sign="ssss", + out_sign="", + method=self._call_start, + ) + host.bridge.add_signal( + "call_accepted", ".plugin", signature="sss" + ) # args: session_id, answer_sdp, profile + host.bridge.add_signal( + "call_info", ".plugin", signature="ssss" + ) # args: session_id, info_type, extra, profile + + def get_handler(self, client): + return XEP_0167_handler() + + # bridge methods + + def _call_start( + self, + entity_s: str, + call_data_s: str, + profile_key: str, + ): + client = self.host.get_client(profile_key) + return defer.ensureDeferred( + self.call_start( + client, jid.JID(entity_s), data_format.deserialise(call_data_s) + ) + ) + + async def call_start( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + call_data: dict, + media: str = "video", + ) -> None: + """Temporary method to test RTP session""" + contents = [] + metadata = call_data.get("metadata") or {} + + if "sdp" in call_data: + sdp_data = mapping.parse_sdp(call_data["sdp"]) + for media_type in ("audio", "video"): + try: + media_data = sdp_data.pop(media_type) + except KeyError: + continue + call_data[media_type] = media_data["application_data"] + transport_data = media_data["transport_data"] + try: + call_data[media_type]["fingerprint"] = transport_data["fingerprint"] + except KeyError: + log.warning("fingerprint is missing") + pass + try: + call_data[media_type]["id"] = media_data["id"] + except KeyError: + log.warning(f"no media ID found for {media_type}: {media_data}") + try: + call_data[media_type]["ice-candidates"] = transport_data["candidates"] + metadata["ice-ufrag"] = transport_data["ufrag"] + metadata["ice-pwd"] = transport_data["pwd"] + except KeyError: + log.warning("ICE data are missing from SDP") + continue + metadata.update(sdp_data.get("metadata", {})) + + call_type = ( + C.META_SUBTYPE_CALL_VIDEO + if "video" in call_data + else C.META_SUBTYPE_CALL_AUDIO + ) + seen_names = set() + + for media in ("audio", "video"): + media_data = call_data.get(media) + if media_data is not None: + content = { + "app_ns": NS_JINGLE_RTP, + "senders": "both", + "transport_type": self._j.TRANSPORT_DATAGRAM, + "app_kwargs": {"media": media, "media_data": media_data}, + "transport_data": { + "local_ice_data": { + "ufrag": metadata["ice-ufrag"], + "pwd": metadata["ice-pwd"], + "candidates": media_data.pop("ice-candidates"), + "fingerprint": media_data.pop("fingerprint", {}), + } + }, + } + if "id" in media_data: + name = media_data.pop("id") + if name in seen_names: + raise exceptions.DataError( + f"Content name (mid) seen multiple times: {name}" + ) + content["name"] = name + contents.append(content) + if not contents: + raise exceptions.DataError("no valid media data found: {call_data}") + return await self._j.initiate( + client, + peer_jid, + contents, + call_type=call_type, + metadata=metadata, + peer_metadata={}, + ) + + # jingle callbacks + + def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + media: str, + media_data: dict, + ) -> domish.Element: + if media not in ("audio", "video"): + raise ValueError('only "audio" and "video" media types are supported') + content_data = session["contents"][content_name] + application_data = content_data["application_data"] + application_data["media"] = media + application_data["local_data"] = media_data + desc_elt = mapping.build_description(media, media_data, session) + self.host.trigger.point( + "XEP-0167_jingle_session_init", + client, + session, + content_name, + media, + media_data, + desc_elt, + triggers_no_cancel=True, + ) + return desc_elt + + async def jingle_request_confirmation( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + desc_elt: domish.Element, + ) -> bool: + if content_name != next(iter(session["contents"])): + # we request confirmation only for the first content, all others are + # automatically accepted. In practice, that means that the call confirmation + # is requested only once for audio and video contents. + return True + peer_jid = session["peer_jid"] + + if any( + c["desc_elt"].getAttribute("media") == "video" + for c in session["contents"].values() + ): + call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO + else: + call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO + + sdp = mapping.generate_sdp_from_session(session) + + resp_data = await xml_tools.defer_dialog( + self.host, + _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type), + _(CONFIRM_TITLE), + action_extra={ + "session_id": session["id"], + "from_jid": peer_jid.full(), + "type": C.META_TYPE_CALL, + "sub_type": call_type, + "sdp": sdp, + }, + security_limit=SECURITY_LIMIT, + profile=client.profile, + ) + + if resp_data.get("cancelled", False): + return False + + answer_sdp = resp_data["sdp"] + parsed_answer = mapping.parse_sdp(answer_sdp) + session["peer_metadata"].update(parsed_answer["metadata"]) + for media in ("audio", "video"): + for content in session["contents"].values(): + if content["desc_elt"].getAttribute("media") == media: + media_data = parsed_answer[media] + application_data = content["application_data"] + application_data["local_data"] = media_data["application_data"] + transport_data = content["transport_data"] + local_ice_data = media_data["transport_data"] + transport_data["local_ice_data"] = local_ice_data + + return True + + async def jingle_handler(self, client, action, session, content_name, desc_elt): + content_data = session["contents"][content_name] + application_data = content_data["application_data"] + if action == self._j.A_PREPARE_CONFIRMATION: + session["metadata"] = {} + session["peer_metadata"] = {} + try: + media = application_data["media"] = desc_elt["media"] + except KeyError: + raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}') + if media not in ("audio", "video"): + raise exceptions.DataError(f"invalid media: {media!r}") + application_data["peer_data"] = mapping.parse_description(desc_elt) + elif action == self._j.A_SESSION_INITIATE: + application_data["peer_data"] = mapping.parse_description(desc_elt) + desc_elt = mapping.build_description( + application_data["media"], application_data["local_data"], session + ) + elif action == self._j.A_ACCEPTED_ACK: + pass + elif action == self._j.A_PREPARE_INITIATOR: + application_data["peer_data"] = mapping.parse_description(desc_elt) + elif action == self._j.A_SESSION_ACCEPT: + if content_name == next(iter(session["contents"])): + # we only send the signal for first content, as it means that the whole + # session is accepted + answer_sdp = mapping.generate_sdp_from_session(session) + self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile) + else: + log.warning(f"FIXME: unmanaged action {action}") + + self.host.trigger.point( + "XEP-0167_jingle_handler", + client, + action, + session, + content_name, + desc_elt, + triggers_no_cancel=True, + ) + return desc_elt + + def jingle_session_info( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + jingle_elt: domish.Element, + ) -> None: + """Informational messages""" + for elt in jingle_elt.elements(): + if elt.uri == NS_JINGLE_RTP_INFO: + info_type = elt.name + if info_type not in ALLOWED_ACTIONS: + log.warning("ignoring unknow info type: {info_type!r}") + continue + extra = {} + if info_type in ("mute", "unmute"): + name = elt.getAttribute("name") + if name: + extra["name"] = name + log.debug(f"{info_type} call info received (extra: {extra})") + self.host.bridge.call_info( + session["id"], info_type, data_format.serialise(extra), client.profile + ) + + def _call_info(self, session_id, info_type, extra_s, profile_key): + client = self.host.get_client(profile_key) + extra = data_format.deserialise(extra_s) + return self.send_info(client, session_id, info_type, extra) + + + def send_info( + self, + client: SatXMPPEntity, + session_id: str, + info_type: str, + extra: Optional[dict], + ) -> None: + """Send information on the call""" + if info_type not in ALLOWED_ACTIONS: + raise ValueError(f"Unkown info type {info_type!r}") + session = self._j.get_session(client, session_id) + iq_elt, jingle_elt = self._j.build_session_info(client, session) + info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type)) + if extra and info_type in ("mute", "unmute") and "name" in extra: + info_elt["name"] = extra["name"] + iq_elt.send() + + def jingle_terminate( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + reason_elt: domish.Element, + ) -> None: + pass + + +@implementer(iwokkel.IDisco) +class XEP_0167_handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [ + disco.DiscoFeature(NS_JINGLE_RTP), + disco.DiscoFeature(NS_JINGLE_RTP_AUDIO), + disco.DiscoFeature(NS_JINGLE_RTP_VIDEO), + ] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0167/constants.py Mon May 29 13:38:10 2023 +0200 @@ -0,0 +1,27 @@ +#!/usr/bin/env python3 + +# Libervia plugin for managing pipes (experimental) +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Final + + +NS_JINGLE_RTP_BASE: Final = "urn:xmpp:jingle:apps:rtp" +NS_JINGLE_RTP: Final = f"{NS_JINGLE_RTP_BASE}:1" +NS_JINGLE_RTP_AUDIO: Final = f"{NS_JINGLE_RTP_BASE}:audio" +NS_JINGLE_RTP_VIDEO: Final = f"{NS_JINGLE_RTP_BASE}:video" +NS_JINGLE_RTP_ERRORS: Final = f"{NS_JINGLE_RTP_BASE}:errors:1" +NS_JINGLE_RTP_INFO: Final = f"{NS_JINGLE_RTP_BASE}:info:1"
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sat/plugins/plugin_xep_0167/mapping.py Mon May 29 13:38:10 2023 +0200 @@ -0,0 +1,645 @@ +#!/usr/bin/env python3 + +# Libervia plugin for managing pipes (experimental) +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import base64 +from typing import Any, Dict, Optional + +from twisted.words.xish import domish + +from sat.core.constants import Const as C +from sat.core.log import getLogger + +from .constants import NS_JINGLE_RTP + +log = getLogger(__name__) + +host = None + + +def senders_to_sdp(senders: str, session: dict) -> str: + """Returns appropriate SDP attribute corresponding to Jingle senders attribute""" + if senders == "both": + return "a=sendrecv" + elif senders == "none": + return "a=inactive" + elif session["role"] == senders: + return "a=sendonly" + else: + return "a=recvonly" + + +def generate_sdp_from_session( + session: dict, local: bool = False, port: int = 9999 +) -> str: + """Generate an SDP string from session data. + + @param session: A dictionary containing the session data. It should have the + following structure: + + { + "contents": { + "<content_id>": { + "application_data": { + "media": <str: "audio" or "video">, + "local_data": <media_data dict>, + "peer_data": <media_data dict>, + ... + }, + "transport_data": { + "local_ice_data": <ice_data dict>, + "peer_ice_data": <ice_data dict>, + ... + }, + ... + }, + ... + } + } + @param local: A boolean value indicating whether to generate SDP for the local or + peer entity. If True, the method will generate SDP for the local entity, + otherwise for the peer entity. Generally the local SDP is received from frontends + and not needed in backend, except for debugging purpose. + @param port: The preferred port for communications. + + @return: The generated SDP string. + """ + sdp_lines = ["v=0"] + + # Add originator (o=) line after the version (v=) line + username = base64.b64encode(session["local_jid"].full().encode()).decode() + session_id = "1" # Increment this for each session + session_version = "1" # Increment this when the session is updated + network_type = "IN" + address_type = "IP4" + connection_address = "0.0.0.0" + o_line = ( + f"o={username} {session_id} {session_version} {network_type} {address_type} " + f"{connection_address}" + ) + sdp_lines.append(o_line) + + # Add the mandatory "s=" and t=" lines + sdp_lines.append("s=-") + sdp_lines.append("t=0 0") + + # stream direction + all_senders = {c["senders"] for c in session["contents"].values()} + # if we don't have a common senders for all contents, we set them at media level + senders = all_senders.pop() if len(all_senders) == 1 else None + if senders is not None: + sdp_lines.append(senders_to_sdp(senders, session)) + + sdp_lines.append("a=msid-semantic:WMS *") + + host.trigger.point( + "XEP-0167_generate_sdp_session", + session, + local, + sdp_lines, + triggers_no_cancel=True + ) + + contents = session["contents"] + for content_name, content_data in contents.items(): + app_data_key = "local_data" if local else "peer_data" + application_data = content_data["application_data"] + media_data = application_data[app_data_key] + media = application_data["media"] + payload_types = media_data.get("payload_types", {}) + + # Generate m= line + transport = "UDP/TLS/RTP/SAVPF" + payload_type_ids = [str(pt_id) for pt_id in payload_types] + m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}" + sdp_lines.append(m_line) + + sdp_lines.append(f"c={network_type} {address_type} {connection_address}") + + sdp_lines.append(f"a=mid:{content_name}") + + # stream direction + if senders is None: + sdp_lines.append(senders_to_sdp(content_data["senders"], session)) + + # Generate a= lines for rtpmap and fmtp + for pt_id, pt in payload_types.items(): + name = pt["name"] + clockrate = pt.get("clockrate", "") + sdp_lines.append(f"a=rtpmap:{pt_id} {name}/{clockrate}") + + if "ptime" in pt: + sdp_lines.append(f"a=ptime:{pt['ptime']}") + + if "parameters" in pt: + fmtp_params = ";".join([f"{k}={v}" for k, v in pt["parameters"].items()]) + sdp_lines.append(f"a=fmtp:{pt_id} {fmtp_params}") + + if "bandwidth" in media_data: + sdp_lines.append(f"a=b:{media_data['bandwidth']}") + + if media_data.get("rtcp-mux"): + sdp_lines.append("a=rtcp-mux") + + # Generate a= lines for fingerprint, ICE ufrag, pwd and candidates + ice_data_key = "local_ice_data" if local else "peer_ice_data" + ice_data = content_data["transport_data"][ice_data_key] + + if "fingerprint" in ice_data: + fingerprint_data = ice_data["fingerprint"] + sdp_lines.append( + f"a=fingerprint:{fingerprint_data['hash']} " + f"{fingerprint_data['fingerprint']}" + ) + sdp_lines.append(f"a=setup:{fingerprint_data['setup']}") + + sdp_lines.append(f"a=ice-ufrag:{ice_data['ufrag']}") + sdp_lines.append(f"a=ice-pwd:{ice_data['pwd']}") + + for candidate in ice_data["candidates"]: + foundation = candidate["foundation"] + component_id = candidate["component_id"] + transport = candidate["transport"] + priority = candidate["priority"] + address = candidate["address"] + candidate_port = candidate["port"] + candidate_type = candidate["type"] + + candidate_line = ( + f"a=candidate:{foundation} {component_id} {transport} {priority} " + f"{address} {candidate_port} typ {candidate_type}" + ) + + if "rel_addr" in candidate and "rel_port" in candidate: + candidate_line += ( + f" raddr {candidate['rel_addr']} rport {candidate['rel_port']}" + ) + + if "generation" in candidate: + candidate_line += f" generation {candidate['generation']}" + + if "network" in candidate: + candidate_line += f" network {candidate['network']}" + + sdp_lines.append(candidate_line) + + # Generate a= lines for encryption + if "encryption" in media_data: + for enc_data in media_data["encryption"]: + crypto_suite = enc_data["crypto-suite"] + key_params = enc_data["key-params"] + session_params = enc_data.get("session-params", "") + tag = enc_data["tag"] + + crypto_line = f"a=crypto:{tag} {crypto_suite} {key_params}" + if session_params: + crypto_line += f" {session_params}" + sdp_lines.append(crypto_line) + + + host.trigger.point( + "XEP-0167_generate_sdp_content", + session, + local, + content_name, + content_data, + sdp_lines, + application_data, + app_data_key, + media_data, + media, + triggers_no_cancel=True + ) + + # Combine SDP lines and return the result + return "\r\n".join(sdp_lines) + "\r\n" + + +def parse_sdp(sdp: str) -> dict: + """Parse SDP string. + + @param sdp: The SDP string to parse. + + @return: A dictionary containing parsed session data. + """ + # FIXME: to be removed once host is accessible from global var + assert host is not None + lines = sdp.strip().split("\r\n") + # session metadata + metadata: Dict[str, Any] = {} + call_data = {"metadata": metadata} + + media_type = None + media_data: Optional[Dict[str, Any]] = None + application_data: Optional[Dict[str, Any]] = None + transport_data: Optional[Dict[str, Any]] = None + fingerprint_data: Optional[Dict[str, str]] = None + ice_pwd: Optional[str] = None + ice_ufrag: Optional[str] = None + payload_types: Optional[Dict[int, dict]] = None + + for line in lines: + try: + parts = line.split() + prefix = parts[0][:2] # Extract the 'a=', 'm=', etc., prefix + parts[0] = parts[0][2:] # Remove the prefix from the first element + + if prefix == "m=": + media_type = parts[0] + port = int(parts[1]) + payload_types = {} + for payload_type_id in [int(pt_id) for pt_id in parts[3:]]: + payload_type = {"id": payload_type_id} + payload_types[payload_type_id] = payload_type + + application_data = {"media": media_type, "payload_types": payload_types} + transport_data = {"port": port} + if fingerprint_data is not None: + transport_data["fingerprint"] = fingerprint_data + if ice_pwd is not None: + transport_data["pwd"] = ice_pwd + if ice_ufrag is not None: + transport_data["ufrag"] = ice_ufrag + media_data = call_data[media_type] = { + "application_data": application_data, + "transport_data": transport_data, + } + + elif prefix == "a=": + if ":" in parts[0]: + attribute, parts[0] = parts[0].split(":", 1) + else: + attribute = parts[0] + + if ( + media_type is None + or application_data is None + or transport_data is None + ) and not ( + attribute + in ( + "sendrecv", + "sendonly", + "recvonly", + "inactive", + "fingerprint", + "group", + "ice-options", + "msid-semantic", + "ice-pwd", + "ice-ufrag", + ) + ): + log.warning( + "Received attribute before media description, this is " + f"invalid: {line}" + ) + continue + + if attribute == "mid": + assert media_data is not None + try: + media_data["id"] = parts[0] + except IndexError: + log.warning(f"invalid media ID: {line}") + + elif attribute == "rtpmap": + assert application_data is not None + assert payload_types is not None + pt_id = int(parts[0]) + codec_info = parts[1].split("/") + codec = codec_info[0] + clockrate = int(codec_info[1]) + payload_type = { + "id": pt_id, + "name": codec, + "clockrate": clockrate, + } + # Handle optional channel count + if len(codec_info) > 2: + channels = int(codec_info[2]) + payload_type["channels"] = channels + + payload_types.setdefault(pt_id, {}).update(payload_type) + + elif attribute == "fmtp": + assert payload_types is not None + pt_id = int(parts[0]) + params = parts[1].split(";") + try: + payload_type = payload_types[pt_id] + except KeyError: + raise ValueError( + f"Can find content type {pt_id}, ignoring: {line}" + ) + + try: + payload_type["parameters"] = { + name: value + for name, value in (param.split("=") for param in params) + } + except ValueError: + payload_type.setdefault("exra-parameters", []).extend(params) + + elif attribute == "candidate": + assert transport_data is not None + candidate = { + "foundation": parts[0], + "component_id": int(parts[1]), + "transport": parts[2], + "priority": int(parts[3]), + "address": parts[4], + "port": int(parts[5]), + "type": parts[7], + } + + for part in parts[8:]: + if part == "raddr": + candidate["rel_addr"] = parts[parts.index(part) + 1] + elif part == "rport": + candidate["rel_port"] = int(parts[parts.index(part) + 1]) + elif part == "generation": + candidate["generation"] = parts[parts.index(part) + 1] + elif part == "network": + candidate["network"] = parts[parts.index(part) + 1] + + transport_data.setdefault("candidates", []).append(candidate) + + elif attribute == "fingerprint": + algorithm, fingerprint = parts[0], parts[1] + fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint} + if transport_data is not None: + transport_data["fingerprint"] = fingerprint_data + elif attribute == "setup": + assert transport_data is not None + setup = parts[0] + transport_data.setdefault("fingerprint", {})["setup"] = setup + + elif attribute == "b": + assert application_data is not None + bandwidth = int(parts[0]) + application_data["bandwidth"] = bandwidth + + elif attribute == "rtcp-mux": + assert application_data is not None + application_data["rtcp-mux"] = True + + elif attribute == "ice-ufrag": + if transport_data is not None: + transport_data["ufrag"] = parts[0] + + elif attribute == "ice-pwd": + if transport_data is not None: + transport_data["pwd"] = parts[0] + + host.trigger.point( + "XEP-0167_parse_sdp_a", + attribute, + parts, + call_data, + metadata, + media_type, + application_data, + transport_data, + triggers_no_cancel=True + ) + + except ValueError as e: + raise ValueError(f"Could not parse line. Invalid format ({e}): {line}") from e + except IndexError as e: + raise IndexError(f"Incomplete line. Missing data: {line}") from e + + # we remove private data (data starting with _, used by some plugins (e.g. XEP-0294) + # to handle session data at media level)) + for key in [k for k in call_data if k.startswith("_")]: + log.debug(f"cleaning remaining private data {key!r}") + del call_data[key] + + # ICE candidates may only be specified for the first media, this + # duplicate the candidate for the other in this case + all_media = {k:v for k,v in call_data.items() if k in ("audio", "video")} + if len(all_media) > 1 and not all( + "candidates" in c["transport_data"] for c in all_media.values() + ): + first_content = next(iter(all_media.values())) + try: + ice_candidates = first_content["transport_data"]["candidates"] + except KeyError: + log.warning("missing candidates in SDP") + else: + for idx, content in enumerate(all_media.values()): + if idx == 0: + continue + content["transport_data"].setdefault("candidates", ice_candidates) + + return call_data + + +def build_description(media: str, media_data: dict, session: dict) -> domish.Element: + """Generate <description> element from media data + + @param media: media type ("audio" or "video") + + @param media_data: A dictionary containing the media description data. + The keys and values are described below: + + - ssrc (str, optional): The synchronization source identifier. + - payload_types (list): A list of dictionaries, each representing a payload + type. + Each dictionary may contain the following keys: + - channels (str, optional): Number of audio channels. + - clockrate (str, optional): Clock rate of the media. + - id (str): The unique identifier of the payload type. + - maxptime (str, optional): Maximum packet time. + - name (str, optional): Name of the codec. + - ptime (str, optional): Preferred packet time. + - parameters (dict, optional): A dictionary of codec-specific parameters. + Key-value pairs represent the parameter name and value, respectively. + - bandwidth (str, optional): The bandwidth type. + - rtcp-mux (bool, optional): Indicates whether RTCP multiplexing is enabled or + not. + - encryption (list, optional): A list of dictionaries, each representing an + encryption method. + Each dictionary may contain the following keys: + - tag (str): The unique identifier of the encryption method. + - crypto-suite (str): The encryption suite in use. + - key-params (str): Key parameters for the encryption suite. + - session-params (str, optional): Session parameters for the encryption + suite. + + @return: A <description> element. + """ + # FIXME: to be removed once host is accessible from global var + assert host is not None + + desc_elt = domish.Element((NS_JINGLE_RTP, "description"), attribs={"media": media}) + + for pt_id, pt_data in media_data.get("payload_types", {}).items(): + payload_type_elt = desc_elt.addElement("payload-type") + payload_type_elt["id"] = str(pt_id) + for attr in ["channels", "clockrate", "maxptime", "name", "ptime"]: + if attr in pt_data: + payload_type_elt[attr] = str(pt_data[attr]) + + if "parameters" in pt_data: + for param_name, param_value in pt_data["parameters"].items(): + param_elt = payload_type_elt.addElement("parameter") + param_elt["name"] = param_name + param_elt["value"] = param_value + host.trigger.point( + "XEP-0167_build_description_payload_type", + desc_elt, + media_data, + pt_data, + payload_type_elt, + triggers_no_cancel=True + ) + + if "bandwidth" in media_data: + bandwidth_elt = desc_elt.addElement("bandwidth") + bandwidth_elt["type"] = media_data["bandwidth"] + + if media_data.get("rtcp-mux"): + desc_elt.addElement("rtcp-mux") + + # Add encryption element + if "encryption" in media_data: + encryption_elt = desc_elt.addElement("encryption") + # we always want require encryption if the `encryption` data is present + encryption_elt["required"] = "1" + for enc_data in media_data["encryption"]: + crypto_elt = encryption_elt.addElement("crypto") + for attr in ["tag", "crypto-suite", "key-params", "session-params"]: + if attr in enc_data: + crypto_elt[attr] = enc_data[attr] + + host.trigger.point( + "XEP-0167_build_description", + desc_elt, + media_data, + session, + triggers_no_cancel=True + ) + + return desc_elt + + +def parse_description(desc_elt: domish.Element) -> dict: + """Parse <desciption> to a dict + + @param desc_elt: <description> element + @return: media data as in [build_description] + """ + # FIXME: to be removed once host is accessible from global var + assert host is not None + + media_data = {} + if desc_elt.hasAttribute("ssrc"): + media_data.setdefault("ssrc", {})[desc_elt["ssrc"]] = {} + + payload_types = {} + for payload_type_elt in desc_elt.elements(NS_JINGLE_RTP, "payload-type"): + payload_type_data = { + attr: payload_type_elt[attr] + for attr in [ + "channels", + "clockrate", + "maxptime", + "name", + "ptime", + ] + if payload_type_elt.hasAttribute(attr) + } + try: + pt_id = int(payload_type_elt["id"]) + except KeyError: + log.warning( + f"missing ID in payload type, ignoring: {payload_type_elt.toXml()}" + ) + continue + + parameters = {} + for param_elt in payload_type_elt.elements(NS_JINGLE_RTP, "parameter"): + param_name = param_elt.getAttribute("name") + param_value = param_elt.getAttribute("value") + if not param_name or param_value is None: + log.warning(f"invalid parameter: {param_elt.toXml()}") + continue + parameters[param_name] = param_value + + if parameters: + payload_type_data["parameters"] = parameters + + host.trigger.point( + "XEP-0167_parse_description_payload_type", + desc_elt, + media_data, + payload_type_elt, + payload_type_data, + triggers_no_cancel=True + ) + payload_types[pt_id] = payload_type_data + + # bandwidth + media_data["payload_types"] = payload_types + try: + bandwidth_elt = next(desc_elt.elements(NS_JINGLE_RTP, "bandwidth")) + except StopIteration: + pass + else: + bandwidth = bandwidth_elt.getAttribute("type") + if not bandwidth: + log.warning(f"invalid bandwidth: {bandwidth_elt.toXml}") + else: + media_data["bandwidth"] = bandwidth + + # rtcp-mux + rtcp_mux_elt = next(desc_elt.elements(NS_JINGLE_RTP, "rtcp-mux"), None) + media_data["rtcp-mux"] = rtcp_mux_elt is not None + + # Encryption + encryption_data = [] + encryption_elt = next(desc_elt.elements(NS_JINGLE_RTP, "encryption"), None) + if encryption_elt: + media_data["encryption_required"] = C.bool( + encryption_elt.getAttribute("required", C.BOOL_FALSE) + ) + + for crypto_elt in encryption_elt.elements(NS_JINGLE_RTP, "crypto"): + crypto_data = { + attr: crypto_elt[attr] + for attr in [ + "crypto-suite", + "key-params", + "session-params", + "tag", + ] + if crypto_elt.hasAttribute(attr) + } + encryption_data.append(crypto_data) + + if encryption_data: + media_data["encryption"] = encryption_data + + host.trigger.point( + "XEP-0167_parse_description", + desc_elt, + media_data, + triggers_no_cancel=True + ) + + return media_data