comparison libervia/web/pages/calls/_browser/webrtc.py @ 1600:0a4433a343a3

browser (calls): implement WebRTC file sharing: - Send file through WebRTC when the new `file` button is used during a call. - Show a confirmation dialog and download file sent by WebRTC. rel 442
author Goffi <goffi@goffi.org>
date Sat, 06 Apr 2024 13:06:17 +0200
parents 9ba532041a8e
children 6feac4a25e60
comparison
equal deleted inserted replaced
1599:197350e8bf3b 1600:0a4433a343a3
1 import json 1 import json
2 import re 2 import re
3 3
4 from bridge import AsyncBridge as Bridge 4 from bridge import AsyncBridge as Bridge
5 from browser import aio, console as log, document, timer, window 5 from browser import aio, console as log, document, window
6 import dialog 6 import dialog
7 import errors
8 from javascript import JSObject 7 from javascript import JSObject
9 import jid 8 import jid
10 9
11 log.warning = log.warn 10 log.warning = log.warn
12 profile = window.profile or "" 11 profile = window.profile or ""
13 bridge = Bridge() 12 bridge = Bridge()
14 13
15 14
15 class FileSender:
16 CHUNK_SIZE = 64 * 1024
17
18 def __init__(self, session_id, file, data_channel):
19 self.session_id = session_id
20 self.file = file
21 self.data_channel = data_channel
22 data_channel.bind("open", self._on_open)
23 self.offset = 0
24
25 def _on_open(self, __):
26 log.info(f"Data channel open, starting to send {self.file.name}.")
27 self.send_file()
28
29 def _on_reader_load(self, event):
30 self.data_channel.send(event.target.result)
31 self.offset += self.CHUNK_SIZE
32 self.send_file()
33
34 def send_file(self):
35 if self.offset < self.file.size:
36 chunk = self.file.slice(self.offset, self.offset + self.CHUNK_SIZE)
37 reader = window.FileReader.new()
38 reader.onload = self._on_reader_load
39 reader.readAsArrayBuffer(chunk)
40 else:
41 log.info(f"file {self.file.name} sent.")
42 self.data_channel.close()
43 if self.session_id is not None:
44 aio.run(bridge.call_end(self.session_id, ""))
45
46
47 class FileReceiver:
48
49 def __init__(self, session_id: str, data_channel, extra_data: dict) -> None:
50 """Initializes the file receiver with a data channel.
51
52 @param data_channel: The RTCDataChannel through which file data is received.
53 """
54 self.session_id = session_id
55 self.data_channel = data_channel
56 self.received_chunks = []
57 self.file_data = extra_data.get("file_data", {})
58 data_channel.bind("message", self._on_message)
59 data_channel.bind("close", self._on_close)
60 log.debug("File receiver created.")
61
62 def _on_message(self, event) -> None:
63 """Handles incoming message events from the data channel.
64
65 @param event: The event containing the data chunk.
66 """
67 self.received_chunks.append(event.data)
68
69 def _on_close(self, __) -> None:
70 """Handles the data channel's close event.
71
72 Assembles the received chunks into a Blob and triggers a file download.
73 """
74 # The file is complete, we assemble the chunks in a blob
75 blob = window.Blob.new(self.received_chunks)
76 url = window.URL.createObjectURL(blob)
77
78 # and create the <a> element to download the file.
79 a = document.createElement("a")
80 a.href = url
81 a.download = self.file_data.get("name", "received_file")
82 document.body.appendChild(a)
83 a.click()
84
85 # We now clean up.
86 document.body.removeChild(a)
87 window.URL.revokeObjectURL(url)
88 log.info("File received.")
89 aio.run(bridge.call_end(self.session_id, ""))
90
91
16 class WebRTC: 92 class WebRTC:
93
17 def __init__( 94 def __init__(
18 self, 95 self,
19 screen_sharing_cb=None, 96 screen_sharing_cb=None,
20 on_connection_established_cb=None, 97 on_connection_established_cb=None,
21 on_reconnect_cb=None, 98 on_reconnect_cb=None,
22 on_connection_lost_cb=None, 99 on_connection_lost_cb=None,
23 on_video_devices=None, 100 on_video_devices=None,
24 on_reset_cb=None, 101 on_reset_cb=None,
102 file_only: bool = False,
103 extra_data: dict|None = None
25 ): 104 ):
105 """Initialise WebRTC instance.
106
107 @param screen_sharing_cb: callable function for screen sharing event
108 @param on_connection_established_cb: callable function for connection established event
109 @param on_reconnect_cb: called when a reconnection is triggered.
110 @param on_connection_lost_cb: called when the connection is lost.
111 @param on_video_devices: called when new video devices are set.
112 @param on_reset_cb: called on instance reset.
113 @param file_only: indicates a file transfer only session.
114 @param extra_data: optional dictionary containing additional data.
115 Notably used for file transfer, where ``file_data`` key is used.
116 """
26 # reset 117 # reset
27 self.on_reset_cb = on_reset_cb 118 self.on_reset_cb = on_reset_cb
28 self.reset_instance() 119 self.reset_instance()
29 120
30 # ICE events 121 # ICE events
40 self.on_video_devices = on_video_devices 131 self.on_video_devices = on_video_devices
41 self.video_devices = [] 132 self.video_devices = []
42 self.has_multiple_cameras = False 133 self.has_multiple_cameras = False
43 self.current_camera = None 134 self.current_camera = None
44 135
45 # Initially populate the video devices list 136 self.file_only = file_only
46 aio.run(self._populate_video_devices()) 137 if not file_only:
138 # Initially populate the video devices list
139 aio.run(self._populate_video_devices())
140
141 # video elements
142 self.local_video_elt = document["local_video"]
143 self.remote_video_elt = document["remote_video"]
144 else:
145 self.file_sender = None
47 146
48 # muting 147 # muting
49 self.is_audio_muted = None 148 self.is_audio_muted = None
50 self.is_video_muted = None 149 self.is_video_muted = None
51 150
52 # screen sharing 151 # screen sharing
53 self._is_sharing_screen = False 152 self._is_sharing_screen = False
54 self.screen_sharing_cb = screen_sharing_cb 153 self.screen_sharing_cb = screen_sharing_cb
55 154
56 # video elements 155 # extra
57 self.local_video_elt = document["local_video"] 156 if extra_data is None:
58 self.remote_video_elt = document["remote_video"] 157 extra_data = {}
158 self.extra_data = extra_data
59 159
60 @property 160 @property
61 def is_sharing_screen(self) -> bool: 161 def is_sharing_screen(self) -> bool:
62 return self._is_sharing_screen 162 return self._is_sharing_screen
63 163
71 def reset_instance(self): 171 def reset_instance(self):
72 """Inits or resets the instance variables to their default state.""" 172 """Inits or resets the instance variables to their default state."""
73 self._peer_connection = None 173 self._peer_connection = None
74 self._media_types = None 174 self._media_types = None
75 self._media_types_inv = None 175 self._media_types_inv = None
76 self._callee = None
77 self.ufrag = None 176 self.ufrag = None
78 self.pwd = None 177 self.pwd = None
79 self.sid = None 178 self.sid = None
80 self.local_candidates = None 179 self.local_candidates = None
81 self.remote_stream = None 180 self.remote_stream = None
82 self.remote_candidates_buffer = { 181 self.remote_candidates_buffer = {
83 "audio": {"candidates": []}, 182 "audio": {"candidates": []},
84 "video": {"candidates": []}, 183 "video": {"candidates": []},
184 "application": {"candidates": []},
85 } 185 }
86 self.local_candidates_buffer = {} 186 self.local_candidates_buffer = {}
87 self.media_candidates = {} 187 self.media_candidates = {}
88 if self.on_reset_cb is not None: 188 if self.on_reset_cb is not None:
89 self.on_reset_cb() 189 self.on_reset_cb()
316 if connection.iceGatheringState == "complete": 416 if connection.iceGatheringState == "complete":
317 log.info("ICE candidates gathering done") 417 log.info("ICE candidates gathering done")
318 418
319 async def _create_peer_connection( 419 async def _create_peer_connection(
320 self, 420 self,
321 ): 421 ) -> JSObject:
322 """Creates peer connection""" 422 """Creates peer connection"""
323 if self._peer_connection is not None: 423 if self._peer_connection is not None:
324 raise Exception("create_peer_connection can't be called twice!") 424 raise Exception("create_peer_connection can't be called twice!")
325 425
326 external_disco = json.loads(await bridge.external_disco_get("")) 426 external_disco = json.loads(await bridge.external_disco_get(""))
352 "icegatheringstatechange", self.on_ice_gathering_state_change 452 "icegatheringstatechange", self.on_ice_gathering_state_change
353 ) 453 )
354 454
355 self._peer_connection = peer_connection 455 self._peer_connection = peer_connection
356 window.pc = self._peer_connection 456 window.pc = self._peer_connection
457 return peer_connection
357 458
358 async def _get_user_media(self, audio: bool = True, video: bool = True) -> None: 459 async def _get_user_media(self, audio: bool = True, video: bool = True) -> None:
359 """ 460 """
360 Gets user media (camera and microphone). 461 Gets user media (camera and microphone).
361 462
583 """Called when new ICE canidates are received from peer 684 """Called when new ICE canidates are received from peer
584 685
585 @param candidates: Dictionary containing new ICE candidates 686 @param candidates: Dictionary containing new ICE candidates
586 """ 687 """
587 log.debug(f"new peer candidates received: {candidates}") 688 log.debug(f"new peer candidates received: {candidates}")
588 # FIXME: workaround for https://github.com/brython-dev/brython/issues/2227, the 689
589 # following test raise a JS exception
590 try: 690 try:
691 # FIXME: javascript.NULL must be used here, once we move to Brython 3.12.3+
591 remoteDescription_is_none = self._peer_connection.remoteDescription is None 692 remoteDescription_is_none = self._peer_connection.remoteDescription is None
592 except Exception as e: 693 except Exception as e:
694 # FIXME: should be fine in Brython 3.12.3+
593 log.debug("Workaround for Brython bug activated.") 695 log.debug("Workaround for Brython bug activated.")
594 remoteDescription_is_none = True 696 remoteDescription_is_none = True
595 697
596 if ( 698 if (
597 self._peer_connection is None 699 self._peer_connection is None
598 # or self._peer_connection.remoteDescription is None 700 # or self._peer_connection.remoteDescription is NULL
599 or remoteDescription_is_none 701 or remoteDescription_is_none
600 ): 702 ):
601 for media_type in ("audio", "video"): 703 for media_type in ("audio", "video", "application"):
602 media_candidates = candidates.get(media_type) 704 media_candidates = candidates.get(media_type)
603 if media_candidates: 705 if media_candidates:
604 buffer = self.remote_candidates_buffer[media_type] 706 buffer = self.remote_candidates_buffer[media_type]
605 buffer["candidates"].extend(media_candidates["candidates"]) 707 buffer["candidates"].extend(media_candidates["candidates"])
606 return 708 return
608 for candidate in ice_data["candidates"]: 710 for candidate in ice_data["candidates"]:
609 candidate_sdp = self.build_ice_candidate(candidate) 711 candidate_sdp = self.build_ice_candidate(candidate)
610 try: 712 try:
611 sdp_mline_index = self.get_sdp_mline_index(media_type) 713 sdp_mline_index = self.get_sdp_mline_index(media_type)
612 except Exception as e: 714 except Exception as e:
613 log.warning(e) 715 log.warning(f"Can't get sdp_mline_index: {e}")
614 continue 716 continue
615 ice_candidate = window.RTCIceCandidate.new( 717 else:
616 {"candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index} 718 ice_candidate = window.RTCIceCandidate.new(
617 ) 719 {"candidate": candidate_sdp, "sdpMLineIndex": sdp_mline_index}
618 await self._peer_connection.addIceCandidate(ice_candidate) 720 )
721 await self._peer_connection.addIceCandidate(ice_candidate)
619 722
620 def on_track(self, event): 723 def on_track(self, event):
621 """New track has been received from peer 724 """New track has been received from peer
622 725
623 @param event: Event associated with the new track 726 @param event: Event associated with the new track
632 self.remote_stream.addTrack(event.track) 735 self.remote_stream.addTrack(event.track)
633 736
634 def on_negotiation_needed(self, event) -> None: 737 def on_negotiation_needed(self, event) -> None:
635 log.debug(f"on_negotiation_needed {event=}") 738 log.debug(f"on_negotiation_needed {event=}")
636 # TODO 739 # TODO
740
741 def _on_data_channel(self, event) -> None:
742 """Handles the data channel event from the peer connection.
743
744 @param event: The event associated with the opening of a data channel.
745 """
746 data_channel = event.channel
747 self.file_receiver = FileReceiver(self.sid, data_channel, self.extra_data)
637 748
638 async def answer_call(self, sid: str, offer_sdp: str, profile: str): 749 async def answer_call(self, sid: str, offer_sdp: str, profile: str):
639 """We respond to the call""" 750 """We respond to the call"""
640 log.debug("answering call") 751 log.debug("answering call")
641 if sid != self.sid: 752 if sid != self.sid:
645 await self._peer_connection.setRemoteDescription( 756 await self._peer_connection.setRemoteDescription(
646 {"type": "offer", "sdp": offer_sdp} 757 {"type": "offer", "sdp": offer_sdp}
647 ) 758 )
648 await self.on_ice_candidates_new(self.remote_candidates_buffer) 759 await self.on_ice_candidates_new(self.remote_candidates_buffer)
649 self.remote_candidates_buffer.clear() 760 self.remote_candidates_buffer.clear()
650 await self._get_user_media() 761 if self.file_only:
762 self._peer_connection.bind("datachannel", self._on_data_channel)
763 else:
764 await self._get_user_media()
651 765
652 # Gather local ICE candidates 766 # Gather local ICE candidates
653 local_ice_data = await self._gather_ice_candidates(False) 767 local_ice_data = await self._gather_ice_candidates(False)
654 self.local_candidates = local_ice_data["candidates"] 768 self.local_candidates = local_ice_data["candidates"]
655 769
656 await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp) 770 await bridge.call_answer_sdp(sid, self._peer_connection.localDescription.sdp)
657 771
658 async def make_call( 772 async def _get_call_data(self) -> dict:
659 self, callee_jid: jid.JID, audio: bool = True, video: bool = True 773 """Start a WebRTC call"""
660 ) -> None:
661 """Start a WebRTC call
662
663 @param audio: True if an audio flux is required
664 @param video: True if a video flux is required
665 """
666 await self._create_peer_connection()
667 await self._get_user_media(audio, video)
668 await self._gather_ice_candidates(True) 774 await self._gather_ice_candidates(True)
669 775
670 call_data = {"sdp": self._peer_connection.localDescription.sdp} 776 return {"sdp": self._peer_connection.localDescription.sdp}
671 log.info(f"calling {callee_jid!r}") 777
672 self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data)) 778 async def _send_buffered_local_candidates(self) -> None:
673 log.debug(f"Call SID: {self.sid}")
674
675 if self.local_candidates_buffer: 779 if self.local_candidates_buffer:
676 log.debug( 780 log.debug(
677 f"sending buffered local ICE candidates: {self.local_candidates_buffer}" 781 f"sending buffered local ICE candidates: {self.local_candidates_buffer}"
678 ) 782 )
679 assert self.pwd is not None 783 assert self.pwd is not None
682 ice_data[media_type] = { 786 ice_data[media_type] = {
683 "ufrag": self.ufrag, 787 "ufrag": self.ufrag,
684 "pwd": self.pwd, 788 "pwd": self.pwd,
685 "candidates": candidates 789 "candidates": candidates
686 } 790 }
687 aio.run( 791 await bridge.ice_candidates_add(
688 bridge.ice_candidates_add( 792 self.sid,
689 self.sid, 793 json.dumps(
690 json.dumps( 794 ice_data
691 ice_data 795 ),
692 ),
693 )
694 ) 796 )
695 self.local_candidates_buffer.clear() 797 self.local_candidates_buffer.clear()
798
799 async def make_call(
800 self, callee_jid: jid.JID, audio: bool = True, video: bool = True
801 ) -> None:
802 """
803 @param audio: True if an audio flux is required
804 @param video: True if a video flux is required
805 """
806 await self._create_peer_connection()
807 await self._get_user_media(audio, video)
808 call_data = await self._get_call_data()
809 log.info(f"calling {callee_jid!r}")
810 self.sid = await bridge.call_start(str(callee_jid), json.dumps(call_data))
811 log.debug(f"Call SID: {self.sid}")
812 await self._send_buffered_local_candidates()
813
814 def _on_opened_data_channel(self, event):
815 log.info("Datachannel has been opened.")
816
817 async def send_file(self, callee_jid: jid.JID, file: JSObject) -> None:
818 assert self.file_only
819 peer_connection = await self._create_peer_connection()
820 data_channel = peer_connection.createDataChannel("file")
821 call_data = await self._get_call_data()
822 log.info(f"sending file to {callee_jid!r}")
823 file_meta = {
824 "size": file.size
825 }
826 if file.type:
827 file_meta["media_type"] = file.type
828
829 try:
830 file_data = json.loads(await bridge.file_jingle_send(
831 str(callee_jid),
832 "",
833 file.name,
834 "",
835 json.dumps({
836 "webrtc": True,
837 "call_data": call_data,
838 **file_meta
839 })
840 ))
841 except Exception as e:
842 dialog.notification.show(
843 f"Can't send file: {e}", level="error"
844 )
845 return
846
847 self.sid = file_data["session_id"]
848
849 log.debug(f"File Transfer SID: {self.sid}")
850 await self._send_buffered_local_candidates()
851 self.file_sender = FileSender(self.sid, file, data_channel)
696 852
697 async def end_call(self) -> None: 853 async def end_call(self) -> None:
698 """Stop streaming and clean instance""" 854 """Stop streaming and clean instance"""
699 if self._peer_connection is None: 855 if self._peer_connection is None:
700 log.debug("There is currently no call to end.") 856 log.debug("There is currently no call to end.")