Mercurial > libervia-backend
view libervia/frontends/tools/webrtc_file.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia WebRTC implementation # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import asyncio import atexit from functools import partial import logging from pathlib import Path from typing import Any, Callable, IO import gi gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"}) from gi.repository import GLib, GstWebRTC from libervia.backend.core import exceptions from libervia.backend.core.i18n import _ from libervia.backend.tools.common import data_format, utils from libervia.frontends.tools import aio, jid, webrtc from libervia.frontends.tools.webrtc_models import CallData log = logging.getLogger(__name__) WEBRTC_CHUNK_SIZE = 64 * 1024 class WebRTCFileSender: def __init__( self, bridge, profile: str, on_call_start_cb: Callable[[dict], Any] | None = None, end_call_cb: Callable[[], Any] | None = None, ) -> None: """Initializes the File Sender. @param bridge: An async Bridge instance. @param profile: The profile name to be used. @param on_call_start_cb: A blocking or async callable that accepts a dict as its only argument. @param end_call_cb: A callable to be invoked at the end of a call. """ self.bridge = bridge self.profile = profile self.on_call_start_cb = on_call_start_cb self.end_call_cb = end_call_cb self.loop = asyncio.get_event_loop() async def _on_webrtc_call_start( self, file_path: Path, file_name: str | None, callee: str, call_data: dict, profile: str, ) -> str: file_data_s = await self.bridge.file_jingle_send( str(callee), "", file_name or file_path.name, "", data_format.serialise( { "webrtc": True, "call_data": call_data, "size": file_path.stat().st_size, } ), self.profile, ) file_data = data_format.deserialise(file_data_s) if self.on_call_start_cb is not None: await aio.maybe_async(self.on_call_start_cb(file_data)) return file_data["session_id"] async def _send_file( self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel ) -> None: """Send file to Data Channel by chunks""" try: with file_path.open("rb") as file: while True: data = file.read(WEBRTC_CHUNK_SIZE) if not data: break data_channel.send_data(GLib.Bytes(data)) # We give control back to the loop to avoid freezing everything. await asyncio.sleep(0) finally: webrtc_call = self.webrtc_call # we connect to the "on-close" signal to wait for the data channel to be # actually closed before closing the call and quitting the app. data_channel.connect("on-close", partial(self._on_dc_close, webrtc_call)) data_channel.close() def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel): if webrtc_call is not None: aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop) async def _end_call_and_quit(self, webrtc_call): await webrtc_call.webrtc.end_call() if self.end_call_cb is not None: await aio.maybe_async(self.end_call_cb()) def _on_dc_open( self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel ) -> None: """Called when datachannel is open""" aio.run_from_thread(self._send_file, file_path, data_channel, loop=self.loop) async def send_file_webrtc( self, file_path: Path | str, callee: jid.JID, file_name: str | None = None, ) -> None: """Send a file using WebRTC to the given callee JID. @param file_path: The local path to the file to send. @param callee: The JID of the recipient to send the file to. @param file_name: Name of the file as sent to the peer. If None or empty string, name will be retrieved from file path. """ file_path = Path(file_path) call_data = CallData(callee=callee) self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call( self.bridge, self.profile, call_data, sources_data=webrtc.SourcesDataChannel( dc_open_cb=partial(self._on_dc_open, file_path) ), call_start_cb=partial( self._on_webrtc_call_start, file_path, file_name, ), ) class WebRTCFileReceiver: def __init__( self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None ) -> None: """Initializes the File Receiver. @param bridge: An async Bridge instance. @param profile: The profile name to be used. @param on_close_cb: Called when the Data Channel is closed. """ self.bridge = bridge self.profile = profile self.on_close_cb = on_close_cb self.loop = asyncio.get_event_loop() self.file_data: dict | None = None self.fd: IO[bytes] | None = None @staticmethod def format_confirm_msg( action_data: dict, peer_jid: jid.JID, peer_name: str | None = None ) -> str: """Format a user-friendly confirmation message. File data will be retrieve from ``action_data`` and used to format a user-friendly file confirmation message. @param action_data: Data as returned by the "FILE" ``action_new`` signal. @return: User-friendly confirmation message. """ file_data = action_data.get("file_data", {}) file_name = file_data.get("name") file_size = file_data.get("size") if file_name: file_name_msg = 'wants to send you the file "{file_name}"'.format( file_name=file_name ) else: file_name_msg = "wants to send you an unnamed file" if file_size is not None: file_size_msg = "which has a size of {file_size_human}".format( file_size_human=utils.get_human_size(file_size) ) else: file_size_msg = "which has an unknown size" file_description = file_data.get("desc") if file_description: description_msg = " Description: {}.".format(file_description) else: description_msg = "" file_data = action_data.get("file_data", {}) if not peer_name: peer_name = str(peer_jid) else: peer_name = f"{peer_name} ({peer_jid})" return _( "{peer_name} {file_name_msg} {file_size_msg}.{description_msg} " "Do you accept?" ).format( peer_name=peer_name, file_name_msg=file_name_msg, file_size_msg=file_size_msg, description_msg=description_msg, ) def _on_dc_message_data(self, fd, data_channel, glib_data) -> None: """A data chunk of the file has been received.""" fd.write(glib_data.get_data()) def _on_dc_close(self, data_channel) -> None: """Data channel is closed The file download should be complete, we close it. """ aio.run_from_thread(self._on_close, loop=self.loop) async def _on_close(self) -> None: assert self.fd is not None self.fd.close() if self.on_close_cb is not None: await aio.maybe_async(self.on_close_cb()) def _on_data_channel(self, webrtcbin, data_channel) -> None: """The data channel has been opened.""" data_channel.connect( "on-message-data", partial(self._on_dc_message_data, self.fd) ) data_channel.connect("on-close", self._on_dc_close) def _on_fd_clean(self, fd) -> None: """Closed opened file object if not already. If the file object was not closed, an error message is returned. """ if fd is None: return if not fd.closed: log.error( f"The file {fd.name!r} was not closed properly, which might " "indicate an incomplete download." ) fd.close() async def receive_file_webrtc( self, from_jid: jid.JID, session_id: str, file_path: Path, file_data: dict, ) -> None: """Receives a file via WebRTC and saves it to the specified path. @param from_jid: The JID of the entity sending the file. @param session_id: The Jingle FT Session ID. @param file_path: The local path where the received file will be saved. If a file already exists at this path, it will be overwritten. @param file_data: Additional data about the file being transferred. """ if file_path.exists() and not file_path.is_file(): raise exceptions.InternalError( f"{file_path} is not a valid destination path." ) self.fd = file_path.open("wb") atexit.register(self._on_fd_clean, self.fd) self.file_data = file_data call_data = CallData(callee=from_jid, sid=session_id) await webrtc.WebRTCCall.make_webrtc_call( self.bridge, self.profile, call_data, sinks_data=webrtc.SinksDataChannel( dc_on_data_channel=self._on_data_channel, ), )