# HG changeset patch # User Goffi # Date 1712403789 -7200 # Node ID d01b8d00261924b03748b9df747104120d2a76ac # Parent 0fbe5c605eb60890df861c7397c767279367cce3 cli (call, file), frontends: implement webRTC data channel transfer: - file send/receive commands now supports webRTC transfer. In `send` command, the `--webrtc` flags is currenty used to activate it. - WebRTC related code have been factorized and moved to `libervia.frontends.tools.webrtc*` modules. rel 442 diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/base.py --- a/libervia/cli/base.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/cli/base.py Sat Apr 06 13:43:09 2024 +0200 @@ -74,7 +74,7 @@ DESCRIPTION = """This software is a command line tool for XMPP. Get the latest version at """ + C.APP_URL -COPYLEFT = """Copyright (C) 2009-2021 Jérôme Poisson, Adrien Cossa +COPYLEFT = """Copyright (C) 2009-2024 Jérôme Poisson, Adrien Cossa This program comes with ABSOLUTELY NO WARRANTY; This is free software, and you are welcome to redistribute it under certain conditions. """ diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/call_gui.py --- a/libervia/cli/call_gui.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/cli/call_gui.py Sat Apr 06 13:43:09 2024 +0200 @@ -50,7 +50,6 @@ import gi from libervia.backend.core.i18n import _ -from libervia.cli.call_webrtc import WebRTCCall from libervia.frontends.tools import aio, display_servers, webrtc gi.require_versions({ "Gst": "1.0", @@ -65,6 +64,9 @@ running = False +aio.install_glib_asyncio_iteration() + + class ActivableButton(QPushButton): def __init__(self, text, parent=None): super().__init__(parent) @@ -210,8 +212,8 @@ app = QApplication([]) av_call_gui = cls(parent.host, icons_path) av_call_gui.show() - webrtc_call = await WebRTCCall.make_webrtc_call( - parent.host, + webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( + parent.host.bridge, parent.profile, call_data, sinks=webrtc.SINKS_APP, @@ -219,6 +221,12 @@ local_video_cb=partial(av_call_gui.on_new_sample, video_stream="local"), remote_video_cb=partial(av_call_gui.on_new_sample, video_stream="remote"), ), + # we want to be sure that call is ended if user presses `Ctrl + c` or anything + # else stops the session. + on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback( + parent.host.bridge.call_end, sid, "", profile + ), + on_call_ended_cb=lambda sid, profile: parent.host.a_quit(), ) av_call_gui.webrtc_call = webrtc_call diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/call_simple.py --- a/libervia/cli/call_simple.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/cli/call_simple.py Sat Apr 06 13:43:09 2024 +0200 @@ -29,7 +29,11 @@ from rich.panel import Panel from rich.text import Text -from .call_webrtc import CallData, WebRTCCall +from libervia.frontends.tools import aio +from libervia.frontends.tools.webrtc import CallData, WebRTCCall + + +aio.install_glib_asyncio_iteration() class BaseAVTUI: @@ -157,10 +161,16 @@ merge_pip = False if "split" in parent.args.output_opts else None webrtc_call = await WebRTCCall.make_webrtc_call( - parent.host, + parent.host.bridge, parent.profile, call_data, merge_pip=merge_pip, + # we want to be sure that call is ended if user presses `Ctrl + c` or anything + # else stops the session. + on_call_setup_cb=lambda sid, profile: parent.host.add_on_quit_callback( + parent.host.bridge.call_end, sid, "", profile + ), + on_call_ended_cb=lambda sid, profile: parent.host.a_quit(), **kwargs, ) if not parent.args.no_ui: diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/call_tui.py --- a/libervia/cli/call_tui.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/cli/call_tui.py Sat Apr 06 13:43:09 2024 +0200 @@ -27,16 +27,19 @@ from term_image import image as t_image from libervia.cli.constants import Const as C -from libervia.frontends.tools import webrtc +from libervia.frontends.tools import aio, webrtc +from libervia.frontends.tools.webrtc import CallData, WebRTCCall from .call_simple import BaseAVTUI -from .call_webrtc import CallData, WebRTCCall gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst +aio.install_glib_asyncio_iteration() + + class AVCallUI(BaseAVTUI): def __init__(self, parent): super().__init__(parent.host, align="center") @@ -74,7 +77,7 @@ # we use low res by default for performance reason kwargs["target_size"] = (640, 380) webrtc_call = await WebRTCCall.make_webrtc_call( - self.parent.host, + self.parent.host.bridge, self.parent.profile, call_data, sinks=webrtc.SINKS_APP, @@ -83,6 +86,12 @@ remote_video_cb=None, ), merge_pip=True, + # we want to be sure that call is ended if user presses `Ctrl + c` or anything + # else stops the session. + on_call_setup_cb=lambda sid, profile: self.parent.host.add_on_quit_callback( + self.parent.host.bridge.call_end, sid, "", profile + ), + on_call_ended_cb=lambda sid, profile: self.parent.host.a_quit(), **kwargs, ) self.webrtc = webrtc_call.webrtc diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/call_webrtc.py --- a/libervia/cli/call_webrtc.py Sat Apr 06 12:59:50 2024 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,135 +0,0 @@ -#!/usr/bin/env python3 - -# Libervia CLI -# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -from dataclasses import dataclass - -from libervia.backend.tools.common import data_format -from libervia.frontends.tools import aio, jid - - -@dataclass -class CallData: - callee: jid.JID - sid: str | None = None - action_id: str | None = None - - -class WebRTCCall: - def __init__(self, host, profile: str, callee: jid.JID, **kwargs): - """Create and setup a webRTC instance - - @param profile: profile making or receiving the call - @param callee: peer jid - @param kwargs: extra kw args to use when instantiating WebRTC - """ - from libervia.frontends.tools import webrtc - - aio.install_glib_asyncio_iteration() - self.host = host - self.profile = profile - self.webrtc = webrtc.WebRTC(host.bridge, profile, **kwargs) - self.webrtc.callee = callee - host.bridge.register_signal( - "ice_candidates_new", self.on_ice_candidates_new, "plugin" - ) - host.bridge.register_signal("call_setup", self.on_call_setup, "plugin") - host.bridge.register_signal("call_ended", self.on_call_ended, "plugin") - - @classmethod - async def make_webrtc_call( - cls, - host, - profile: str, - call_data: CallData, - **kwargs - ) -> "WebRTCCall": - """Create the webrtc_call instance - - @param call_data: Call data of the command - @param kwargs: extra args used to instanciate WebRTCCall - - """ - webrtc_call = cls(host, profile, call_data.callee, **kwargs) - if call_data.sid is None: - # we are making the call - await webrtc_call.start() - else: - # we are receiving the call - webrtc_call.sid = call_data.sid - if call_data.action_id is not None: - await host.bridge.action_launch( - call_data.action_id, - data_format.serialise({"cancelled": False}), - profile - ) - return webrtc_call - - @property - def sid(self) -> str | None: - return self.webrtc.sid - - @sid.setter - def sid(self, new_sid: str | None) -> None: - self.webrtc.sid = new_sid - - async def on_ice_candidates_new( - self, sid: str, candidates_s: str, profile: str - ) -> None: - if sid != self.webrtc.sid or profile != self.profile: - return - self.webrtc.on_ice_candidates_new( - data_format.deserialise(candidates_s), - ) - - async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: - if sid != self.webrtc.sid or profile != self.profile: - return - setup_data = data_format.deserialise(setup_data_s) - try: - role = setup_data["role"] - sdp = setup_data["sdp"] - except KeyError: - self.host.disp(f"Invalid setup data received: {setup_data}", error=True) - return - if role == "initiator": - self.webrtc.on_accepted_call(sdp, profile) - elif role == "responder": - await self.webrtc.answer_call(sdp, profile) - else: - self.host.disp( - f"Invalid role received during setup: {setup_data}", error=True - ) - # we want to be sure that call is ended if user presses `Ctrl + c` or anything - # else stops the session. - self.host.add_on_quit_callback( - lambda: self.host.bridge.call_end(sid, "", profile) - ) - - async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: - if sid != self.webrtc.sid or profile != self.profile: - return - await self.webrtc.end_call() - await self.host.a_quit() - - async def start(self): - """Start a call. - - To be used only if we are initiator - """ - await self.webrtc.setup_call("initiator") - self.webrtc.start_pipeline() diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/cmd_call.py --- a/libervia/cli/cmd_call.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/cli/cmd_call.py Sat Apr 06 13:43:09 2024 +0200 @@ -21,15 +21,16 @@ from functools import partial import importlib import logging +from typing import Any from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format from libervia.cli.constants import Const as C from libervia.frontends.tools import jid +from libervia.frontends.tools.webrtc_models import CallData from . import base -from .call_webrtc import CallData, WebRTCCall __commands__ = ["Call"] @@ -58,6 +59,19 @@ self.parser.add_argument( "--no-ui", action="store_true", help=_("disable user interface") ) + sources_group = self.parser.add_mutually_exclusive_group() + sources_group.add_argument( + "-s", "--sources", choices=['auto', 'test'], default='auto', + help='Well-known sources to use (default: "auto").' + ) + + def get_call_data_kw(self) -> dict[str, Any]: + """Get relevant keyword arguments for CallData""" + kwargs: dict[str, Any] = {} + if self.args.sources == "test": + kwargs["sources"] = "test" + return kwargs + async def start(self): root_logger = logging.getLogger() @@ -120,6 +134,7 @@ await super().start() await super().output(CallData( callee=jid.JID(self.args.entity), + kwargs=self.get_call_data_kw() )) @@ -178,7 +193,8 @@ await super().output(CallData( callee=peer_jid, sid=action_data["session_id"], - action_id=action_id + action_id=action_id, + kwargs=self.get_call_data_kw() )) async def start(self): diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/cli/cmd_file.py --- a/libervia/cli/cmd_file.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/cli/cmd_file.py Sat Apr 06 13:43:09 2024 +0200 @@ -18,6 +18,11 @@ # along with this program. If not, see . +import asyncio +from functools import partial +import importlib +import logging +from typing import IO from . import base from . import xmlui_manager import sys @@ -28,7 +33,7 @@ from libervia.backend.tools.common import data_format from libervia.cli.constants import Const as C from libervia.cli import common -from libervia.frontends.tools import jid +from libervia.frontends.tools import aio, jid from libervia.backend.tools.common.ansi import ANSI as A from libervia.backend.tools.common import utils from urllib.parse import urlparse @@ -81,6 +86,11 @@ action="store_true", help=_("end-to-end encrypt the file transfer") ) + self.parser.add_argument( + "--webrtc", + action="store_true", + help=_("Use WebRTC Data Channel transport.") + ) async def on_progress_started(self, metadata): self.disp(_("File copy started"), 2) @@ -94,12 +104,8 @@ else: self.disp(_("Error while sending file: {}").format(error_msg), error=True) - async def got_id(self, data, file_): - """Called when a progress id has been received - - @param pid(unicode): progress id - @param file_(str): file path - """ + async def got_id(self, data: dict): + """Called when a progress id has been received""" # FIXME: this show progress only for last progress_id self.disp(_("File request sent to {jid}".format(jid=self.args.jid)), 1) try: @@ -109,7 +115,9 @@ self.disp(_("Can't send file to {jid}".format(jid=self.args.jid)), error=True) self.host.quit(2) + async def start(self): + file_ = None for file_ in self.args.files: if not os.path.exists(file_): self.disp( @@ -148,28 +156,41 @@ bz2.add(file_) bz2.close() self.disp(_("Done !"), 1) + self.args.files = [buf.name] + if not self.args.name: + self.args.name = archive_name + for file_ in self.args.files: + file_path = Path(file_) + if self.args.webrtc: + root_logger = logging.getLogger() + # we don't want any formatting for messages from webrtc + for handler in root_logger.handlers: + handler.setFormatter(None) + if self.verbosity == 0: + root_logger.setLevel(logging.ERROR) + if self.verbosity >= 1: + root_logger.setLevel(logging.WARNING) + if self.verbosity >= 2: + root_logger.setLevel(logging.DEBUG) + from libervia.frontends.tools.webrtc_file import WebRTCFileSender + aio.install_glib_asyncio_iteration() + file_sender = WebRTCFileSender( + self.host.bridge, + self.profile, + on_call_start_cb=self.got_id, + end_call_cb=self.host.a_quit + ) + await file_sender.send_file_webrtc( + file_path, + self.args.jid, + self.args.name + ) + else: try: - send_data = await self.host.bridge.file_send( + send_data_raw = await self.host.bridge.file_send( self.args.jid, - buf.name, - self.args.name or archive_name, - "", - data_format.serialise(extra), - self.profile, - ) - except Exception as e: - self.disp(f"can't send file: {e}", error=True) - self.host.quit(C.EXIT_BRIDGE_ERRBACK) - else: - await self.got_id(send_data, file_) - else: - for file_ in self.args.files: - path = os.path.abspath(file_) - try: - send_data = await self.host.bridge.file_send( - self.args.jid, - path, + str(file_path.absolute()), self.args.name, "", data_format.serialise(extra), @@ -179,7 +200,8 @@ self.disp(f"can't send file {file_!r}: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: - await self.got_id(send_data, file_) + send_data = data_format.deserialise(send_data_raw) + await self.got_id(send_data) class Request(base.CommandBase): @@ -301,7 +323,7 @@ use_verbose=True, help=_("wait for a file to be sent by a contact"), ) - self._overwrite_refused = False # True when one overwrite as already been refused + self._overwrite_refused = False # True when one overwrite has already been refused self.action_callbacks = { C.META_TYPE_CONFIRM: self.on_confirm_action, C.META_TYPE_FILE: self.on_file_action, @@ -326,7 +348,7 @@ "--force", action="store_true", help=_( - "force overwritting of existing files (/!\\ name is choosed by sender)" + "force overwriting of existing files (/!\\ name is choosed by sender)" ), ) self.parser.add_argument( @@ -354,6 +376,58 @@ async def on_progress_error(self, e): self.disp(_("Error while receiving file: {e}").format(e=e), error=True) + async def _on_webrtc_close(self) -> None: + if not self.args.multiple: + await self.host.a_quit() + + async def on_webrtc_file( + self, + from_jid: jid.JID, + session_id: str, + file_data: dict + ) -> None: + from libervia.frontends.tools.webrtc_file import WebRTCFileReceiver + aio.install_glib_asyncio_iteration() + root_logger = logging.getLogger() + # we don't want any formatting for messages from webrtc + for handler in root_logger.handlers: + handler.setFormatter(None) + if self.verbosity == 0: + root_logger.setLevel(logging.ERROR) + if self.verbosity >= 1: + root_logger.setLevel(logging.WARNING) + if self.verbosity >= 2: + root_logger.setLevel(logging.DEBUG) + + dest_path = Path(self.path) + + if dest_path.is_dir(): + filename = file_data.get("name", "unammed_file") + dest_path /= filename + if dest_path.exists() and not self.args.force: + self.host.disp( + "Destination file already exists", + error=True + ) + aio.run_from_thread( + self.host.a_quit, C.EXIT_ERROR, loop=self.host.loop.loop + ) + return + + file_receiver = WebRTCFileReceiver( + self.host.bridge, + self.profile, + on_close_cb=self._on_webrtc_close + ) + + await file_receiver.receive_file_webrtc( + from_jid, + session_id, + dest_path, + file_data + ) + + def get_xmlui_id(self, action_data): # FIXME: we temporarily use ElementTree, but a real XMLUI managing module # should be available in the futur @@ -376,12 +450,16 @@ if action_data.get("subtype") != C.META_TYPE_FILE: self.disp(_("Ignoring confirm dialog unrelated to file."), 1) return + try: + from_jid = jid.JID(action_data["from_jid"]) + except KeyError: + self.disp(_("Ignoring action without from_jid data"), 1) + return - # we always accept preflight confirmation dialog, as for now a second dialog is - # always sent - # FIXME: real confirmation should be done here, and second dialog should not be - # sent from backend - xmlui_data = {"answer": C.BOOL_TRUE} + # We accept if no JID is specified (meaning "accept all") or if the sender is + # explicitly specified. + answer = not self.bare_jids or from_jid.bare in self.bare_jids + xmlui_data = {"answer": C.bool_const(answer)} await self.host.bridge.action_launch( xmlui_id, data_format.serialise(xmlui_data), profile_key=profile ) @@ -401,7 +479,10 @@ self.disp(_("ignoring action without progress id"), 1) return - if not self.bare_jids or from_jid.bare in self.bare_jids: + webrtc = action_data.get("webrtc", False) + file_accepted = action_data.get("file_accepted", False) + + if file_accepted or not self.bare_jids or from_jid.bare in self.bare_jids: if self._overwrite_refused: self.disp(_("File refused because overwrite is needed"), error=True) await self.host.bridge.action_launch( @@ -410,7 +491,22 @@ ) return self.host.quit_from_signal(2) await self.set_progress_id(progress_id) - xmlui_data = {"path": self.path} + if webrtc: + xmlui_data = {"answer": C.BOOL_TRUE} + file_data = action_data.get("file_data") or {} + try: + session_id = action_data["session_id"] + except KeyError: + self.disp(_("ignoring action without session id"), 1) + return + await self.on_webrtc_file( + from_jid, + session_id, + file_data + ) + + else: + xmlui_data = {"path": self.path} await self.host.bridge.action_launch( xmlui_id, data_format.serialise(xmlui_data), profile_key=profile ) @@ -438,7 +534,9 @@ xmlui_id, data_format.serialise(xmlui_data), profile_key=profile ) - async def on_not_in_roster_action(self, action_data, action_id, security_limit, profile): + async def on_not_in_roster_action( + self, action_data, action_id, security_limit, profile + ): xmlui_id = self.get_xmlui_id(action_data) if xmlui_id is None: return self.host.quit_from_signal(1) @@ -618,12 +716,8 @@ async def on_progress_error(self, error_msg): self.disp(_("Error while uploading file: {}").format(error_msg), error=True) - async def got_id(self, data, file_): - """Called when a progress id has been received - - @param pid(unicode): progress id - @param file_(str): file path - """ + async def got_id(self, data): + """Called when a progress id has been received""" try: await self.set_progress_id(data["progress"]) except KeyError: @@ -669,7 +763,7 @@ self.disp(f"error while trying to upload a file: {e}", error=True) self.host.quit(C.EXIT_BRIDGE_ERRBACK) else: - await self.got_id(upload_data, file_) + await self.got_id(upload_data) class ShareAffiliationsSet(base.CommandBase): diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/frontends/quick_frontend/quick_app.py --- a/libervia/frontends/quick_frontend/quick_app.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/frontends/quick_frontend/quick_app.py Sat Apr 06 13:43:09 2024 +0200 @@ -637,7 +637,7 @@ This will launch frontend specific workflow - /!\ if you override the method and don't call the parent, be sure to add the + /!\\ if you override the method and don't call the parent, be sure to add the profile to ready_profiles ! if you don't, all signals will stay in cache @param profile(unicode): %(doc_profile)s @@ -763,7 +763,7 @@ action_handler(action_data, action_id, security_limit, profile) else: self.action_manager( - action_data, user_action=False, profile=profile + action_data, user_action=False, action_id=action_id, profile=profile ) def contact_new_handler(self, jid_s, attributes, groups, profile): @@ -1304,21 +1304,31 @@ ) self._action_handlers[action_type] = handler - def action_manager(self, action_data, callback=None, ui_show_cb=None, user_action=True, - progress_cb=None, progress_eb=None, profile=C.PROF_KEY_NONE): + def action_manager( + self, + action_data: dict, + callback: Callable|None = None, + ui_show_cb: Callable|None = None, + user_action: bool = True, + action_id: str|None = None, + progress_cb: Callable|None = None, + progress_eb: Callable|None = None, + profile: str = C.PROF_KEY_NONE + ) -> None: """Handle backend action - @param action_data(dict): action dict as sent by action_launch or returned by an + @param action_data: action dict as sent by action_launch or returned by an UI action - @param callback(None, callback): if not None, callback to use on XMLUI answer - @param ui_show_cb(None, callback): if not None, method to call to show the XMLUI - @param user_action(bool): if True, the action is a result of a user interaction + @param callback: if not None, callback to use on XMLUI answer + @param ui_show_cb: if not None, method to call to show the XMLUI + @param user_action: if True, the action is a result of a user interaction else the action come from backend direclty (i.e. action_new). This is useful to know if the frontend can display a popup immediately (if True) or if it should add it to a queue that the user can activate later. - @param progress_cb(None, callable): method to call when progression is finished. + @param action_id: ID of the action. + @param progress_cb: method to call when progression is finished. Only make sense if a progress is expected in this action - @param progress_eb(None, callable): method to call when something went wrong + @param progress_eb: method to call when something went wrong during progression. Only make sense if a progress is expected in this action """ diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/frontends/tools/aio.py --- a/libervia/frontends/tools/aio.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/frontends/tools/aio.py Sat Apr 06 13:43:09 2024 +0200 @@ -34,7 +34,7 @@ """ background_tasks.discard(task) e = task.exception() - if e is not None: + if e is not None and not isinstance(e, SystemExit): exc_info = (type(e), e, e.__traceback__) log.error("Task failed:", exc_info=exc_info) @@ -57,7 +57,7 @@ def run_with_args( - async_method: Callable[..., Coroutine[Any, Any, Any]], *args: Any, **kwargs: Any + async_method: Callable[..., Coroutine], *args: Any, **kwargs: Any ) -> None: """Schedules and tracks an asynchronous method with arguments. @@ -72,7 +72,7 @@ def run_from_thread( - async_method: Coroutine | asyncio.Future, + async_method: Callable[..., Coroutine] | Callable[..., asyncio.Future], *args, loop: asyncio.AbstractEventLoop | None = None, **kwargs, diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/frontends/tools/webrtc.py --- a/libervia/frontends/tools/webrtc.py Sat Apr 06 12:59:50 2024 +0200 +++ b/libervia/frontends/tools/webrtc.py Sat Apr 06 13:43:09 2024 +0200 @@ -16,11 +16,10 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . +from collections.abc import Awaitable import gi -gi.require_versions({ - "Gst": "1.0", - "GstWebRTC": "1.0" -}) + +gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import Gst, GstWebRTC, GstSdp from libervia.backend.core import exceptions @@ -33,22 +32,23 @@ "your system (e.g., `python3-gst-1.0` on Debian and derivatives)." ) import asyncio -from dataclasses import dataclass from datetime import datetime import logging -from random import randint import re from typing import Callable from urllib.parse import quote_plus from libervia.backend.tools.common import data_format -from libervia.frontends.tools import aio, display_servers +from libervia.frontends.tools import aio, display_servers, jid +from .webrtc_models import AppSinkData, CallData +from .webrtc_screenshare import DesktopPortal current_server = display_servers.detect() if current_server == display_servers.X11: # GSTreamer's ximagesrc documentation asks to run this function import ctypes - ctypes.CDLL('libX11.so.6').XInitThreads() + + ctypes.CDLL("libX11.so.6").XInitThreads() log = logging.getLogger(__name__) @@ -57,194 +57,18 @@ SOURCES_AUTO = "auto" SOURCES_TEST = "test" +SOURCES_DATACHANNEL = "datachannel" SINKS_APP = "app" SINKS_AUTO = "auto" SINKS_TEST = "test" - - -class ScreenshareError(Exception): - pass - - -@dataclass -class AppSinkData: - local_video_cb: Callable - remote_video_cb: Callable|None - - -class DesktopPortal: - - def __init__(self, webrtc: "WebRTC"): - import dbus - from dbus.mainloop.glib import DBusGMainLoop - # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes - self.dbus = dbus - self.webrtc = webrtc - self.sources_type = dbus.UInt32(7) - DBusGMainLoop(set_as_default=True) - self.session_bus = dbus.SessionBus() - portal_object = self.session_bus.get_object( - 'org.freedesktop.portal.Desktop', - '/org/freedesktop/portal/desktop' - ) - self.screencast_interface = dbus.Interface( - portal_object, - 'org.freedesktop.portal.ScreenCast' - ) - self.session_interface = None - self.session_signal = None - self.handle_counter = 0 - self.session_handle = None - self.stream_data: dict|None = None - - @property - def handle_token(self): - self.handle_counter += 1 - return f"libervia{self.handle_counter}" - - def on_session_closed(self, details: dict) -> None: - if self.session_interface is not None: - self.session_interface = None - self.webrtc.desktop_sharing = False - if self.session_signal is not None: - self.session_signal.remove() - self.session_signal = None - - - async def dbus_call(self, method_name: str, *args) -> dict: - """Call a screenshare portal method - - This method handle the signal response. - @param method_name: method to call - @param args: extra args - `handle_token` will be automatically added to the last arg (option dict) - @return: method result - """ - if self.session_handle is not None: - self.end_screenshare() - method = getattr(self.screencast_interface, method_name) - options = args[-1] - reply_fut = asyncio.Future() - signal_fut = asyncio.Future() - # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html - handle_token = self.handle_token - sender = self.session_bus.get_unique_name().replace(".", "_")[1:] - path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" - signal_match = None - - def on_signal(response, results): - assert signal_match is not None - signal_match.remove() - if response == 0: - signal_fut.set_result(results) - elif response == 1: - signal_fut.set_exception( - exceptions.CancelError("Cancelled by user.") - ) - else: - signal_fut.set_exception(ScreenshareError( - f"Can't get signal result" - )) - - signal_match = self.session_bus.add_signal_receiver( - on_signal, - signal_name="Response", - dbus_interface="org.freedesktop.portal.Request", - path=path - ) - - options["handle_token"] = handle_token - - method( - *args, - reply_handler=reply_fut.set_result, - error_handler=reply_fut.set_exception - ) - try: - await reply_fut - except Exception as e: - raise ScreenshareError(f"Can't ask screenshare permission: {e}") - return await signal_fut - - async def request_screenshare(self) -> dict: - session_data = await self.dbus_call( - "CreateSession", - { - "session_handle_token": str(randint(1, 2**32)), - } - ) - try: - session_handle = session_data["session_handle"] - except KeyError: - raise ScreenshareError("Can't get session handle") - self.session_handle = session_handle - - - await self.dbus_call( - "SelectSources", - session_handle, - { - "multiple": True, - "types": self.sources_type, - "modal": True - } - ) - screenshare_data = await self.dbus_call( - "Start", - session_handle, - "", - {} - ) - - session_object = self.session_bus.get_object( - 'org.freedesktop.portal.Desktop', - session_handle - ) - self.session_interface = self.dbus.Interface( - session_object, - 'org.freedesktop.portal.Session' - ) - - self.session_signal = self.session_bus.add_signal_receiver( - self.on_session_closed, - signal_name="Closed", - dbus_interface="org.freedesktop.portal.Session", - path=session_handle - ) - - try: - node_id, stream_data = screenshare_data["streams"][0] - source_type = int(stream_data["source_type"]) - except (IndexError, KeyError): - raise ScreenshareError("Can't parse stream data") - self.stream_data = stream_data = { - "session_handle": session_handle, - "node_id": node_id, - "source_type": source_type - } - try: - height = int(stream_data["size"][0]) - weight = int(stream_data["size"][1]) - except (IndexError, KeyError): - pass - else: - stream_data["size"] = (height, weight) - - return self.stream_data - - def end_screenshare(self) -> None: - """Close a running screenshare session, if any.""" - if self.session_interface is None: - return - self.session_interface.Close() - self.on_session_closed({}) +SINKS_DATACHANNEL = "datachannel" class WebRTC: """GSTreamer based WebRTC implementation for audio and video communication. This class encapsulates the WebRTC functionalities required for initiating and - handling audio and video calls. + handling audio and video calls, and data channels. """ def __init__( @@ -255,8 +79,15 @@ sinks: str = SINKS_AUTO, appsink_data: AppSinkData | None = None, reset_cb: Callable | None = None, - merge_pip: bool|None = None, - target_size: tuple[int, int]|None = None, + merge_pip: bool | None = None, + target_size: tuple[int, int] | None = None, + call_start_cb: Callable[[str, dict, str], Awaitable[str]] | None = None, + dc_open_cb: ( + Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None + ) = None, + dc_on_data_channel: ( + Callable[[GstWebRTC.WebRTCDataChannel], Awaitable[None]] | None + ) = None, ) -> None: """Initializes a new WebRTC instance. @@ -277,6 +108,11 @@ when ``merge_pip`` is set. None to autodetect (not real autodetection implemeted yet, default to (1280,720)). + @param call_start_cb: Called when call is started. + @param dc_open_cb: Called when Data Channel is open (for SOURCES_DATACHANNEL). + This callback will be run in a GStreamer thread. + @param dc_open_cb: Called when Data Channel is created (for SINKS_DATACHANNEL). + This callback will be run in a GStreamer thread. """ self.main_loop = asyncio.get_event_loop() self.bridge = bridge @@ -289,11 +125,20 @@ self.sources = sources self.sinks = sinks if target_size is None: - target_size=(1280, 720) + target_size = (1280, 720) self.target_width, self.target_height = target_size if merge_pip is None: merge_pip = sinks == SINKS_AUTO self.merge_pip = merge_pip + if call_start_cb is None: + call_start_cb = self._call_start + self.call_start_cb = call_start_cb + if sources == SOURCES_DATACHANNEL: + assert dc_open_cb is not None + self.dc_open_cb = dc_open_cb + if sinks == SINKS_DATACHANNEL: + assert dc_on_data_channel is not None + self.dc_on_data_channel = dc_on_data_channel if sinks == SINKS_APP: if ( merge_pip @@ -387,13 +232,12 @@ ) self.bindings[key] = cb - def generate_dot_file( self, filename: str = "pipeline", details: Gst.DebugGraphDetails = Gst.DebugGraphDetails.ALL, with_timestamp: bool = True, - bin_: Gst.Bin|None = None, + bin_: Gst.Bin | None = None, ) -> None: """Generate Dot File for debugging @@ -412,7 +256,7 @@ if bin_ is None: bin_ = self.pipeline if with_timestamp: - timestamp = datetime.now().isoformat(timespec='milliseconds') + timestamp = datetime.now().isoformat(timespec="milliseconds") filename = f"{timestamp}_filename" Gst.debug_bin_to_dot_file(bin_, details, filename) @@ -556,7 +400,7 @@ self.local_candidates_buffer = {} self.ufrag: str | None = None self.pwd: str | None = None - self.callee: str | None = None + self.callee: jid.JID | None = None self._media_types = None self._media_types_inv = None self._sdp_set: bool = False @@ -576,7 +420,6 @@ if self.reset_cb is not None: self.reset_cb() - async def setup_call( self, role: str, @@ -598,76 +441,84 @@ """ assert role in ("initiator", "responder") self.role = role - if audio_pt is None or video_pt is None: - raise NotImplementedError("None value is not handled yet") - if self.sources == SOURCES_AUTO: - video_source_elt = "v4l2src" - audio_source_elt = "pulsesrc" - elif self.sources == SOURCES_TEST: - video_source_elt = "videotestsrc is-live=true pattern=ball" - audio_source_elt = "audiotestsrc" + if self.sources == SOURCES_DATACHANNEL or self.sinks == SINKS_DATACHANNEL: + # Setup pipeline for datachannel only, no media streams. + self.gst_pipe_desc = f""" + webrtcbin name=sendrecv bundle-policy=max-bundle + """ else: - raise exceptions.InternalError(f'Unknown "sources" value: {self.sources!r}') - + if audio_pt is None or video_pt is None: + raise NotImplementedError("None value is not handled yet") - if self.sinks == SINKS_APP: - local_video_sink_elt = ( - "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " - "sync=True" - ) - elif self.sinks == SINKS_AUTO: - local_video_sink_elt = "autovideosink" - else: - raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") + if self.sources == SOURCES_AUTO: + video_source_elt = "v4l2src" + audio_source_elt = "pulsesrc" + elif self.sources == SOURCES_TEST: + video_source_elt = "videotestsrc is-live=true pattern=ball" + audio_source_elt = "audiotestsrc" + else: + raise exceptions.InternalError( + f'Unknown "sources" value: {self.sources!r}' + ) - if self.merge_pip: - extra_elt = ( - "compositor name=compositor background=black " - f"! video/x-raw,width={self.target_width},height={self.target_height}," - "framerate=30/1 " - f"! {local_video_sink_elt}" - ) - local_video_sink_elt = "compositor.sink_1" - else: - extra_elt = "" + if self.sinks == SINKS_APP: + local_video_sink_elt = ( + "appsink name=local_video_sink emit-signals=true drop=true max-buffers=1 " + "sync=True" + ) + elif self.sinks == SINKS_AUTO: + local_video_sink_elt = "autovideosink" + else: + raise exceptions.InternalError(f"Unknown sinks value {self.sinks!r}") - self.gst_pipe_desc = f""" - webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle + if self.merge_pip: + extra_elt = ( + "compositor name=compositor background=black " + f"! video/x-raw,width={self.target_width},height={self.target_height}," + "framerate=30/1 " + f"! {local_video_sink_elt}" + ) + local_video_sink_elt = "compositor.sink_1" + else: + extra_elt = "" - input-selector name=video_selector - ! videorate - ! video/x-raw,framerate=30/1 - ! tee name=t + self.gst_pipe_desc = f""" + webrtcbin latency=100 name=sendrecv bundle-policy=max-bundle - {extra_elt} + input-selector name=video_selector + ! videorate + ! video/x-raw,framerate=30/1 + ! tee name=t - {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. - videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. + {extra_elt} + + {video_source_elt} name=video_src ! queue leaky=downstream ! video_selector. + videotestsrc name=muted_src is-live=true pattern=black ! queue leaky=downstream ! video_selector. - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! vp8enc deadline=1 keyframe-max-dist=60 - ! rtpvp8pay picture-id-mode=15-bit - ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} - ! sendrecv. + t. + ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! vp8enc deadline=1 keyframe-max-dist=60 + ! rtpvp8pay picture-id-mode=15-bit + ! application/x-rtp,media=video,encoding-name=VP8,payload={video_pt} + ! sendrecv. - t. - ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream - ! videoconvert - ! {local_video_sink_elt} + t. + ! queue max-size-buffers=5 max-size-time=0 max-size-bytes=0 leaky=downstream + ! videoconvert + ! {local_video_sink_elt} - {audio_source_elt} name=audio_src - ! valve - ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream - ! audioconvert - ! audioresample - ! opusenc audio-type=voice - ! rtpopuspay - ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} - ! sendrecv. - """ + {audio_source_elt} name=audio_src + ! valve + ! queue max-size-buffers=10 max-size-time=0 max-size-bytes=0 leaky=downstream + ! audioconvert + ! audioresample + ! opusenc audio-type=voice + ! rtpopuspay + ! application/x-rtp,media=audio,encoding-name=OPUS,payload={audio_pt} + ! sendrecv. + """ log.debug(f"Gstreamer pipeline: {self.gst_pipe_desc}") @@ -680,16 +531,29 @@ if not self.pipeline: raise exceptions.InternalError("Failed to create Gstreamer pipeline.") + if not isinstance(self.pipeline, Gst.Pipeline): + # in the case of Data Channel there is a single element, and Gst.parse_launch + # doesn't create a Pipeline in this case, so we do it manually. + pipeline = Gst.Pipeline() + pipeline.add(self.pipeline) + self.pipeline = pipeline + self.webrtcbin = self.pipeline.get_by_name("sendrecv") - self.video_src = self.pipeline.get_by_name("video_src") - self.muted_src = self.pipeline.get_by_name("muted_src") - self.video_selector = self.pipeline.get_by_name("video_selector") - self.audio_valve = self.pipeline.get_by_name("audio_valve") + if self.webrtcbin is None: + raise exceptions.InternalError("Can't get the pipeline.") - if self.video_muted: - self.on_video_mute(True) - if self.audio_muted: - self.on_audio_mute(True) + # For datachannel setups, media source, selector, and sink elements are not + # created + if self.sources != SOURCES_DATACHANNEL and self.sinks != SINKS_DATACHANNEL: + self.video_src = self.pipeline.get_by_name("video_src") + self.muted_src = self.pipeline.get_by_name("muted_src") + self.video_selector = self.pipeline.get_by_name("video_selector") + self.audio_valve = self.pipeline.get_by_name("audio_valve") + + if self.video_muted: + self.on_video_mute(True) + if self.audio_muted: + self.on_audio_mute(True) # set STUN and TURN servers external_disco = data_format.deserialise( @@ -719,7 +583,7 @@ log.warning(f"Erreur while adding TURN server {url}") # local video feedback - if self.sinks == SINKS_APP: + if self.sinks == SINKS_APP and self.sources != SOURCES_DATACHANNEL: assert self.appsink_data is not None local_video_sink = self.pipeline.get_by_name("local_video_sink") local_video_sink.set_property("emit-signals", True) @@ -746,6 +610,24 @@ "notify::ice-connection-state", self.on_ice_connection_state ) + if self.sources == SOURCES_DATACHANNEL: + # Data channel configuration for compatibility with browser defaults + data_channel_options = Gst.Structure.new_empty("data-channel-options") + data_channel_options.set_value("ordered", True) + data_channel_options.set_value("protocol", "") + + # Create the data channel + self.pipeline.set_state(Gst.State.READY) + self.data_channel = self.webrtcbin.emit( + "create-data-channel", "file", data_channel_options + ) + if self.data_channel is None: + log.error("Failed to create data channel") + return + self.data_channel.connect("on-open", self.dc_open_cb) + if self.sinks == SINKS_DATACHANNEL: + self.webrtcbin.connect("on-data-channel", self.dc_on_data_channel) + def start_pipeline(self) -> None: """Starts the GStreamer pipeline.""" log.debug("starting the pipeline") @@ -813,7 +695,7 @@ elif isinstance(dest, Gst.Element): return source.link(dest) else: - log.error(f"Unexpected type for dest: {type(sink)}") + log.error(f"Unexpected type for dest: {type(dest)}") return False return True @@ -941,7 +823,6 @@ self.pipeline.add(q, conv, videoscale, capsfilter) - self.pipeline.sync_children_states() ret = pad.link(q.get_static_pad("sink")) if ret != Gst.PadLinkReturn.OK: @@ -997,6 +878,11 @@ decodebin.sync_state_with_parent() pad.link(decodebin.get_static_pad("sink")) + async def _call_start(self, callee: jid.JID, call_data: dict, profile: str) -> str: + return await self.bridge.call_start( + str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile + ) + async def _start_call(self) -> None: """Initiate the call. @@ -1004,8 +890,9 @@ local ICE candidates, they are sent as part of the initiation. """ assert self.callee - self.sid = await self.bridge.call_start( - str(self.callee), data_format.serialise({"sdp": self.offer}), self.profile + assert self.call_start_cb is not None + self.sid = await self.call_start_cb( + self.callee, {"sdp": self.offer}, self.profile ) if self.local_candidates_buffer: log.debug( @@ -1083,6 +970,9 @@ f"Local ICE candidate. MLine Index: {mline_index}, Candidate: {candidate_sdp}" ) parsed_candidate = self.parse_ice_candidate(candidate_sdp) + if parsed_candidate is None: + log.warning(f"Can't parse candidate: {candidate_sdp}") + return try: media_type = self.media_types[mline_index] except KeyError: @@ -1129,7 +1019,7 @@ except Exception as e: raise exceptions.InternalError(f"Can't find sdp mline index: {e}") self.webrtcbin.emit("add-ice-candidate", mline_index, candidate_sdp) - log.debug( + log.warning( f"Remote ICE candidate added. MLine Index: {mline_index}, " f"{candidate_sdp}" ) @@ -1178,7 +1068,9 @@ @param muted: True if video is muted. """ if self.video_selector is not None: - current_source = None if muted else "desktop" if self.desktop_sharing else "video" + current_source = ( + None if muted else "desktop" if self.desktop_sharing else "video" + ) self.switch_video_source(current_source) state = "muted" if muted else "unmuted" log.info(f"Video is now {state}") @@ -1201,9 +1093,7 @@ except exceptions.CancelError: self.desktop_sharing = False return - self.desktop_sharing_data = { - "path": str(screenshare_data["node_id"]) - } + self.desktop_sharing_data = {"path": str(screenshare_data["node_id"])} self.do_desktop_switch(desktop_active) def do_desktop_switch(self, desktop_active: bool) -> None: @@ -1216,7 +1106,7 @@ self.switch_video_source(source) self.desktop_sharing = desktop_active - def switch_video_source(self, source: str|None) -> None: + def switch_video_source(self, source: str | None) -> None: """Activates the specified source while deactivating the others. @param source: 'desktop', 'video', 'muted' or None for muted source. @@ -1252,18 +1142,18 @@ if self.desktop_sink_pad: pad = self.desktop_sink_pad else: - log.error(f"No desktop pad available") - pad = None + log.error(f"No desktop pad available") + pad = None else: pad_name = f"sink_{['video', 'muted'].index(source)}" pad = self.video_selector.get_static_pad(pad_name) if pad is not None: - self.video_selector.props.active_pad = pad + self.video_selector.props.active_pad = pad self.pipeline.set_state(Gst.State.PLAYING) - def _setup_desktop_source(self, properties: dict[str, object]|None) -> None: + def _setup_desktop_source(self, properties: dict[str, object] | None) -> None: """Set up a new desktop source. @param properties: The properties to set on the desktop source. @@ -1287,7 +1177,9 @@ video_convert.link(queue) sink_pad_template = self.video_selector.get_pad_template("sink_%u") - self.desktop_sink_pad = self.video_selector.request_pad(sink_pad_template, None, None) + self.desktop_sink_pad = self.video_selector.request_pad( + sink_pad_template, None, None + ) queue_src_pad = queue.get_static_pad("src") queue_src_pad.link(self.desktop_sink_pad) @@ -1327,3 +1219,114 @@ async def end_call(self) -> None: """Stop streaming and clean instance""" self.reset_instance() + + +class WebRTCCall: + """Helper class to create and handle WebRTC. + + This class handles signals and communication of connection data with backend. + + """ + + def __init__( + self, + bridge, + profile: str, + callee: jid.JID, + on_call_setup_cb: Callable | None = None, + on_call_ended_cb: Callable | None = None, + **kwargs, + ): + """Create and setup a webRTC instance + + @param bridge: async Bridge. + @param profile: profile making or receiving the call + @param callee: peer jid + @param kwargs: extra kw args to use when instantiating WebRTC + """ + self.profile = profile + self.webrtc = WebRTC(bridge, profile, **kwargs) + self.webrtc.callee = callee + self.on_call_setup_cb = on_call_setup_cb + self.on_call_ended_cb = on_call_ended_cb + bridge.register_signal( + "ice_candidates_new", self.on_ice_candidates_new, "plugin" + ) + bridge.register_signal("call_setup", self.on_call_setup, "plugin") + bridge.register_signal("call_ended", self.on_call_ended, "plugin") + + @classmethod + async def make_webrtc_call( + cls, bridge, profile: str, call_data: CallData, **kwargs + ) -> "WebRTCCall": + """Create the webrtc_call instance + + @param call_data: Call data of the command + @param kwargs: extra args used to instanciate WebRTCCall + + """ + webrtc_call = cls(bridge, profile, call_data.callee, **call_data.kwargs, **kwargs) + if call_data.sid is None: + # we are making the call + await webrtc_call.start() + else: + # we are receiving the call + webrtc_call.sid = call_data.sid + if call_data.action_id is not None: + await bridge.action_launch( + call_data.action_id, + data_format.serialise({"cancelled": False}), + profile, + ) + return webrtc_call + + @property + def sid(self) -> str | None: + return self.webrtc.sid + + @sid.setter + def sid(self, new_sid: str | None) -> None: + self.webrtc.sid = new_sid + + async def on_ice_candidates_new( + self, sid: str, candidates_s: str, profile: str + ) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + self.webrtc.on_ice_candidates_new( + data_format.deserialise(candidates_s), + ) + + async def on_call_setup(self, sid: str, setup_data_s: str, profile: str) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + setup_data = data_format.deserialise(setup_data_s) + try: + role = setup_data["role"] + sdp = setup_data["sdp"] + except KeyError: + log.error(f"Invalid setup data received: {setup_data}") + return + if role == "initiator": + self.webrtc.on_accepted_call(sdp, profile) + elif role == "responder": + await self.webrtc.answer_call(sdp, profile) + else: + log.error(f"Invalid role received during setup: {setup_data}") + if self.on_call_setup_cb is not None: + await aio.maybe_async(self.on_call_setup_cb(sid, profile)) + + async def on_call_ended(self, sid: str, data_s: str, profile: str) -> None: + if sid != self.webrtc.sid or profile != self.profile: + return + await self.webrtc.end_call() + if self.on_call_ended_cb is not None: + await aio.maybe_async(self.on_call_ended_cb(sid, profile)) + + async def start(self): + """Start a call. + + To be used only if we are initiator + """ + await self.webrtc.setup_call("initiator") + self.webrtc.start_pipeline() diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/frontends/tools/webrtc_file.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/frontends/tools/webrtc_file.py Sat Apr 06 13:43:09 2024 +0200 @@ -0,0 +1,300 @@ +#!/usr/bin/env python3 + +# Libervia WebRTC implementation +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +import asyncio +import atexit +from functools import partial +import logging +from pathlib import Path +from typing import Any, Callable, IO + +import gi + +gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) +from gi.repository import GLib, GstWebRTC + +from libervia.backend.core import exceptions +from libervia.backend.core.i18n import _ +from libervia.backend.tools.common import data_format, utils +from libervia.frontends.tools import aio, jid, webrtc +from libervia.frontends.tools.webrtc_models import CallData + + +log = logging.getLogger(__name__) + +WEBRTC_CHUNK_SIZE = 64 * 1024 + + +class WebRTCFileSender: + + def __init__( + self, + bridge, + profile: str, + on_call_start_cb: Callable[[dict], Any] | None = None, + end_call_cb: Callable[[], Any] | None = None, + ) -> None: + """Initializes the File Sender. + + @param bridge: An async Bridge instance. + @param profile: The profile name to be used. + @param on_call_start_cb: A blocking or async callable that accepts a dict as its + only argument. + @param end_call_cb: A callable to be invoked at the end of a call. + """ + + self.bridge = bridge + self.profile = profile + self.on_call_start_cb = on_call_start_cb + self.end_call_cb = end_call_cb + self.loop = asyncio.get_event_loop() + + async def _on_webrtc_call_start( + self, + file_path: Path, + file_name: str | None, + callee: str, + call_data: dict, + profile: str, + ) -> str: + file_data_s = await self.bridge.file_jingle_send( + str(callee), + "", + file_name or file_path.name, + "", + data_format.serialise( + { + "webrtc": True, + "call_data": call_data, + "size": file_path.stat().st_size, + } + ), + self.profile, + ) + file_data = data_format.deserialise(file_data_s) + + if self.on_call_start_cb is not None: + await aio.maybe_async(self.on_call_start_cb(file_data)) + return file_data["session_id"] + + async def _send_file( + self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel + ) -> None: + """Send file to Data Channel by chunks""" + try: + with file_path.open("rb") as file: + while True: + data = file.read(WEBRTC_CHUNK_SIZE) + if not data: + break + data_channel.send_data(GLib.Bytes(data)) + # We give control back to the loop to avoid freezing everything. + await asyncio.sleep(0) + finally: + webrtc_call = self.webrtc_call + # we connect to the "on-close" signal to wait for the data channel to be + # actually closed before closing the call and quitting the app. + data_channel.connect("on-close", partial(self._on_dc_close, webrtc_call)) + data_channel.close() + + def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel): + if webrtc_call is not None: + aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop) + + async def _end_call_and_quit(self, webrtc_call): + await webrtc_call.webrtc.end_call() + if self.end_call_cb is not None: + await aio.maybe_async(self.end_call_cb()) + + def _on_dc_open( + self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel + ) -> None: + """Called when datachannel is open""" + aio.run_from_thread(self._send_file, file_path, data_channel, loop=self.loop) + + async def send_file_webrtc( + self, + file_path: Path|str, + callee: jid.JID, + file_name: str | None = None, + ) -> None: + """Send a file using WebRTC to the given callee JID. + + @param file_path: The local path to the file to send. + @param callee: The JID of the recipient to send the file to. + @param file_name: Name of the file as sent to the peer. + If None or empty string, name will be retrieved from file path. + """ + file_path = Path(file_path) + call_data = CallData(callee=callee) + self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( + self.bridge, + self.profile, + call_data, + sources=webrtc.SOURCES_DATACHANNEL, + call_start_cb=partial( + self._on_webrtc_call_start, + file_path, + file_name, + ), + dc_open_cb=partial(self._on_dc_open, file_path), + ) + + +class WebRTCFileReceiver: + + def __init__( + self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None + ) -> None: + """Initializes the File Receiver. + + @param bridge: An async Bridge instance. + @param profile: The profile name to be used. + @param on_close_cb: Called when the Data Channel is closed. + """ + self.bridge = bridge + self.profile = profile + self.on_close_cb = on_close_cb + self.loop = asyncio.get_event_loop() + self.file_data: dict | None = None + self.fd: IO[bytes] | None = None + + @staticmethod + def format_confirm_msg( + action_data: dict, + peer_jid: jid.JID, + peer_name: str|None = None + ) -> str: + """Format a user-friendly confirmation message. + + File data will be retrieve from ``action_data`` and used to format a user-friendly + file confirmation message. + @param action_data: Data as returned by the "FILE" ``action_new`` signal. + @return: User-friendly confirmation message. + """ + file_data = action_data.get("file_data", {}) + + file_name = file_data.get('name') + file_size = file_data.get('size') + + if file_name: + file_name_msg = 'wants to send you the file "{file_name}"'.format( + file_name=file_name + ) + else: + file_name_msg = 'wants to send you an unnamed file' + + if file_size is not None: + file_size_msg = "which has a size of {file_size_human}".format( + file_size_human=utils.get_human_size(file_size) + ) + else: + file_size_msg = "which has an unknown size" + + file_description = file_data.get('desc') + if file_description: + description_msg = " Description: {}.".format(file_description) + else: + description_msg = "" + + file_data = action_data.get("file_data", {}) + + if not peer_name: + peer_name = str(peer_jid) + else: + peer_name = f"{peer_name} ({peer_jid})" + + return ( + _("{peer_name} {file_name_msg} {file_size_msg}.{description_msg} " + "Do you accept?").format( + peer_name=peer_name, + file_name_msg=file_name_msg, + file_size_msg=file_size_msg, + description_msg=description_msg + ) + ) + + def _on_dc_message_data(self, fd, data_channel, glib_data) -> None: + """A data chunk of the file has been received.""" + fd.write(glib_data.get_data()) + + def _on_dc_close(self, data_channel) -> None: + """Data channel is closed + + The file download should be complete, we close it. + """ + aio.run_from_thread(self._on_close, loop=self.loop) + + async def _on_close(self) -> None: + assert self.fd is not None + self.fd.close() + if self.on_close_cb is not None: + await aio.maybe_async(self.on_close_cb()) + + def _on_data_channel(self, webrtcbin, data_channel) -> None: + """The data channel has been opened.""" + data_channel.connect( + "on-message-data", partial(self._on_dc_message_data, self.fd) + ) + data_channel.connect("on-close", self._on_dc_close) + + def _on_fd_clean(self, fd) -> None: + """Closed opened file object if not already. + + If the file object was not closed, an error message is returned. + """ + if fd is None: + return + if not fd.closed: + log.error( + f"The file {fd.name!r} was not closed properly, which might " + "indicate an incomplete download." + ) + fd.close() + + async def receive_file_webrtc( + self, + from_jid: jid.JID, + session_id: str, + file_path: Path, + file_data: dict, + ) -> None: + """Receives a file via WebRTC and saves it to the specified path. + + @param from_jid: The JID of the entity sending the file. + @param session_id: The Jingle FT Session ID. + @param file_path: The local path where the received file will be saved. + If a file already exists at this path, it will be overwritten. + @param file_data: Additional data about the file being transferred. + """ + if file_path.exists() and not file_path.is_file(): + raise exceptions.InternalError( + f"{file_path} is not a valid destination path." + ) + self.fd = file_path.open("wb") + atexit.register(self._on_fd_clean, self.fd) + self.file_data = file_data + call_data = CallData(callee=from_jid, sid=session_id) + await webrtc.WebRTCCall.make_webrtc_call( + self.bridge, + self.profile, + call_data, + sinks=webrtc.SINKS_DATACHANNEL, + dc_on_data_channel=self._on_data_channel, + ) diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/frontends/tools/webrtc_models.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/frontends/tools/webrtc_models.py Sat Apr 06 13:43:09 2024 +0200 @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +# Libervia WebRTC implementation +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from dataclasses import dataclass, field +from typing import Any, Callable + +from libervia.frontends.tools import jid + + +@dataclass +class CallData: + callee: jid.JID + sid: str | None = None + action_id: str | None = None + kwargs: dict[str, Any] = field(default_factory=dict) + + +@dataclass +class AppSinkData: + local_video_cb: Callable + remote_video_cb: Callable|None diff -r 0fbe5c605eb6 -r d01b8d002619 libervia/frontends/tools/webrtc_screenshare.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/frontends/tools/webrtc_screenshare.py Sat Apr 06 13:43:09 2024 +0200 @@ -0,0 +1,207 @@ +#!/usr/bin/env python3 + +# Libervia WebRTC implementation +# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +from libervia.backend.core import exceptions + +import asyncio +import logging +from random import randint + + +log = logging.getLogger(__name__) + + +SOURCES_AUTO = "auto" +SOURCES_TEST = "test" +SOURCES_DATACHANNEL = "datachannel" +SINKS_APP = "app" +SINKS_AUTO = "auto" +SINKS_TEST = "test" +SINKS_DATACHANNEL = "datachannel" + + +class ScreenshareError(Exception): + pass + + +class DesktopPortal: + + def __init__(self, webrtc): + import dbus + from dbus.mainloop.glib import DBusGMainLoop + # we want monitors + windows, see https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.ScreenCast.html#org-freedesktop-portal-screencast-availablesourcetypes + self.dbus = dbus + self.webrtc = webrtc + self.sources_type = dbus.UInt32(7) + DBusGMainLoop(set_as_default=True) + self.session_bus = dbus.SessionBus() + portal_object = self.session_bus.get_object( + 'org.freedesktop.portal.Desktop', + '/org/freedesktop/portal/desktop' + ) + self.screencast_interface = dbus.Interface( + portal_object, + 'org.freedesktop.portal.ScreenCast' + ) + self.session_interface = None + self.session_signal = None + self.handle_counter = 0 + self.session_handle = None + self.stream_data: dict|None = None + + @property + def handle_token(self): + self.handle_counter += 1 + return f"libervia{self.handle_counter}" + + def on_session_closed(self, details: dict) -> None: + if self.session_interface is not None: + self.session_interface = None + self.webrtc.desktop_sharing = False + if self.session_signal is not None: + self.session_signal.remove() + self.session_signal = None + + + async def dbus_call(self, method_name: str, *args) -> dict: + """Call a screenshare portal method + + This method handle the signal response. + @param method_name: method to call + @param args: extra args + `handle_token` will be automatically added to the last arg (option dict) + @return: method result + """ + if self.session_handle is not None: + self.end_screenshare() + method = getattr(self.screencast_interface, method_name) + options = args[-1] + reply_fut = asyncio.Future() + signal_fut = asyncio.Future() + # cf. https://flatpak.github.io/xdg-desktop-portal/docs/doc-org.freedesktop.portal.Request.html + handle_token = self.handle_token + sender = self.session_bus.get_unique_name().replace(".", "_")[1:] + path = f"/org/freedesktop/portal/desktop/request/{sender}/{handle_token}" + signal_match = None + + def on_signal(response, results): + assert signal_match is not None + signal_match.remove() + if response == 0: + signal_fut.set_result(results) + elif response == 1: + signal_fut.set_exception( + exceptions.CancelError("Cancelled by user.") + ) + else: + signal_fut.set_exception(ScreenshareError( + f"Can't get signal result" + )) + + signal_match = self.session_bus.add_signal_receiver( + on_signal, + signal_name="Response", + dbus_interface="org.freedesktop.portal.Request", + path=path + ) + + options["handle_token"] = handle_token + + method( + *args, + reply_handler=reply_fut.set_result, + error_handler=reply_fut.set_exception + ) + try: + await reply_fut + except Exception as e: + raise ScreenshareError(f"Can't ask screenshare permission: {e}") + return await signal_fut + + async def request_screenshare(self) -> dict: + session_data = await self.dbus_call( + "CreateSession", + { + "session_handle_token": str(randint(1, 2**32)), + } + ) + try: + session_handle = session_data["session_handle"] + except KeyError: + raise ScreenshareError("Can't get session handle") + self.session_handle = session_handle + + + await self.dbus_call( + "SelectSources", + session_handle, + { + "multiple": True, + "types": self.sources_type, + "modal": True + } + ) + screenshare_data = await self.dbus_call( + "Start", + session_handle, + "", + {} + ) + + session_object = self.session_bus.get_object( + 'org.freedesktop.portal.Desktop', + session_handle + ) + self.session_interface = self.dbus.Interface( + session_object, + 'org.freedesktop.portal.Session' + ) + + self.session_signal = self.session_bus.add_signal_receiver( + self.on_session_closed, + signal_name="Closed", + dbus_interface="org.freedesktop.portal.Session", + path=session_handle + ) + + try: + node_id, stream_data = screenshare_data["streams"][0] + source_type = int(stream_data["source_type"]) + except (IndexError, KeyError): + raise ScreenshareError("Can't parse stream data") + self.stream_data = stream_data = { + "session_handle": session_handle, + "node_id": node_id, + "source_type": source_type + } + try: + height = int(stream_data["size"][0]) + weight = int(stream_data["size"][1]) + except (IndexError, KeyError): + pass + else: + stream_data["size"] = (height, weight) + + return self.stream_data + + def end_screenshare(self) -> None: + """Close a running screenshare session, if any.""" + if self.session_interface is None: + return + self.session_interface.Close() + self.on_session_closed({})