diff sat/plugins/plugin_xep_0167/mapping.py @ 4056:1c4f4aa36d98

plugin XEP-0167: Jingle RTP Sessions implementation: rel 420
author Goffi <goffi@goffi.org>
date Mon, 29 May 2023 13:38:10 +0200
parents
children adb9dc9c8114
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0167/mapping.py	Mon May 29 13:38:10 2023 +0200
@@ -0,0 +1,645 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for managing pipes (experimental)
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import base64
+from typing import Any, Dict, Optional
+
+from twisted.words.xish import domish
+
+from sat.core.constants import Const as C
+from sat.core.log import getLogger
+
+from .constants import NS_JINGLE_RTP
+
+log = getLogger(__name__)
+
+host = None
+
+
+def senders_to_sdp(senders: str, session: dict) -> str:
+    """Returns appropriate SDP attribute corresponding to Jingle senders attribute"""
+    if senders == "both":
+        return "a=sendrecv"
+    elif senders == "none":
+        return "a=inactive"
+    elif session["role"] == senders:
+        return "a=sendonly"
+    else:
+        return "a=recvonly"
+
+
+def generate_sdp_from_session(
+    session: dict, local: bool = False, port: int = 9999
+) -> str:
+    """Generate an SDP string from session data.
+
+    @param session: A dictionary containing the session data. It should have the
+        following structure:
+
+        {
+            "contents": {
+                "<content_id>": {
+                    "application_data": {
+                        "media": <str: "audio" or "video">,
+                        "local_data": <media_data dict>,
+                        "peer_data": <media_data dict>,
+                        ...
+                    },
+                    "transport_data": {
+                        "local_ice_data": <ice_data dict>,
+                        "peer_ice_data": <ice_data dict>,
+                        ...
+                    },
+                    ...
+                },
+                ...
+            }
+        }
+    @param local: A boolean value indicating whether to generate SDP for the local or
+        peer entity. If True, the method will generate SDP for the local entity,
+        otherwise for the peer entity. Generally the local SDP is received from frontends
+        and not needed in backend, except for debugging purpose.
+    @param port: The preferred port for communications.
+
+    @return: The generated SDP string.
+    """
+    sdp_lines = ["v=0"]
+
+    # Add originator (o=) line after the version (v=) line
+    username = base64.b64encode(session["local_jid"].full().encode()).decode()
+    session_id = "1"  # Increment this for each session
+    session_version = "1"  # Increment this when the session is updated
+    network_type = "IN"
+    address_type = "IP4"
+    connection_address = "0.0.0.0"
+    o_line = (
+        f"o={username} {session_id} {session_version} {network_type} {address_type} "
+        f"{connection_address}"
+    )
+    sdp_lines.append(o_line)
+
+    # Add the mandatory "s=" and t=" lines
+    sdp_lines.append("s=-")
+    sdp_lines.append("t=0 0")
+
+    # stream direction
+    all_senders = {c["senders"] for c in session["contents"].values()}
+    # if we don't have a common senders for all contents, we set them at media level
+    senders = all_senders.pop() if len(all_senders) == 1 else None
+    if senders is not None:
+        sdp_lines.append(senders_to_sdp(senders, session))
+
+    sdp_lines.append("a=msid-semantic:WMS *")
+
+    host.trigger.point(
+        "XEP-0167_generate_sdp_session",
+        session,
+        local,
+        sdp_lines,
+        triggers_no_cancel=True
+    )
+
+    contents = session["contents"]
+    for content_name, content_data in contents.items():
+        app_data_key = "local_data" if local else "peer_data"
+        application_data = content_data["application_data"]
+        media_data = application_data[app_data_key]
+        media = application_data["media"]
+        payload_types = media_data.get("payload_types", {})
+
+        # Generate m= line
+        transport = "UDP/TLS/RTP/SAVPF"
+        payload_type_ids = [str(pt_id) for pt_id in payload_types]
+        m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}"
+        sdp_lines.append(m_line)
+
+        sdp_lines.append(f"c={network_type} {address_type} {connection_address}")
+
+        sdp_lines.append(f"a=mid:{content_name}")
+
+        # stream direction
+        if senders is None:
+            sdp_lines.append(senders_to_sdp(content_data["senders"], session))
+
+        # Generate a= lines for rtpmap and fmtp
+        for pt_id, pt in payload_types.items():
+            name = pt["name"]
+            clockrate = pt.get("clockrate", "")
+            sdp_lines.append(f"a=rtpmap:{pt_id} {name}/{clockrate}")
+
+            if "ptime" in pt:
+                sdp_lines.append(f"a=ptime:{pt['ptime']}")
+
+            if "parameters" in pt:
+                fmtp_params = ";".join([f"{k}={v}" for k, v in pt["parameters"].items()])
+                sdp_lines.append(f"a=fmtp:{pt_id} {fmtp_params}")
+
+        if "bandwidth" in media_data:
+            sdp_lines.append(f"a=b:{media_data['bandwidth']}")
+
+        if media_data.get("rtcp-mux"):
+            sdp_lines.append("a=rtcp-mux")
+
+        # Generate a= lines for fingerprint, ICE ufrag, pwd and candidates
+        ice_data_key = "local_ice_data" if local else "peer_ice_data"
+        ice_data = content_data["transport_data"][ice_data_key]
+
+        if "fingerprint" in ice_data:
+            fingerprint_data = ice_data["fingerprint"]
+            sdp_lines.append(
+                f"a=fingerprint:{fingerprint_data['hash']} "
+                f"{fingerprint_data['fingerprint']}"
+            )
+            sdp_lines.append(f"a=setup:{fingerprint_data['setup']}")
+
+        sdp_lines.append(f"a=ice-ufrag:{ice_data['ufrag']}")
+        sdp_lines.append(f"a=ice-pwd:{ice_data['pwd']}")
+
+        for candidate in ice_data["candidates"]:
+            foundation = candidate["foundation"]
+            component_id = candidate["component_id"]
+            transport = candidate["transport"]
+            priority = candidate["priority"]
+            address = candidate["address"]
+            candidate_port = candidate["port"]
+            candidate_type = candidate["type"]
+
+            candidate_line = (
+                f"a=candidate:{foundation} {component_id} {transport} {priority} "
+                f"{address} {candidate_port} typ {candidate_type}"
+            )
+
+            if "rel_addr" in candidate and "rel_port" in candidate:
+                candidate_line += (
+                    f" raddr {candidate['rel_addr']} rport {candidate['rel_port']}"
+                )
+
+            if "generation" in candidate:
+                candidate_line += f" generation {candidate['generation']}"
+
+            if "network" in candidate:
+                candidate_line += f" network {candidate['network']}"
+
+            sdp_lines.append(candidate_line)
+
+        # Generate a= lines for encryption
+        if "encryption" in media_data:
+            for enc_data in media_data["encryption"]:
+                crypto_suite = enc_data["crypto-suite"]
+                key_params = enc_data["key-params"]
+                session_params = enc_data.get("session-params", "")
+                tag = enc_data["tag"]
+
+                crypto_line = f"a=crypto:{tag} {crypto_suite} {key_params}"
+                if session_params:
+                    crypto_line += f" {session_params}"
+                sdp_lines.append(crypto_line)
+
+
+        host.trigger.point(
+            "XEP-0167_generate_sdp_content",
+            session,
+            local,
+            content_name,
+            content_data,
+            sdp_lines,
+            application_data,
+            app_data_key,
+            media_data,
+            media,
+            triggers_no_cancel=True
+        )
+
+    # Combine SDP lines and return the result
+    return "\r\n".join(sdp_lines) + "\r\n"
+
+
+def parse_sdp(sdp: str) -> dict:
+    """Parse SDP string.
+
+    @param sdp: The SDP string to parse.
+
+    @return: A dictionary containing parsed session data.
+    """
+    # FIXME: to be removed once host is accessible from global var
+    assert host is not None
+    lines = sdp.strip().split("\r\n")
+    # session metadata
+    metadata: Dict[str, Any] = {}
+    call_data = {"metadata": metadata}
+
+    media_type = None
+    media_data: Optional[Dict[str, Any]] = None
+    application_data: Optional[Dict[str, Any]] = None
+    transport_data: Optional[Dict[str, Any]] = None
+    fingerprint_data: Optional[Dict[str, str]] = None
+    ice_pwd: Optional[str] = None
+    ice_ufrag: Optional[str] = None
+    payload_types: Optional[Dict[int, dict]] = None
+
+    for line in lines:
+        try:
+            parts = line.split()
+            prefix = parts[0][:2]  # Extract the 'a=', 'm=', etc., prefix
+            parts[0] = parts[0][2:]  # Remove the prefix from the first element
+
+            if prefix == "m=":
+                media_type = parts[0]
+                port = int(parts[1])
+                payload_types = {}
+                for payload_type_id in [int(pt_id) for pt_id in parts[3:]]:
+                    payload_type = {"id": payload_type_id}
+                    payload_types[payload_type_id] = payload_type
+
+                application_data = {"media": media_type, "payload_types": payload_types}
+                transport_data = {"port": port}
+                if fingerprint_data is not None:
+                    transport_data["fingerprint"] = fingerprint_data
+                if ice_pwd is not None:
+                    transport_data["pwd"] = ice_pwd
+                if ice_ufrag is not None:
+                    transport_data["ufrag"] = ice_ufrag
+                media_data = call_data[media_type] = {
+                    "application_data": application_data,
+                    "transport_data": transport_data,
+                }
+
+            elif prefix == "a=":
+                if ":" in parts[0]:
+                    attribute, parts[0] = parts[0].split(":", 1)
+                else:
+                    attribute = parts[0]
+
+                if (
+                    media_type is None
+                    or application_data is None
+                    or transport_data is None
+                ) and not (
+                    attribute
+                    in (
+                        "sendrecv",
+                        "sendonly",
+                        "recvonly",
+                        "inactive",
+                        "fingerprint",
+                        "group",
+                        "ice-options",
+                        "msid-semantic",
+                        "ice-pwd",
+                        "ice-ufrag",
+                    )
+                ):
+                    log.warning(
+                        "Received attribute before media description, this is "
+                        f"invalid: {line}"
+                    )
+                    continue
+
+                if attribute == "mid":
+                    assert media_data is not None
+                    try:
+                        media_data["id"] = parts[0]
+                    except IndexError:
+                        log.warning(f"invalid media ID: {line}")
+
+                elif attribute == "rtpmap":
+                    assert application_data is not None
+                    assert payload_types is not None
+                    pt_id = int(parts[0])
+                    codec_info = parts[1].split("/")
+                    codec = codec_info[0]
+                    clockrate = int(codec_info[1])
+                    payload_type = {
+                        "id": pt_id,
+                        "name": codec,
+                        "clockrate": clockrate,
+                    }
+                    # Handle optional channel count
+                    if len(codec_info) > 2:
+                        channels = int(codec_info[2])
+                        payload_type["channels"] = channels
+
+                    payload_types.setdefault(pt_id, {}).update(payload_type)
+
+                elif attribute == "fmtp":
+                    assert payload_types is not None
+                    pt_id = int(parts[0])
+                    params = parts[1].split(";")
+                    try:
+                        payload_type = payload_types[pt_id]
+                    except KeyError:
+                        raise ValueError(
+                            f"Can find content type {pt_id}, ignoring: {line}"
+                        )
+
+                    try:
+                        payload_type["parameters"] = {
+                            name: value
+                            for name, value in (param.split("=") for param in params)
+                        }
+                    except ValueError:
+                        payload_type.setdefault("exra-parameters", []).extend(params)
+
+                elif attribute == "candidate":
+                    assert transport_data is not None
+                    candidate = {
+                        "foundation": parts[0],
+                        "component_id": int(parts[1]),
+                        "transport": parts[2],
+                        "priority": int(parts[3]),
+                        "address": parts[4],
+                        "port": int(parts[5]),
+                        "type": parts[7],
+                    }
+
+                    for part in parts[8:]:
+                        if part == "raddr":
+                            candidate["rel_addr"] = parts[parts.index(part) + 1]
+                        elif part == "rport":
+                            candidate["rel_port"] = int(parts[parts.index(part) + 1])
+                        elif part == "generation":
+                            candidate["generation"] = parts[parts.index(part) + 1]
+                        elif part == "network":
+                            candidate["network"] = parts[parts.index(part) + 1]
+
+                    transport_data.setdefault("candidates", []).append(candidate)
+
+                elif attribute == "fingerprint":
+                    algorithm, fingerprint = parts[0], parts[1]
+                    fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint}
+                    if transport_data is not None:
+                        transport_data["fingerprint"] = fingerprint_data
+                elif attribute == "setup":
+                    assert transport_data is not None
+                    setup = parts[0]
+                    transport_data.setdefault("fingerprint", {})["setup"] = setup
+
+                elif attribute == "b":
+                    assert application_data is not None
+                    bandwidth = int(parts[0])
+                    application_data["bandwidth"] = bandwidth
+
+                elif attribute == "rtcp-mux":
+                    assert application_data is not None
+                    application_data["rtcp-mux"] = True
+
+                elif attribute == "ice-ufrag":
+                    if transport_data is not None:
+                        transport_data["ufrag"] = parts[0]
+
+                elif attribute == "ice-pwd":
+                    if transport_data is not None:
+                        transport_data["pwd"] = parts[0]
+
+                host.trigger.point(
+                    "XEP-0167_parse_sdp_a",
+                    attribute,
+                    parts,
+                    call_data,
+                    metadata,
+                    media_type,
+                    application_data,
+                    transport_data,
+                    triggers_no_cancel=True
+                )
+
+        except ValueError as e:
+            raise ValueError(f"Could not parse line. Invalid format ({e}): {line}") from e
+        except IndexError as e:
+            raise IndexError(f"Incomplete line. Missing data: {line}") from e
+
+    # we remove private data (data starting with _, used by some plugins (e.g. XEP-0294)
+    # to handle session data at media level))
+    for key in [k for k in call_data if k.startswith("_")]:
+        log.debug(f"cleaning remaining private data {key!r}")
+        del call_data[key]
+
+    # ICE candidates may only be specified for the first media, this
+    # duplicate the candidate for the other in this case
+    all_media = {k:v for k,v in call_data.items() if k in ("audio", "video")}
+    if len(all_media) > 1 and not all(
+        "candidates" in c["transport_data"] for c in all_media.values()
+    ):
+        first_content = next(iter(all_media.values()))
+        try:
+            ice_candidates = first_content["transport_data"]["candidates"]
+        except KeyError:
+            log.warning("missing candidates in SDP")
+        else:
+            for idx, content in enumerate(all_media.values()):
+                if idx == 0:
+                    continue
+                content["transport_data"].setdefault("candidates", ice_candidates)
+
+    return call_data
+
+
+def build_description(media: str, media_data: dict, session: dict) -> domish.Element:
+    """Generate <description> element from media data
+
+    @param media: media type ("audio" or "video")
+
+    @param media_data: A dictionary containing the media description data.
+        The keys and values are described below:
+
+        - ssrc (str, optional): The synchronization source identifier.
+        - payload_types (list): A list of dictionaries, each representing a payload
+          type.
+          Each dictionary may contain the following keys:
+            - channels (str, optional): Number of audio channels.
+            - clockrate (str, optional): Clock rate of the media.
+            - id (str): The unique identifier of the payload type.
+            - maxptime (str, optional): Maximum packet time.
+            - name (str, optional): Name of the codec.
+            - ptime (str, optional): Preferred packet time.
+            - parameters (dict, optional): A dictionary of codec-specific parameters.
+              Key-value pairs represent the parameter name and value, respectively.
+        - bandwidth (str, optional): The bandwidth type.
+        - rtcp-mux (bool, optional): Indicates whether RTCP multiplexing is enabled or
+          not.
+        - encryption (list, optional): A list of dictionaries, each representing an
+          encryption method.
+          Each dictionary may contain the following keys:
+            - tag (str): The unique identifier of the encryption method.
+            - crypto-suite (str): The encryption suite in use.
+            - key-params (str): Key parameters for the encryption suite.
+            - session-params (str, optional): Session parameters for the encryption
+              suite.
+
+    @return: A <description> element.
+    """
+    # FIXME: to be removed once host is accessible from global var
+    assert host is not None
+
+    desc_elt = domish.Element((NS_JINGLE_RTP, "description"), attribs={"media": media})
+
+    for pt_id, pt_data in media_data.get("payload_types", {}).items():
+        payload_type_elt = desc_elt.addElement("payload-type")
+        payload_type_elt["id"] = str(pt_id)
+        for attr in ["channels", "clockrate", "maxptime", "name", "ptime"]:
+            if attr in pt_data:
+                payload_type_elt[attr] = str(pt_data[attr])
+
+        if "parameters" in pt_data:
+            for param_name, param_value in pt_data["parameters"].items():
+                param_elt = payload_type_elt.addElement("parameter")
+                param_elt["name"] = param_name
+                param_elt["value"] = param_value
+        host.trigger.point(
+            "XEP-0167_build_description_payload_type",
+            desc_elt,
+            media_data,
+            pt_data,
+            payload_type_elt,
+            triggers_no_cancel=True
+        )
+
+    if "bandwidth" in media_data:
+        bandwidth_elt = desc_elt.addElement("bandwidth")
+        bandwidth_elt["type"] = media_data["bandwidth"]
+
+    if media_data.get("rtcp-mux"):
+        desc_elt.addElement("rtcp-mux")
+
+    # Add encryption element
+    if "encryption" in media_data:
+        encryption_elt = desc_elt.addElement("encryption")
+        # we always want require encryption if the `encryption` data is present
+        encryption_elt["required"] = "1"
+        for enc_data in media_data["encryption"]:
+            crypto_elt = encryption_elt.addElement("crypto")
+            for attr in ["tag", "crypto-suite", "key-params", "session-params"]:
+                if attr in enc_data:
+                    crypto_elt[attr] = enc_data[attr]
+
+    host.trigger.point(
+        "XEP-0167_build_description",
+        desc_elt,
+        media_data,
+        session,
+        triggers_no_cancel=True
+    )
+
+    return desc_elt
+
+
+def parse_description(desc_elt: domish.Element) -> dict:
+    """Parse <desciption> to a dict
+
+    @param desc_elt: <description> element
+    @return: media data as in [build_description]
+    """
+    # FIXME: to be removed once host is accessible from global var
+    assert host is not None
+
+    media_data = {}
+    if desc_elt.hasAttribute("ssrc"):
+        media_data.setdefault("ssrc", {})[desc_elt["ssrc"]] = {}
+
+    payload_types = {}
+    for payload_type_elt in desc_elt.elements(NS_JINGLE_RTP, "payload-type"):
+        payload_type_data = {
+            attr: payload_type_elt[attr]
+            for attr in [
+                "channels",
+                "clockrate",
+                "maxptime",
+                "name",
+                "ptime",
+            ]
+            if payload_type_elt.hasAttribute(attr)
+        }
+        try:
+            pt_id = int(payload_type_elt["id"])
+        except KeyError:
+            log.warning(
+                f"missing ID in payload type, ignoring: {payload_type_elt.toXml()}"
+            )
+            continue
+
+        parameters = {}
+        for param_elt in payload_type_elt.elements(NS_JINGLE_RTP, "parameter"):
+            param_name = param_elt.getAttribute("name")
+            param_value = param_elt.getAttribute("value")
+            if not param_name or param_value is None:
+                log.warning(f"invalid parameter: {param_elt.toXml()}")
+                continue
+            parameters[param_name] = param_value
+
+        if parameters:
+            payload_type_data["parameters"] = parameters
+
+        host.trigger.point(
+            "XEP-0167_parse_description_payload_type",
+            desc_elt,
+            media_data,
+            payload_type_elt,
+            payload_type_data,
+            triggers_no_cancel=True
+        )
+        payload_types[pt_id] = payload_type_data
+
+    # bandwidth
+    media_data["payload_types"] = payload_types
+    try:
+        bandwidth_elt = next(desc_elt.elements(NS_JINGLE_RTP, "bandwidth"))
+    except StopIteration:
+        pass
+    else:
+        bandwidth = bandwidth_elt.getAttribute("type")
+        if not bandwidth:
+            log.warning(f"invalid bandwidth: {bandwidth_elt.toXml}")
+        else:
+            media_data["bandwidth"] = bandwidth
+
+    # rtcp-mux
+    rtcp_mux_elt = next(desc_elt.elements(NS_JINGLE_RTP, "rtcp-mux"), None)
+    media_data["rtcp-mux"] = rtcp_mux_elt is not None
+
+    # Encryption
+    encryption_data = []
+    encryption_elt = next(desc_elt.elements(NS_JINGLE_RTP, "encryption"), None)
+    if encryption_elt:
+        media_data["encryption_required"] = C.bool(
+            encryption_elt.getAttribute("required", C.BOOL_FALSE)
+        )
+
+        for crypto_elt in encryption_elt.elements(NS_JINGLE_RTP, "crypto"):
+            crypto_data = {
+                attr: crypto_elt[attr]
+                for attr in [
+                    "crypto-suite",
+                    "key-params",
+                    "session-params",
+                    "tag",
+                ]
+                if crypto_elt.hasAttribute(attr)
+            }
+            encryption_data.append(crypto_data)
+
+    if encryption_data:
+        media_data["encryption"] = encryption_data
+
+    host.trigger.point(
+        "XEP-0167_parse_description",
+        desc_elt,
+        media_data,
+        triggers_no_cancel=True
+    )
+
+    return media_data