view libervia/frontends/tools/webrtc_file.py @ 4242:8acf46ed7f36

frontends: remote control implementation: This is the frontends common part of remote control implementation. It handle the creation of WebRTC session, and management of inputs. For now the reception use freedesktop.org Desktop portal, and works mostly with Wayland based Desktop Environments. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:43 +0200
parents 79c8a70e1813
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia WebRTC implementation
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
import atexit
from functools import partial
import logging
from pathlib import Path
from typing import Any, Callable, IO

import gi

gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
from gi.repository import GLib, GstWebRTC

from libervia.backend.core import exceptions
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format, utils
from libervia.frontends.tools import aio, jid, webrtc
from libervia.frontends.tools.webrtc_models import CallData


log = logging.getLogger(__name__)

WEBRTC_CHUNK_SIZE = 64 * 1024


class WebRTCFileSender:

    def __init__(
        self,
        bridge,
        profile: str,
        on_call_start_cb: Callable[[dict], Any] | None = None,
        end_call_cb: Callable[[], Any] | None = None,
    ) -> None:
        """Initializes the File Sender.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_call_start_cb: A blocking or async callable that accepts a dict as its
            only argument.
        @param end_call_cb: A callable to be invoked at the end of a call.
        """

        self.bridge = bridge
        self.profile = profile
        self.on_call_start_cb = on_call_start_cb
        self.end_call_cb = end_call_cb
        self.loop = asyncio.get_event_loop()

    async def _on_webrtc_call_start(
        self,
        file_path: Path,
        file_name: str | None,
        callee: str,
        call_data: dict,
        profile: str,
    ) -> str:
        file_data_s = await self.bridge.file_jingle_send(
            str(callee),
            "",
            file_name or file_path.name,
            "",
            data_format.serialise(
                {
                    "webrtc": True,
                    "call_data": call_data,
                    "size": file_path.stat().st_size,
                }
            ),
            self.profile,
        )
        file_data = data_format.deserialise(file_data_s)

        if self.on_call_start_cb is not None:
            await aio.maybe_async(self.on_call_start_cb(file_data))
        return file_data["session_id"]

    async def _send_file(
        self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel
    ) -> None:
        """Send file to Data Channel by chunks"""
        try:
            with file_path.open("rb") as file:
                while True:
                    data = file.read(WEBRTC_CHUNK_SIZE)
                    if not data:
                        break
                    data_channel.send_data(GLib.Bytes(data))
                    # We give control back to the loop to avoid freezing everything.
                    await asyncio.sleep(0)
        finally:
            webrtc_call = self.webrtc_call
            # we connect to the "on-close" signal to wait for the data channel to be
            # actually closed before closing the call and quitting the app.
            data_channel.connect("on-close", partial(self._on_dc_close, webrtc_call))
            data_channel.close()

    def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel):
        if webrtc_call is not None:
            aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop)

    async def _end_call_and_quit(self, webrtc_call):
        await webrtc_call.webrtc.end_call()
        if self.end_call_cb is not None:
            await aio.maybe_async(self.end_call_cb())

    def _on_dc_open(
        self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel
    ) -> None:
        """Called when datachannel is open"""
        aio.run_from_thread(self._send_file, file_path, data_channel, loop=self.loop)

    async def send_file_webrtc(
        self,
        file_path: Path|str,
        callee: jid.JID,
        file_name: str | None = None,
    ) -> None:
        """Send a file using WebRTC to the given callee JID.

        @param file_path: The local path to the file to send.
        @param callee: The JID of the recipient to send the file to.
        @param file_name: Name of the file as sent to the peer.
            If None or empty string, name will be retrieved from file path.
        """
        file_path = Path(file_path)
        call_data = CallData(callee=callee)
        self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sources_data=webrtc.SourcesDataChannel(
                dc_open_cb=partial(self._on_dc_open, file_path)
            ),
            call_start_cb=partial(
                self._on_webrtc_call_start,
                file_path,
                file_name,
            ),
        )


class WebRTCFileReceiver:

    def __init__(
        self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None
    ) -> None:
        """Initializes the File Receiver.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_close_cb: Called when the Data Channel is closed.
        """
        self.bridge = bridge
        self.profile = profile
        self.on_close_cb = on_close_cb
        self.loop = asyncio.get_event_loop()
        self.file_data: dict | None = None
        self.fd: IO[bytes] | None = None

    @staticmethod
    def format_confirm_msg(
        action_data: dict,
        peer_jid: jid.JID,
        peer_name: str|None = None
    ) -> str:
        """Format a user-friendly confirmation message.

        File data will be retrieve from ``action_data`` and used to format a user-friendly
        file confirmation message.
        @param action_data: Data as returned by the "FILE" ``action_new`` signal.
        @return: User-friendly confirmation message.
        """
        file_data = action_data.get("file_data", {})

        file_name = file_data.get('name')
        file_size = file_data.get('size')

        if file_name:
            file_name_msg = 'wants to send you the file "{file_name}"'.format(
                file_name=file_name
            )
        else:
            file_name_msg = 'wants to send you an unnamed file'

        if file_size is not None:
            file_size_msg = "which has a size of {file_size_human}".format(
                file_size_human=utils.get_human_size(file_size)
            )
        else:
            file_size_msg = "which has an unknown size"

        file_description = file_data.get('desc')
        if file_description:
            description_msg = " Description: {}.".format(file_description)
        else:
            description_msg = ""

        file_data = action_data.get("file_data", {})

        if not peer_name:
            peer_name = str(peer_jid)
        else:
            peer_name = f"{peer_name} ({peer_jid})"

        return (
            _("{peer_name} {file_name_msg} {file_size_msg}.{description_msg} "
            "Do you accept?").format(
                peer_name=peer_name,
                file_name_msg=file_name_msg,
                file_size_msg=file_size_msg,
                description_msg=description_msg
            )
        )

    def _on_dc_message_data(self, fd, data_channel, glib_data) -> None:
        """A data chunk of the file has been received."""
        fd.write(glib_data.get_data())

    def _on_dc_close(self, data_channel) -> None:
        """Data channel is closed

        The file download should be complete, we close it.
        """
        aio.run_from_thread(self._on_close, loop=self.loop)

    async def _on_close(self) -> None:
        assert self.fd is not None
        self.fd.close()
        if self.on_close_cb is not None:
            await aio.maybe_async(self.on_close_cb())

    def _on_data_channel(self, webrtcbin, data_channel) -> None:
        """The data channel has been opened."""
        data_channel.connect(
            "on-message-data", partial(self._on_dc_message_data, self.fd)
        )
        data_channel.connect("on-close", self._on_dc_close)

    def _on_fd_clean(self, fd) -> None:
        """Closed opened file object if not already.

        If the file object was not closed, an error message is returned.
        """
        if fd is None:
            return
        if not fd.closed:
            log.error(
                f"The file {fd.name!r} was not closed properly, which might "
                "indicate an incomplete download."
            )
            fd.close()

    async def receive_file_webrtc(
        self,
        from_jid: jid.JID,
        session_id: str,
        file_path: Path,
        file_data: dict,
    ) -> None:
        """Receives a file via WebRTC and saves it to the specified path.

        @param from_jid: The JID of the entity sending the file.
        @param session_id: The Jingle FT Session ID.
        @param file_path: The local path where the received file will be saved.
            If a file already exists at this path, it will be overwritten.
        @param file_data: Additional data about the file being transferred.
        """
        if file_path.exists() and not file_path.is_file():
            raise exceptions.InternalError(
                f"{file_path} is not a valid destination path."
            )
        self.fd = file_path.open("wb")
        atexit.register(self._on_fd_clean, self.fd)
        self.file_data = file_data
        call_data = CallData(callee=from_jid, sid=session_id)
        await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sinks_data=webrtc.SinksDataChannel(
                dc_on_data_channel=self._on_data_channel,
            ),
        )