Mercurial > libervia-backend
view sat/plugins/plugin_xep_0167/__init__.py @ 4057:e807a5434f82
tests (units): tests for plugin XEP-0167:
fix 420
author | Goffi <goffi@goffi.org> |
---|---|
date | Mon, 29 May 2023 13:38:11 +0200 |
parents | 1c4f4aa36d98 |
children | adb9dc9c8114 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin for managing pipes (experimental) # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Optional from twisted.internet import defer from twisted.words.protocols.jabber import jid from twisted.words.protocols.jabber.xmlstream import XMPPHandler from twisted.words.xish import domish from wokkel import disco, iwokkel from zope.interface import implementer from sat.core import exceptions from sat.core.constants import Const as C from sat.core.core_types import SatXMPPEntity from sat.core.i18n import D_, _ from sat.core.log import getLogger from sat.tools import xml_tools from sat.tools.common import data_format from . import mapping from ..plugin_xep_0166 import BaseApplicationHandler from .constants import ( NS_JINGLE_RTP, NS_JINGLE_RTP_INFO, NS_JINGLE_RTP_AUDIO, NS_JINGLE_RTP_VIDEO, ) log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Jingle RTP Sessions", C.PI_IMPORT_NAME: "XEP-0167", C.PI_TYPE: "XEP", C.PI_PROTOCOLS: ["XEP-0167"], C.PI_DEPENDENCIES: ["XEP-0166"], C.PI_MAIN: "XEP_0167", C.PI_HANDLER: "yes", C.PI_DESCRIPTION: _("""Real-time Transport Protocol (RTP) is used for A/V calls"""), } CONFIRM = D_("{peer} wants to start a call ({call_type}) with you, do you accept?") CONFIRM_TITLE = D_("Incoming Call") SECURITY_LIMIT = 0 ALLOWED_ACTIONS = ( "active", "hold", "unhold", "mute", "unmute", "ringing", ) class XEP_0167(BaseApplicationHandler): def __init__(self, host): log.info(f'Plugin "{PLUGIN_INFO[C.PI_NAME]}" initialization') self.host = host # FIXME: to be removed once host is accessible from global var mapping.host = host self._j = host.plugins["XEP-0166"] self._j.register_application(NS_JINGLE_RTP, self) host.bridge.add_method( "call_start", ".plugin", in_sign="sss", out_sign="s", method=self._call_start, async_=True, ) host.bridge.add_method( "call_info", ".plugin", in_sign="ssss", out_sign="", method=self._call_start, ) host.bridge.add_signal( "call_accepted", ".plugin", signature="sss" ) # args: session_id, answer_sdp, profile host.bridge.add_signal( "call_info", ".plugin", signature="ssss" ) # args: session_id, info_type, extra, profile def get_handler(self, client): return XEP_0167_handler() # bridge methods def _call_start( self, entity_s: str, call_data_s: str, profile_key: str, ): client = self.host.get_client(profile_key) return defer.ensureDeferred( self.call_start( client, jid.JID(entity_s), data_format.deserialise(call_data_s) ) ) async def call_start( self, client: SatXMPPEntity, peer_jid: jid.JID, call_data: dict, media: str = "video", ) -> None: """Temporary method to test RTP session""" contents = [] metadata = call_data.get("metadata") or {} if "sdp" in call_data: sdp_data = mapping.parse_sdp(call_data["sdp"]) for media_type in ("audio", "video"): try: media_data = sdp_data.pop(media_type) except KeyError: continue call_data[media_type] = media_data["application_data"] transport_data = media_data["transport_data"] try: call_data[media_type]["fingerprint"] = transport_data["fingerprint"] except KeyError: log.warning("fingerprint is missing") pass try: call_data[media_type]["id"] = media_data["id"] except KeyError: log.warning(f"no media ID found for {media_type}: {media_data}") try: call_data[media_type]["ice-candidates"] = transport_data["candidates"] metadata["ice-ufrag"] = transport_data["ufrag"] metadata["ice-pwd"] = transport_data["pwd"] except KeyError: log.warning("ICE data are missing from SDP") continue metadata.update(sdp_data.get("metadata", {})) call_type = ( C.META_SUBTYPE_CALL_VIDEO if "video" in call_data else C.META_SUBTYPE_CALL_AUDIO ) seen_names = set() for media in ("audio", "video"): media_data = call_data.get(media) if media_data is not None: content = { "app_ns": NS_JINGLE_RTP, "senders": "both", "transport_type": self._j.TRANSPORT_DATAGRAM, "app_kwargs": {"media": media, "media_data": media_data}, "transport_data": { "local_ice_data": { "ufrag": metadata["ice-ufrag"], "pwd": metadata["ice-pwd"], "candidates": media_data.pop("ice-candidates"), "fingerprint": media_data.pop("fingerprint", {}), } }, } if "id" in media_data: name = media_data.pop("id") if name in seen_names: raise exceptions.DataError( f"Content name (mid) seen multiple times: {name}" ) content["name"] = name contents.append(content) if not contents: raise exceptions.DataError("no valid media data found: {call_data}") return await self._j.initiate( client, peer_jid, contents, call_type=call_type, metadata=metadata, peer_metadata={}, ) # jingle callbacks def jingle_session_init( self, client: SatXMPPEntity, session: dict, content_name: str, media: str, media_data: dict, ) -> domish.Element: if media not in ("audio", "video"): raise ValueError('only "audio" and "video" media types are supported') content_data = session["contents"][content_name] application_data = content_data["application_data"] application_data["media"] = media application_data["local_data"] = media_data desc_elt = mapping.build_description(media, media_data, session) self.host.trigger.point( "XEP-0167_jingle_session_init", client, session, content_name, media, media_data, desc_elt, triggers_no_cancel=True, ) return desc_elt async def jingle_request_confirmation( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, desc_elt: domish.Element, ) -> bool: if content_name != next(iter(session["contents"])): # we request confirmation only for the first content, all others are # automatically accepted. In practice, that means that the call confirmation # is requested only once for audio and video contents. return True peer_jid = session["peer_jid"] if any( c["desc_elt"].getAttribute("media") == "video" for c in session["contents"].values() ): call_type = session["call_type"] = C.META_SUBTYPE_CALL_VIDEO else: call_type = session["call_type"] = C.META_SUBTYPE_CALL_AUDIO sdp = mapping.generate_sdp_from_session(session) resp_data = await xml_tools.defer_dialog( self.host, _(CONFIRM).format(peer=peer_jid.userhost(), call_type=call_type), _(CONFIRM_TITLE), action_extra={ "session_id": session["id"], "from_jid": peer_jid.full(), "type": C.META_TYPE_CALL, "sub_type": call_type, "sdp": sdp, }, security_limit=SECURITY_LIMIT, profile=client.profile, ) if resp_data.get("cancelled", False): return False answer_sdp = resp_data["sdp"] parsed_answer = mapping.parse_sdp(answer_sdp) session["peer_metadata"].update(parsed_answer["metadata"]) for media in ("audio", "video"): for content in session["contents"].values(): if content["desc_elt"].getAttribute("media") == media: media_data = parsed_answer[media] application_data = content["application_data"] application_data["local_data"] = media_data["application_data"] transport_data = content["transport_data"] local_ice_data = media_data["transport_data"] transport_data["local_ice_data"] = local_ice_data return True async def jingle_handler(self, client, action, session, content_name, desc_elt): content_data = session["contents"][content_name] application_data = content_data["application_data"] if action == self._j.A_PREPARE_CONFIRMATION: session["metadata"] = {} session["peer_metadata"] = {} try: media = application_data["media"] = desc_elt["media"] except KeyError: raise exceptions.DataError('"media" key is missing in {desc_elt.toXml()}') if media not in ("audio", "video"): raise exceptions.DataError(f"invalid media: {media!r}") application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_INITIATE: application_data["peer_data"] = mapping.parse_description(desc_elt) desc_elt = mapping.build_description( application_data["media"], application_data["local_data"], session ) elif action == self._j.A_ACCEPTED_ACK: pass elif action == self._j.A_PREPARE_INITIATOR: application_data["peer_data"] = mapping.parse_description(desc_elt) elif action == self._j.A_SESSION_ACCEPT: if content_name == next(iter(session["contents"])): # we only send the signal for first content, as it means that the whole # session is accepted answer_sdp = mapping.generate_sdp_from_session(session) self.host.bridge.call_accepted(session["id"], answer_sdp, client.profile) else: log.warning(f"FIXME: unmanaged action {action}") self.host.trigger.point( "XEP-0167_jingle_handler", client, action, session, content_name, desc_elt, triggers_no_cancel=True, ) return desc_elt def jingle_session_info( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, jingle_elt: domish.Element, ) -> None: """Informational messages""" for elt in jingle_elt.elements(): if elt.uri == NS_JINGLE_RTP_INFO: info_type = elt.name if info_type not in ALLOWED_ACTIONS: log.warning("ignoring unknow info type: {info_type!r}") continue extra = {} if info_type in ("mute", "unmute"): name = elt.getAttribute("name") if name: extra["name"] = name log.debug(f"{info_type} call info received (extra: {extra})") self.host.bridge.call_info( session["id"], info_type, data_format.serialise(extra), client.profile ) def _call_info(self, session_id, info_type, extra_s, profile_key): client = self.host.get_client(profile_key) extra = data_format.deserialise(extra_s) return self.send_info(client, session_id, info_type, extra) def send_info( self, client: SatXMPPEntity, session_id: str, info_type: str, extra: Optional[dict], ) -> None: """Send information on the call""" if info_type not in ALLOWED_ACTIONS: raise ValueError(f"Unkown info type {info_type!r}") session = self._j.get_session(client, session_id) iq_elt, jingle_elt = self._j.build_session_info(client, session) info_elt = jingle_elt.addElement((NS_JINGLE_RTP_INFO, info_type)) if extra and info_type in ("mute", "unmute") and "name" in extra: info_elt["name"] = extra["name"] iq_elt.send() def jingle_terminate( self, client: SatXMPPEntity, action: str, session: dict, content_name: str, reason_elt: domish.Element, ) -> None: pass @implementer(iwokkel.IDisco) class XEP_0167_handler(XMPPHandler): def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [ disco.DiscoFeature(NS_JINGLE_RTP), disco.DiscoFeature(NS_JINGLE_RTP_AUDIO), disco.DiscoFeature(NS_JINGLE_RTP_VIDEO), ] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []