view libervia/backend/plugins/plugin_xep_0293.py @ 4309:b56b1eae7994

component email gateway: add multicasting: XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033 metadata are converted to suitable Email headers and vice versa. Email address and JID are both supported, and delivery is done by the gateway when suitable on incoming messages. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 4b842c1fb686
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import List

from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger

log = getLogger(__name__)

NS_JINGLE_RTP_RTCP_FB = "urn:xmpp:jingle:apps:rtp:rtcp-fb:0"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle RTP Feedback Negotiation",
    C.PI_IMPORT_NAME: "XEP-0293",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0293"],
    C.PI_DEPENDENCIES: ["XEP-0092", "XEP-0166", "XEP-0167"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0293",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Jingle RTP Feedback Negotiation"""),
}

RTCP_FB_KEY = "rtcp-fb"


class XEP_0293:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_content", self._generate_sdp_content_trigger
        )
        host.trigger.add("XEP-0167_parse_description", self._parse_description_trigger)
        host.trigger.add(
            "XEP-0167_parse_description_payload_type",
            self._parse_description_payload_type_trigger,
        )
        host.trigger.add("XEP-0167_build_description", self._build_description_trigger)
        host.trigger.add(
            "XEP-0167_build_description_payload_type",
            self._build_description_payload_type_trigger,
        )

    def get_handler(self, client):
        return XEP_0293_handler()

    ## SDP

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: List[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "rtcp-fb" and "rtcp-fb-trr-int" attributes

        @param attribute: The attribute being parsed.
        @param parts: The list of parts in the attribute.
        @param call_data: The call data dict.
        @param metadata: The metadata dict.
        @param media_type: The media type (e.g., audio, video).
        @param application_data: The application data dict.
        @param transport_data: The transport data dict.
        @param payload_map: The payload map dict.
        """
        if attribute == "rtcp-fb":
            pt_id = parts[0]
            feedback_type = parts[1]

            feedback_subtype = None
            parameters = {}

            # Check if there are extra parameters
            if len(parts) > 2:
                feedback_subtype = parts[2]

            if len(parts) > 3:
                for parameter in parts[3:]:
                    name, _, value = parameter.partition("=")
                    parameters[name] = value or None

            # Check if this feedback is linked to a payload type
            if pt_id == "*":
                # Not linked to a payload type, add to application data
                application_data.setdefault(RTCP_FB_KEY, []).append(
                    (feedback_type, feedback_subtype, parameters)
                )
            else:
                payload_types = application_data.get("payload_types", {})
                try:
                    payload_type = payload_types[int(pt_id)]
                except KeyError:
                    log.warning(
                        f"Got reference to unknown payload type {pt_id}: "
                        f"{' '.join(parts)}"
                    )
                else:
                    # Linked to a payload type, add to payload data
                    payload_type.setdefault(RTCP_FB_KEY, []).append(
                        (feedback_type, feedback_subtype, parameters)
                    )

        elif attribute == "rtcp-fb-trr-int":
            pt_id = parts[0]  # Payload type ID
            interval = int(parts[1])

            # Check if this interval is linked to a payload type
            if pt_id == "*":
                # Not linked to a payload type, add to application data
                application_data["rtcp-fb-trr-int"] = interval
            else:
                payload_types = application_data.get("payload_types", {})
                try:
                    payload_type = payload_types[int(pt_id)]
                except KeyError:
                    log.warning(
                        f"Got reference to unknown payload type {pt_id}: "
                        f"{' '.join(parts)}"
                    )
                else:
                    # Linked to a payload type, add to payload data
                    payload_type["rtcp-fb-trr-int"] = interval

    def _generate_rtcp_fb_lines(
        self, data: dict, pt_id: str, sdp_lines: List[str]
    ) -> None:
        for type_, subtype, parameters in data.get(RTCP_FB_KEY, []):
            parameters_strs = [
                f"{k}={v}" if v is not None else k for k, v in parameters.items()
            ]
            parameters_str = " ".join(parameters_strs)

            sdp_line = f"a=rtcp-fb:{pt_id} {type_}"
            if subtype:
                sdp_line += f" {subtype}"
            if parameters_str:
                sdp_line += f" {parameters_str}"
            sdp_lines.append(sdp_line)

    def _generate_rtcp_fb_trr_int_lines(
        self, data: dict, pt_id: str, sdp_lines: List[str]
    ) -> None:
        if "rtcp-fb-trr-int" not in data:
            return
        sdp_lines.append(f"a=rtcp-fb:{pt_id} trr-int {data['rtcp-fb-trr-int']}")

    def _generate_sdp_content_trigger(
        self,
        session: dict,
        local: bool,
        content_name: str,
        content_data: dict,
        sdp_lines: List[str],
        application_data: dict,
        app_data_key: str,
        media_data: dict,
        media: str,
    ) -> None:
        """Generate SDP attributes "rtcp-fb" and "rtcp-fb-trr-int" from application data.

        @param session: The session data.
        @param local: Whether this is local or remote content.
        @param content_name: The name of the content.
        @param content_data: The data of the content.
        @param sdp_lines: The list of SDP lines to append to.
        @param application_data: The application data dict.
        @param app_data_key: The key for the application data.
        @param media_data: The media data dict.
        @param media: The media type (e.g., audio, video).
        """
        # Generate lines for application data
        self._generate_rtcp_fb_lines(application_data, "*", sdp_lines)
        self._generate_rtcp_fb_trr_int_lines(application_data, "*", sdp_lines)

        # Generate lines for each payload type
        for pt_id, payload_data in media_data.get("payload_types", {}).items():
            self._generate_rtcp_fb_lines(payload_data, pt_id, sdp_lines)
            self._generate_rtcp_fb_trr_int_lines(payload_data, pt_id, sdp_lines)

    ## XML

    def _parse_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None:
        """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements.

        @param parent_elt: The parent domish.Element.
        @param data: The data dict to populate.
        """
        for rtcp_fb_elt in parent_elt.elements(NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"):
            try:
                type_ = rtcp_fb_elt["type"]
                subtype = rtcp_fb_elt.getAttribute("subtype")

                parameters = {}
                for parameter_elt in rtcp_fb_elt.elements(
                    NS_JINGLE_RTP_RTCP_FB, "parameter"
                ):
                    parameters[parameter_elt["name"]] = parameter_elt.getAttribute(
                        "value"
                    )

                data.setdefault(RTCP_FB_KEY, []).append((type_, subtype, parameters))
            except (KeyError, ValueError) as e:
                log.warning(f"Error while parsing <rtcp-fb>: {e}\n{rtcp_fb_elt.toXml()}")

        for rtcp_fb_trr_int_elt in parent_elt.elements(
            NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int"
        ):
            try:
                interval_value = int(rtcp_fb_trr_int_elt["value"])
                data.setdefault("rtcp_fb_trr_int", []).append(interval_value)
            except (KeyError, ValueError) as e:
                log.warning(
                    f"Error while parsing <rtcp-fb-trr-int>: {e}\n"
                    f"{rtcp_fb_trr_int_elt.toXml()}"
                )

    def _parse_description_trigger(
        self, desc_elt: domish.Element, media_data: dict
    ) -> None:
        """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a description.

        @param desc_elt: The <description> domish.Element.
        @param media_data: The media data dict to populate.
        """
        self._parse_rtcp_fb_elements(desc_elt, media_data)

    def _parse_description_payload_type_trigger(
        self,
        desc_elt: domish.Element,
        media_data: dict,
        payload_type_elt: domish.Element,
        payload_type_data: dict,
    ) -> None:
        """Parse the <rtcp-fb> and <rtcp-fb-trr-int> elements from a payload type.

        @param desc_elt: The <description> domish.Element.
        @param media_data: The media data dict.
        @param payload_type_elt: The <payload-type> domish.Element.
        @param payload_type_data: The payload type data dict to populate.
        """
        self._parse_rtcp_fb_elements(payload_type_elt, payload_type_data)

    def build_rtcp_fb_elements(self, parent_elt: domish.Element, data: dict) -> None:
        """Helper method to build the <rtcp-fb> and <rtcp-fb-trr-int> elements"""
        for type_, subtype, parameters in data.get(RTCP_FB_KEY, []):
            rtcp_fb_elt = parent_elt.addElement((NS_JINGLE_RTP_RTCP_FB, "rtcp-fb"))
            rtcp_fb_elt["type"] = type_
            if subtype:
                rtcp_fb_elt["subtype"] = subtype
            for name, value in parameters.items():
                param_elt = rtcp_fb_elt.addElement(name)
                if value is not None:
                    param_elt.addContent(str(value))

        if "rtcp-fb-trr-int" in data:
            rtcp_fb_trr_int_elt = parent_elt.addElement(
                (NS_JINGLE_RTP_RTCP_FB, "rtcp-fb-trr-int")
            )
            rtcp_fb_trr_int_elt["value"] = str(data["rtcp-fb-trr-int"])

    def _build_description_payload_type_trigger(
        self,
        desc_elt: domish.Element,
        media_data: dict,
        payload_type: dict,
        payload_type_elt: domish.Element,
    ) -> None:
        """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a payload type"""
        self.build_rtcp_fb_elements(payload_type_elt, payload_type)

    def _build_description_trigger(
        self, desc_elt: domish.Element, media_data: dict, session: dict
    ) -> None:
        """Build the <rtcp-fb> and <rtcp-fb-trr-int> elements for a media description"""
        self.build_rtcp_fb_elements(desc_elt, media_data)


@implementer(iwokkel.IDisco)
class XEP_0293_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_JINGLE_RTP_RTCP_FB)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []