view libervia/backend/plugins/plugin_xep_0167/mapping.py @ 4141:ba8ddfdd334f

cli (loops): run GLib loop in same thread as asyncio: use the new `install_glib_asyncio_iteration` to run GLib in the same thread as asyncio. rel 426
author Goffi <goffi@goffi.org>
date Wed, 01 Nov 2023 14:05:53 +0100
parents b2709504586a
children e11b13418ba6
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import base64
from typing import Any, Dict, Optional

from twisted.words.xish import domish

from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger

from .constants import NS_JINGLE_RTP

log = getLogger(__name__)

host = None


def senders_to_sdp(senders: str, session: dict) -> str:
    """Returns appropriate SDP attribute corresponding to Jingle senders attribute"""
    if senders == "both":
        return "a=sendrecv"
    elif senders == "none":
        return "a=inactive"
    elif session["role"] == senders:
        return "a=sendonly"
    else:
        return "a=recvonly"


def generate_sdp_from_session(
    session: dict, local: bool = False, port: int = 9
) -> str:
    """Generate an SDP string from session data.

    @param session: A dictionary containing the session data. It should have the
        following structure:

        {
            "contents": {
                "<content_id>": {
                    "application_data": {
                        "media": <str: "audio" or "video">,
                        "local_data": <media_data dict>,
                        "peer_data": <media_data dict>,
                        ...
                    },
                    "transport_data": {
                        "local_ice_data": <ice_data dict>,
                        "peer_ice_data": <ice_data dict>,
                        ...
                    },
                    ...
                },
                ...
            }
        }
    @param local: A boolean value indicating whether to generate SDP for the local or
        peer entity. If True, the method will generate SDP for the local entity,
        otherwise for the peer entity. Generally the local SDP is received from frontends
        and not needed in backend, except for debugging purpose.
    @param port: The preferred port for communications.

    @return: The generated SDP string.
    """
    contents = session["contents"]
    sdp_lines = ["v=0"]

    # Add originator (o=) line after the version (v=) line
    username = base64.b64encode(session["local_jid"].full().encode()).decode()
    session_id = "1"  # Increment this for each session
    session_version = "1"  # Increment this when the session is updated
    network_type = "IN"
    address_type = "IP4"
    connection_address = "0.0.0.0"
    o_line = (
        f"o={username} {session_id} {session_version} {network_type} {address_type} "
        f"{connection_address}"
    )
    sdp_lines.append(o_line)

    # Add the mandatory "s=" and t=" lines
    sdp_lines.append("s=-")
    sdp_lines.append("t=0 0")

    # stream direction
    all_senders = {c["senders"] for c in session["contents"].values()}
    # if we don't have a common senders for all contents, we set them at media level
    senders = all_senders.pop() if len(all_senders) == 1 else None

    sdp_lines.append("a=msid-semantic:WMS *")
    sdp_lines.append("a=ice-options:trickle")

    host.trigger.point(
        "XEP-0167_generate_sdp_session",
        session,
        local,
        sdp_lines,
        triggers_no_cancel=True
    )

    for content_name, content_data in contents.items():
        app_data_key = "local_data" if local else "peer_data"
        application_data = content_data["application_data"]
        media_data = application_data[app_data_key]
        media = application_data["media"]
        payload_types = media_data.get("payload_types", {})

        # Generate m= line
        transport = "UDP/TLS/RTP/SAVPF"
        payload_type_ids = [str(pt_id) for pt_id in payload_types]
        m_line = f"m={media} {port} {transport} {' '.join(payload_type_ids)}"
        sdp_lines.append(m_line)

        sdp_lines.append(f"c={network_type} {address_type} {connection_address}")

        sdp_lines.append(f"a=mid:{content_name}")
        if senders is not None:
            sdp_lines.append(senders_to_sdp(senders, session))

        # stream direction
        if senders is None:
            sdp_lines.append(senders_to_sdp(content_data["senders"], session))


        # Generate a= lines for rtpmap and fmtp
        for pt_id, pt in payload_types.items():
            name = pt["name"]
            clockrate = pt.get("clockrate", "")

            # Check if "channels" is in pt and append it to the line
            channels = pt.get("channels")
            if channels:
                sdp_lines.append(f"a=rtpmap:{pt_id} {name}/{clockrate}/{channels}")
            else:
                sdp_lines.append(f"a=rtpmap:{pt_id} {name}/{clockrate}")

            if "ptime" in pt:
                sdp_lines.append(f"a=ptime:{pt['ptime']}")

            if "parameters" in pt:
                fmtp_params = ";".join([f"{k}={v}" for k, v in pt["parameters"].items()])
                sdp_lines.append(f"a=fmtp:{pt_id} {fmtp_params}")

        if "bandwidth" in media_data:
            sdp_lines.append(f"a=b:{media_data['bandwidth']}")

        if media_data.get("rtcp-mux"):
            sdp_lines.append("a=rtcp-mux")

        # Generate a= lines for fingerprint, ICE ufrag, pwd and candidates
        ice_data_key = "local_ice_data" if local else "peer_ice_data"
        ice_data = content_data["transport_data"][ice_data_key]

        if "fingerprint" in ice_data:
            fingerprint_data = ice_data["fingerprint"]
            sdp_lines.append(
                f"a=fingerprint:{fingerprint_data['hash']} "
                f"{fingerprint_data['fingerprint']}"
            )
            sdp_lines.append(f"a=setup:{fingerprint_data['setup']}")

        sdp_lines.append(f"a=ice-ufrag:{ice_data['ufrag']}")
        sdp_lines.append(f"a=ice-pwd:{ice_data['pwd']}")

        for candidate in ice_data["candidates"]:
            foundation = candidate["foundation"]
            component_id = candidate["component_id"]
            transport = candidate["transport"]
            priority = candidate["priority"]
            address = candidate["address"]
            candidate_port = candidate["port"]
            candidate_type = candidate["type"]

            candidate_line = (
                f"a=candidate:{foundation} {component_id} {transport} {priority} "
                f"{address} {candidate_port} typ {candidate_type}"
            )

            if "rel_addr" in candidate and "rel_port" in candidate:
                candidate_line += (
                    f" raddr {candidate['rel_addr']} rport {candidate['rel_port']}"
                )

            if "generation" in candidate:
                candidate_line += f" generation {candidate['generation']}"

            if "network" in candidate:
                candidate_line += f" network {candidate['network']}"

            sdp_lines.append(candidate_line)

        # Generate a= lines for encryption
        if "encryption" in media_data:
            for enc_data in media_data["encryption"]:
                crypto_suite = enc_data["crypto-suite"]
                key_params = enc_data["key-params"]
                session_params = enc_data.get("session-params", "")
                tag = enc_data["tag"]

                crypto_line = f"a=crypto:{tag} {crypto_suite} {key_params}"
                if session_params:
                    crypto_line += f" {session_params}"
                sdp_lines.append(crypto_line)


        host.trigger.point(
            "XEP-0167_generate_sdp_content",
            session,
            local,
            content_name,
            content_data,
            sdp_lines,
            application_data,
            app_data_key,
            media_data,
            media,
            triggers_no_cancel=True
        )

    # Combine SDP lines and return the result
    return "\r\n".join(sdp_lines) + "\r\n"


