view libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4096:087902fbb77a

plugin XEP-0060: move setting of `pubsub_watching` to `profile_connecting` so it's available early
author Goffi <goffi@goffi.org>
date Mon, 12 Jun 2023 15:00:59 +0200
parents 4b842c1fb686
children bc60875cb3b8
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Optional

from twisted.internet import defer
from twisted.words.protocols.jabber import jid
from twisted.words.protocols.jabber.xmlstream import XMPPHandler
from twisted.words.xish import domish
from wokkel import disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.tools import xml_tools
from libervia.backend.tools.common import data_format

from . import mapping
from ..plugin_xep_0166 import BaseApplicationHandler
from .constants import (
    NS_JINGLE_RTP,
    NS_JINGLE_RTP_INFO,
    NS_JINGLE_RTP_AUDIO,
    NS_JINGLE_RTP_VIDEO,
)


log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Jingle RTP Sessions",
    C.PI_IMPORT_NAME: "XEP-0167",
    C.PI_TYPE: "XEP",
    C.PI_PROTOCOLS: ["XEP-0167"],
    C.PI_DEPENDENCIES: ["XEP-0166"],
    C.PI_MAIN: "XEP_0167",
    C.PI_HANDLER: "yes",
    C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""),
}

CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?")
CONFIRM_TITLE = D_("Incoming Call")
SECURITY_LIMIT = 0

ALLOWED_ACTIONS = (
    "active",
    "hold",
    "unhold",
    "mute",
    "unmute",
    "ringing",
)


class XEP_0167(BaseApplicationHandler):
    def __init__(self, host):
        log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization')
        self.host = host
        # FIXME: to be removed once host is accessible from global var
        mapping.host = host
        self._j = host.plugins["XEP-0166"]
        self._j.register_application(NS_JINGLE_RTP, self)
        host.bridge.add_method(
            "call_start",
            ".plugin",
            in_sign="sss",
            out_sign="s",
            method=self._call_start,
            async_=True,
        )
        host.bridge.add_method(
            "call_end",
            ".plugin",
            in_sign="sss",
            out_sign="",
            method=self._call_end,
            async_=True,
        )
        host.bridge.add_method(
            "call_info",
            ".plugin",
            in_sign="ssss",
            out_sign="",
            method=self._call_start,
        )
        host.bridge.add_signal(
            "call_accepted", ".plugin", signature="sss"
        )  # args: session_id, answer_sdp, profile
        host.bridge.add_signal(
            "call_ended", ".plugin", signature="sss"
        )  # args: session_id, data, profile
        host.bridge.add_signal(
            "call_info", ".plugin", signature="ssss"
        )  # args: session_id, info_type, extra, profile

    def get_handler(self, client):
        return XEP_0167_handler()

    # bridge methods

    def _call_start(
        self,
        entity_s: str,
        call_data_s: str,
        profile_key: str,
    ):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(
            self.call_start(
                client, jid.JID(entity_s), data_format.deserialise(call_data_s)
            )
        )

    async def call_start(
        self,
        client: SatXMPPEntity,
        peer_jid: jid.JID,
        call_data: dict,
    ) -> None:
        """Temporary method to test RTP session"""
        contents = []
        metadata = call_data.get("metadata") or {}

        if "sdp" in call_data:
            sdp_data = mapping.parse_sdp(call_data["sdp"])
            for media_type in ("audio", "video"):
                try:
                    media_data = sdp_data.pop(media_type)
                except KeyError:
                    continue
                call_data[media_type] = media_data["application_data"]
                transport_data = media_data["transport_data"]
                try:
                    call_data[media_type]["fingerprint"] = transport_data["fingerprint"]
                except KeyError:
                    log.warning("fingerprint is missing")
                    pass
                try:
                    call_data[media_type]["id"] = media_data["id"]
                except KeyError:
                    log.warning(f"no media ID found for {media_type}: {media_data}")
                try:
                    call_data[media_type]["ice-candidates"] = transport_data["candidates"]
                    metadata["ice-ufrag"] = transport_data["ufrag"]
                    metadata["ice-pwd"] = transport_data["pwd"]
                except KeyError:
                    log.warning("ICE data are missing from SDP")
                    continue
            metadata.update(sdp_data.get("metadata", {}))

        call_type = (
            C.META_SUBTYPE_CALL_VIDEO
            if "video" in call_data
            else C.META_SUBTYPE_CALL_AUDIO
        )
        seen_names = set()

        for media in ("audio", "video"):
            media_data = call_data.get(media)
            if media_data is not None:
                content = {
                    "app_ns": NS_JINGLE_RTP,
                    "senders": "both",
                    "transport_type": self._j.TRANSPORT_DATAGRAM,
                    "app_kwargs": {"media": media, "media_data": media_data},
                    "transport_data": {
                        "local_ice_data": {
                            "ufrag": metadata["ice-ufrag"],
                            "pwd": metadata["ice-pwd"],
                            "candidates": media_data.pop("ice-candidates"),
                            "fingerprint": media_data.pop("fingerprint", {}),
                        }
                    },
                }
                if "id" in media_data:
                    name = media_data.pop("id")
                    if name in seen_names:
                        raise exceptions.DataError(
                            f"Content name (mid) seen multiple times: {name}"
                        )
                    content["name"] = name
                contents.append(content)
        if not contents:
            raise exceptions.DataError("no valid media data found: {call_data}")
        return await self._j.initiate(
            client,
            peer_jid,
            contents,
            call_type=call_type,
            metadata=metadata,
            peer_metadata={},
        )

    def _call_end(
        self,
        session_id: str,
        data_s: str,
        profile_key: str,
    ):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(
            self.call_end(
                client, session_id, data_format.deserialise(data_s)
            )
        )

    async def call_end(
        self,
        client: SatXMPPEntity,
        session_id: str,
        data: dict,
    ) -> None:
        """End a call

        @param session_id: Jingle session ID of the call
        @param data: optional extra data, may be used to indicate the reason to end the
            call
        """
        session = self._j.get_session(client, session_id)
        await self._j.terminate(client, self._j.REASON_SUCCESS, session)

    # jingle callbacks

    def jingle_session_init(
        self,
        client: SatXMPPEntity,
        session: dict,
        content_name: str,
        media: str,
        media_data: dict,
    ) -> domish.Element:
        if media not in ("audio", "video"):
            raise ValueError('only "audio" and "video" media types are supported')
        content_data = session["contents"][content_name]
        application_data = content_data["application_data"]
        application_data["media"] = media
        application_data["local_data"] = media_data
        desc_elt = mapping.build_description(media, media_data, session)
        self.host.trigger.point(
            "XEP-0167_jingle_session_init",
            client,
            session,
            content_name,
            media,
            media_data,
            desc_elt,
            triggers_no_cancel=True,
        )
        return desc_elt

    async def jingle_request_confirmation(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        desc_elt: domish.Element,
    ) -> bool:
        if content_name != next(iter(session["contents"])):
            # we request confirmation only for the first content, all others are
            # automatically accepted. In practice, that means that the call confirmation
            # is requested only once for audio and video contents.
            return True
        peer_jid = session["peer_jid"]

        if any(
            c["desc_elt"].getAttribute("media") == "video"
            for c in session["contents"].values()
        ):
            call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO
        else:
            call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO

        sdp = mapping.generate_sdp_from_session(session)

        resp_data = await xml_tools.defer_dialog(
            self.host,
            _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type),
            _(CONFIRM_TITLE),
            action_extra={
                "session_id": session["id"],
                "from_jid": peer_jid.full(),
                "type": C.META_TYPE_CALL,
                "sub_type": call_type,
                "sdp": sdp,
            },
            security_limit=SECURITY_LIMIT,
            profile=client.profile,
        )

        if resp_data.get("cancelled", False):
            return False

        answer_sdp = resp_data["sdp"]
        parsed_answer = mapping.parse_sdp(answer_sdp)
        session["peer_metadata"].update(parsed_answer["metadata"])
        for media in ("audio", "video"):
            for content in session["contents"].values():
                if content["desc_elt"].getAttribute("media") == media:
                    media_data = parsed_answer[media]
                    application_data = content["application_data"]
                    application_data["local_data"] = media_data["application_data"]
                    transport_data = content["transport_data"]
                    local_ice_data = media_data["transport_data"]
                    transport_data["local_ice_data"] = local_ice_data

        return True

    async def jingle_handler(self, client, action, session, content_name, desc_elt):
        content_data = session["contents"][content_name]
        application_data = content_data["application_data"]
        if action == self._j.A_PREPARE_CONFIRMATION:
            session["metadata"] = {}
            session["peer_metadata"] = {}
            try:
                media = application_data["media"] = desc_elt["media"]
            except KeyError:
                raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}')
            if media not in ("audio", "video"):
                raise exceptions.DataError(f"invalid media: {media!r}")
            application_data["peer_data"] = mapping.parse_description(desc_elt)
        elif action == self._j.A_SESSION_INITIATE:
            application_data["peer_data"] = mapping.parse_description(desc_elt)
            desc_elt = mapping.build_description(
                application_data["media"], application_data["local_data"], session
            )
        elif action == self._j.A_ACCEPTED_ACK:
            pass
        elif action == self._j.A_PREPARE_INITIATOR:
            application_data["peer_data"] = mapping.parse_description(desc_elt)
        elif action == self._j.A_SESSION_ACCEPT:
            if content_name == next(iter(session["contents"])):
                # we only send the signal for first content, as it means that the whole
                # session is accepted
                answer_sdp = mapping.generate_sdp_from_session(session)
                self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile)
        else:
            log.warning(f"FIXME: unmanaged action {action}")

        self.host.trigger.point(
            "XEP-0167_jingle_handler",
            client,
            action,
            session,
            content_name,
            desc_elt,
            triggers_no_cancel=True,
        )
        return desc_elt

    def jingle_session_info(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        jingle_elt: domish.Element,
    ) -> None:
        """Informational messages"""
        for elt in jingle_elt.elements():
            if elt.uri == NS_JINGLE_RTP_INFO:
                info_type = elt.name
                if info_type not in ALLOWED_ACTIONS:
                    log.warning("ignoring unknow info type: {info_type!r}")
                    continue
                extra = {}
                if info_type in ("mute", "unmute"):
                    name = elt.getAttribute("name")
                    if name:
                        extra["name"] = name
                log.debug(f"{info_type} call info received (extra: {extra})")
                self.host.bridge.call_info(
                    session["id"], info_type, data_format.serialise(extra), client.profile
                )

    def _call_info(self, session_id, info_type, extra_s, profile_key):
        client = self.host.get_client(profile_key)
        extra = data_format.deserialise(extra_s)
        return self.send_info(client, session_id, info_type, extra)


    def send_info(
        self,
        client: SatXMPPEntity,
        session_id: str,
        info_type: str,
        extra: Optional[dict],
    ) -> None:
        """Send information on the call"""
        if info_type not in ALLOWED_ACTIONS:
            raise ValueError(f"Unkown info type {info_type!r}")
        session = self._j.get_session(client, session_id)
        iq_elt, jingle_elt = self._j.build_session_info(client, session)
        info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type))
        if extra and info_type in ("mute", "unmute") and "name" in extra:
            info_elt["name"] = extra["name"]
        iq_elt.send()

    def jingle_terminate(
        self,
        client: SatXMPPEntity,
        action: str,
        session: dict,
        content_name: str,
        reason_elt: domish.Element,
    ) -> None:
        self.host.bridge.call_ended(session["id"], "", client.profile)


@implementer(iwokkel.IDisco)
class XEP_0167_handler(XMPPHandler):
    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [
            disco.DiscoFeature(NS_JINGLE_RTP),
            disco.DiscoFeature(NS_JINGLE_RTP_AUDIO),
            disco.DiscoFeature(NS_JINGLE_RTP_VIDEO),
        ]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []