Mercurial > libervia-web
view libervia/web/pages/calls/_browser/__init__.py @ 1600:0a4433a343a3
browser (calls): implement WebRTC file sharing:
- Send file through WebRTC when the new `file` button is used during a call.
- Show a confirmation dialog and download file sent by WebRTC.
rel 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 13:06:17 +0200 |
parents | d282dbdd5ffd |
children | 6feac4a25e60 |
line wrap: on
line source
import json from bridge import AsyncBridge as Bridge from browser import aio, console as log, document, window from cache import cache import dialog from javascript import JSObject from jid import JID from jid_search import JidSearch import loading from template import Template from webrtc import WebRTC log.warning = log.warn profile = window.profile or "" bridge = Bridge() GATHER_TIMEOUT = 10000 ALLOWED_STATUSES = ( None, "dialing", "ringing", "in-call", "on-hold", "connecting", "connection-lost", "reconnecting", ) AUDIO = "audio" VIDEO = "video" ALLOWED_CALL_MODES = {AUDIO, VIDEO} INACTIVE_CLASS = "inactive" MUTED_CLASS = "muted" SCREEN_OFF_CLASS = "screen-off" class CallUI: def __init__(self): self.webrtc = WebRTC( screen_sharing_cb=self.on_sharing_screen, on_connection_established_cb=self.on_connection_established, on_reconnect_cb=self.on_reconnect, on_connection_lost_cb=self.on_connection_lost, on_video_devices=self.on_video_devices, on_reset_cb=self.on_reset_cb, ) # mapping of file sending self.files_webrtc: list[dict] = [] self.mode = "search" self._status = None self._callee: JID|None = None self.contacts_elt = document["contacts"] self.search_container_elt = document["search_container"] self.call_container_elt = document["call_container"] self.call_box_elt = document["call_box"] self.call_avatar_wrapper_elt = document["call_avatar_wrapper"] self.call_status_wrapper_elt = document["call_status_wrapper"] self.call_avatar_tpl = Template("call/call_avatar.html") self.call_status_tpl = Template("call/call_status.html") self.audio_player_elt = document["audio_player"] bridge.register_signal("action_new", self._on_action_new) bridge.register_signal("call_info", self._on_call_info) bridge.register_signal("call_setup", self._on_call_setup) bridge.register_signal("call_ended", self._on_call_ended) # call/hang up buttons self._call_mode = VIDEO document["video_call_btn"].bind("click", lambda __: aio.run(self.make_call())) document["audio_call_btn"].bind( "click", lambda __: aio.run(self.make_call(video=False)) ) document["hangup_btn"].bind("click", lambda __: aio.run(self.hang_up())) # other buttons document["full_screen_btn"].bind("click", lambda __: self.toggle_fullscreen()) document["exit_full_screen_btn"].bind( "click", lambda __: self.toggle_fullscreen() ) document["mute_audio_btn"].bind("click", self.toggle_audio_mute) document["mute_video_btn"].bind("click", self.toggle_video_mute) self.share_desktop_col_elt = document["share_desktop_column"] if hasattr(window.navigator.mediaDevices, "getDisplayMedia"): self.share_desktop_col_elt.classList.remove("is-hidden-touch") # screen sharing is supported document["share_desktop_btn"].bind("click", self.toggle_screen_sharing) else: self.share_desktop_col_elt.classList.add("is-hidden") document["switch_camera_btn"].bind("click", self.on_switch_camera) document["send_file_btn"].bind("click", self.on_send_file) document["send_file_input"].bind("change", self._on_send_input_change) # search self.search_elt = document["search"] self.jid_search = JidSearch( self.search_elt, document["contacts"], click_cb=self._on_entity_click, template="call/search_item.html", options={ "no_group": True, "extra_cb": { ".dropdown-trigger": lambda evt, item: aio.run( self.on_entity_action(evt, "menu", item) ), ".click-to-video": lambda evt, item: aio.run( self.on_entity_action(evt, VIDEO, item) ), ".click-to-audio": lambda evt, item: aio.run( self.on_entity_action(evt, AUDIO, item) ), }, }, ) document["clear_search_btn"].bind("click", self.on_clear_search) # incoming call dialog self.incoming_call_dialog_elt = None @property def sid(self) -> str | None: return self.webrtc.sid @sid.setter def sid(self, new_sid) -> None: self.webrtc.sid = new_sid @property def status(self): return self._status @status.setter def status(self, new_status): if new_status != self._status: if new_status not in ALLOWED_STATUSES: raise Exception( f"INTERNAL ERROR: this status is not allowed: {new_status!r}" ) tpl_data = {"entity": self._callee, "status": new_status} if self._callee is not None: try: tpl_data["name"] = cache.identities[self._callee]["nicknames"][0] except (KeyError, IndexError): tpl_data["name"] = str(self._callee) status_elt = self.call_status_tpl.get_elt(tpl_data) self.call_status_wrapper_elt.clear() self.call_status_wrapper_elt <= status_elt self._status = new_status @property def call_mode(self): return self._call_mode @call_mode.setter def call_mode(self, mode): if mode in ALLOWED_CALL_MODES: if self._call_mode == mode: return self._call_mode = mode with_video = mode == VIDEO for elt in self.call_box_elt.select(".is-video-only"): if with_video: elt.classList.remove("is-hidden") else: elt.classList.add("is-hidden") else: raise ValueError("Invalid call mode") def set_avatar(self, entity_jid: JID | str) -> None: """Set the avatar element from entity_jid @param entity_jid: bare jid of the entity """ call_avatar_elt = self.call_avatar_tpl.get_elt( { "entity": str(entity_jid), "identities": cache.identities, } ) self.call_avatar_wrapper_elt.clear() self.call_avatar_wrapper_elt <= call_avatar_elt def _on_action_new( self, action_data_s: str, action_id: str, security_limit: int, profile: str ) -> None: """Called when a call is received @param action_data_s: Action data serialized @param action_id: Unique identifier for the action @param security_limit: Security limit for the action @param profile: Profile associated with the action """ action_data = json.loads(action_data_s) if action_data.get("type") == "confirm" and action_data.get("subtype") == "file": aio.run(self.on_file_preflight(action_data, action_id)) elif action_data.get("type") == "file": aio.run(self.on_file_proposal(action_data, action_id)) elif action_data.get("type") != "call": return else: aio.run(self.on_action_new(action_data, action_id)) def get_human_size(self, size: int|float) -> str: """Return size in human-friendly size using SI units""" units = ["o","Kio","Mio","Gio"] for idx, unit in enumerate(units): if size < 1024.0 or idx == len(units)-1: return f"{size:.2f}{unit}" size /= 1024.0 raise Exception("Internal Error: this line should never be reached.") async def request_file_permission(self, action_data: dict) -> bool: """Request permission to download a file.""" peer_jid = JID(action_data["from_jid"]).bare await cache.fill_identities([peer_jid]) identity = cache.identities[peer_jid] peer_name = identity["nicknames"][0] file_data = action_data.get("file_data", {}) file_name = file_data.get('name') file_size = file_data.get('size') if file_name: file_name_msg = 'wants to send you the file "{file_name}"'.format( file_name=file_name ) else: file_name_msg = 'wants to send you an unnamed file' if file_size is not None: file_size_msg = "which has a size of {file_size_human}".format( file_size_human=self.get_human_size(file_size) ) else: file_size_msg = "which has an unknown size" file_description = file_data.get('desc') if file_description: description_msg = " Description: {}.".format(file_description) else: description_msg = "" file_data = action_data.get("file_data", {}) file_accept_dlg = dialog.Confirm( "{peer_name} ({peer_jid}) {file_name_msg} {file_size_msg}.{description_msg} Do you " "accept?".format( peer_name=peer_name, peer_jid=peer_jid, file_name_msg=file_name_msg, file_size_msg=file_size_msg, description_msg=description_msg ), ok_label="Download", cancel_label="Reject" ) return await file_accept_dlg.ashow() async def on_file_preflight(self, action_data: dict, action_id: str) -> None: """Handle a file preflight (proposal made to all devices).""" # FIXME: temporarily done in call page, will be moved to notifications handler to # make it work anywhere. accepted = await self.request_file_permission(action_data) await bridge.action_launch( action_id, json.dumps({"answer": str(accepted).lower()}) ) async def on_file_proposal(self, action_data: dict, action_id: str) -> None: """Handle a file proposal. This is a proposal made specifically to this device, a opposed to ``on_file_preflight``. File may already have been accepted during preflight. """ # FIXME: as for on_file_preflight, this will be moved to notification handler. if not action_data.get("webrtc", False): peer_jid = JID(action_data["from_jid"]).bare # We try to do a not-too-technical warning about webrtc not being supported. dialog.notification.show( f"A file sending from {peer_jid} can't be accepted because it is not " "compatible with web browser direct transfer (WebRTC).", level="warning", ) # We don't explicitly refuse the file proposal, because it may be accepted and # supported by other frontends. # TODO: Check if any other frontend is connected for this profile, and refuse # the file if none is. return if action_data.get("file_accepted", False): # File proposal has already been accepted in preflight. accepted = True else: accepted = await self.request_file_permission(action_data) if accepted: sid = action_data["session_id"] webrtc = WebRTC( file_only=True, extra_data={"file_data": action_data.get("file_data", {})} ) webrtc.sid = sid self.files_webrtc.append({ "webrtc": webrtc, }) await bridge.action_launch( action_id, json.dumps({"answer": str(accepted).lower()}) ) async def on_action_new(self, action_data: dict, action_id: str) -> None: peer_jid = JID(action_data["from_jid"]).bare call_type = action_data["sub_type"] call_emoji = "📹" if call_type == VIDEO else "📞" log.info(f"{peer_jid} wants to start a call ({call_type}).") if self.sid is not None: log.warning( f"already in a call ({self.sid}), can't receive a new call from " f"{peer_jid}" ) return sid = self.sid = action_data["session_id"] await cache.fill_identities([peer_jid]) identity = cache.identities[peer_jid] self._callee = peer_jid peer_name = identity["nicknames"][0] # we start the ring self.audio_player_elt.play() # and ask user if we take the call try: self.incoming_call_dialog_elt = dialog.Confirm( f"{peer_name} is calling you ({call_emoji}{call_type}).", ok_label="Answer", cancel_label="Reject" ) accepted = await self.incoming_call_dialog_elt.ashow() except dialog.CancelError as e: log.info("Call has been cancelled") self.incoming_call_dialog_elt = None self.sid = None match e.reason: case "busy": dialog.notification.show( f"{peer_name} can't answer your call", level="info", ) case "taken_by_other_device": device = e.text dialog.notification.show( f"The call has been taken on another device ({device}).", level="info", ) case _: dialog.notification.show( f"{peer_name} has cancelled the call", level="info" ) return self.incoming_call_dialog_elt = None # we stop the ring self.audio_player_elt.pause() self.audio_player_elt.currentTime = 0 if accepted: log.debug(f"Call SID: {sid}") # Answer the call self.call_mode = call_type self.set_avatar(peer_jid) self.status = "connecting" self.switch_mode("call") else: log.info(f"your are declining the call from {peer_jid}") self.sid = None await bridge.action_launch(action_id, json.dumps({"cancelled": not accepted})) def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None: """Call has been terminated @param session_id: Session identifier @param data_s: Serialised additional data on why the call has ended @param profile: Profile associated """ if self.sid is None: log.debug("there are no calls in progress") return if session_id != self.sid: log.debug( f"ignoring call_ended not linked to our call ({self.sid}): {session_id}" ) return aio.run(self.end_call(json.loads(data_s))) def _on_call_info(self, session_id: str, info_type, info_data_s: str, profile: str): if self.sid != session_id: return if info_type == "ringing": self.status = "ringing" def _on_call_setup(self, session_id: str, setup_data_s: str, profile: str) -> None: """Called when we have received answer SDP from responder @param session_id: Session identifier @param sdp: Session Description Protocol data @param profile: Profile associated with the action """ aio.run(self.on_call_setup(session_id, json.loads(setup_data_s), profile)) async def on_call_setup( self, session_id: str, setup_data: dict, profile: str ) -> None: """Call has been accepted, connection can be established @param session_id: Session identifier @param setup_data: Data with following keys: role: initiator or responser sdp: Session Description Protocol data @param profile: Profile associated """ if self.sid == session_id: webrtc = self.webrtc else: for file_webrtc in self.files_webrtc: webrtc = file_webrtc["webrtc"] if webrtc.sid == session_id: break else: log.debug( f"Call ignored due to different session ID ({self.sid=} {session_id=})" ) return try: role = setup_data["role"] sdp = setup_data["sdp"] except KeyError: dialog.notification.show( f"Invalid setup data received: {setup_data}", level="error" ) return if role == "initiator": await webrtc.accept_call(session_id, sdp, profile) elif role == "responder": await webrtc.answer_call(session_id, sdp, profile) else: dialog.notification.show( f"Invalid role received during setup: {setup_data}", level="error" ) return def on_connection_established(self): self.status = "in-call" def on_reconnect(self): self.status = "reconnecting" def on_connection_lost(self): self.status = "connection-lost" def on_video_devices(self, has_multiple_cameras: bool) -> None: switch_camera_col_elt = document["switch_camera_column"] if has_multiple_cameras: switch_camera_col_elt.classList.remove("is-hidden", "is-hidden-desktop") else: switch_camera_col_elt.classList.add("is-hidden") def on_reset_cb(self) -> None: """Call when webRTC connection is reset, we reset buttons statuses""" document["full_screen_btn"].classList.remove("is-hidden") document["exit_full_screen_btn"].classList.add("is-hidden") for btn_elt in document["mute_audio_btn"], document["mute_video_btn"]: btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning") btn_elt.classList.add("is-success") async def make_call(self, audio: bool = True, video: bool = True) -> None: """Start a WebRTC call @param audio: True if an audio flux is required @param video: True if a video flux is required """ self.call_mode = VIDEO if video else AUDIO try: callee_jid = JID(self.search_elt.value.strip()) if not callee_jid.is_valid: raise ValueError except ValueError: dialog.notification.show( "Invalid identifier, please use a valid callee identifier", level="error" ) return self._callee = callee_jid await cache.fill_identities([callee_jid]) self.status = "dialing" self.set_avatar(callee_jid) self.switch_mode("call") await self.webrtc.make_call(callee_jid, audio, video) async def end_call(self, data: dict) -> None: """Stop streaming and clean instance""" # if there is any ringing, we stop it self.audio_player_elt.pause() self.audio_player_elt.currentTime = 0 reason = data.get("reason", "") text = data.get("text", "") if self.incoming_call_dialog_elt is not None: self.incoming_call_dialog_elt.cancel(reason, text) self.incoming_call_dialog_elt = None self.switch_mode("search") await self.webrtc.end_call() async def hang_up(self) -> None: """Terminate the call""" session_id = self.sid if not session_id: log.warning("Can't hand_up, not call in progress") return await self.end_call({"reason": "terminated"}) await bridge.call_end(session_id, "") def _handle_animation_end( self, element, remove=None, add=None, ): """Return a handler that removes specified classes and the event handler. @param element: The element to operate on. @param remove: List of class names to remove from the element. @param add: List of class names to add to the element. """ def handler(__, remove=remove, add=add): log.info(f"animation end OK {element=}") if add: if isinstance(add, str): add = [add] element.classList.add(*add) if remove: if isinstance(remove, str): remove = [remove] element.classList.remove(*remove) element.unbind("animationend", handler) return handler def switch_mode(self, mode: str) -> None: """Handles the user interface changes""" if mode == self.mode: return if mode == "call": # Hide contacts with fade-out animation and bring up the call box self.search_container_elt.classList.add("fade-out-y") self.search_container_elt.bind( "animationend", self._handle_animation_end( self.search_container_elt, remove="fade-out-y", add="is-hidden" ), ) self.call_container_elt.classList.remove("is-hidden") self.call_container_elt.classList.add("slide-in") self.call_container_elt.bind( "animationend", self._handle_animation_end(self.call_container_elt, remove="slide-in"), ) self.mode = mode elif mode == "search": self.toggle_fullscreen(False) self.search_container_elt.classList.add("fade-out-y", "animation-reverse") self.search_container_elt.classList.remove("is-hidden") self.search_container_elt.bind( "animationend", self._handle_animation_end( self.search_container_elt, remove=["fade-out-y", "animation-reverse"], ), ) self.call_container_elt.classList.add("slide-in", "animation-reverse") self.call_container_elt.bind( "animationend", self._handle_animation_end( self.call_container_elt, remove=["slide-in", "animation-reverse"], add="is-hidden", ), ) self.mode = mode else: log.error(f"Internal Error: Unknown call mode: {mode}") def on_clear_search(self, ev) -> None: """Clear the search input and trigger its 'input' event. @param ev: the event object from the button click. """ if not self.search_elt.value: return # clear the search field self.search_elt.value = "" # and dispatch the input event so items are updated self.search_elt.dispatchEvent(window.Event.new("input")) def toggle_fullscreen(self, fullscreen: bool | None = None): """Toggle fullscreen mode for video elements. @param fullscreen: if set, determine the fullscreen state; otherwise, the fullscreen mode will be toggled. """ do_fullscreen = ( document.fullscreenElement is None if fullscreen is None else fullscreen ) try: if do_fullscreen: if document.fullscreenElement is None: self.call_box_elt.requestFullscreen() document["full_screen_btn"].classList.add("is-hidden") document["exit_full_screen_btn"].classList.remove("is-hidden") else: if document.fullscreenElement is not None: document.exitFullscreen() document["full_screen_btn"].classList.remove("is-hidden") document["exit_full_screen_btn"].classList.add("is-hidden") except Exception as e: dialog.notification.show( f"An error occurred while toggling fullscreen: {e}", level="error" ) def toggle_audio_mute(self, evt): is_muted = self.webrtc.toggle_audio_mute() btn_elt = evt.currentTarget if is_muted: btn_elt.classList.remove("is-success") btn_elt.classList.add(INACTIVE_CLASS, MUTED_CLASS, "is-warning") dialog.notification.show( f"audio is now muted", level="info", delay=2, ) else: btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning") btn_elt.classList.add("is-success") def toggle_video_mute(self, evt): is_muted = self.webrtc.toggle_video_mute() btn_elt = evt.currentTarget if is_muted: btn_elt.classList.remove("is-success") btn_elt.classList.add(INACTIVE_CLASS, MUTED_CLASS, "is-warning") dialog.notification.show( f"video is now muted", level="info", delay=2, ) else: btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning") btn_elt.classList.add("is-success") def toggle_screen_sharing(self, evt): aio.run(self.webrtc.toggle_screen_sharing()) def on_sharing_screen(self, sharing: bool) -> None: """Called when screen sharing state changes""" share_desktop_btn_elt = document["share_desktop_btn"] if sharing: share_desktop_btn_elt.classList.add("is-danger") share_desktop_btn_elt.classList.remove(INACTIVE_CLASS, SCREEN_OFF_CLASS) else: share_desktop_btn_elt.classList.remove("is-danger") share_desktop_btn_elt.classList.add(INACTIVE_CLASS, SCREEN_OFF_CLASS) def on_switch_camera(self, __) -> None: aio.run(self.webrtc.switch_camera()) def on_send_file(self, __) -> None: document["send_file_input"].click() def _on_send_input_change(self, evt) -> None: aio.run(self.on_send_input_change(evt)) async def on_send_input_change(self, evt) -> None: assert self._callee is not None files = evt.currentTarget.files for file in files: webrtc = WebRTC(file_only=True) self.files_webrtc.append({ "file": file, "webrtc": webrtc }) await webrtc.send_file(self._callee, file) def _on_entity_click(self, item: dict) -> None: aio.run(self.on_entity_click(item)) async def on_entity_click(self, item: dict) -> None: """Set entity JID to search bar, and start the call""" self.search_elt.value = item["entity"] await self.make_call() async def on_entity_action(self, evt, action: str, item: dict) -> None: """Handle extra actions on search items""" evt.stopPropagation() if action == "menu": evt.currentTarget.parent.classList.toggle("is-active") elif action in (VIDEO, AUDIO): self.search_elt.value = item["entity"] # we want the dropdown to be inactive evt.currentTarget.closest(".dropdown").classList.remove("is-active") await self.make_call(video=action == VIDEO) CallUI() loading.remove_loading_screen()