Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_xep_0167/__init__.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0167/__init__.py@d10748475025 |
children | bc60875cb3b8 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_xep_0167/__init__.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,439 @@ +#!/usr/bin/env python3 + +# Libervia: an XMPP client +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from typing import Optional + +from twisted.internet import defer +from twisted.words.protocols.jabber import jid +from twisted.words.protocols.jabber.xmlstream import XMPPHandler +from twisted.words.xish import domish +from wokkel import disco, iwokkel +from zope.interface import implementer + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.log import getLogger +from libervia.backend.tools import xml_tools +from libervia.backend.tools.common import data_format + +from . import mapping +from ..plugin_xep_0166 import BaseApplicationHandler +from .constants import ( + NS_JINGLE_RTP, + NS_JINGLE_RTP_INFO, + NS_JINGLE_RTP_AUDIO, + NS_JINGLE_RTP_VIDEO, +) + + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Jingle RTP Sessions", + C.PI_IMPORT_NAME: "XEP-0167", + C.PI_TYPE: "XEP", + C.PI_PROTOCOLS: ["XEP-0167"], + C.PI_DEPENDENCIES: ["XEP-0166"], + C.PI_MAIN: "XEP_0167", + C.PI_HANDLER: "yes", + C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""), +} + +CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?") +CONFIRM_TITLE = D_("Incoming Call") +SECURITY_LIMIT = 0 + +ALLOWED_ACTIONS = ( + "active", + "hold", + "unhold", + "mute", + "unmute", + "ringing", +) + + +class XEP_0167(BaseApplicationHandler): + def __init__(self, host): + log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') + self.host = host + # FIXME: to be removed once host is accessible from global var + mapping.host = host + self._j = host.plugins["XEP-0166"] + self._j.register_application(NS_JINGLE_RTP, self) + host.bridge.add_method( + "call_start", + ".plugin", + in_sign="sss", + out_sign="s", + method=self._call_start, + async_=True, + ) + host.bridge.add_method( + "call_end", + ".plugin", + in_sign="sss", + out_sign="", + method=self._call_end, + async_=True, + ) + host.bridge.add_method( + "call_info", + ".plugin", + in_sign="ssss", + out_sign="", + method=self._call_start, + ) + host.bridge.add_signal( + "call_accepted", ".plugin", signature="sss" + ) # args: session_id, answer_sdp, profile + host.bridge.add_signal( + "call_ended", ".plugin", signature="sss" + ) # args: session_id, data, profile + host.bridge.add_signal( + "call_info", ".plugin", signature="ssss" + ) # args: session_id, info_type, extra, profile + + def get_handler(self, client): + return XEP_0167_handler() + + # bridge methods + + def _call_start( + self, + entity_s: str, + call_data_s: str, + profile_key: str, + ): + client = self.host.get_client(profile_key) + return defer.ensureDeferred( + self.call_start( + client, jid.JID(entity_s), data_format.deserialise(call_data_s) + ) + ) + + async def call_start( + self, + client: SatXMPPEntity, + peer_jid: jid.JID, + call_data: dict, + ) -> None: + """Temporary method to test RTP session""" + contents = [] + metadata = call_data.get("metadata") or {} + + if "sdp" in call_data: + sdp_data = mapping.parse_sdp(call_data["sdp"]) + for media_type in ("audio", "video"): + try: + media_data = sdp_data.pop(media_type) + except KeyError: + continue + call_data[media_type] = media_data["application_data"] + transport_data = media_data["transport_data"] + try: + call_data[media_type]["fingerprint"] = transport_data["fingerprint"] + except KeyError: + log.warning("fingerprint is missing") + pass + try: + call_data[media_type]["id"] = media_data["id"] + except KeyError: + log.warning(f"no media ID found for {media_type}: {media_data}") + try: + call_data[media_type]["ice-candidates"] = transport_data["candidates"] + metadata["ice-ufrag"] = transport_data["ufrag"] + metadata["ice-pwd"] = transport_data["pwd"] + except KeyError: + log.warning("ICE data are missing from SDP") + continue + metadata.update(sdp_data.get("metadata", {})) + + call_type = ( + C.META_SUBTYPE_CALL_VIDEO + if "video" in call_data + else C.META_SUBTYPE_CALL_AUDIO + ) + seen_names = set() + + for media in ("audio", "video"): + media_data = call_data.get(media) + if media_data is not None: + content = { + "app_ns": NS_JINGLE_RTP, + "senders": "both", + "transport_type": self._j.TRANSPORT_DATAGRAM, + "app_kwargs": {"media": media, "media_data": media_data}, + "transport_data": { + "local_ice_data": { + "ufrag": metadata["ice-ufrag"], + "pwd": metadata["ice-pwd"], + "candidates": media_data.pop("ice-candidates"), + "fingerprint": media_data.pop("fingerprint", {}), + } + }, + } + if "id" in media_data: + name = media_data.pop("id") + if name in seen_names: + raise exceptions.DataError( + f"Content name (mid) seen multiple times: {name}" + ) + content["name"] = name + contents.append(content) + if not contents: + raise exceptions.DataError("no valid media data found: {call_data}") + return await self._j.initiate( + client, + peer_jid, + contents, + call_type=call_type, + metadata=metadata, + peer_metadata={}, + ) + + def _call_end( + self, + session_id: str, + data_s: str, + profile_key: str, + ): + client = self.host.get_client(profile_key) + return defer.ensureDeferred( + self.call_end( + client, session_id, data_format.deserialise(data_s) + ) + ) + + async def call_end( + self, + client: SatXMPPEntity, + session_id: str, + data: dict, + ) -> None: + """End a call + + @param session_id: Jingle session ID of the call + @param data: optional extra data, may be used to indicate the reason to end the + call + """ + session = self._j.get_session(client, session_id) + await self._j.terminate(client, self._j.REASON_SUCCESS, session) + + # jingle callbacks + + def jingle_session_init( + self, + client: SatXMPPEntity, + session: dict, + content_name: str, + media: str, + media_data: dict, + ) -> domish.Element: + if media not in ("audio", "video"): + raise ValueError('only "audio" and "video" media types are supported') + content_data = session["contents"][content_name] + application_data = content_data["application_data"] + application_data["media"] = media + application_data["local_data"] = media_data + desc_elt = mapping.build_description(media, media_data, session) + self.host.trigger.point( + "XEP-0167_jingle_session_init", + client, + session, + content_name, + media, + media_data, + desc_elt, + triggers_no_cancel=True, + ) + return desc_elt + + async def jingle_request_confirmation( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + desc_elt: domish.Element, + ) -> bool: + if content_name != next(iter(session["contents"])): + # we request confirmation only for the first content, all others are + # automatically accepted. In practice, that means that the call confirmation + # is requested only once for audio and video contents. + return True + peer_jid = session["peer_jid"] + + if any( + c["desc_elt"].getAttribute("media") == "video" + for c in session["contents"].values() + ): + call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO + else: + call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO + + sdp = mapping.generate_sdp_from_session(session) + + resp_data = await xml_tools.defer_dialog( + self.host, + _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type), + _(CONFIRM_TITLE), + action_extra={ + "session_id": session["id"], + "from_jid": peer_jid.full(), + "type": C.META_TYPE_CALL, + "sub_type": call_type, + "sdp": sdp, + }, + security_limit=SECURITY_LIMIT, + profile=client.profile, + ) + + if resp_data.get("cancelled", False): + return False + + answer_sdp = resp_data["sdp"] + parsed_answer = mapping.parse_sdp(answer_sdp) + session["peer_metadata"].update(parsed_answer["metadata"]) + for media in ("audio", "video"): + for content in session["contents"].values(): + if content["desc_elt"].getAttribute("media") == media: + media_data = parsed_answer[media] + application_data = content["application_data"] + application_data["local_data"] = media_data["application_data"] + transport_data = content["transport_data"] + local_ice_data = media_data["transport_data"] + transport_data["local_ice_data"] = local_ice_data + + return True + + async def jingle_handler(self, client, action, session, content_name, desc_elt): + content_data = session["contents"][content_name] + application_data = content_data["application_data"] + if action == self._j.A_PREPARE_CONFIRMATION: + session["metadata"] = {} + session["peer_metadata"] = {} + try: + media = application_data["media"] = desc_elt["media"] + except KeyError: + raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}') + if media not in ("audio", "video"): + raise exceptions.DataError(f"invalid media: {media!r}") + application_data["peer_data"] = mapping.parse_description(desc_elt) + elif action == self._j.A_SESSION_INITIATE: + application_data["peer_data"] = mapping.parse_description(desc_elt) + desc_elt = mapping.build_description( + application_data["media"], application_data["local_data"], session + ) + elif action == self._j.A_ACCEPTED_ACK: + pass + elif action == self._j.A_PREPARE_INITIATOR: + application_data["peer_data"] = mapping.parse_description(desc_elt) + elif action == self._j.A_SESSION_ACCEPT: + if content_name == next(iter(session["contents"])): + # we only send the signal for first content, as it means that the whole + # session is accepted + answer_sdp = mapping.generate_sdp_from_session(session) + self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile) + else: + log.warning(f"FIXME: unmanaged action {action}") + + self.host.trigger.point( + "XEP-0167_jingle_handler", + client, + action, + session, + content_name, + desc_elt, + triggers_no_cancel=True, + ) + return desc_elt + + def jingle_session_info( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + jingle_elt: domish.Element, + ) -> None: + """Informational messages""" + for elt in jingle_elt.elements(): + if elt.uri == NS_JINGLE_RTP_INFO: + info_type = elt.name + if info_type not in ALLOWED_ACTIONS: + log.warning("ignoring unknow info type: {info_type!r}") + continue + extra = {} + if info_type in ("mute", "unmute"): + name = elt.getAttribute("name") + if name: + extra["name"] = name + log.debug(f"{info_type} call info received (extra: {extra})") + self.host.bridge.call_info( + session["id"], info_type, data_format.serialise(extra), client.profile + ) + + def _call_info(self, session_id, info_type, extra_s, profile_key): + client = self.host.get_client(profile_key) + extra = data_format.deserialise(extra_s) + return self.send_info(client, session_id, info_type, extra) + + + def send_info( + self, + client: SatXMPPEntity, + session_id: str, + info_type: str, + extra: Optional[dict], + ) -> None: + """Send information on the call""" + if info_type not in ALLOWED_ACTIONS: + raise ValueError(f"Unkown info type {info_type!r}") + session = self._j.get_session(client, session_id) + iq_elt, jingle_elt = self._j.build_session_info(client, session) + info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type)) + if extra and info_type in ("mute", "unmute") and "name" in extra: + info_elt["name"] = extra["name"] + iq_elt.send() + + def jingle_terminate( + self, + client: SatXMPPEntity, + action: str, + session: dict, + content_name: str, + reason_elt: domish.Element, + ) -> None: + self.host.bridge.call_ended(session["id"], "", client.profile) + + +@implementer(iwokkel.IDisco) +class XEP_0167_handler(XMPPHandler): + def getDiscoInfo(self, requestor, target, nodeIdentifier=""): + return [ + disco.DiscoFeature(NS_JINGLE_RTP), + disco.DiscoFeature(NS_JINGLE_RTP_AUDIO), + disco.DiscoFeature(NS_JINGLE_RTP_VIDEO), + ] + + def getDiscoItems(self, requestor, target, nodeIdentifier=""): + return []