view libervia/web/pages/calls/_browser/__init__.py @ 1599:197350e8bf3b

pages(g): fix guest session data: Following https://repos.goffi.org/libervia-web/rev/86c7a3a625d5, new session are now started on connection, as a result, guest data were lost on connection. This patch fixes it by storing those data after the connection.
author Goffi <goffi@goffi.org>
date Tue, 05 Mar 2024 16:40:25 +0100
parents d282dbdd5ffd
children 0a4433a343a3
line wrap: on
line source

import json

from bridge import AsyncBridge as Bridge
from browser import aio, console as log, document, window
from cache import cache
import dialog
from jid import JID
from jid_search import JidSearch
import loading
from template import Template
from webrtc import WebRTC

log.warning = log.warn
profile = window.profile or ""
bridge = Bridge()
GATHER_TIMEOUT = 10000
ALLOWED_STATUSES = (
    None,
    "dialing",
    "ringing",
    "in-call",
    "on-hold",
    "connecting",
    "connection-lost",
    "reconnecting",
)
AUDIO = "audio"
VIDEO = "video"
ALLOWED_CALL_MODES = {AUDIO, VIDEO}
INACTIVE_CLASS = "inactive"
MUTED_CLASS = "muted"
SCREEN_OFF_CLASS = "screen-off"


class CallUI:
    def __init__(self):
        self.webrtc = WebRTC(
            screen_sharing_cb=self.on_sharing_screen,
            on_connection_established_cb=self.on_connection_established,
            on_reconnect_cb=self.on_reconnect,
            on_connection_lost_cb=self.on_connection_lost,
            on_video_devices=self.on_video_devices,
            on_reset_cb=self.on_reset_cb,
        )
        self.mode = "search"
        self._status = None
        self._callee = None
        self.contacts_elt = document["contacts"]
        self.search_container_elt = document["search_container"]
        self.call_container_elt = document["call_container"]
        self.call_box_elt = document["call_box"]
        self.call_avatar_wrapper_elt = document["call_avatar_wrapper"]
        self.call_status_wrapper_elt = document["call_status_wrapper"]
        self.call_avatar_tpl = Template("call/call_avatar.html")
        self.call_status_tpl = Template("call/call_status.html")
        self.audio_player_elt = document["audio_player"]
        bridge.register_signal("action_new", self._on_action_new)
        bridge.register_signal("call_info", self._on_call_info)
        bridge.register_signal("call_setup", self._on_call_setup)
        bridge.register_signal("call_ended", self._on_call_ended)

        # call/hang up buttons
        self._call_mode = VIDEO
        document["video_call_btn"].bind("click", lambda __: aio.run(self.make_call()))
        document["audio_call_btn"].bind(
            "click", lambda __: aio.run(self.make_call(video=False))
        )
        document["hangup_btn"].bind("click", lambda __: aio.run(self.hang_up()))

        # other buttons
        document["full_screen_btn"].bind("click", lambda __: self.toggle_fullscreen())
        document["exit_full_screen_btn"].bind(
            "click", lambda __: self.toggle_fullscreen()
        )
        document["mute_audio_btn"].bind("click", self.toggle_audio_mute)
        document["mute_video_btn"].bind("click", self.toggle_video_mute)
        self.share_desktop_col_elt = document["share_desktop_column"]
        if hasattr(window.navigator.mediaDevices, "getDisplayMedia"):
            self.share_desktop_col_elt.classList.remove("is-hidden-touch")
            # screen sharing is supported
            document["share_desktop_btn"].bind("click", self.toggle_screen_sharing)
        else:
            self.share_desktop_col_elt.classList.add("is-hidden")
        document["switch_camera_btn"].bind("click", self.on_switch_camera)

        # search
        self.search_elt = document["search"]
        self.jid_search = JidSearch(
            self.search_elt,
            document["contacts"],
            click_cb=self._on_entity_click,
            template="call/search_item.html",
            options={
                "no_group": True,
                "extra_cb": {
                    ".dropdown-trigger": lambda evt, item: aio.run(
                        self.on_entity_action(evt, "menu", item)
                    ),
                    ".click-to-video": lambda evt, item: aio.run(
                        self.on_entity_action(evt, VIDEO, item)
                    ),
                    ".click-to-audio": lambda evt, item: aio.run(
                        self.on_entity_action(evt, AUDIO, item)
                    ),
                },
            },
        )
        document["clear_search_btn"].bind("click", self.on_clear_search)

        # incoming call dialog
        self.incoming_call_dialog_elt = None

    @property
    def sid(self) -> str | None:
        return self.webrtc.sid

    @sid.setter
    def sid(self, new_sid) -> None:
        self.webrtc.sid = new_sid

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, new_status):
        if new_status != self._status:
            if new_status not in ALLOWED_STATUSES:
                raise Exception(
                    f"INTERNAL ERROR: this status is not allowed: {new_status!r}"
                )
            tpl_data = {"entity": self._callee, "status": new_status}
            if self._callee is not None:
                try:
                    tpl_data["name"] = cache.identities[self._callee]["nicknames"][0]
                except (KeyError, IndexError):
                    tpl_data["name"] = str(self._callee)
            status_elt = self.call_status_tpl.get_elt(tpl_data)
            self.call_status_wrapper_elt.clear()
            self.call_status_wrapper_elt <= status_elt

        self._status = new_status

    @property
    def call_mode(self):
        return self._call_mode

    @call_mode.setter
    def call_mode(self, mode):
        if mode in ALLOWED_CALL_MODES:
            if self._call_mode == mode:
                return
            self._call_mode = mode
            with_video = mode == VIDEO
            for elt in self.call_box_elt.select(".is-video-only"):
                if with_video:
                    elt.classList.remove("is-hidden")
                else:
                    elt.classList.add("is-hidden")
        else:
            raise ValueError("Invalid call mode")

    def set_avatar(self, entity_jid: JID | str) -> None:
        """Set the avatar element from entity_jid

        @param entity_jid: bare jid of the entity
        """
        call_avatar_elt = self.call_avatar_tpl.get_elt(
            {
                "entity": str(entity_jid),
                "identities": cache.identities,
            }
        )
        self.call_avatar_wrapper_elt.clear()
        self.call_avatar_wrapper_elt <= call_avatar_elt

    def _on_action_new(
        self, action_data_s: str, action_id: str, security_limit: int, profile: str
    ) -> None:
        """Called when a call is received

        @param action_data_s: Action data serialized
        @param action_id: Unique identifier for the action
        @param security_limit: Security limit for the action
        @param profile: Profile associated with the action
        """
        action_data = json.loads(action_data_s)
        if action_data.get("type") != "call":
            return
        aio.run(self.on_action_new(action_data, action_id))

    async def on_action_new(self, action_data: dict, action_id: str) -> None:
        peer_jid = JID(action_data["from_jid"]).bare
        log.info(f"{peer_jid} wants to start a call ({action_data['sub_type']})")
        if self.sid is not None:
            log.warning(
                f"already in a call ({self.sid}), can't receive a new call from "
                f"{peer_jid}"
            )
            return
        sid = self.sid = action_data["session_id"]
        await cache.fill_identities([peer_jid])
        identity = cache.identities[peer_jid]
        self._callee = peer_jid
        peer_name = identity["nicknames"][0]

        # we start the ring
        self.audio_player_elt.play()

        # and ask user if we take the call
        try:
            self.incoming_call_dialog_elt = dialog.Confirm(
                f"{peer_name} is calling you.", ok_label="Answer", cancel_label="Reject"
            )
            accepted = await self.incoming_call_dialog_elt.ashow()
        except dialog.CancelError:
            log.info("Call has been cancelled")
            self.incoming_call_dialog_elt = None
            self.sid = None
            dialog.notification.show(f"{peer_name} has cancelled the call", level="info")
            return

        self.incoming_call_dialog_elt = None

        # we stop the ring
        self.audio_player_elt.pause()
        self.audio_player_elt.currentTime = 0

        if accepted:
            log.debug(f"Call SID: {sid}")

            # Answer the call
            self.set_avatar(peer_jid)
            self.status = "connecting"
            self.switch_mode("call")
        else:
            log.info(f"your are declining the call from {peer_jid}")
            self.sid = None
        await bridge.action_launch(action_id, json.dumps({"cancelled": not accepted}))

    def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None:
        """Call has been terminated

        @param session_id: Session identifier
        @param data_s: Serialised additional data on why the call has ended
        @param profile: Profile associated
        """
        if self.sid is None:
            log.debug("there are no calls in progress")
            return
        if session_id != self.sid:
            log.debug(
                f"ignoring call_ended not linked to our call ({self.sid}): {session_id}"
            )
            return
        aio.run(self.end_call(json.loads(data_s)))

    def _on_call_info(self, session_id: str, info_type, info_data_s: str, profile: str):
        if self.sid != session_id:
            return
        if info_type == "ringing":
            self.status = "ringing"

    def _on_call_setup(self, session_id: str, setup_data_s: str, profile: str) -> None:
        """Called when we have received answer SDP from responder

        @param session_id: Session identifier
        @param sdp: Session Description Protocol data
        @param profile: Profile associated with the action
        """
        aio.run(self.on_call_setup(session_id, json.loads(setup_data_s), profile))

    async def on_call_setup(
        self, session_id: str, setup_data: dict, profile: str
    ) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param setup_data: Data with following keys:
            role: initiator or responser
            sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        if self.sid != session_id:
            log.debug(
                f"Call ignored due to different session ID ({self.sid=} {session_id=})"
            )
            return
        try:
            role = setup_data["role"]
            sdp = setup_data["sdp"]
        except KeyError:
            dialog.notification.show(
                f"Invalid setup data received: {setup_data}", level="error"
            )
            return
        if role == "initiator":
            await self.webrtc.accept_call(session_id, sdp, profile)
        elif role == "responder":
            await self.webrtc.answer_call(session_id, sdp, profile)
        else:
            dialog.notification.show(
                f"Invalid role received during setup: {setup_data}", level="error"
            )
            return

    def on_connection_established(self):
        self.status = "in-call"

    def on_reconnect(self):
        self.status = "reconnecting"

    def on_connection_lost(self):
        self.status = "connection-lost"

    def on_video_devices(self, has_multiple_cameras: bool) -> None:
        switch_camera_col_elt = document["switch_camera_column"]
        if has_multiple_cameras:
            switch_camera_col_elt.classList.remove("is-hidden", "is-hidden-desktop")
        else:
            switch_camera_col_elt.classList.add("is-hidden")

    def on_reset_cb(self) -> None:
        """Call when webRTC connection is reset, we reset buttons statuses"""
        document["full_screen_btn"].classList.remove("is-hidden")
        document["exit_full_screen_btn"].classList.add("is-hidden")
        for btn_elt in document["mute_audio_btn"], document["mute_video_btn"]:
            btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            btn_elt.classList.add("is-success")

    async def make_call(self, audio: bool = True, video: bool = True) -> None:
        """Start a WebRTC call

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        """
        self.call_mode = VIDEO if video else AUDIO
        try:
            callee_jid = JID(self.search_elt.value.strip())
            if not callee_jid.is_valid:
                raise ValueError
        except ValueError:
            dialog.notification.show(
                "Invalid identifier, please use a valid callee identifier", level="error"
            )
            return

        self._callee = callee_jid
        await cache.fill_identities([callee_jid])
        self.status = "dialing"
        self.set_avatar(callee_jid)

        self.switch_mode("call")
        await self.webrtc.make_call(callee_jid, audio, video)

    async def end_call(self, data: dict) -> None:
        """Stop streaming and clean instance"""
        # if there is any ringing, we stop it
        self.audio_player_elt.pause()
        self.audio_player_elt.currentTime = 0

        if self.incoming_call_dialog_elt is not None:
            self.incoming_call_dialog_elt.cancel()
            self.incoming_call_dialog_elt = None

        self.switch_mode("search")

        if data.get("reason") == "busy":
            assert self._callee is not None
            peer_name = cache.identities[self._callee]["nicknames"][0]
            dialog.notification.show(
                f"{peer_name} can't answer your call",
                level="info",
            )

        await self.webrtc.end_call()

    async def hang_up(self) -> None:
        """Terminate the call"""
        session_id = self.sid
        if not session_id:
            log.warning("Can't hand_up, not call in progress")
            return
        await self.end_call({"reason": "terminated"})
        await bridge.call_end(session_id, "")

    def _handle_animation_end(
        self,
        element,
        remove=None,
        add=None,
    ):
        """Return a handler that removes specified classes and the event handler.

        @param element: The element to operate on.
        @param remove: List of class names to remove from the element.
        @param add: List of class names to add to the element.
        """

        def handler(__, remove=remove, add=add):
            log.info(f"animation end OK {element=}")
            if add:
                if isinstance(add, str):
                    add = [add]
                element.classList.add(*add)
            if remove:
                if isinstance(remove, str):
                    remove = [remove]
                element.classList.remove(*remove)
            element.unbind("animationend", handler)

        return handler

    def switch_mode(self, mode: str) -> None:
        """Handles the user interface changes"""
        if mode == self.mode:
            return
        if mode == "call":
            # Hide contacts with fade-out animation and bring up the call box
            self.search_container_elt.classList.add("fade-out-y")
            self.search_container_elt.bind(
                "animationend",
                self._handle_animation_end(
                    self.search_container_elt, remove="fade-out-y", add="is-hidden"
                ),
            )
            self.call_container_elt.classList.remove("is-hidden")
            self.call_container_elt.classList.add("slide-in")
            self.call_container_elt.bind(
                "animationend",
                self._handle_animation_end(self.call_container_elt, remove="slide-in"),
            )
            self.mode = mode
        elif mode == "search":
            self.toggle_fullscreen(False)
            self.search_container_elt.classList.add("fade-out-y", "animation-reverse")
            self.search_container_elt.classList.remove("is-hidden")
            self.search_container_elt.bind(
                "animationend",
                self._handle_animation_end(
                    self.search_container_elt,
                    remove=["fade-out-y", "animation-reverse"],
                ),
            )
            self.call_container_elt.classList.add("slide-in", "animation-reverse")
            self.call_container_elt.bind(
                "animationend",
                self._handle_animation_end(
                    self.call_container_elt,
                    remove=["slide-in", "animation-reverse"],
                    add="is-hidden",
                ),
            )
            self.mode = mode
        else:
            log.error(f"Internal Error: Unknown call mode: {mode}")

    def on_clear_search(self, ev) -> None:
        """Clear the search input and trigger its 'input' event.

        @param ev: the event object from the button click.
        """
        if not self.search_elt.value:
            return
        # clear the search field
        self.search_elt.value = ""
        # and dispatch the input event so items are updated
        self.search_elt.dispatchEvent(window.Event.new("input"))

    def toggle_fullscreen(self, fullscreen: bool | None = None):
        """Toggle fullscreen mode for video elements.

        @param fullscreen: if set, determine the fullscreen state; otherwise,
            the fullscreen mode will be toggled.
        """
        do_fullscreen = (
            document.fullscreenElement is None if fullscreen is None else fullscreen
        )

        try:
            if do_fullscreen:
                if document.fullscreenElement is None:
                    self.call_box_elt.requestFullscreen()
                    document["full_screen_btn"].classList.add("is-hidden")
                    document["exit_full_screen_btn"].classList.remove("is-hidden")
            else:
                if document.fullscreenElement is not None:
                    document.exitFullscreen()
                    document["full_screen_btn"].classList.remove("is-hidden")
                    document["exit_full_screen_btn"].classList.add("is-hidden")

        except Exception as e:
            dialog.notification.show(
                f"An error occurred while toggling fullscreen: {e}", level="error"
            )

    def toggle_audio_mute(self, evt):
        is_muted = self.webrtc.toggle_audio_mute()
        btn_elt = evt.currentTarget
        if is_muted:
            btn_elt.classList.remove("is-success")
            btn_elt.classList.add(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            dialog.notification.show(
                f"audio is now muted",
                level="info",
                delay=2,
            )
        else:
            btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            btn_elt.classList.add("is-success")

    def toggle_video_mute(self, evt):
        is_muted = self.webrtc.toggle_video_mute()
        btn_elt = evt.currentTarget
        if is_muted:
            btn_elt.classList.remove("is-success")
            btn_elt.classList.add(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            dialog.notification.show(
                f"video is now muted",
                level="info",
                delay=2,
            )
        else:
            btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            btn_elt.classList.add("is-success")

    def toggle_screen_sharing(self, evt):
        aio.run(self.webrtc.toggle_screen_sharing())

    def on_sharing_screen(self, sharing: bool) -> None:
        """Called when screen sharing state changes"""
        share_desktop_btn_elt = document["share_desktop_btn"]
        if sharing:
            share_desktop_btn_elt.classList.add("is-danger")
            share_desktop_btn_elt.classList.remove(INACTIVE_CLASS, SCREEN_OFF_CLASS)
        else:
            share_desktop_btn_elt.classList.remove("is-danger")
            share_desktop_btn_elt.classList.add(INACTIVE_CLASS, SCREEN_OFF_CLASS)

    def on_switch_camera(self, __) -> None:
        aio.run(self.webrtc.switch_camera())

    def _on_entity_click(self, item: dict) -> None:
        aio.run(self.on_entity_click(item))

    async def on_entity_click(self, item: dict) -> None:
        """Set entity JID to search bar, and start the call"""
        self.search_elt.value = item["entity"]

        await self.make_call()

    async def on_entity_action(self, evt, action: str, item: dict) -> None:
        """Handle extra actions on search items"""
        evt.stopPropagation()
        if action == "menu":
            evt.currentTarget.parent.classList.toggle("is-active")
        elif action in (VIDEO, AUDIO):
            self.search_elt.value = item["entity"]
            # we want the dropdown to be inactive
            evt.currentTarget.closest(".dropdown").classList.remove("is-active")
            await self.make_call(video=action == VIDEO)


CallUI()
loading.remove_loading_screen()