view libervia/backend/plugins/plugin_xep_0293.py @ 4151:18026ce0819c

core (xmpp): message reception workflow refactoring: - Call methods from a root async one instead of using Deferred callbacks chain. - Use a queue to be sure to process messages in order.
author Goffi <goffi@goffi.org>
date Wed, 22 Nov 2023 14:50:35 +0100
parents 4b842c1fb686
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import List

from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger

log = getLogger(__name__)

NS_JINGLE_RTP_RTCP_FB = "urn:xmpp:jingle:apps:rtp:rtcp-fb:0"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle RTP Feedback Negotiation",
    C.PI_IMPORT_NAME: "XEP-0293",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0293"],
    C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0166", "XEP-0167"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0293",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Jingle RTP Feedback Negotiation"""),
}

RTCP_FB_KEY = "rtcp-fb"


class XEP_0293:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger)
        host.trigger.add(
            "XEP-0167_parse_description_payload_type",
            self._parse_description_payload_type_trigger,
        )
        host.trigger.add("XEP-0167_build_description", self._build_description_trigger)
        host.trigger.add(
            "XEP-0167_build_description_payload_type",
            self._build_description_payload_type_trigger,
        )

    def get_handler(self, client):
        return XEP_0293_handler()

    ## SDP

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: List[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "rtcp-fb" and "rtcp-fb-trr-int" attributes

        @param attribute: The attribute being parsed.
        @param parts: The list of parts in the attribute.
        @param call_data: The call data dict.
        @param metadata: The metadata dict.
        @param media_type: The media type (e.g., audio, video).
        @param application_data: The application data dict.
        @param transport_data: The transport data dict.
        @param payload_map: The payload map dict.
        """
        if attribute == "rtcp-fb":
            pt_id = parts[0]
            feedback_type = parts[1]

            feedback_subtype = None
            parameters = {}

            # Check if there are extra parameters
            if len(parts) > 2:
                feedback_subtype = parts[2]

            if len(parts) > 3:
                for parameter in parts[3:]:
                    name, _, value = parameter.partition("=")
                    parameters[name] = value or None

            # Check if this feedback is linked to a payload type
            if pt_id == "*":
                # Not linked to a payload type, add to application data
                application_data.setdefault(RTCP_FB_KEY, []).append(
                    (feedback_type, feedback_subtype, parameters)
                )
            else:
                payload_types = application_data.get("payload_types", {})
                try:
                    payload_type = payload_types[int(pt_id)]
                except KeyError:
                    log.warning(
                        f"Got reference to unknown payload type {pt_id}: "
                        f"{' '.join(parts)}"
                    )
                else:
                    # Linked to a payload type, add to payload data
                    payload_type.setdefault(RTCP_FB_KEY, []).append(
                        (feedback_type, feedback_subtype, parameters)
                    )

        elif attribute == "rtcp-fb-trr-int":
            pt_id = parts[0]  # Payload type ID
            interval = int(parts[1])

            # Check if this interval is linked to a payload type
            if pt_id == "*":
                # Not linked to a payload type, add to application data
                application_data["rtcp-fb-trr-int"] = interval
            else:
                payload_types = application_data.get("payload_types", {})
                try:
                    payload_type = payload_types[int(pt_id)]
                except KeyError:
                    log.warning(
                        f"Got reference to unknown payload type {pt_id}: "
                        f"{' '.join(parts)}"
                    )
                else:
                    # Linked to a payload type, add to payload data
                    payload_type["rtcp-fb-trr-int"] = interval

    def _generate_rtcp_fb_lines(
        self, data: dict, pt_id: str, sdp_lines: List[str]
    ) -> None:
        for type_, subtype, parameters in data.get(RTCP_FB_KEY, []):
            parameters_strs = [
                f"{k}={v}" if v is not None else k for k, v in parameters.items()
            ]
            parameters_str = " ".join(parameters_strs)

            sdp_line = f"a=rtcp-fb:{pt_id} {type_}"
            if subtype:
                sdp_line += f" {subtype}"
            if parameters_str:
                sdp_line += f" {parameters_str}"
            sdp_lines.append(sdp_line)

    def _generate_rtcp_fb_trr_int_lines(
        self, data: dict, pt_id: str, sdp_lines: List[str]
    ) -> None:
        if "rtcp-fb-trr-int" not in data:
            return
        sdp_lines.append(f"a=rtcp-fb:{pt_id} trr-int {data['rtcp-fb-trr-int']}")

    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        content_name: str,
        content_data: dict,
        sdp_lines: List[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str,
    ) -> None:
        """Generate SDP attributes "rtcp-fb" and "rtcp-fb-trr-int" from application data.

        @param session: The session data.
        @param local: Whether this is local or remote content.
        @param content_name: The name of the content.
        @param content_data: The data of the content.
        @param sdp_lines: The list of SDP lines to append to.
        @param application_data: The application data dict.
        @param app_data_key: The key for the application data.
        @param media_data: The media data dict.
        @param media: The media type (e.g., audio, video).
        """
        # Generate lines for application data
        self._generate_rtcp_fb_lines(application_data, "*", sdp_lines)
        self._generate_rtcp_fb_trr_int_lines(application_data, "*", sdp_lines)

        # Generate lines for each payload type
        for pt_id, payload_data in media_data.get("payload_types", {}).items():
            self._generate_rtcp_fb_lines(payload_data, pt_id, sdp_lines)
            self._generate_rtcp_fb_trr_int_lines(payload_data, pt_id, sdp_lines)

    ## XML

    def _parse_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None:
        """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements.

        @param parent_elt: The parent domish.Element.
        @param data: The data dict to populate.
        """
        for rtcp_fb_elt in parent_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"):
            try:
                type_ = rtcp_fb_elt["type"]
                subtype = rtcp_fb_elt.getAttribute("subtype")

                parameters = {}
                for parameter_elt in rtcp_fb_elt.elements(
                    NS_JINGLE_RTP_RTCP_FB, "parameter"
                ):
                    parameters[parameter_elt["name"]] = parameter_elt.getAttribute(
                        "value"
                    )

                data.setdefault(RTCP_FB_KEY, []).append((type_, subtype, parameters))
            except (KeyError, ValueError) as e:
                log.warning(f"Error while parsing <rtcp-fb>: {e}\n{rtcp_fb_elt.toXml()}")

        for rtcp_fb_trr_int_elt in parent_elt.elements(
            NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int"
        ):
            try:
                interval_value = int(rtcp_fb_trr_int_elt["value"])
                data.setdefault("rtcp_fb_trr_int", []).append(interval_value)
            except (KeyError, ValueError) as e:
                log.warning(
                    f"Error while parsing <rtcp-fb-trr-int>: {e}\n"
                    f"{rtcp_fb_trr_int_elt.toXml()}"
                )

    def _parse_description_trigger(
        self, desc_elt: domish.Element, media_data: dict
    ) -> None:
        """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a description.

        @param desc_elt: The <description> domish.Element.
        @param media_data: The media data dict to populate.
        """
        self._parse_rtcp_fb_elements(desc_elt, media_data)

    def _parse_description_payload_type_trigger(
        self,
        desc_elt: domish.Element,
        media_data: dict,
        payload_type_elt: domish.Element,
        payload_type_data: dict,
    ) -> None:
        """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type.

        @param desc_elt: The <description> domish.Element.
        @param media_data: The media data dict.
        @param payload_type_elt: The <payload-type> domish.Element.
        @param payload_type_data: The payload type data dict to populate.
        """
        self._parse_rtcp_fb_elements(payload_type_elt, payload_type_data)

    def build_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None:
        """Helper method to build the <rtcp-fb> and <rtcp-fb-trr-int> elements"""
        for type_, subtype, parameters in data.get(RTCP_FB_KEY, []):
            rtcp_fb_elt = parent_elt.addElement((NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"))
            rtcp_fb_elt["type"] = type_
            if subtype:
                rtcp_fb_elt["subtype"] = subtype
            for name, value in parameters.items():
                param_elt = rtcp_fb_elt.addElement(name)
                if value is not None:
                    param_elt.addContent(str(value))

        if "rtcp-fb-trr-int" in data:
            rtcp_fb_trr_int_elt = parent_elt.addElement(
                (NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")
            )
            rtcp_fb_trr_int_elt["value"] = str(data["rtcp-fb-trr-int"])

    def _build_description_payload_type_trigger(
        self,
        desc_elt: domish.Element,
        media_data: dict,
        payload_type: dict,
        payload_type_elt: domish.Element,
    ) -> None:
        """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a payload type"""
        self.build_rtcp_fb_elements(payload_type_elt, payload_type)

    def _build_description_trigger(
        self, desc_elt: domish.Element, media_data: dict, session: dict
    ) -> None:
        """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a media description"""
        self.build_rtcp_fb_elements(desc_elt, media_data)


@implementer(iwokkel.IDisco)
class XEP_0293_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_RTP_RTCP_FB)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []