view libervia/backend/plugins/plugin_xep_0298.py @ 4294:a0ed5c976bf8

component conferences, plugin XEP-0167, XEP-0298: add stream user metadata: A/V conference now adds user metadata about the stream it is forwarding through XEP-0298. This is parsed and added to metadata during confirmation on client side. rel 448
author Goffi <goffi@goffi.org>
date Tue, 06 Aug 2024 23:43:11 +0200
parents dd0891d0b22b
children
line wrap: on
line source

#!/usr/bin/env python3


# Libervia plugin to handle events
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from urllib.parse import quote, unquote
from twisted.words.protocols.jabber import jid
from twisted.words.xish import domish
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import _
from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger
from wokkel import disco, iwokkel
from zope.interface import implementer
from twisted.words.protocols.jabber.xmlstream import XMPPHandler

from libervia.backend.plugins.plugin_xep_0166 import XEP_0166

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Events",
    C.PI_IMPORT_NAME: "XEP-0298",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0167"],
    C.PI_MAIN: "XEP_0298",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _(
        "Delivering Conference Information to Jingle Participants (Coin). This plugin "
        "is used to associate metadata about uses an call states in multi-party calls."
    ),
}

NS_COIN = "urn:xmpp:coin:1"
NS_CONFERENCE_INFO = "urn:ietf:params:xml:ns:conference-info"


class XEP_0298:
    namespace = NS_COIN

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._j: XEP_0166 = host.plugins["XEP-0166"]
        host.trigger.add(
            "XEP-0167_jingle_session_init", self._jingle_session_init_trigger
        )
        host.trigger.add("XEP-0167_jingle_handler", self._jingle_handler_trigger)

    def get_handler(self, client):
        return CoinHandler(self)

    def _jingle_session_init_trigger(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        media: str,
        media_data: dict,
        desc_elt: domish.Element,
    ) -> None:
        """Check for the presence of "user" metadata, and add relevant elements."""
        if client.profile == "conferences":
            if content_name != next(iter(session["contents"].keys())):
                # We only apply metadata for the first content, as it is global.
                return
            try:
                user = session["metadata"]["user"]
            except KeyError:
                return
            jingle_elt = session["jingle_elt"]
            conference_info_elt = self.add_conference_info(jingle_elt, True)
            self.add_user(conference_info_elt, user)

    async def _jingle_handler_trigger(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        desc_elt: domish.Element,
    ) -> None:
        # this is a session metadata, so we only generate it on the first content
        if action == self._j.A_PREPARE_CONFIRMATION and content_name == next(
            iter(session["contents"])
        ):
            jingle_elt = session["jingle_elt"]
            infos = self.parse(jingle_elt)
            try:
                user = infos["info"]["users"][0]
            except (KeyError, IndexError):
                pass
            else:
                session.setdefault("metadata", {})["peer_user"] = user

    def add_conference_info(
        self, jingle_elt: domish.Element, is_focus: bool = False, **kwargs
    ) -> domish.Element:
        """Create and return a <conference_info> element

        @param jingle_elt: parent element
        @param kwargs: attributes of the element.
        @return: created <conference-info> element.
        """
        conference_info_elt = jingle_elt.addElement(
            (NS_CONFERENCE_INFO, "conference-info"),
        )
        if is_focus:
            conference_info_elt["isfocus"] = C.BOOL_TRUE
        conference_info_elt.attributes.update(kwargs)
        return conference_info_elt

    def add_user(
        self, conference_info_elt: domish.Element, entity: jid.JID
    ) -> domish.Element:
        """Add an user to a cconference info element.

        If the parent <users> doesn't exist, it will be created.
        @param conference_info_elt: <conference-info> element where the <user> element
        need to be added
        @param entity: XMPP JID to use as entity.
        @return: created <user> element.
        """
        try:
            users_elt = next(conference_info_elt.elements(NS_CONFERENCE_INFO, "users"))
        except StopIteration:
            users_elt = conference_info_elt.addElement("users")

        user_elt = users_elt.addElement("user")
        user_elt["entity"] = f"xmpp:{quote(entity.userhost())}"
        return user_elt

    def parse(self, jingle_elt: domish.Element) -> dict:
        """Parse a Jingle element and return a dictionary with conference info if present.

        @param jingle_elt: Jingle element to parse.
        @return: Dictionary with "info" key if conference info is found.
        """
        try:
            conference_info_elt = next(
                jingle_elt.elements(NS_CONFERENCE_INFO, "conference-info")
            )
        except StopIteration:
            return {}

        users = []
        try:
            users_elt = next(conference_info_elt.elements(NS_CONFERENCE_INFO, "users"))
            for user_elt in users_elt.elements(NS_CONFERENCE_INFO, "user"):
                entity = user_elt.getAttribute("entity")
                if entity.startswith("xmpp:"):
                    try:
                        entity = jid.JID(unquote(entity[5:]))
                        users.append(entity)
                    except Exception as e:
                        log.warning(f"Failed to parse entity {entity!r}: {e}")
                else:
                    log.warning(f"Ignoring non-XMPP entity {entity!r}")
        except StopIteration:
            pass

        return {"info": {"users": users}}


@implementer(iwokkel.IDisco)
class CoinHandler(XMPPHandler):

    def __init__(self, plugin_parent):
        self.plugin_parent = plugin_parent

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [
            disco.DiscoFeature(NS_COIN),
        ]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []