Mercurial > libervia-web
view libervia/web/pages/_browser/bridge.py @ 1616:6bfeb9f0fb84
browser (calls): conferences implementation:
- Handle A/V conferences calls creation/joining by entering a conference room JID in the
search box.
- Group call box has been improved and is used both for group calls (small number of
participants) and A/V conferences (larger number of participants).
- Fullscreen button for group call is working.
- Avatar/user nickname are shown in group call on peer user, as an overlay on video
stream.
- Use `user` metadata when present to display the right user avatar/name when receiving a
stream from SFU (i.e. A/V conference).
- Peer user have a new 3 dots menu with a `pin` item to (un)pin it (i.e. display it on
full container with on top).
- Updated webrtc to handle unidirectional streams correctly and to adapt to A/V conference
specification.
rel 448
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 07 Aug 2024 00:01:57 +0200 |
parents | 5db55d01ce05 |
children |
line wrap: on
line source
from browser import window, aio, timer, console as log import time import random import json import dialog import javascript log.warning = log.warn tab_id = random.randint(0, 2**64) log.info(f"TAB ID is {tab_id}") class BridgeException(Exception): """An exception which has been raised from the backend and arrived to the frontend.""" def __init__(self, name, message="", condition=""): """ @param name (str): full exception class name (with module) @param message (str): error message @param condition (str) : error condition """ Exception.__init__(self) self.fullname = str(name) self.message = str(message) self.condition = str(condition) if condition else "" self.module, __, self.classname = str(self.fullname).rpartition(".") def __str__(self): return f"{self.classname}: {self.message or ''}" def __eq__(self, other): return self.classname == other class WebSocket: def __init__(self, broadcast_channel): self.broadcast_channel = broadcast_channel self.token = window.ws_token self.socket = None self.create_socket() self.retrying = False self.network_error = False self._ready_fut = aio.Future() @property def profile(self): return self.broadcast_channel.profile @property def is_ready(self): return False if self.socket is None else self.socket.readyState == "OPEN" @property def connection_ready_fut(self): """Future resolved when connection is ready""" return self._ready_fut def retry_connect(self) -> None: if self.retrying: return self.retrying = True try: notif = dialog.RetryNotification(self.create_socket) notif.show( "Can't connect to server", delay=random.randint(0, 30) ) except Exception as e: # for security reasons, browser don't give the reason of the error with # WebSockets, thus we try to detect network error here, as if we can't show # the retry dialog, that probably means that it's not reachable try: name = e.name except AttributeError: name = None if name == "NetworkError": self.network_error = True log.warning("network error detected, server may be down") log.error(f"Can't show retry dialog: {e}") log.info("retrying in 30s") timer.set_timeout(self.create_socket, 30000) else: raise e else: # if we can show the retry dialog, the network is fine self.network_error = False def create_socket(self) -> None: log.debug("creating socket") self.retrying = False self.socket = window.WebSocket.new(window.ws_url, "libervia-page") self.socket_start = time.time() self.socket.bind("open", self.on_open) self.socket.bind("error", self.on_error) self.socket.bind("close", self.on_close) self.socket.bind("message", self.on_message) def send(self, data_type: str, data: dict) -> None: self.socket.send(json.dumps({ "type": data_type, "data": data }, ensure_ascii=False)) def close(self) -> None: log.debug("closing socket") self.broadcast_channel.ws = None self.socket.close() def on_open(self, evt) -> None: log.info("websocket connection opened") self._ready_fut.set_result(None) self.send("init", {"profile": self.profile, "token": self.token}) def on_error(self, evt) -> None: if not self.network_error and time.time() - self.socket_start < 5: # disconnection is happening fast, we try to reload log.warning("Reloading due to suspected session error") window.location.reload() else: self.retry_connect() def on_close(self, evt) -> None: log.warning(f"websocket is closed {evt.code=} {evt.reason=}") if self.broadcast_channel.ws is None: # this is a close requested locally return elif evt.code == 4401: log.info( "no authorized, the session is probably not valid anymore, reloading" ) window.location.reload() else: # close event may be due to normal tab closing, thus we try to reconnect only # after a delay timer.set_timeout(self.retry_connect, 5000) def on_message(self, message_evt): msg_data = json.loads(message_evt.data) msg_type = msg_data.get("type") if msg_type == "bridge": log.debug( f"==> bridge message: {msg_data=}" ) self.broadcast_channel.post( msg_type, msg_data["data"] ) elif msg_type == "force_close": log.warning(f"force closing connection: {msg_data.get('reason')}") self.close() else: dialog.notification.show( f"Unexpected message type {msg_type}" "error" ) class BroadcastChannel: handlers = {} def __init__(self): log.debug(f"BroadcastChannel init with profile {self.profile!r}") self.ws = None self._connected = False self.start = time.time() self.bc = window.BroadcastChannel.new("libervia") self.bc.bind("message", self.on_message) # there is no way to check if there is already a connection in BroadcastChannel # API, thus we wait a bit to see if somebody is answering. If not, we are probably # the first tab. self.check_connection_timer = timer.set_timeout(self.establish_connection, 20) # set of all known tab ids self.tabs_ids = {tab_id} self.post("salut_a_vous", { "profile": self.profile }) window.bind("unload", self.on_unload) self._wait_connection_fut = aio.Future() @property def profile(self): return window.profile or "" @property def connecting_tab(self) -> bool: """True is this tab is the one establishing the websocket connection""" return self.ws is not None async def _wait_for_ws(self): assert self.ws is not None await self.ws.connection_ready_fut self._wait_connection_fut.set_result(None) self._connected = True @connecting_tab.setter def connecting_tab(self, connecting: bool) -> None: if connecting: if self.ws is None: self.ws = WebSocket(self) self.post("connection", {}) aio.run(self._wait_for_ws()) elif self.ws is not None: self.ws.close() @property def connected(self): return self._connected async def wait_for_connection(self): if self._connected: return await self._wait_connection_fut def establish_connection(self) -> None: """Called when there is no existing connection""" timer.clear_timeout(self.check_connection_timer) log.debug(f"Establishing connection {tab_id=}") self.connecting_tab = True def handle_bridge_signal(self, data: dict) -> None: """Forward bridge signals to registered handlers""" signal = data["signal"] handlers = self.handlers.get(signal, []) for handler in handlers: handler(*data["args"]) def on_message(self, evt) -> None: data = json.loads(evt.data) # FIXME: we convert back to int, see FIXME in [post] for details data["id"] = int(data["id"]) if data["type"] == "bridge": self.handle_bridge_signal(data) elif data["type"] == "salut_a_toi": # this is a response from existing tabs other_tab_id = data["id"] if other_tab_id == tab_id: # in the unlikely case that this happens, we simply reload this tab to get # a new ID log.warning("duplicate tab id, we reload the page: {tab_id=}") window.location.reload() return self.tabs_ids.add(other_tab_id) if data["connecting_tab"] and self.check_connection_timer is not None: # this tab has the websocket connection to server log.info(f"there is already a connection to server at tab {other_tab_id}") timer.clear_timeout(self.check_connection_timer) self.check_connection_timer = None self._wait_connection_fut.set_result(None) self._connected = True elif data["type"] == "salut_a_vous": # a new tab has just been created if data["profile"] != self.profile: log.info( f"we are now connected with the profile {data['profile']}, " "reloading the page" ) window.location.reload() else: self.tabs_ids.add(data["id"]) self.post("salut_a_toi", { "id": tab_id, "connecting_tab": self.connecting_tab }) elif data["type"] == "connection": log.info(f"tab {data['id']} is the new connecting tab") elif data["type"] == "salut_a_rantanplan": # a tab is being closed other_tab_id = data["id"] # it is unlikely that there is a collision, but just in case we check it if other_tab_id != tab_id: self.tabs_ids.discard(other_tab_id) if data["connecting_tab"]: log.info(f"connecting tab with id {other_tab_id} has been closed") if max(self.tabs_ids) == tab_id: log.info("this is the new connecting tab, establish_connection") self.connecting_tab = True else: log.info(f"tab with id {other_tab_id} has been closed") else: log.warning(f"unknown message type: {data}") def post(self, data_type, data: dict): data["type"] = str(data_type) # FIXME: for some reason, JSON.stringify fail when a random.randint is used with # Brython 3.11 . See https://github.com/brython-dev/brython/issues/2332, # workaround may be removed once fixed version is used. data["id"] = str(tab_id) # FIXME: json.dumps doesn't support "ensure_ascii=False" and fails to correctly # dump emoji. See https://github.com/brython-dev/brython/issues/2331, workaround # may be removed once fixed version is used. dumped = javascript.JSON.stringify(data) self.bc.postMessage(dumped) if data_type == "bridge": self.handle_bridge_signal(data) def on_unload(self, evt) -> None: """Send a message to indicate that the tab is being closed""" self.post("salut_a_rantanplan", { "id": tab_id, "connecting_tab": self.connecting_tab }) class Bridge: bc: BroadcastChannel | None = None def __init__(self) -> None: if Bridge.bc is None: Bridge.bc = BroadcastChannel() def __getattr__(self, attr): return lambda *args, **kwargs: self.call(attr, *args, **kwargs) def on_load(self, xhr, ev, callback, errback): if xhr.status == 200: ret = javascript.JSON.parse(xhr.response) if callback is not None: if ret is None: callback() else: callback(ret) elif xhr.status == 502: # PROXY_ERROR is used for bridge error ret = javascript.JSON.parse(xhr.response) if errback is not None: errback(ret) else: log.error( f"bridge call failed: code: {xhr.response}, text: {xhr.statusText}" ) if errback is not None: errback({"fullname": "BridgeInternalError", "message": xhr.statusText}) def call(self, method_name, *args, callback, errback, **kwargs): xhr = window.XMLHttpRequest.new() xhr.bind('load', lambda ev: self.on_load(xhr, ev, callback, errback)) xhr.bind('error', lambda ev: errback( {"fullname": "ConnectionError", "message": xhr.statusText})) xhr.open("POST", f"/_bridge/{method_name}", True) data = javascript.JSON.stringify({ "args": args, "kwargs": kwargs, }) xhr.setRequestHeader('X-Csrf-Token', window.csrf_token) xhr.send(data) def register_signal(self, signal: str, handler, iface=None) -> None: BroadcastChannel.handlers.setdefault(signal, []).append(handler) log.debug(f"signal {signal} has been registered") class AsyncBridge: def __getattr__(self, attr): return lambda *args, **kwargs: self.call(attr, *args, **kwargs) async def call(self, method_name, *args, **kwargs): print(f"calling {method_name}") data = javascript.JSON.stringify({ "args": args, "kwargs": kwargs, }) url = f"/_bridge/{method_name}" r = await aio.post( url, headers={ 'X-Csrf-Token': window.csrf_token, }, data=data, ) if r.status == 200: return javascript.JSON.parse(r.data) elif r.status == 502: ret = javascript.JSON.parse(r.data) raise BridgeException(ret['fullname'], ret['message'], ret['condition']) else: print(f"bridge called failed: code: {r.status}, text: {r.statusText}") raise BridgeException("InternalError", r.statusText) def register_signal(self, signal: str, handler, iface=None) -> None: BroadcastChannel.handlers.setdefault(signal, []).append(handler) log.debug(f"signal {signal} has been registered")