view libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4232:0fbe5c605eb6

tests (unit/webrtc,XEP-0176, XEP-0234): Fix tests and add webrtc file transfer tests: fix 441
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 12:59:50 +0200
parents e11b13418ba6
children 79c8a70e1813
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional
import uuid

from twisted.internet import reactor
from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.tools import xml_tools
from libervia.backend.tools.common import data_format

from . import mapping
from ..plugin_xep_0166 import BaseApplicationHandler
from .constants import (
    NS_JINGLE_RTP,
    NS_JINGLE_RTP_AUDIO,
    NS_JINGLE_RTP_INFO,
    NS_JINGLE_RTP_VIDEO,
)


log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Jingle RTP Sessions",
    C.PI_IMPORT_NAME: "XEP-0167",
    C.PI_TYPE: "XEP",
    C.PI_PROTOCOLS: ["XEP-0167"],
    C.PI_DEPENDENCIES: ["XEP-0166"],
    C.PI_MAIN: "XEP_0167",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""),
}

CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?")
CONFIRM_TITLE = D_("Incoming Call")
SECURITY_LIMIT = 0

ALLOWED_ACTIONS = (
    "active",
    "hold",
    "unhold",
    "mute",
    "unmute",
    "ringing",
)


class XEP_0167(BaseApplicationHandler):
    def __init__(self, host):
        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
        self.host = host
        # FIXME: to be removed once host is accessible from global var
        mapping.host = host
        self._j = host.plugins["XEP-0166"]
        self._j.register_application(NS_JINGLE_RTP, self)
        host.register_namespace("jingle-rtp", NS_JINGLE_RTP)
        host.bridge.add_method(
            "call_start",
            ".plugin",
            in_sign="sss",
            out_sign="s",
            method=self._call_start,
            async_=True,
        )
        host.bridge.add_method(
            "call_answer_sdp",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._call_answer_sdp,
            async_=True,
        )
        host.bridge.add_method(
            "call_info",
            ".plugin",
            in_sign="ssss",
            out_sign="",
            method=self._call_info,
        )
        host.bridge.add_method(
            "call_end",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._call_end,
            async_=True,
        )

        # args: session_id, serialised setup data (dict with keys "role" and "sdp"),
        #   profile
        host.bridge.add_signal("call_setup", ".plugin", signature="sss")

        # args: session_id, data, profile
        host.bridge.add_signal("call_ended", ".plugin", signature="sss")

        # args: session_id, info_type, extra, profile
        host.bridge.add_signal("call_info", ".plugin", signature="ssss")

    def get_handler(self, client):
        return XEP_0167_handler()

    # bridge methods

    def _call_start(
        self,
        entity_s: str,
        call_data_s: str,
        profile_key: str,
    ):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(
            self.call_start(
                client, jid.JID(entity_s), data_format.deserialise(call_data_s)
            )
        )

    def parse_call_data(self, call_data: dict) -> dict:
        """Parse ``call_data`` and return corresponding contents end metadata"""
        metadata = call_data.get("metadata") or {}

        if "sdp" in call_data:
            sdp_data = mapping.parse_sdp(call_data["sdp"])
            to_delete = set()
            for media, data in sdp_data.items():
                if media not in ("audio", "video", "application"):
                    continue
                to_delete.add(media)
                media_type, media_data = media, data
                call_data[media_type] = media_data["application_data"]
                transport_data = media_data["transport_data"]
                try:
                    call_data[media_type]["fingerprint"] = transport_data["fingerprint"]
                except KeyError:
                    log.warning("fingerprint is missing")
                    pass
                try:
                    call_data[media_type]["id"] = media_data["id"]
                except KeyError:
                    log.warning(f"no media ID found for {media_type}: {media_data}")
                # FIXME: the 2 values below are linked to XEP-0343, they should be added
                #   there instead, maybe with some new trigger?
                for key in ("sctp-port","max-message-size"):
                    value = transport_data.get(key)
                    if value is not None:
                        metadata[key] = value
                try:
                    call_data[media_type]["ice-candidates"] = transport_data.get(
                        "candidates", []
                    )
                    metadata["ice-ufrag"] = transport_data["ufrag"]
                    metadata["ice-pwd"] = transport_data["pwd"]
                except KeyError:
                    log.warning("ICE data are missing from SDP")
                    continue
            for media in to_delete:
                del sdp_data[media]
            metadata.update(sdp_data.get("metadata", {}))

        return metadata

    async def call_start(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        call_data: dict,
    ) -> str:
        """Initiate a call session with the given peer.

        @param peer_jid: JID of the peer to initiate a call session with.
        @param call_data: Dictionary containing data for the call. Must include SDP information.
            The dict can have the following keys:
                - sdp (str): SDP data for the call.
                - metadata (dict): Additional metadata for the call (optional).
            Each media type ("audio" and "video") in the SDP should have:
                - application_data (dict): Data about the media.
                - fingerprint (str): Security fingerprint data (optional).
                - id (str): Identifier for the media (optional).
                - ice-candidates: ICE candidates for media transport.
                - And other transport specific data.

        @return: Session ID (SID) for the initiated call session.

        @raises exceptions.DataError: If media data is invalid or duplicate content name
            (mid) is found.
        """
        sid = str(uuid.uuid4())
        metadata = self.parse_call_data(call_data)
        contents = []
        seen_names = set()

        for media, media_data in call_data.items():
            if media not in ("audio", "video"):
                continue
            content = {
                "app_ns": NS_JINGLE_RTP,
                "senders": "both",
                "transport_type": self._j.TRANSPORT_DATAGRAM,
                "app_kwargs": {"media": media, "media_data": media_data},
                "transport_data": {
                    "local_ice_data": {
                        "ufrag": metadata["ice-ufrag"],
                        "pwd": metadata["ice-pwd"],
                        "candidates": media_data.pop("ice-candidates"),
                        "fingerprint": media_data.pop("fingerprint", {}),
                    }
                },
            }
            if "id" in media_data:
                name = media_data.pop("id")
                if name in seen_names:
                    raise exceptions.DataError(
                        f"Content name (mid) seen multiple times: {name}"
                    )
                content["name"] = name
            contents.append(content)
        if not contents:
            raise exceptions.DataError("no valid media data found: {call_data}")

        call_type = (
            C.META_SUBTYPE_CALL_VIDEO if "video" in call_data
            else C.META_SUBTYPE_CALL_AUDIO
        )

        defer.ensureDeferred(
            self._j.initiate(
                client,
                peer_jid,
                contents,
                sid=sid,
                call_type=call_type,
                metadata=metadata,
                peer_metadata={},
            )
        )
        return sid

    def _call_answer_sdp(self, session_id: str, answer_sdp: str, profile: str) -> None:
        client = self.host.get_client(profile)
        session = self._j.get_session(client, session_id)
        try:
            answer_sdp_d = session.pop("answer_sdp_d")
        except KeyError:
            raise exceptions.NotFound(
                f"No answer SDP expected for session {session_id!r}"
            )
        answer_sdp_d.callback(answer_sdp)

    def _call_end(
        self,
        session_id: str,
        data_s: str,
        profile_key: str,
    ):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(
            self.call_end(client, session_id, data_format.deserialise(data_s))
        )

    async def call_end(
        self,
        client: SatXMPPEntity,
        session_id: str,
        data: dict,
    ) -> None:
        """End a call

        @param session_id: Jingle session ID of the call
        @param data: optional extra data, may be used to indicate the reason to end the
            call
        """
        session = self._j.get_session(client, session_id)
        await self._j.terminate(client, self._j.REASON_SUCCESS, session)

    # jingle callbacks

    async def confirm_incoming_call(
        self, client: SatXMPPEntity, session: dict, call_type: str
    ) -> bool:
        """Prompt the user for a call confirmation.

        @param client: The client entity.
        @param session: The Jingle session.
        @param call_type: Type of media (audio or video).

        @return: True if the call has been accepted
        """
        peer_jid = session["peer_jid"]

        session["call_type"] = call_type
        cancellable_deferred = session.setdefault("cancellable_deferred", [])

        dialog_d = xml_tools.defer_dialog(
            self.host,
            _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type),
            _(CONFIRM_TITLE),
            action_extra={
                "session_id": session["id"],
                "from_jid": peer_jid.full(),
                "type": C.META_TYPE_CALL,
                "sub_type": call_type,
            },
            security_limit=SECURITY_LIMIT,
            profile=client.profile,
        )

        cancellable_deferred.append(dialog_d)

        resp_data = await dialog_d

        accepted = not resp_data.get("cancelled", False)

        if accepted:
            session["call_accepted"] = True

        return accepted

    async def jingle_preflight(
        self, client: SatXMPPEntity, session: dict, description_elt: domish.Element
    ) -> None:
        """Perform preflight checks for an incoming call session.

        Check if the calls is audio only or audio/video, then, prompts the user for
        confirmation.

        @param client: The client instance.
        @param session: Jingle session.
        @param description_elt: The description element. It's parent attribute is used to
            determine check siblings to see if it's an audio only or audio/video call.

        @raises exceptions.CancelError: If the user doesn't accept the incoming call.
        """
        if session.get("call_accepted", False):
            # the call is already accepted, nothing to do
            return

        parent_elt = description_elt.parent
        assert parent_elt is not None

        assert description_elt.parent is not None
        for desc_elt in parent_elt.elements(NS_JINGLE_RTP, "description"):
            if desc_elt.getAttribute("media") == "video":
                call_type = C.META_SUBTYPE_CALL_VIDEO
                break
        else:
            call_type = C.META_SUBTYPE_CALL_AUDIO

        try:
            accepted = await self.confirm_incoming_call(client, session, call_type)
        except defer.CancelledError as e:
            # raised when call is retracted before user has answered or rejected
            self.host.bridge.call_ended(
                session["id"],
                data_format.serialise({"reason": "retracted"}),
                client.profile,
            )
            raise e

        if not accepted:
            raise exceptions.CancelError("User declined the incoming call.")

    async def jingle_preflight_info(
        self,
        client: SatXMPPEntity,
        session: dict,
        info_type: str,
        info_data: dict | None = None,
    ) -> None:
        if info_type == "ringing":
            if not session.get("ringing", False):
                self.host.bridge.call_info(session["id"], "ringing", "", client.profile)
                # we indicate that the ringing has started, to avoid sending several times
                # the signal
                session["ringing"] = True
        else:
            log.warning(f"Unknown preflight info type: {info_type!r}")

    async def jingle_preflight_cancel(
        self, client: SatXMPPEntity, session: dict, cancel_error: exceptions.CancelError
    ) -> None:
        """The call has been rejected"""
        # call_ended is used to send the signal only once even if there are audio and
        # video contents
        call_ended = session.get("call_ended", False)
        if call_ended:
            return
        data = {"reason": getattr(cancel_error, "reason", None) or "cancelled"}
        data["text"] = str(cancel_error)
        self.host.bridge.call_ended(
            session["id"], data_format.serialise(data), client.profile
        )
        session["call_ended"] = True

    def jingle_session_init(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        media: str,
        media_data: dict,
    ) -> domish.Element:
        if media not in ("audio", "video"):
            raise ValueError('only "audio" and "video" media types are supported')
        content_data = session["contents"][content_name]
        application_data = content_data["application_data"]
        application_data["media"] = media
        application_data["local_data"] = media_data
        desc_elt = mapping.build_description(media, media_data, session)
        self.host.trigger.point(
            "XEP-0167_jingle_session_init",
            client,
            session,
            content_name,
            media,
            media_data,
            desc_elt,
            triggers_no_cancel=True,
        )
        return desc_elt

    async def jingle_request_confirmation(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        desc_elt: domish.Element,
    ) -> bool:
        """Requests confirmation from the user for a Jingle session's incoming call.

        This method checks the content type of the Jingle session (audio or video)
        based on the session's contents. Confirmation is requested only for the first
        content; subsequent contents are automatically accepted. This means, in practice,
        that the call confirmation is prompted only once for both audio and video contents.

        @param client: The client instance.
        @param action: The action type associated with the Jingle session.
        @param session: Jingle session.
        @param content_name: Name of the content being checked.
        @param desc_elt: The description element associated with the content.

        @return: True if the call is accepted by the user, False otherwise.
        """
        if content_name != next(iter(session["contents"])):
            # we request confirmation only for the first content, all others are
            # automatically accepted. In practice, that means that the call confirmation
            # is requested only once for audio and video contents.
            return True

        if not session.get("call_accepted", False):
            if any(
                c["desc_elt"].getAttribute("media") == "video"
                for c in session["contents"].values()
            ):
                call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
            else:
                call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO

            accepted = await self.confirm_incoming_call(client, session, call_type)
            if not accepted:
                return False

        sdp = mapping.generate_sdp_from_session(session)
        session["answer_sdp_d"] = answer_sdp_d = defer.Deferred()
        # we should have the answer long before 2 min
        answer_sdp_d.addTimeout(2 * 60, reactor)

        self.host.bridge.call_setup(
            session["id"],
            data_format.serialise(
                {
                    "role": session["role"],
                    "sdp": sdp,
                }
            ),
            client.profile,
        )

        answer_sdp = await answer_sdp_d

        parsed_answer = mapping.parse_sdp(answer_sdp)
        session["metadata"].update(parsed_answer["metadata"])
        for media in ("audio", "video"):
            for content in session["contents"].values():
                if content["desc_elt"].getAttribute("media") == media:
                    media_data = parsed_answer[media]
                    application_data = content["application_data"]
                    application_data["local_data"] = media_data["application_data"]
                    transport_data = content["transport_data"]
                    local_ice_data = media_data["transport_data"]
                    transport_data["local_ice_data"] = local_ice_data

        return True

    async def jingle_handler(self, client, action, session, content_name, desc_elt):
        content_data = session["contents"][content_name]
        application_data = content_data["application_data"]
        if action == self._j.A_PREPARE_CONFIRMATION:
            session["metadata"] = {}
            session.setdefault("peer_metadata", {})
            try:
                media = application_data["media"] = desc_elt["media"]
            except KeyError:
                raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}')
            if media not in ("audio", "video"):
                raise exceptions.DataError(f"invalid media: {media!r}")
            application_data["peer_data"] = mapping.parse_description(desc_elt)
        elif action == self._j.A_SESSION_INITIATE:
            application_data["peer_data"] = mapping.parse_description(desc_elt)
            desc_elt = mapping.build_description(
                application_data["media"], application_data["local_data"], session
            )
        elif action == self._j.A_ACCEPTED_ACK:
            pass
        elif action == self._j.A_PREPARE_INITIATOR:
            application_data["peer_data"] = mapping.parse_description(desc_elt)
        elif action == self._j.A_SESSION_ACCEPT:
            if content_name == next(iter(session["contents"])):
                # we only send the signal for first content, as it means that the whole
                # session is accepted
                answer_sdp = mapping.generate_sdp_from_session(session)
                self.host.bridge.call_setup(
                    session["id"],
                    data_format.serialise(
                        {
                            "role": session["role"],
                            "sdp": answer_sdp,
                        }
                    ),
                    client.profile,
                )
        else:
            log.warning(f"FIXME: unmanaged action {action}")

        self.host.trigger.point(
            "XEP-0167_jingle_handler",
            client,
            action,
            session,
            content_name,
            desc_elt,
            triggers_no_cancel=True,
        )
        return desc_elt

    def jingle_session_info(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        jingle_elt: domish.Element,
    ) -> None:
        """Informational messages"""
        for elt in jingle_elt.elements():
            if elt.uri == NS_JINGLE_RTP_INFO:
                info_type = elt.name
                if info_type not in ALLOWED_ACTIONS:
                    log.warning("ignoring unknow info type: {info_type!r}")
                    continue
                extra = {}
                if info_type in ("mute", "unmute"):
                    name = elt.getAttribute("name")
                    if name:
                        extra["name"] = name
                log.debug(f"{info_type} call info received (extra: {extra})")
                self.host.bridge.call_info(
                    session["id"], info_type, data_format.serialise(extra), client.profile
                )

    def _call_info(self, session_id, info_type, extra_s, profile_key):
        client = self.host.get_client(profile_key)
        extra = data_format.deserialise(extra_s)
        return self.send_info(client, session_id, info_type, extra)

    def send_info(
        self,
        client: SatXMPPEntity,
        session_id: str,
        info_type: str,
        extra: Optional[dict],
    ) -> None:
        """Send information on the call"""
        if info_type not in ALLOWED_ACTIONS:
            raise ValueError(f"Unkown info type {info_type!r}")
        session = self._j.get_session(client, session_id)
        iq_elt, jingle_elt = self._j.build_session_info(client, session)
        info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type))
        if extra and info_type in ("mute", "unmute") and "name" in extra:
            info_elt["name"] = extra["name"]
        iq_elt.send()

    def jingle_terminate(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        reason_elt: domish.Element,
    ) -> None:
        reason, text = self._j.parse_reason_elt(reason_elt)
        data = {"reason": reason}
        if text:
            data["text"] = text
        self.host.bridge.call_ended(
            session["id"], data_format.serialise(data), client.profile
        )


@implementer(iwokkel.IDisco)
class XEP_0167_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [
            disco.DiscoFeature(NS_JINGLE_RTP),
            disco.DiscoFeature(NS_JINGLE_RTP_AUDIO),
            disco.DiscoFeature(NS_JINGLE_RTP_VIDEO),
        ]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []