view libervia/backend/plugins/plugin_xep_0338.py @ 4122:52a89ddf3087

plugin XEP-0338: be sure to add the `<group>` element only once: rel 424
author Goffi <goffi@goffi.org>
date Tue, 03 Oct 2023 15:27:07 +0200
parents 4b842c1fb686
children c8b19a32f5c0
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import List

from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.core_types import SatXMPPEntity

log = getLogger(__name__)

NS_JINGLE_GROUPING = "urn:xmpp:jingle:apps:grouping:0"
NS_RFC_5888 = "urn:ietf:rfc:5888"

PLUGIN_INFO = {
    C.PI_NAME: "Jingle Grouping Framework",
    C.PI_IMPORT_NAME: "XEP-0338",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0338"],
    C.PI_DEPENDENCIES: ["XEP-0166", "XEP-0167"],
    C.PI_RECOMMENDATIONS: [],
    C.PI_MAIN: "XEP_0338",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Jingle mapping of RFC 5888 SDP Grouping Framework"""),
}


class XEP_0338:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self._j = host.plugins["XEP-0166"]
        host.trigger.add("XEP-0167_parse_sdp_a", self._parse_sdp_a_trigger)
        host.trigger.add(
            "XEP-0167_generate_sdp_session", self._generate_sdp_session_trigger
        )
        host.trigger.add("XEP-0167_jingle_session_init", self._jingle_session_init_trigger)
        host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)

    def get_handler(self, client):
        return XEP_0338_handler()

    def _parse_sdp_a_trigger(
        self,
        attribute: str,
        parts: List[str],
        call_data: dict,
        metadata: dict,
        media_type: str,
        application_data: dict,
        transport_data: dict,
    ) -> None:
        """Parse "group" attributes"""
        if attribute == "group":
            semantics = parts[0]
            content_names = parts[1:]
            metadata.setdefault("group", {})[semantics] = content_names

    def _generate_sdp_session_trigger(
        self,
        session: dict,
        local: bool,
        sdp_lines: List[str],
    ) -> None:
        """Generate "group" attributes"""
        key = "metadata" if local else "peer_metadata"
        group_data = session[key].get("group", {})

        for semantics, content_names in group_data.items():
            sdp_lines.append(f"a=group:{semantics} {' '.join(content_names)}")

    def parse_group_element(
        self, jingle_elt: domish.Element, session: dict
    ) -> None:
        """Parse the <group> and <content> elements"""
        for group_elt in jingle_elt.elements(NS_JINGLE_GROUPING, "group"):
            try:
                metadata = session["peer_metadata"]
                semantics = group_elt["semantics"]
                group_content = metadata.setdefault("group", {})[semantics] = []
                for content_elt in group_elt.elements(NS_JINGLE_GROUPING, "content"):
                    group_content.append(content_elt["name"])
            except KeyError as e:
                log.warning(f"Error while parsing <group>: {e}\n{group_elt.toXml()}")

    def add_group_element(
        self, jingle_elt: domish.Element, session: dict
    ) -> None:
        """Build the <group> and <content> elements if possible"""
        for semantics, content_names in session["metadata"].get("group", {}).items():
            group_elt = jingle_elt.addElement((NS_JINGLE_GROUPING, "group"))
            group_elt["semantics"] = semantics
            for content_name in content_names:
                content_elt = group_elt.addElement((NS_JINGLE_GROUPING, "content"))
                content_elt["name"] = content_name

    def _jingle_session_init_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        media: str,
        media_data: dict,
        desc_elt: domish.Element,
    ) -> None:
        if content_name == next(iter(session["contents"])):
            # the <group> element must be added only once, so we do it only for the first
            # content
            jingle_elt = session["jingle_elt"]
            self.add_group_element(jingle_elt, session)

    def _jingle_handler_trigger(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        desc_elt: domish.Element,
    ) -> None:
        # this is a session metadata, so we only generate it on the first content
        if content_name == next(iter(session["contents"])) and action in (
            self._j.A_PREPARE_CONFIRMATION,
            self._j.A_SESSION_INITIATE,
            self._j.A_PREPARE_INITIATOR,
        ):
            jingle_elt = session["jingle_elt"]
            self.parse_group_element(jingle_elt, session)
            if action == self._j.A_SESSION_INITIATE:
                self.add_group_element(jingle_elt, session)


@implementer(iwokkel.IDisco)
class XEP_0338_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_RFC_5888)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []