view libervia/frontends/tools/webrtc_file.py @ 4240:79c8a70e1813

backend, frontend: prepare remote control: This is a series of changes necessary to prepare the implementation of remote control feature: - XEP-0166: add a `priority` attribute to `ApplicationData`: this is needed when several applications are working in a same session, to know which one must be handled first. Will be used to make Remote Control have precedence over Call content. - XEP-0166: `_call_plugins` is now async and is not used with `DeferredList` anymore: the benefit to have methods called in parallels is very low, and it cause a lot of trouble as we can't predict order. Methods are now called sequentially so workflow can be predicted. - XEP-0167: fix `senders` XMPP attribute <=> SDP mapping - XEP-0234: preflight acceptance key is now `pre-accepted` instead of `file-accepted`, so the same key can be used with other jingle applications. - XEP-0167, XEP-0343: move some method to XEP-0167 - XEP-0353: use new `priority` feature to call preflight methods of applications according to it. - frontend (webrtc): refactor the sources/sink handling with a more flexible mechanism based on Pydantic models. It is now possible to have has many Data Channel as necessary, to have them in addition to A/V streams, to specify manually GStreamer sources and sinks, etc. - frontend (webrtc): rework of the pipeline to reduce latency. - frontend: new `portal_desktop` method. Screenshare portal handling has been moved there, and RemoteDesktop portal has been added. - frontend (webrtc): fix `extract_ufrag_pwd` method. rel 436
author Goffi <goffi@goffi.org>
date Sat, 11 May 2024 13:52:41 +0200
parents d01b8d002619
children 0d7bb4df2343
line wrap: on
line source

#!/usr/bin/env python3

# Libervia WebRTC implementation
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import asyncio
import atexit
from functools import partial
import logging
from pathlib import Path
from typing import Any, Callable, IO

import gi

gi.require_versions({"Gst": "1.0", "GstWebRTC": "1.0"})
from gi.repository import GLib, GstWebRTC

from libervia.backend.core import exceptions
from libervia.backend.core.i18n import _
from libervia.backend.tools.common import data_format, utils
from libervia.frontends.tools import aio, jid, webrtc
from libervia.frontends.tools.webrtc_models import CallData


log = logging.getLogger(__name__)

WEBRTC_CHUNK_SIZE = 64 * 1024


class WebRTCFileSender:

    def __init__(
        self,
        bridge,
        profile: str,
        on_call_start_cb: Callable[[dict], Any] | None = None,
        end_call_cb: Callable[[], Any] | None = None,
    ) -> None:
        """Initializes the File Sender.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_call_start_cb: A blocking or async callable that accepts a dict as its
            only argument.
        @param end_call_cb: A callable to be invoked at the end of a call.
        """

        self.bridge = bridge
        self.profile = profile
        self.on_call_start_cb = on_call_start_cb
        self.end_call_cb = end_call_cb
        self.loop = asyncio.get_event_loop()

    async def _on_webrtc_call_start(
        self,
        file_path: Path,
        file_name: str | None,
        callee: str,
        call_data: dict,
        profile: str,
    ) -> str:
        file_data_s = await self.bridge.file_jingle_send(
            str(callee),
            "",
            file_name or file_path.name,
            "",
            data_format.serialise(
                {
                    "webrtc": True,
                    "call_data": call_data,
                    "size": file_path.stat().st_size,
                }
            ),
            self.profile,
        )
        file_data = data_format.deserialise(file_data_s)

        if self.on_call_start_cb is not None:
            await aio.maybe_async(self.on_call_start_cb(file_data))
        return file_data["session_id"]

    async def _send_file(
        self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel
    ) -> None:
        """Send file to Data Channel by chunks"""
        try:
            with file_path.open("rb") as file:
                while True:
                    data = file.read(WEBRTC_CHUNK_SIZE)
                    if not data:
                        break
                    data_channel.send_data(GLib.Bytes(data))
                    # We give control back to the loop to avoid freezing everything.
                    await asyncio.sleep(0)
        finally:
            webrtc_call = self.webrtc_call
            # we connect to the "on-close" signal to wait for the data channel to be
            # actually closed before closing the call and quitting the app.
            data_channel.connect("on-close", partial(self._on_dc_close, webrtc_call))
            data_channel.close()

    def _on_dc_close(self, webrtc_call, data_channel: GstWebRTC.WebRTCDataChannel):
        if webrtc_call is not None:
            aio.run_from_thread(self._end_call_and_quit, webrtc_call, loop=self.loop)

    async def _end_call_and_quit(self, webrtc_call):
        await webrtc_call.webrtc.end_call()
        if self.end_call_cb is not None:
            await aio.maybe_async(self.end_call_cb())

    def _on_dc_open(
        self, file_path: Path, data_channel: GstWebRTC.WebRTCDataChannel
    ) -> None:
        """Called when datachannel is open"""
        aio.run_from_thread(self._send_file, file_path, data_channel, loop=self.loop)

    async def send_file_webrtc(
        self,
        file_path: Path|str,
        callee: jid.JID,
        file_name: str | None = None,
    ) -> None:
        """Send a file using WebRTC to the given callee JID.

        @param file_path: The local path to the file to send.
        @param callee: The JID of the recipient to send the file to.
        @param file_name: Name of the file as sent to the peer.
            If None or empty string, name will be retrieved from file path.
        """
        file_path = Path(file_path)
        call_data = CallData(callee=callee)
        self.webrtc_call = await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sources_data=webrtc.SourcesDataChannel(
                dc_open_cb=partial(self._on_dc_open, file_path)
            ),
            call_start_cb=partial(
                self._on_webrtc_call_start,
                file_path,
                file_name,
            ),
        )


class WebRTCFileReceiver:

    def __init__(
        self, bridge, profile: str, on_close_cb: Callable[[], Any] | None = None
    ) -> None:
        """Initializes the File Receiver.

        @param bridge: An async Bridge instance.
        @param profile: The profile name to be used.
        @param on_close_cb: Called when the Data Channel is closed.
        """
        self.bridge = bridge
        self.profile = profile
        self.on_close_cb = on_close_cb
        self.loop = asyncio.get_event_loop()
        self.file_data: dict | None = None
        self.fd: IO[bytes] | None = None

    @staticmethod
    def format_confirm_msg(
        action_data: dict,
        peer_jid: jid.JID,
        peer_name: str|None = None
    ) -> str:
        """Format a user-friendly confirmation message.

        File data will be retrieve from ``action_data`` and used to format a user-friendly
        file confirmation message.
        @param action_data: Data as returned by the "FILE" ``action_new`` signal.
        @return: User-friendly confirmation message.
        """
        file_data = action_data.get("file_data", {})

        file_name = file_data.get('name')
        file_size = file_data.get('size')

        if file_name:
            file_name_msg = 'wants to send you the file "{file_name}"'.format(
                file_name=file_name
            )
        else:
            file_name_msg = 'wants to send you an unnamed file'

        if file_size is not None:
            file_size_msg = "which has a size of {file_size_human}".format(
                file_size_human=utils.get_human_size(file_size)
            )
        else:
            file_size_msg = "which has an unknown size"

        file_description = file_data.get('desc')
        if file_description:
            description_msg = " Description: {}.".format(file_description)
        else:
            description_msg = ""

        file_data = action_data.get("file_data", {})

        if not peer_name:
            peer_name = str(peer_jid)
        else:
            peer_name = f"{peer_name} ({peer_jid})"

        return (
            _("{peer_name} {file_name_msg} {file_size_msg}.{description_msg} "
            "Do you accept?").format(
                peer_name=peer_name,
                file_name_msg=file_name_msg,
                file_size_msg=file_size_msg,
                description_msg=description_msg
            )
        )

    def _on_dc_message_data(self, fd, data_channel, glib_data) -> None:
        """A data chunk of the file has been received."""
        fd.write(glib_data.get_data())

    def _on_dc_close(self, data_channel) -> None:
        """Data channel is closed

        The file download should be complete, we close it.
        """
        aio.run_from_thread(self._on_close, loop=self.loop)

    async def _on_close(self) -> None:
        assert self.fd is not None
        self.fd.close()
        if self.on_close_cb is not None:
            await aio.maybe_async(self.on_close_cb())

    def _on_data_channel(self, webrtcbin, data_channel) -> None:
        """The data channel has been opened."""
        data_channel.connect(
            "on-message-data", partial(self._on_dc_message_data, self.fd)
        )
        data_channel.connect("on-close", self._on_dc_close)

    def _on_fd_clean(self, fd) -> None:
        """Closed opened file object if not already.

        If the file object was not closed, an error message is returned.
        """
        if fd is None:
            return
        if not fd.closed:
            log.error(
                f"The file {fd.name!r} was not closed properly, which might "
                "indicate an incomplete download."
            )
            fd.close()

    async def receive_file_webrtc(
        self,
        from_jid: jid.JID,
        session_id: str,
        file_path: Path,
        file_data: dict,
    ) -> None:
        """Receives a file via WebRTC and saves it to the specified path.

        @param from_jid: The JID of the entity sending the file.
        @param session_id: The Jingle FT Session ID.
        @param file_path: The local path where the received file will be saved.
            If a file already exists at this path, it will be overwritten.
        @param file_data: Additional data about the file being transferred.
        """
        if file_path.exists() and not file_path.is_file():
            raise exceptions.InternalError(
                f"{file_path} is not a valid destination path."
            )
        self.fd = file_path.open("wb")
        atexit.register(self._on_fd_clean, self.fd)
        self.file_data = file_data
        call_data = CallData(callee=from_jid, sid=session_id)
        await webrtc.WebRTCCall.make_webrtc_call(
            self.bridge,
            self.profile,
            call_data,
            sinks_data=webrtc.SinksDataChannel(
                dc_on_data_channel=self._on_data_channel,
            ),
        )