diff libervia/web/pages/calls/_browser/webrtc.py @ 1600:0a4433a343a3

browser (calls): implement WebRTC file sharing: - Send file through WebRTC when the new `file` button is used during a call. - Show a confirmation dialog and download file sent by WebRTC. rel 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 13:06:17 +0200
parents 9ba532041a8e
children 6feac4a25e60
line wrap: on
line diff
--- a/libervia/web/pages/calls/_browser/webrtc.py	Tue Mar 05 16:40:25 2024 +0100
+++ b/libervia/web/pages/calls/_browser/webrtc.py	Sat Apr 06 13:06:17 2024 +0200
@@ -2,9 +2,8 @@
 import re
 
 from bridge import AsyncBridge as Bridge
-from browser import aio, console as log, document, timer, window
+from browser import aio, console as log, document, window
 import dialog
-import errors
 from javascript import JSObject
 import jid
 
@@ -13,7 +12,85 @@
 bridge = Bridge()
 
 
+class FileSender:
+    CHUNK_SIZE = 64 * 1024
+
+    def __init__(self, session_id, file, data_channel):
+        self.session_id = session_id
+        self.file = file
+        self.data_channel = data_channel
+        data_channel.bind("open", self._on_open)
+        self.offset = 0
+
+    def _on_open(self, __):
+        log.info(f"Data channel open, starting to send {self.file.name}.")
+        self.send_file()
+
+    def _on_reader_load(self, event):
+        self.data_channel.send(event.target.result)
+        self.offset += self.CHUNK_SIZE
+        self.send_file()
+
+    def send_file(self):
+        if self.offset < self.file.size:
+            chunk = self.file.slice(self.offset, self.offset + self.CHUNK_SIZE)
+            reader = window.FileReader.new()
+            reader.onload = self._on_reader_load
+            reader.readAsArrayBuffer(chunk)
+        else:
+            log.info(f"file {self.file.name} sent.")
+            self.data_channel.close()
+            if self.session_id is not None:
+                aio.run(bridge.call_end(self.session_id, ""))
+
+
+class FileReceiver:
+
+    def __init__(self, session_id: str, data_channel, extra_data: dict) -> None:
+        """Initializes the file receiver with a data channel.
+
+        @param data_channel: The RTCDataChannel through which file data is received.
+        """
+        self.session_id = session_id
+        self.data_channel = data_channel
+        self.received_chunks = []
+        self.file_data = extra_data.get("file_data", {})
+        data_channel.bind("message", self._on_message)
+        data_channel.bind("close", self._on_close)
+        log.debug("File receiver created.")
+
+    def _on_message(self, event) -> None:
+        """Handles incoming message events from the data channel.
+
+        @param event: The event containing the data chunk.
+        """
+        self.received_chunks.append(event.data)
+
+    def _on_close(self, __) -> None:
+        """Handles the data channel's close event.
+
+        Assembles the received chunks into a Blob and triggers a file download.
+        """
+        # The file is complete, we assemble the chunks in a blob
+        blob = window.Blob.new(self.received_chunks)
+        url = window.URL.createObjectURL(blob)
+
+        # and create the <a> element to download the file.
+        a = document.createElement("a")
+        a.href = url
+        a.download = self.file_data.get("name", "received_file")
+        document.body.appendChild(a)
+        a.click()
+
+        # We now clean up.
+        document.body.removeChild(a)
+        window.URL.revokeObjectURL(url)
+        log.info("File received.")
+        aio.run(bridge.call_end(self.session_id, ""))
+
+
 class WebRTC:
+
     def __init__(
         self,
         screen_sharing_cb=None,
@@ -22,7 +99,21 @@
         on_connection_lost_cb=None,
         on_video_devices=None,
         on_reset_cb=None,
+        file_only: bool = False,
+        extra_data: dict|None = None
     ):
+        """Initialise WebRTC instance.
+
+        @param screen_sharing_cb: callable function for screen sharing event
+        @param on_connection_established_cb: callable function for connection established event
+        @param on_reconnect_cb: called when a reconnection is triggered.
+        @param on_connection_lost_cb: called when the connection is lost.
+        @param on_video_devices: called when new video devices are set.
+        @param on_reset_cb: called on instance reset.
+        @param file_only: indicates a file transfer only session.
+        @param extra_data: optional dictionary containing additional data.
+            Notably used for file transfer, where ``file_data`` key is used.
+        """
         # reset
         self.on_reset_cb = on_reset_cb
         self.reset_instance()
@@ -42,8 +133,16 @@
         self.has_multiple_cameras = False
         self.current_camera = None
 
-        # Initially populate the video devices list
-        aio.run(self._populate_video_devices())
+        self.file_only = file_only
+        if not file_only:
+            # Initially populate the video devices list
+            aio.run(self._populate_video_devices())
+
+            # video elements
+            self.local_video_elt = document["local_video"]
+            self.remote_video_elt = document["remote_video"]
+        else:
+            self.file_sender = None
 
         # muting
         self.is_audio_muted = None
@@ -53,9 +152,10 @@
         self._is_sharing_screen = False
         self.screen_sharing_cb = screen_sharing_cb
 
-        # video elements
-        self.local_video_elt = document["local_video"]
-        self.remote_video_elt = document["remote_video"]
+        # extra
+        if extra_data is None:
+            extra_data = {}
+        self.extra_data = extra_data
 
     @property
     def is_sharing_screen(self) -> bool:
@@ -73,7 +173,6 @@
         self._peer_connection = None
         self._media_types = None
         self._media_types_inv = None
-        self._callee = None
         self.ufrag = None
         self.pwd = None
         self.sid = None
@@ -82,6 +181,7 @@
         self.remote_candidates_buffer = {
             "audio": {"candidates": []},
             "video": {"candidates": []},
+            "application": {"candidates": []},
         }
         self.local_candidates_buffer = {}
         self.media_candidates = {}
@@ -318,7 +418,7 @@
 
     async def _create_peer_connection(
         self,
-    ):
+    ) -> JSObject:
         """Creates peer connection"""
         if self._peer_connection is not None:
             raise Exception("create_peer_connection can't be called twice!")
@@ -354,6 +454,7 @@
 
         self._peer_connection = peer_connection
         window.pc = self._peer_connection
+        return peer_connection
 
     async def _get_user_media(self, audio: bool = True, video: bool = True) -> None:
         """
@@ -585,20 +686,21 @@
         @param candidates: Dictionary containing new ICE candidates
         """
         log.debug(f"new peer candidates received: {candidates}")
-        # FIXME: workaround for https://github.com/brython-dev/brython/issues/2227, the
-        #  following test raise a JS exception
+
         try:
+            # FIXME: javascript.NULL must be used here, once we move to Brython 3.12.3+
             remoteDescription_is_none = self._peer_connection.remoteDescription is None
         except Exception as e:
+            # FIXME: should be fine in Brython 3.12.3+
             log.debug("Workaround for Brython bug activated.")
             remoteDescription_is_none = True
 
         if (
             self._peer_connection is None
-            # or self._peer_connection.remoteDescription is None
+            # or self._peer_connection.remoteDescription is NULL
             or remoteDescription_is_none
         ):
-            for media_type in ("audio", "video"):
+            for media_type in ("audio", "video", "application"):
                 media_candidates = candidates.get(media_type)
                 if media_candidates:
                     buffer = self.remote_candidates_buffer[media_type]
@@ -610,12 +712,13 @@
                 try:
                     sdp_mline_index = self.get_sdp_mline_index(media_type)
                 except Exception as e:
-                    log.warning(e)
+                    log.warning(f"Can't get sdp_mline_index: {e}")
                     continue
-                ice_candidate = window.RTCIceCandidate.new(
-                    {"candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index}
-                )
-                await self._peer_connection.addIceCandidate(ice_candidate)
+                else:
+                    ice_candidate = window.RTCIceCandidate.new(
+                        {"candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index}
+                    )
+                    await self._peer_connection.addIceCandidate(ice_candidate)
 
     def on_track(self, event):
         """New track has been received from peer
@@ -635,6 +738,14 @@
         log.debug(f"on_negotiation_needed {event=}")
         # TODO
 
+    def _on_data_channel(self, event) -> None:
+        """Handles the data channel event from the peer connection.
+
+        @param event: The event associated with the opening of a data channel.
+        """
+        data_channel = event.channel
+        self.file_receiver = FileReceiver(self.sid, data_channel, self.extra_data)
+
     async def answer_call(self, sid: str, offer_sdp: str, profile: str):
         """We respond to the call"""
         log.debug("answering call")
@@ -647,7 +758,10 @@
         )
         await self.on_ice_candidates_new(self.remote_candidates_buffer)
         self.remote_candidates_buffer.clear()
-        await self._get_user_media()
+        if self.file_only:
+            self._peer_connection.bind("datachannel", self._on_data_channel)
+        else:
+            await self._get_user_media()
 
         # Gather local ICE candidates
         local_ice_data = await self._gather_ice_candidates(False)
@@ -655,23 +769,13 @@
 
         await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp)
 
-    async def make_call(
-        self, callee_jid: jid.JID, audio: bool = True, video: bool = True
-    ) -> None:
-        """Start a WebRTC call
-
-        @param audio: True if an audio flux is required
-        @param video: True if a video flux is required
-        """
-        await self._create_peer_connection()
-        await self._get_user_media(audio, video)
+    async def _get_call_data(self) -> dict:
+        """Start a WebRTC call"""
         await self._gather_ice_candidates(True)
 
-        call_data = {"sdp": self._peer_connection.localDescription.sdp}
-        log.info(f"calling {callee_jid!r}")
-        self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data))
-        log.debug(f"Call SID: {self.sid}")
+        return {"sdp": self._peer_connection.localDescription.sdp}
 
+    async def _send_buffered_local_candidates(self) -> None:
         if self.local_candidates_buffer:
             log.debug(
                 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
@@ -684,16 +788,68 @@
                     "pwd": self.pwd,
                     "candidates": candidates
                 }
-            aio.run(
-                bridge.ice_candidates_add(
-                    self.sid,
-                    json.dumps(
-                        ice_data
-                    ),
-                )
+            await bridge.ice_candidates_add(
+                self.sid,
+                json.dumps(
+                    ice_data
+                ),
             )
             self.local_candidates_buffer.clear()
 
+    async def make_call(
+        self, callee_jid: jid.JID, audio: bool = True, video: bool = True
+    ) -> None:
+        """
+        @param audio: True if an audio flux is required
+        @param video: True if a video flux is required
+        """
+        await self._create_peer_connection()
+        await self._get_user_media(audio, video)
+        call_data = await self._get_call_data()
+        log.info(f"calling {callee_jid!r}")
+        self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data))
+        log.debug(f"Call SID: {self.sid}")
+        await self._send_buffered_local_candidates()
+
+    def _on_opened_data_channel(self, event):
+        log.info("Datachannel has been opened.")
+
+    async def send_file(self, callee_jid: jid.JID, file: JSObject) -> None:
+        assert self.file_only
+        peer_connection = await self._create_peer_connection()
+        data_channel = peer_connection.createDataChannel("file")
+        call_data = await self._get_call_data()
+        log.info(f"sending file to {callee_jid!r}")
+        file_meta = {
+            "size": file.size
+        }
+        if file.type:
+            file_meta["media_type"] = file.type
+
+        try:
+            file_data = json.loads(await bridge.file_jingle_send(
+                    str(callee_jid),
+                    "",
+                    file.name,
+                    "",
+                    json.dumps({
+                        "webrtc": True,
+                        "call_data": call_data,
+                        **file_meta
+                    })
+            ))
+        except Exception as e:
+            dialog.notification.show(
+                f"Can't send file: {e}", level="error"
+            )
+            return
+
+        self.sid = file_data["session_id"]
+
+        log.debug(f"File Transfer SID: {self.sid}")
+        await self._send_buffered_local_candidates()
+        self.file_sender = FileSender(self.sid, file, data_channel)
+
     async def end_call(self) -> None:
         """Stop streaming and clean instance"""
         if self._peer_connection is None: