view libervia/backend/plugins/plugin_xep_0294.py @ 4348:35d41de5b2aa default tip @

doc (component): document use of Gateway Relayed Encryption: fix 455
author Goffi <goffi@goffi.org>
date Mon, 13 Jan 2025 01:23:22 +0100
parents 4b842c1fb686
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Dict, List, Optional, Union

from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger

log = getLogger(__name__)

NS_JINGLE_RTP_HDREXT = "urn:xmpp:jingle:apps:rtp:rtp-hdrext:0"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle RTP Header Extensions Negotiation",
    C.PI_IMPORT_NAME: "XEP-0294",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0294"],
    C.PI_DEPENDENCIES: ["XEP-0167"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0294",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Jingle RTP Header Extensions Negotiation"""),
}


class XEP_0294:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger)
        host.trigger.add("XEP-0167_build_description", self._build_description_trigger)

    def get_handler(self, client):
        return XEP_0294_handler()

    def _parse_extmap(self, parts: List[str], application_data: dict) -> None:
        """Parse an individual extmap line and fill application_data accordingly"""
        if "/" in parts[0]:
            id_, direction = parts[0].split("/", 1)
        else:
            id_ = parts[0]
            direction = None
        uri = parts[1]
        attributes = parts[2:]

        if direction in (None, "sendrecv"):
            senders = "both"
        elif direction == "sendonly":
            senders = "initiator"
        elif direction == "recvonly":
            senders = "responder"
        elif direction == "inactive":
            senders = "none"
        else:
            log.warning(f"invalid direction for extmap: {direction}")
            senders = "sendrecv"

        rtp_hdr_ext_data: Dict[str, Union[str, dict]] = {
            "id": id_,
            "uri": uri,
            "senders": senders,
        }

        if attributes:
            parameters = {}
            for attribute in attributes:
                name, *value = attribute.split("=", 1)
                parameters[name] = value[0] if value else None
            rtp_hdr_ext_data["parameters"] = parameters

        application_data.setdefault("rtp-hdrext", {})[id_] = rtp_hdr_ext_data

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: List[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: Optional[dict],
        transport_data: dict,
    ) -> None:
        """Parse "extmap" and "extmap-allow-mixed" attributes"""
        if attribute == "extmap":
            if application_data is None:
                call_data.setdefault("_extmaps", []).append(parts)
            else:
                self._parse_extmap(parts, application_data)
        elif attribute == "extmap-allow-mixed":
            if application_data is None:
                call_data["_extmap-allow-mixed"] = True
            else:
                application_data["extmap-allow-mixed"] = True
        elif (
            application_data is not None
            and "_extmaps" in call_data
            and "rtp-hdrext" not in application_data
        ):
            extmaps = call_data.pop("_extmaps")
            for parts in extmaps:
                self._parse_extmap(parts, application_data)
        elif (
            application_data is not None
            and "_extmap-allow-mixed" in call_data
            and "extmap-allow-mixed" not in application_data
        ):
            value = call_data.pop("_extmap-allow-mixed")
            application_data["extmap-allow-mixed"] = value

    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        idx: int,
        content_data: dict,
        sdp_lines: List[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str,
    ) -> None:
        """Generate "extmap" and "extmap-allow-mixed" attributes"""
        rtp_hdrext_dict = media_data.get("rtp-hdrext", {})

        for id_, ext_data in rtp_hdrext_dict.items():
            senders = ext_data.get("senders")
            if senders in (None, "both"):
                direction = "sendrecv"
            elif senders == "initiator":
                direction = "sendonly"
            elif senders == "responder":
                direction = "recvonly"
            elif senders == "none":
                direction = "inactive"
            else:
                raise exceptions.InternalError(
                    f"Invalid senders value for extmap: {ext_data.get('senders')}"
                )

            parameters_str = ""
            if "parameters" in ext_data:
                parameters_str = " " + " ".join(
                    f"{k}={v}" if v is not None else f"{k}"
                    for k, v in ext_data["parameters"].items()
                )

            sdp_lines.append(
                f"a=extmap:{id_}/{direction} {ext_data['uri']}{parameters_str}"
            )

        if media_data.get("extmap-allow-mixed", False):
            sdp_lines.append("a=extmap-allow-mixed")

    def _parse_description_trigger(
        self, desc_elt: domish.Element, media_data: dict
    ) -> None:
        """Parse the <rtp-hdrext> and <extmap-allow-mixed> elements"""
        for rtp_hdrext_elt in desc_elt.elements(NS_JINGLE_RTP_HDREXT, "rtp-hdrext"):
            id_ = rtp_hdrext_elt["id"]
            uri = rtp_hdrext_elt["uri"]
            senders = rtp_hdrext_elt.getAttribute("senders", "both")
            # FIXME: workaround for Movim bug https://github.com/movim/movim/issues/1212
            if senders in ("sendonly", "recvonly", "sendrecv", "inactive"):
                log.warning("Movim bug workaround for wrong extmap value")
                if senders == "sendonly":
                    senders = "initiator"
                elif senders == "recvonly":
                    senders = "responder"
                elif senders == "sendrecv":
                    senders = "both"
                else:
                    senders = "none"

            media_data.setdefault("rtp-hdrext", {})[id_] = {
                "id": id_,
                "uri": uri,
                "senders": senders,
            }

            parameters = {}
            for param_elt in rtp_hdrext_elt.elements(NS_JINGLE_RTP_HDREXT, "parameter"):
                try:
                    parameters[param_elt["name"]] = param_elt.getAttribute("value")
                except KeyError:
                    log.warning(f"invalid parameters (missing name): {param_elt.toXml()}")

            if parameters:
                media_data["rtp-hdrext"][id_]["parameters"] = parameters

        try:
            next(desc_elt.elements(NS_JINGLE_RTP_HDREXT, "extmap-allow-mixed"))
        except StopIteration:
            pass
        else:
            media_data["extmap-allow-mixed"] = True

    def _build_description_trigger(
        self, desc_elt: domish.Element, media_data: dict, session: dict
    ) -> None:
        """Build the <rtp-hdrext> and <extmap-allow-mixed> elements if possible"""
        for id_, hdrext_data in media_data.get("rtp-hdrext", {}).items():
            rtp_hdrext_elt = desc_elt.addElement((NS_JINGLE_RTP_HDREXT, "rtp-hdrext"))
            rtp_hdrext_elt["id"] = id_
            rtp_hdrext_elt["uri"] = hdrext_data["uri"]
            senders = hdrext_data.get("senders", "both")
            if senders != "both":
                # we must not set "both" senders otherwise calls will fail with Movim due
                # to https://github.com/movim/movim/issues/1213
                rtp_hdrext_elt["senders"] = senders

            for name, value in hdrext_data.get("parameters", {}).items():
                param_elt = rtp_hdrext_elt.addElement((NS_JINGLE_RTP_HDREXT, "parameter"))
                param_elt["name"] = name
                if value is not None:
                    param_elt["value"] = value

        if media_data.get("extmap-allow-mixed", False):
            desc_elt.addElement((NS_JINGLE_RTP_HDREXT, "extmap-allow-mixed"))


@implementer(iwokkel.IDisco)
class XEP_0294_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_RTP_HDREXT)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []