diff sat/plugins/plugin_xep_0167/__init__.py @ 4056:1c4f4aa36d98

plugin XEP-0167: Jingle RTP Sessions implementation: rel 420
author Goffi <goffi@goffi.org>
date Mon, 29 May 2023 13:38:10 +0200
parents
children adb9dc9c8114
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/sat/plugins/plugin_xep_0167/__init__.py	Mon May 29 13:38:10 2023 +0200
@@ -0,0 +1,401 @@
+#!/usr/bin/env python3
+
+# Libervia plugin for managing pipes (experimental)
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from typing import Optional
+
+from twisted.internet import defer
+from twisted.words.protocols.jabber import jid
+from twisted.words.protocols.jabber.xmlstream import XMPPHandler
+from twisted.words.xish import domish
+from wokkel import disco, iwokkel
+from zope.interface import implementer
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
+from sat.core.core_types import SatXMPPEntity
+from sat.core.i18n import D_, _
+from sat.core.log import getLogger
+from sat.tools import xml_tools
+from sat.tools.common import data_format
+
+from . import mapping
+from ..plugin_xep_0166 import BaseApplicationHandler
+from .constants import (
+    NS_JINGLE_RTP,
+    NS_JINGLE_RTP_INFO,
+    NS_JINGLE_RTP_AUDIO,
+    NS_JINGLE_RTP_VIDEO,
+)
+
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Jingle RTP Sessions",
+    C.PI_IMPORT_NAME: "XEP-0167",
+    C.PI_TYPE: "XEP",
+    C.PI_PROTOCOLS: ["XEP-0167"],
+    C.PI_DEPENDENCIES: ["XEP-0166"],
+    C.PI_MAIN: "XEP_0167",
+    C.PI_HANDLER: "yes",
+    C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""),
+}
+
+CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?")
+CONFIRM_TITLE = D_("Incoming Call")
+SECURITY_LIMIT = 0
+
+ALLOWED_ACTIONS = (
+    "active",
+    "hold",
+    "unhold",
+    "mute",
+    "unmute",
+    "ringing",
+)
+
+
+class XEP_0167(BaseApplicationHandler):
+    def __init__(self, host):
+        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
+        self.host = host
+        # FIXME: to be removed once host is accessible from global var
+        mapping.host = host
+        self._j = host.plugins["XEP-0166"]
+        self._j.register_application(NS_JINGLE_RTP, self)
+        host.bridge.add_method(
+            "call_start",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._call_start,
+            async_=True,
+        )
+        host.bridge.add_method(
+            "call_info",
+            ".plugin",
+            in_sign="ssss",
+            out_sign="",
+            method=self._call_start,
+        )
+        host.bridge.add_signal(
+            "call_accepted", ".plugin", signature="sss"
+        )  # args: session_id, answer_sdp, profile
+        host.bridge.add_signal(
+            "call_info", ".plugin", signature="ssss"
+        )  # args: session_id, info_type, extra, profile
+
+    def get_handler(self, client):
+        return XEP_0167_handler()
+
+    # bridge methods
+
+    def _call_start(
+        self,
+        entity_s: str,
+        call_data_s: str,
+        profile_key: str,
+    ):
+        client = self.host.get_client(profile_key)
+        return defer.ensureDeferred(
+            self.call_start(
+                client, jid.JID(entity_s), data_format.deserialise(call_data_s)
+            )
+        )
+
+    async def call_start(
+        self,
+        client: SatXMPPEntity,
+        peer_jid: jid.JID,
+        call_data: dict,
+        media: str = "video",
+    ) -> None:
+        """Temporary method to test RTP session"""
+        contents = []
+        metadata = call_data.get("metadata") or {}
+
+        if "sdp" in call_data:
+            sdp_data = mapping.parse_sdp(call_data["sdp"])
+            for media_type in ("audio", "video"):
+                try:
+                    media_data = sdp_data.pop(media_type)
+                except KeyError:
+                    continue
+                call_data[media_type] = media_data["application_data"]
+                transport_data = media_data["transport_data"]
+                try:
+                    call_data[media_type]["fingerprint"] = transport_data["fingerprint"]
+                except KeyError:
+                    log.warning("fingerprint is missing")
+                    pass
+                try:
+                    call_data[media_type]["id"] = media_data["id"]
+                except KeyError:
+                    log.warning(f"no media ID found for {media_type}: {media_data}")
+                try:
+                    call_data[media_type]["ice-candidates"] = transport_data["candidates"]
+                    metadata["ice-ufrag"] = transport_data["ufrag"]
+                    metadata["ice-pwd"] = transport_data["pwd"]
+                except KeyError:
+                    log.warning("ICE data are missing from SDP")
+                    continue
+            metadata.update(sdp_data.get("metadata", {}))
+
+        call_type = (
+            C.META_SUBTYPE_CALL_VIDEO
+            if "video" in call_data
+            else C.META_SUBTYPE_CALL_AUDIO
+        )
+        seen_names = set()
+
+        for media in ("audio", "video"):
+            media_data = call_data.get(media)
+            if media_data is not None:
+                content = {
+                    "app_ns": NS_JINGLE_RTP,
+                    "senders": "both",
+                    "transport_type": self._j.TRANSPORT_DATAGRAM,
+                    "app_kwargs": {"media": media, "media_data": media_data},
+                    "transport_data": {
+                        "local_ice_data": {
+                            "ufrag": metadata["ice-ufrag"],
+                            "pwd": metadata["ice-pwd"],
+                            "candidates": media_data.pop("ice-candidates"),
+                            "fingerprint": media_data.pop("fingerprint", {}),
+                        }
+                    },
+                }
+                if "id" in media_data:
+                    name = media_data.pop("id")
+                    if name in seen_names:
+                        raise exceptions.DataError(
+                            f"Content name (mid) seen multiple times: {name}"
+                        )
+                    content["name"] = name
+                contents.append(content)
+        if not contents:
+            raise exceptions.DataError("no valid media data found: {call_data}")
+        return await self._j.initiate(
+            client,
+            peer_jid,
+            contents,
+            call_type=call_type,
+            metadata=metadata,
+            peer_metadata={},
+        )
+
+    # jingle callbacks
+
+    def jingle_session_init(
+        self,
+        client: SatXMPPEntity,
+        session: dict,
+        content_name: str,
+        media: str,
+        media_data: dict,
+    ) -> domish.Element:
+        if media not in ("audio", "video"):
+            raise ValueError('only "audio" and "video" media types are supported')
+        content_data = session["contents"][content_name]
+        application_data = content_data["application_data"]
+        application_data["media"] = media
+        application_data["local_data"] = media_data
+        desc_elt = mapping.build_description(media, media_data, session)
+        self.host.trigger.point(
+            "XEP-0167_jingle_session_init",
+            client,
+            session,
+            content_name,
+            media,
+            media_data,
+            desc_elt,
+            triggers_no_cancel=True,
+        )
+        return desc_elt
+
+    async def jingle_request_confirmation(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        desc_elt: domish.Element,
+    ) -> bool:
+        if content_name != next(iter(session["contents"])):
+            # we request confirmation only for the first content, all others are
+            # automatically accepted. In practice, that means that the call confirmation
+            # is requested only once for audio and video contents.
+            return True
+        peer_jid = session["peer_jid"]
+
+        if any(
+            c["desc_elt"].getAttribute("media") == "video"
+            for c in session["contents"].values()
+        ):
+            call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
+        else:
+            call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO
+
+        sdp = mapping.generate_sdp_from_session(session)
+
+        resp_data = await xml_tools.defer_dialog(
+            self.host,
+            _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type),
+            _(CONFIRM_TITLE),
+            action_extra={
+                "session_id": session["id"],
+                "from_jid": peer_jid.full(),
+                "type": C.META_TYPE_CALL,
+                "sub_type": call_type,
+                "sdp": sdp,
+            },
+            security_limit=SECURITY_LIMIT,
+            profile=client.profile,
+        )
+
+        if resp_data.get("cancelled", False):
+            return False
+
+        answer_sdp = resp_data["sdp"]
+        parsed_answer = mapping.parse_sdp(answer_sdp)
+        session["peer_metadata"].update(parsed_answer["metadata"])
+        for media in ("audio", "video"):
+            for content in session["contents"].values():
+                if content["desc_elt"].getAttribute("media") == media:
+                    media_data = parsed_answer[media]
+                    application_data = content["application_data"]
+                    application_data["local_data"] = media_data["application_data"]
+                    transport_data = content["transport_data"]
+                    local_ice_data = media_data["transport_data"]
+                    transport_data["local_ice_data"] = local_ice_data
+
+        return True
+
+    async def jingle_handler(self, client, action, session, content_name, desc_elt):
+        content_data = session["contents"][content_name]
+        application_data = content_data["application_data"]
+        if action == self._j.A_PREPARE_CONFIRMATION:
+            session["metadata"] = {}
+            session["peer_metadata"] = {}
+            try:
+                media = application_data["media"] = desc_elt["media"]
+            except KeyError:
+                raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}')
+            if media not in ("audio", "video"):
+                raise exceptions.DataError(f"invalid media: {media!r}")
+            application_data["peer_data"] = mapping.parse_description(desc_elt)
+        elif action == self._j.A_SESSION_INITIATE:
+            application_data["peer_data"] = mapping.parse_description(desc_elt)
+            desc_elt = mapping.build_description(
+                application_data["media"], application_data["local_data"], session
+            )
+        elif action == self._j.A_ACCEPTED_ACK:
+            pass
+        elif action == self._j.A_PREPARE_INITIATOR:
+            application_data["peer_data"] = mapping.parse_description(desc_elt)
+        elif action == self._j.A_SESSION_ACCEPT:
+            if content_name == next(iter(session["contents"])):
+                # we only send the signal for first content, as it means that the whole
+                # session is accepted
+                answer_sdp = mapping.generate_sdp_from_session(session)
+                self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile)
+        else:
+            log.warning(f"FIXME: unmanaged action {action}")
+
+        self.host.trigger.point(
+            "XEP-0167_jingle_handler",
+            client,
+            action,
+            session,
+            content_name,
+            desc_elt,
+            triggers_no_cancel=True,
+        )
+        return desc_elt
+
+    def jingle_session_info(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        jingle_elt: domish.Element,
+    ) -> None:
+        """Informational messages"""
+        for elt in jingle_elt.elements():
+            if elt.uri == NS_JINGLE_RTP_INFO:
+                info_type = elt.name
+                if info_type not in ALLOWED_ACTIONS:
+                    log.warning("ignoring unknow info type: {info_type!r}")
+                    continue
+                extra = {}
+                if info_type in ("mute", "unmute"):
+                    name = elt.getAttribute("name")
+                    if name:
+                        extra["name"] = name
+                log.debug(f"{info_type} call info received (extra: {extra})")
+                self.host.bridge.call_info(
+                    session["id"], info_type, data_format.serialise(extra), client.profile
+                )
+
+    def _call_info(self, session_id, info_type, extra_s, profile_key):
+        client = self.host.get_client(profile_key)
+        extra = data_format.deserialise(extra_s)
+        return self.send_info(client, session_id, info_type, extra)
+
+
+    def send_info(
+        self,
+        client: SatXMPPEntity,
+        session_id: str,
+        info_type: str,
+        extra: Optional[dict],
+    ) -> None:
+        """Send information on the call"""
+        if info_type not in ALLOWED_ACTIONS:
+            raise ValueError(f"Unkown info type {info_type!r}")
+        session = self._j.get_session(client, session_id)
+        iq_elt, jingle_elt = self._j.build_session_info(client, session)
+        info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type))
+        if extra and info_type in ("mute", "unmute") and "name" in extra:
+            info_elt["name"] = extra["name"]
+        iq_elt.send()
+
+    def jingle_terminate(
+        self,
+        client: SatXMPPEntity,
+        action: str,
+        session: dict,
+        content_name: str,
+        reason_elt: domish.Element,
+    ) -> None:
+        pass
+
+
+@implementer(iwokkel.IDisco)
+class XEP_0167_handler(XMPPHandler):
+    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
+        return [
+            disco.DiscoFeature(NS_JINGLE_RTP),
+            disco.DiscoFeature(NS_JINGLE_RTP_AUDIO),
+            disco.DiscoFeature(NS_JINGLE_RTP_VIDEO),
+        ]
+
+    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
+        return []