def parse_sdp(sdp: str) -> dict:
    """Parse SDP string.

    @param sdp: The SDP string to parse.

    @return: A dictionary containing parsed session data.
    """
    # FIXME: to be removed once host is accessible from global var
    assert host is not None
    lines = sdp.strip().split("\r\n")
    # session metadata
    metadata: Dict[str, Any] = {}
    call_data = {"metadata": metadata}

    media_type = None
    media_data: Optional[Dict[str, Any]] = None
    application_data: Optional[Dict[str, Any]] = None
    transport_data: Optional[Dict[str, Any]] = None
    fingerprint_data: Optional[Dict[str, str]] = None
    ice_pwd: Optional[str] = None
    ice_ufrag: Optional[str] = None
    payload_types: Optional[Dict[int, dict]] = None

    for line in lines:
        try:
            parts = line.split()
            prefix = parts[0][:2]  # Extract the 'a=', 'm=', etc., prefix
            parts[0] = parts[0][2:]  # Remove the prefix from the first element

            if prefix == "m=":
                media_type = parts[0]
                port = int(parts[1])
                payload_types = {}
                for payload_type_id in [int(pt_id) for pt_id in parts[3:]]:
                    payload_type = {"id": payload_type_id}
                    payload_types[payload_type_id] = payload_type

                application_data = {"media": media_type, "payload_types": payload_types}
                transport_data = {"port": port}
                if fingerprint_data is not None:
                    transport_data["fingerprint"] = fingerprint_data
                if ice_pwd is not None:
                    transport_data["pwd"] = ice_pwd
                if ice_ufrag is not None:
                    transport_data["ufrag"] = ice_ufrag
                media_data = call_data[media_type] = {
                    "application_data": application_data,
                    "transport_data": transport_data,
                }

            elif prefix == "a=":
                if ":" in parts[0]:
                    attribute, parts[0] = parts[0].split(":", 1)
                else:
                    attribute = parts[0]

                if (
                    media_type is None
                    or application_data is None
                    or transport_data is None
                ) and not (
                    attribute
                    in (
                        "sendrecv",
                        "sendonly",
                        "recvonly",
                        "inactive",
                        "fingerprint",
                        "group",
                        "ice-options",
                        "msid-semantic",
                        "ice-pwd",
                        "ice-ufrag",
                    )
                ):
                    log.warning(
                        "Received attribute before media description, this is "
                        f"invalid: {line}"
                    )
                    continue

                if attribute == "mid":
                    assert media_data is not None
                    try:
                        media_data["id"] = parts[0]
                    except IndexError:
                        log.warning(f"invalid media ID: {line}")

                elif attribute == "rtpmap":
                    assert application_data is not None
                    assert payload_types is not None
                    pt_id = int(parts[0])
                    codec_info = parts[1].split("/")
                    codec = codec_info[0]
                    clockrate = int(codec_info[1])
                    payload_type = {
                        "id": pt_id,
                        "name": codec,
                        "clockrate": clockrate,
                    }
                    # Handle optional channel count
                    if len(codec_info) > 2:
                        channels = int(codec_info[2])
                        payload_type["channels"] = channels

                    payload_types.setdefault(pt_id, {}).update(payload_type)

                elif attribute == "fmtp":
                    assert payload_types is not None
                    pt_id = int(parts[0])
                    params = parts[1].split(";")
                    try:
                        payload_type = payload_types[pt_id]
                    except KeyError:
                        raise ValueError(
                            f"Can find content type {pt_id}, ignoring: {line}"
                        )

                    try:
                        payload_type["parameters"] = {
                            name: value
                            for name, value in (param.split("=") for param in params)
                        }
                    except ValueError:
                        payload_type.setdefault("exra-parameters", []).extend(params)

                elif attribute == "candidate":
                    assert transport_data is not None
                    candidate = {
                        "foundation": parts[0],
                        "component_id": int(parts[1]),
                        "transport": parts[2],
                        "priority": int(parts[3]),
                        "address": parts[4],
                        "port": int(parts[5]),
                        "type": parts[7],
                    }

                    for part in parts[8:]:
                        if part == "raddr":
                            candidate["rel_addr"] = parts[parts.index(part) + 1]
                        elif part == "rport":
                            candidate["rel_port"] = int(parts[parts.index(part) + 1])
                        elif part == "generation":
                            candidate["generation"] = parts[parts.index(part) + 1]
                        elif part == "network":
                            candidate["network"] = parts[parts.index(part) + 1]

                    transport_data.setdefault("candidates", []).append(candidate)

                elif attribute == "fingerprint":
                    algorithm, fingerprint = parts[0], parts[1]
                    fingerprint_data = {"hash": algorithm, "fingerprint": fingerprint}
                    if transport_data is not None:
                        transport_data.setdefault("fingerprint", {}).update(
                            fingerprint_data
                        )
                elif attribute == "setup":
                    assert transport_data is not None
                    setup = parts[0]
                    transport_data.setdefault("fingerprint", {})["setup"] = setup

                elif attribute == "b":
                    assert application_data is not None
                    bandwidth = int(parts[0])
                    application_data["bandwidth"] = bandwidth

                elif attribute == "rtcp-mux":
                    assert application_data is not None
                    application_data["rtcp-mux"] = True

                elif attribute == "ice-ufrag":
                    if transport_data is not None:
                        transport_data["ufrag"] = parts[0]

                elif attribute == "ice-pwd":
                    if transport_data is not None:
                        transport_data["pwd"] = parts[0]

                host.trigger.point(
                    "XEP-0167_parse_sdp_a",
                    attribute,
                    parts,
                    call_data,
                    metadata,
                    media_type,
                    application_data,
                    transport_data,
                    triggers_no_cancel=True
                )

        except ValueError as e:
            raise ValueError(f"Could not parse line. Invalid format ({e}): {line}") from e
        except IndexError as e:
            raise IndexError(f"Incomplete line. Missing data: {line}") from e

    # we remove private data (data starting with _, used by some plugins (e.g. XEP-0294)
    # to handle session data at media level))
    for key in [k for k in call_data if k.startswith("_")]:
        log.debug(f"cleaning remaining private data {key!r}")
        del call_data[key]

    # FIXME: is this really useful?
    # ICE candidates may only be specified for the first media, this
    # duplicate the candidate for the other in this case
    all_media = {k:v for k,v in call_data.items() if k in ("audio", "video")}
    if len(all_media) > 1 and not all(
        "candidates" in c["transport_data"] for c in all_media.values()
    ):
        first_content = next(iter(all_media.values()))
        try:
            ice_candidates = first_content["transport_data"]["candidates"]
        except KeyError:
            ice_candidates = []
        for idx, content in enumerate(all_media.values()):
            if idx == 0:
                continue
            content["transport_data"].setdefault("candidates", ice_candidates)

    return call_data


def build_description(media: str, media_data: dict, session: dict) -> domish.Element:
    """Generate <description> element from media data

    @param media: media type ("audio" or "video")

    @param media_data: A dictionary containing the media description data.
        The keys and values are described below:

        - ssrc (str, optional): The synchronization source identifier.
        - payload_types (list): A list of dictionaries, each representing a payload
          type.
          Each dictionary may contain the following keys:
            - channels (str, optional): Number of audio channels.
            - clockrate (str, optional): Clock rate of the media.
            - id (str): The unique identifier of the payload type.
            - maxptime (str, optional): Maximum packet time.
            - name (str, optional): Name of the codec.
            - ptime (str, optional): Preferred packet time.
            - parameters (dict, optional): A dictionary of codec-specific parameters.
              Key-value pairs represent the parameter name and value, respectively.
        - bandwidth (str, optional): The bandwidth type.
        - rtcp-mux (bool, optional): Indicates whether RTCP multiplexing is enabled or
          not.
        - encryption (list, optional): A list of dictionaries, each representing an
          encryption method.
          Each dictionary may contain the following keys:
            - tag (str): The unique identifier of the encryption method.
            - crypto-suite (str): The encryption suite in use.
            - key-params (str): Key parameters for the encryption suite.
            - session-params (str, optional): Session parameters for the encryption
              suite.

    @return: A <description> element.
    """
    # FIXME: to be removed once host is accessible from global var
    assert host is not None

    desc_elt = domish.Element((NS_JINGLE_RTP, "description"), attribs={"media": media})

    for pt_id, pt_data in media_data.get("payload_types", {}).items():
        payload_type_elt = desc_elt.addElement("payload-type")
        payload_type_elt["id"] = str(pt_id)
        for attr in ["channels", "clockrate", "maxptime", "name", "ptime"]:
            if attr in pt_data:
                payload_type_elt[attr] = str(pt_data[attr])

        if "parameters" in pt_data:
            for param_name, param_value in pt_data["parameters"].items():
                param_elt = payload_type_elt.addElement("parameter")
                param_elt["name"] = param_name
                param_elt["value"] = param_value
        host.trigger.point(
            "XEP-0167_build_description_payload_type",
            desc_elt,
            media_data,
            pt_data,
            payload_type_elt,
            triggers_no_cancel=True
        )

    if "bandwidth" in media_data:
        bandwidth_elt = desc_elt.addElement("bandwidth")
        bandwidth_elt["type"] = media_data["bandwidth"]

    if media_data.get("rtcp-mux"):
        desc_elt.addElement("rtcp-mux")

    # Add encryption element
    if "encryption" in media_data:
        encryption_elt = desc_elt.addElement("encryption")
        # we always want require encryption if the `encryption` data is present
        encryption_elt["required"] = "1"
        for enc_data in media_data["encryption"]:
            crypto_elt = encryption_elt.addElement("crypto")
            for attr in ["tag", "crypto-suite", "key-params", "session-params"]:
                if attr in enc_data:
                    crypto_elt[attr] = enc_data[attr]

    host.trigger.point(
        "XEP-0167_build_description",
        desc_elt,
        media_data,
        session,
        triggers_no_cancel=True
    )

    return desc_elt


def parse_description(desc_elt: domish.Element) -> dict:
    """Parse <desciption> to a dict

    @param desc_elt: <description> element
    @return: media data as in [build_description]
    """
    # FIXME: to be removed once host is accessible from global var
    assert host is not None

    media_data = {}
    if desc_elt.hasAttribute("ssrc"):
        media_data.setdefault("ssrc", {})[desc_elt["ssrc"]] = {}

    payload_types = {}
    for payload_type_elt in desc_elt.elements(NS_JINGLE_RTP, "payload-type"):
        payload_type_data = {
            attr: payload_type_elt[attr]
            for attr in [
                "channels",
                "clockrate",
                "maxptime",
                "name",
                "ptime",
            ]
            if payload_type_elt.hasAttribute(attr)
        }
        try:
            pt_id = int(payload_type_elt["id"])
        except KeyError:
            log.warning(
                f"missing ID in payload type, ignoring: {payload_type_elt.toXml()}"
            )
            continue

        parameters = {}
        for param_elt in payload_type_elt.elements(NS_JINGLE_RTP, "parameter"):
            param_name = param_elt.getAttribute("name")
            param_value = param_elt.getAttribute("value")
            if not param_name or param_value is None:
                log.warning(f"invalid parameter: {param_elt.toXml()}")
                continue
            parameters[param_name] = param_value

        if parameters:
            payload_type_data["parameters"] = parameters

        host.trigger.point(
            "XEP-0167_parse_description_payload_type",
            desc_elt,
            media_data,
            payload_type_elt,
            payload_type_data,
            triggers_no_cancel=True
        )
        payload_types[pt_id] = payload_type_data

    # bandwidth
    media_data["payload_types"] = payload_types
    try:
        bandwidth_elt = next(desc_elt.elements(NS_JINGLE_RTP, "bandwidth"))
    except StopIteration:
        pass
    else:
        bandwidth = bandwidth_elt.getAttribute("type")
        if not bandwidth:
            log.warning(f"invalid bandwidth: {bandwidth_elt.toXml}")
        else:
            media_data["bandwidth"] = bandwidth

    # rtcp-mux
    rtcp_mux_elt = next(desc_elt.elements(NS_JINGLE_RTP, "rtcp-mux"), None)
    media_data["rtcp-mux"] = rtcp_mux_elt is not None

    # Encryption
    encryption_data = []
    encryption_elt = next(desc_elt.elements(NS_JINGLE_RTP, "encryption"), None)
    if encryption_elt:
        media_data["encryption_required"] = C.bool(
            encryption_elt.getAttribute("required", C.BOOL_FALSE)
        )

        for crypto_elt in encryption_elt.elements(NS_JINGLE_RTP, "crypto"):
            crypto_data = {
                attr: crypto_elt[attr]
                for attr in [
                    "crypto-suite",
                    "key-params",
                    "session-params",
                    "tag",
                ]
                if crypto_elt.hasAttribute(attr)
            }
            encryption_data.append(crypto_data)

    if encryption_data:
        media_data["encryption"] = encryption_data

    host.trigger.point(
        "XEP-0167_parse_description",
        desc_elt,
        media_data,
        triggers_no_cancel=True
    )

    return media_data