view libervia/backend/plugins/plugin_xep_0339.py @ 4234:67de9ed101aa

docker (e2e): add GStreamer dependencies to test WebRTC stack: rel 424
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 15:04:01 +0200
parents 4b842c1fb686
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import List

from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools import xml_tools

log = getLogger(__name__)

NS_JINGLE_RTP_SSMA = "urn:xmpp:jingle:apps:rtp:ssma:0"

PLUGIN_INFO = {
    C.PI_NAME: "Source-Specific Media Attributes in Jingle",
    C.PI_IMPORT_NAME: "XEP-0339",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0339"],
    C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0167"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0339",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Source-Specific Media Attributes in Jingle"""),
}


class XEP_0339:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger)
        host.trigger.add("XEP-0167_build_description", self._build_description_trigger)

    def get_handler(self, client):
        return XEP_0339_handler()

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: List[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "ssrc" attributes"""
        if attribute == "ssrc":
            assert application_data is not None
            ssrc_id = int(parts[0])

            if len(parts) > 1:
                name, *values = " ".join(parts[1:]).split(":", 1)
                if values:
                    value = values[0] or None
                else:
                    value = None
                application_data.setdefault("ssrc", {}).setdefault(ssrc_id, {})[
                    name
                ] = value
            else:
                log.warning(f"no attribute in ssrc: {' '.join(parts)}")
                application_data.setdefault("ssrc", {}).setdefault(ssrc_id, {})
        elif attribute == "ssrc-group":
            assert application_data is not None
            semantics, *ssrc_ids = parts
            ssrc_ids = [int(ssrc_id) for ssrc_id in ssrc_ids]
            application_data.setdefault("ssrc-group", {})[semantics] = ssrc_ids
        elif attribute == "msid":
            assert application_data is not None
            application_data["msid"] = " ".join(parts)


    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        idx: int,
        content_data: dict,
        sdp_lines: List[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str
    ) -> None:
        """Generate "msid" and "ssrc" attributes"""
        if "msid" in media_data:
            sdp_lines.append(f"a=msid:{media_data['msid']}")

        ssrc_data = media_data.get("ssrc", {})
        ssrc_group_data = media_data.get("ssrc-group", {})

        for ssrc_id, attributes in ssrc_data.items():
            if not attributes:
                # there are no attributes for this SSRC ID, we add a simple line with only
                # the SSRC ID
                sdp_lines.append(f"a=ssrc:{ssrc_id}")
            else:
                for attr_name, attr_value in attributes.items():
                    if attr_value is not None:
                        sdp_lines.append(f"a=ssrc:{ssrc_id} {attr_name}:{attr_value}")
                    else:
                        sdp_lines.append(f"a=ssrc:{ssrc_id} {attr_name}")
        for semantics, ssrc_ids in ssrc_group_data.items():
            ssrc_lines = " ".join(str(ssrc_id) for ssrc_id in ssrc_ids)
            sdp_lines.append(f"a=ssrc-group:{semantics} {ssrc_lines}")

    def _parse_description_trigger(
        self, desc_elt: domish.Element, media_data: dict
    ) -> bool:
        """Parse the <source> and <ssrc-group> elements"""
        for source_elt in desc_elt.elements(NS_JINGLE_RTP_SSMA, "source"):
            try:
                ssrc_id = int(source_elt["ssrc"])
                media_data.setdefault("ssrc", {})[ssrc_id] = {}
                for param_elt in source_elt.elements(NS_JINGLE_RTP_SSMA, "parameter"):
                    name = param_elt["name"]
                    value = param_elt.getAttribute("value")
                    media_data["ssrc"][ssrc_id][name] = value
                    if name == "msid" and "msid" not in media_data:
                        media_data["msid"] = value
            except (KeyError, ValueError) as e:
                log.warning(f"Error while parsing <source>: {e}\n{source_elt.toXml()}")

        for ssrc_group_elt in desc_elt.elements(NS_JINGLE_RTP_SSMA, "ssrc-group"):
            try:
                semantics = ssrc_group_elt["semantics"]
                semantic_ids = media_data.setdefault("ssrc-group", {})[semantics] = []
                for source_elt in ssrc_group_elt.elements(NS_JINGLE_RTP_SSMA, "source"):
                    semantic_ids.append(
                        int(source_elt["ssrc"])
                    )
            except (KeyError, ValueError) as e:
                log.warning(
                    f"Error while parsing <ssrc-group>: {e}\n{ssrc_group_elt.toXml()}"
                )

        return True

    def _build_description_trigger(
        self, desc_elt: domish.Element, media_data: dict, session: dict
    ) -> bool:
        """Build the <source> and <ssrc-group> elements if possible"""
        for ssrc_id, parameters in media_data.get("ssrc", {}).items():
            if "msid" not in parameters and "msid" in media_data:
                parameters["msid"] = media_data["msid"]
            source_elt = desc_elt.addElement((NS_JINGLE_RTP_SSMA, "source"))
            source_elt["ssrc"] = str(ssrc_id)
            for name, value in parameters.items():
                param_elt = source_elt.addElement((NS_JINGLE_RTP_SSMA, "parameter"))
                param_elt["name"] = name
                if value is not None:
                    param_elt["value"] = value

        for semantics, ssrc_ids in media_data.get("ssrc-group", {}).items():
            ssrc_group_elt = desc_elt.addElement((NS_JINGLE_RTP_SSMA, "ssrc-group"))
            ssrc_group_elt["semantics"] = semantics
            for ssrc_id in ssrc_ids:
                source_elt = ssrc_group_elt.addElement((NS_JINGLE_RTP_SSMA, "source"))
                source_elt["ssrc"] = str(ssrc_id)

        return True


@implementer(iwokkel.IDisco)
class XEP_0339_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_RTP_SSMA)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []