view libervia/web/pages/calls/_browser/__init__.py @ 1618:5d9889f14012 default tip @

server: start major redesign - Add icons to menu items - Switch menu items representation from tuple to dictionary for future extensibility: - Include icon information - Prepare for additional data - Remove "login" from main menu, add login page URL to template data, as it is now a separate right-aligned item
author Goffi <goffi@goffi.org>
date Sat, 26 Oct 2024 23:07:01 +0200
parents 6bfeb9f0fb84
children
line wrap: on
line source

import json

from bridge import AsyncBridge as Bridge
from browser import aio, console as log, document, window
from cache import cache
import dialog
from javascript import JSObject, NULL
from jid import JID
from jid_search import JidSearch
import loading
from template import Template
from webrtc import WebRTC

log.warning = log.warn
profile = window.profile or ""
own_jid = JID(window.own_jid)
bridge = Bridge()
GATHER_TIMEOUT = 10000
ALLOWED_STATUSES = (
    None,
    "dialing",
    "ringing",
    "in-call",
    "on-hold",
    "connecting",
    "connection-lost",
    "reconnecting",
)
AUDIO = "audio"
VIDEO = "video"
REMOTE = "remote-control"
ALLOWED_CALL_MODES = {AUDIO, VIDEO, REMOTE}
INACTIVE_CLASS = "inactive"
MUTED_CLASS = "muted"
SCREEN_OFF_CLASS = "screen-off"
MUJI_PREFIX = "_muji_"


class CallUI:
    def __init__(self):
        self.webrtc = WebRTC(
            screen_sharing_cb=self.on_sharing_screen,
            on_connection_established_cb=self.on_connection_established,
            on_reconnect_cb=self.on_reconnect,
            on_connection_lost_cb=self.on_connection_lost,
            on_video_devices=self.on_video_devices,
            on_reset_cb=self.on_reset_cb,
            local_video_elt=document["local_video"],
            remote_video_elt=document["remote_video"]
        )
        # mapping of file sending
        self.files_webrtc: list[dict] = []
        self.mode = "search"
        self._status = None
        self._callee: JID|None = None
        self._group_call_room: JID|None = None
        self._group_call_peers: dict = {}
        self.is_conference = False
        self.contacts_elt = document["contacts"]
        self.search_container_elt = document["search_container"]
        self.call_container_elt = document["call_container"]
        self.group_call_container_elt = document["group_call_container"]
        self.call_box_elt = document["call_box"]
        self.call_avatar_wrapper_elt = document["call_avatar_wrapper"]
        self.call_status_wrapper_elt = document["call_status_wrapper"]
        self.call_avatar_tpl = Template("call/call_avatar.html")
        self.call_status_tpl = Template("call/call_status.html")
        self.group_peer_tpl = Template("call/group_peer.html")
        self.audio_player_elt = document["audio_player"]
        bridge.register_signal("action_new", self._on_action_new)
        bridge.register_signal("call_info", self._on_call_info)
        bridge.register_signal("call_setup", self._on_call_setup)
        bridge.register_signal("call_ended", self._on_call_ended)
        bridge.register_signal("call_group_setup", self._on_call_group_setup)

        # call/hang up buttons
        self._call_mode = VIDEO
        document["video_call_btn"].bind("click", lambda __: aio.run(self.make_call()))
        document["audio_call_btn"].bind(
            "click", lambda __: aio.run(self.make_call(video=False))
        )
        document["group_call_btn"].bind(
            "click",
            lambda __: aio.run(self.make_group_call())
        )
        document["hangup_btn"].bind("click", lambda __: aio.run(self.hang_up()))
        document["group_hangup_btn"].bind("click", lambda __: aio.run(self.group_hang_up()))

        # other buttons
        document["full_screen_btn"].bind("click", lambda __: self.toggle_fullscreen())
        document["exit_full_screen_btn"].bind(
            "click", lambda __: self.toggle_fullscreen()
        )
        document["group_full_screen_btn"].bind(
            "click",
            lambda __: self.toggle_fullscreen(group=True)
        )
        document["group_exit_full_screen_btn"].bind(
            "click",
            lambda __: self.toggle_fullscreen(group=True)
        )
        document["mute_audio_btn"].bind("click", self.toggle_audio_mute)
        document["mute_video_btn"].bind("click", self.toggle_video_mute)
        self.share_desktop_col_elt = document["share_desktop_column"]
        if hasattr(window.navigator.mediaDevices, "getDisplayMedia"):
            self.share_desktop_col_elt.classList.remove("is-hidden-touch")
            # screen sharing is supported
            document["share_desktop_btn"].bind("click", self.toggle_screen_sharing)
        else:
            self.share_desktop_col_elt.classList.add("is-hidden")
        document["switch_camera_btn"].bind("click", self.on_switch_camera)
        document["send_file_btn"].bind("click", self.on_send_file)
        document["send_file_input"].bind("change", self._on_send_input_change)

        # search
        self.search_elt = document["search"]
        self.jid_search = JidSearch(
            self.search_elt,
            document["contacts"],
            click_cb=self._on_entity_click,
            allow_multiple_selection=True,
            template="call/search_item.html",
            selection_state_callback=self._on_search_selection,
            options={
                "no_group": True,
                "extra_cb": {
                    ".dropdown-trigger": lambda evt, item: aio.run(
                        self.on_entity_action(evt, "menu", item)
                    ),
                    ".click-to-video": lambda evt, item: aio.run(
                        self.on_entity_action(evt, VIDEO, item)
                    ),
                    ".click-to-audio": lambda evt, item: aio.run(
                        self.on_entity_action(evt, AUDIO, item)
                    ),
                    ".click-to-remote-control": lambda evt, item: aio.run(
                        self.on_entity_action(evt, REMOTE, item)
                    ),
                },
            },
        )
        document["clear_search_btn"].bind("click", self.on_clear_search)

        # incoming call dialog
        self.incoming_call_dialog_elt = None

    @property
    def sid(self) -> str | None:
        return self.webrtc.sid

    @sid.setter
    def sid(self, new_sid) -> None:
        self.webrtc.sid = new_sid

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, new_status):
        if new_status != self._status:
            if new_status not in ALLOWED_STATUSES:
                raise Exception(
                    f"INTERNAL ERROR: this status is not allowed: {new_status!r}"
                )
            tpl_data = {"entity": self._callee, "status": new_status}
            if self._callee is not None:
                try:
                    tpl_data["name"] = cache.identities[self._callee]["nicknames"][0]
                except (KeyError, IndexError):
                    tpl_data["name"] = str(self._callee)
            status_elt = self.call_status_tpl.get_elt(tpl_data)
            self.call_status_wrapper_elt.clear()
            self.call_status_wrapper_elt <= status_elt

        self._status = new_status

    @property
    def call_mode(self):
        return self._call_mode

    @call_mode.setter
    def call_mode(self, mode):
        if mode in ALLOWED_CALL_MODES:
            if self._call_mode == mode:
                return
            log.debug("Switching to {mode} call mode.")
            self._call_mode = mode
            selector = ".is-video-only, .is-not-remote"
            for elt in self.call_box_elt.select(selector):
                if mode == VIDEO:
                    # In video, all elements are visible.
                    elt.classList.remove("is-hidden")
                elif mode == AUDIO:
                    # In audio, we hide video-only elements.
                    if elt.classList.contains("is-video-only"):
                        elt.classList.add("is-hidden")
                    else:
                        elt.classList.remove("is-hidden")
                elif mode == REMOTE:
                    # In remote, we show all video element, except if they are
                    # `is-not-remote`
                    if elt.classList.contains("is-not-remote"):
                        elt.classList.add("is-hidden")
                    else:
                        elt.classList.remove("is-hidden")
                else:
                    raise Exception("This line should never be reached.")
        else:
            raise ValueError("Invalid call mode")

    def set_avatar(self, entity_jid: JID | str) -> None:
        """Set the avatar element from entity_jid

        @param entity_jid: bare jid of the entity
        """
        call_avatar_elt = self.call_avatar_tpl.get_elt(
            {
                "entity": str(entity_jid),
                "identities": cache.identities,
            }
        )
        self.call_avatar_wrapper_elt.clear()
        self.call_avatar_wrapper_elt <= call_avatar_elt

    def _on_action_new(
        self, action_data_s: str, action_id: str, security_limit: int, profile: str
    ) -> None:
        """Called when a call is received

        @param action_data_s: Action data serialized
        @param action_id: Unique identifier for the action
        @param security_limit: Security limit for the action
        @param profile: Profile associated with the action
        """
        action_data = json.loads(action_data_s)
        type_ = action_data.get("type")
        subtype = action_data.get("subtype")
        if type_ in ("confirm", "not_in_roster_leak") and subtype == "file":
            aio.run(self.on_file_preflight(action_data, action_id))
        elif type_ == "file":
            aio.run(self.on_file_proposal(action_data, action_id))
        elif (
            type_ == "confirm"
            and subtype == "muc-invitation"
            # FIXME: Q&D hack until there is a proper group call invitation solution.
            and MUJI_PREFIX in action_data.get("room_jid", "")
        ):
            aio.run(self.on_group_call_proposal(action_data, action_id))
        elif type_ != "call":
            return
        elif MUJI_PREFIX in action_data.get("from_jid", ""):
            aio.run(self.on_group_call_join(action_data, action_id))
        elif self.is_conference and action_data["from_jid"] == self._callee:
            aio.run(self.on_conference_call_join(action_data, action_id))
        else:
            aio.run(self.on_action_new(action_data, action_id))

    def get_human_size(self, size: int|float) -> str:
        """Return size in human-friendly size using SI units"""
        units = ["o","Kio","Mio","Gio"]
        for idx, unit in enumerate(units):
            if size < 1024.0 or idx == len(units)-1:
                return f"{size:.2f}{unit}"
            size /= 1024.0
        raise Exception("Internal Error: this line should never be reached.")

    async def request_file_permission(self, action_data: dict) -> bool:
        """Request permission to download a file."""
        peer_jid = JID(action_data["from_jid"]).bare
        await cache.fill_identities([peer_jid])
        try:
            identity = cache.identities[peer_jid]
        except KeyError:
            peer_name = peer_jid.local
        else:
            peer_name = identity["nicknames"][0]

        file_data = action_data.get("file_data", {})

        file_name = file_data.get('name')
        file_size = file_data.get('size')

        if file_name:
            file_name_msg = 'wants to send you the file "{file_name}"'.format(
                file_name=file_name
            )
        else:
            file_name_msg = 'wants to send you an unnamed file'

        if file_size is not None:
            file_size_msg = "which has a size of {file_size_human}".format(
                file_size_human=self.get_human_size(file_size)
            )
        else:
            file_size_msg = "which has an unknown size"

        file_description = file_data.get('desc')
        if file_description:
            description_msg = " Description: {}.".format(file_description)
        else:
            description_msg = ""

        file_data = action_data.get("file_data", {})

        file_accept_dlg = dialog.Confirm(
            "{peer_name} ({peer_jid}) {file_name_msg} {file_size_msg}.{description_msg} Do you "
            "accept?".format(
                peer_name=peer_name,
                peer_jid=peer_jid,
                file_name_msg=file_name_msg,
                file_size_msg=file_size_msg,
                description_msg=description_msg
            ),
            ok_label="Download",
            cancel_label="Reject"
        )
        return await file_accept_dlg.ashow()

    async def on_file_preflight(self, action_data: dict, action_id: str) -> None:
        """Handle a file preflight (proposal made to all devices)."""
        # FIXME: temporarily done in call page, will be moved to notifications handler to
        #   make it work anywhere.
        accepted = await self.request_file_permission(action_data)

        await bridge.action_launch(
            action_id, json.dumps({"answer": str(accepted).lower()})
        )

    async def on_file_proposal(self, action_data: dict, action_id: str) -> None:
        """Handle a file proposal.

        This is a proposal made specifically to this device, a opposed to
        ``on_file_preflight``. File may already have been accepted during preflight.
        """
        # FIXME: as for on_file_preflight, this will be moved to notification handler.
        if not action_data.get("webrtc", False):
            peer_jid = JID(action_data["from_jid"]).bare
            # We try to do a not-too-technical warning about webrtc not being supported.
            dialog.notification.show(
                f"A file sending from {peer_jid} can't be accepted because it is not "
                "compatible with web browser direct transfer (WebRTC).",
                level="warning",
            )
            # We don't explicitly refuse the file proposal, because it may be accepted and
            # supported by other frontends.
            # TODO: Check if any other frontend is connected for this profile, and refuse
            # the file if none is.
            return
        if action_data.get("pre_accepted", False):
            # File proposal has already been accepted in preflight.
            accepted = True
        else:
            accepted = await self.request_file_permission(action_data)

        if accepted:
            sid = action_data["session_id"]
            webrtc = WebRTC(
                file_only=True,
                extra_data={"file_data": action_data.get("file_data", {})}
            )
            webrtc.sid = sid
            self.files_webrtc.append({
                "webrtc": webrtc,
            })

        await bridge.action_launch(
            action_id, json.dumps({"answer": str(accepted).lower()})
        )

    async def on_group_call_proposal(self, action_data: dict, action_id: str) -> None:
        """Handle a group call proposal."""
        peer_jid = JID(action_data["from_jid"]).bare
        await cache.fill_identities([peer_jid])
        identity = cache.identities[peer_jid]
        peer_name = identity["nicknames"][0]

        group_call_accept_dlg = dialog.Confirm(
            "{peer_name} ({peer_jid}) proposes a group call to you. Do you accept?"
            .format(
                peer_name=peer_name,
                peer_jid=peer_jid,
            ),
            ok_label="Accept Group Call",
            cancel_label="Reject"
        )
        accepted = await group_call_accept_dlg.ashow()
        if accepted:
            self.switch_mode("group_call")
        await bridge.action_launch(
            action_id, json.dumps({"answer": str(accepted).lower()})
        )

    async def on_group_call_join(self, action_data: dict, action_id: str) -> None:
        peer_jid = JID(action_data["from_jid"])
        if peer_jid.bare != self._group_call_room:
            log.warning(
                f"Refusing group call join as were are not expecting any from this room.\n"
                f"{peer_jid.bare=} {self._group_call_room=}"
            )
            return
        log.info(f"{peer_jid} joined the group call.")

        group_video_grid_elt = document["group_video_grid"]
        await cache.fill_identities([peer_jid])

        group_peer_elt = self.group_peer_tpl.get_elt({
            "entity": str(peer_jid),
            "identities": cache.identities,
        })
        for pin_item in group_peer_elt.select('.action_pin'):
            pin_item.bind('click', self.toggle_pin)
        group_video_grid_elt <= group_peer_elt

        peer_video_stream_elt = group_peer_elt.select_one(".peer_video_stream")
        assert peer_video_stream_elt is not None
        webrtc = WebRTC(
            remote_video_elt=peer_video_stream_elt
        )
        sid = webrtc.sid = action_data["session_id"]
        self._group_call_peers[peer_jid] = {
            "webrtc": webrtc,
            "element": group_peer_elt,
            "sid": sid
        }

        log.debug(f"Call SID: {sid}")

        await bridge.action_launch(action_id, json.dumps({"cancelled": False}))

    async def on_conference_call_join(self, action_data: dict, action_id: str) -> None:
        log.debug(f"on_conference_call_join {action_data=}")
        peer_jid = JID(action_data["from_jid"])
        try:
            user_jid = JID(action_data["metadata"]["user"])
        except KeyError:
            user_jid = peer_jid
        log.info(f"{user_jid} joined the conference call.")

        group_video_grid_elt = document["group_video_grid"]
        await cache.fill_identities([str(user_jid)])

        group_peer_elt = self.group_peer_tpl.get_elt({
            "entity": str(user_jid),
            "identities": cache.identities,
        })
        for pin_item in group_peer_elt.select('.action_pin'):
            pin_item.bind('click', self.toggle_pin)
        group_video_grid_elt <= group_peer_elt
        peer_video_stream_elt = group_peer_elt.select_one(".peer_video_stream")
        assert peer_video_stream_elt is not None
        log.debug(f"starting webrtc for {peer_jid} on {peer_video_stream_elt}")
        webrtc = WebRTC(
            remote_video_elt=peer_video_stream_elt
        )
        sid = webrtc.sid = action_data["session_id"]
        self._group_call_peers[peer_jid] = {
            "webrtc": webrtc,
            "element": group_peer_elt,
            "sid": sid
        }

        log.debug(f"Somebody joined conference call. SID: {sid}")

        await bridge.action_launch(action_id, json.dumps({"cancelled": False}))

    async def on_action_new(self, action_data: dict, action_id: str) -> None:
        peer_jid = JID(action_data["from_jid"]).bare
        call_type = action_data["sub_type"]
        call_emoji = "📹" if call_type == VIDEO else "📞"
        log.info(f"{peer_jid} wants to start a call ({call_type}).")
        if self.sid is not None:
            log.warning(
                f"already in a call ({self.sid}), can't receive a new call from "
                f"{peer_jid}"
            )
            return
        sid = self.sid = action_data["session_id"]
        await cache.fill_identities([peer_jid])
        try:
            identity = cache.identities[peer_jid]
        except KeyError:
            peer_name = peer_jid.local
        else:
            peer_name = identity["nicknames"][0]
        self._callee = peer_jid

        # we start the ring
        self.audio_player_elt.play()

        # and ask user if we take the call
        try:
            self.incoming_call_dialog_elt = dialog.Confirm(
                f"{peer_name} is calling you ({call_emoji}{call_type}).", ok_label="Answer", cancel_label="Reject"
            )
            accepted = await self.incoming_call_dialog_elt.ashow()
        except dialog.CancelError as e:
            log.info("Call has been cancelled")
            self.incoming_call_dialog_elt = None
            self.sid = None
            match e.reason:
                case "busy":
                    dialog.notification.show(
                        f"{peer_name} can't answer your call",
                        level="info",
                    )
                case "taken_by_other_device":
                    device = e.text
                    dialog.notification.show(
                        f"The call has been taken on another device ({device}).",
                        level="info",
                    )
                case _:
                    dialog.notification.show(
                        f"{peer_name} has cancelled the call",
                        level="info"
                    )
            return

        self.incoming_call_dialog_elt = None

        # we stop the ring
        self.audio_player_elt.pause()
        self.audio_player_elt.currentTime = 0

        if accepted:
            log.debug(f"Call SID: {sid}")

            # Answer the call
            self.call_mode = call_type
            self.set_avatar(peer_jid)
            self.status = "connecting"
            self.switch_mode("call")
        else:
            log.info(f"your are declining the call from {peer_jid}")
            self.sid = None
        await bridge.action_launch(action_id, json.dumps({"cancelled": not accepted}))

    def _on_call_ended(self, session_id: str, data_s: str, profile: str) -> None:
        """Call has been terminated

        @param session_id: Session identifier
        @param data_s: Serialised additional data on why the call has ended
        @param profile: Profile associated
        """
        if self.sid is None:
            log.debug("there are no calls in progress")
            return
        if session_id != self.sid:
            log.debug(
                f"ignoring call_ended not linked to our call ({self.sid}): {session_id}"
            )
            return
        aio.run(self.end_call(json.loads(data_s)))

    def _on_call_info(self, session_id: str, info_type, info_data_s: str, profile: str):
        if self.sid != session_id:
            return
        if info_type == "ringing":
            self.status = "ringing"

    def _on_call_setup(self, session_id: str, setup_data_s: str, profile: str) -> None:
        """Called when we have received answer SDP from responder

        @param session_id: Session identifier
        @param sdp: Session Description Protocol data
        @param profile: Profile associated with the action
        """
        aio.run(self.on_call_setup(session_id, json.loads(setup_data_s), profile))

    async def on_call_setup(
        self, session_id: str, setup_data: dict, profile: str
    ) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param setup_data: Data with following keys:
            role: initiator or responser
            sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        if self.sid == session_id:
            webrtc = self.webrtc
        else:
            for file_webrtc in self.files_webrtc:
                webrtc = file_webrtc["webrtc"]
                if webrtc.sid == session_id:
                    break
            else:
                for peer_data in self._group_call_peers.values():
                    webrtc = peer_data["webrtc"]
                    if webrtc.sid == session_id:
                        break
                    else:
                        log.debug(
                            f"Call ignored due to different session ID ({self.sid=} "
                            f"{session_id=})"
                        )
                        return
        try:
            role = setup_data["role"]
            sdp = setup_data["sdp"]
        except KeyError:
            dialog.notification.show(
                f"Invalid setup data received: {setup_data}", level="error"
            )
            return
        if role == "initiator":
            await webrtc.accept_call(session_id, sdp, profile)
        elif role == "responder":
            await webrtc.answer_call(
                session_id, sdp, profile, conference=self.is_conference
            )
        else:
            dialog.notification.show(
                f"Invalid role received during setup: {setup_data}", level="error"
            )
            return

    def _on_call_group_setup(
        self,
        room_jid_s: str,
        setup_data_s: str,
        profile: str
    ) -> None:
        """Called when we are finishing preparation of a group call.

        @param room_jid_s: JID of the room used for group call coordination.
        @param setup_data_s: serialised data of group call options, such as codec
            restrictions.
        @param profile: Profile associated with the action
        """
        if setup_data_s:
            setup_data = json.loads(setup_data_s)
        else:
            setup_data = {}
        aio.run(
            self.on_call_group_setup(
                JID(room_jid_s),
                setup_data,
                profile
            )
        )

    async def on_call_group_setup(
        self, room_jid: JID, setup_data: dict, profile: str
    ) -> None:
        """Call has been accepted, connection can be established

        @param session_id: Session identifier
        @param setup_data: Data with following keys:
            role: initiator or responser
            sdp: Session Description Protocol data
        @param profile: Profile associated
        """
        log.info(f"Setting up group call at {room_jid}.")
        try:
            to_call = setup_data["to_call"]
        except KeyError:
            dialog.notification.show(
                'Internal error: missing "to_call" data.', level="error"
            )
            return

        # we need a remote_video_elt to instantiate, but it won't be used.
        webrtc = WebRTC(remote_video_elt=document["remote_video"])
        call_data = await webrtc.prepare_call()

        # we have just used this WebRTC instance to get calling data.
        await webrtc.end_call()
        del webrtc
        await bridge.call_group_data_set(
            str(room_jid),
            json.dumps(call_data),
        )
        # At this point, we can initiate the call.
        # As per specification, we call each entity which was preparing when we started
        # our own preparation.
        group_video_grid_elt = document["group_video_grid"]
        local_stream = None
        for entity_jid_s in to_call:
            entity_jid = JID(entity_jid_s)
            log.info(f"Calling {entity_jid_s}.")
            await cache.fill_identities([entity_jid])
            group_peer_elt = self.group_peer_tpl.get_elt({
                "entity": str(entity_jid),
                "identities": cache.identities,
            })
            for pin_item in group_peer_elt.select('.action_pin'):
                pin_item.bind('click', self.toggle_pin)
            group_video_grid_elt <= group_peer_elt
            peer_video_stream_elt = group_peer_elt.select_one(".peer_video_stream")
            assert peer_video_stream_elt is not None
            webrtc = WebRTC(
                remote_video_elt=peer_video_stream_elt,
                local_stream=local_stream
            )

            self._group_call_peers[JID(entity_jid)] = {
                "webrtc": webrtc
            }
            await webrtc.make_call(entity_jid)
            # we save the local stream to re-use it with next WebRTC instance.
            if local_stream is None:
                local_stream = webrtc.local_stream

    def on_connection_established(self):
        self.status = "in-call"

    def on_reconnect(self):
        self.status = "reconnecting"

    def on_connection_lost(self):
        self.status = "connection-lost"

    def on_video_devices(self, has_multiple_cameras: bool) -> None:
        switch_camera_col_elt = document["switch_camera_column"]
        if has_multiple_cameras:
            switch_camera_col_elt.classList.remove("is-hidden", "is-hidden-desktop")
        else:
            switch_camera_col_elt.classList.add("is-hidden")

    def on_reset_cb(self) -> None:
        """Call when webRTC connection is reset, we reset buttons statuses"""
        document["full_screen_btn"].classList.remove("is-hidden")
        document["exit_full_screen_btn"].classList.add("is-hidden")
        document["group_full_screen_btn"].classList.remove("is-hidden")
        document["group_exit_full_screen_btn"].classList.add("is-hidden")
        for btn_elt in document["mute_audio_btn"], document["mute_video_btn"]:
            btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            btn_elt.classList.add("is-success")

    async def make_call(
        self,
        audio: bool = True,
        video: bool = True,
        remote: bool = False
    ) -> None:
        """Start a WebRTC call

        @param audio: True if an audio flux is required
        @param video: True if a video flux is required
        @param remote: True if this is a Remote Control session.
        """
        if remote:
            self.call_mode = REMOTE
        elif video:
            self.call_mode = VIDEO
        else:
            self.call_mode = AUDIO
        try:
            callee_jid = JID(self.search_elt.value.strip())
            if not callee_jid.is_valid:
                raise ValueError
        except ValueError:
            dialog.notification.show(
                "Invalid identifier, please use a valid callee identifier", level="error"
            )
            return

        self._callee = callee_jid
        await cache.fill_identities([callee_jid])

        self.is_conference = False
        try:
            disco_identities = cache.identities[callee_jid]["identities"]
        except KeyError:
            pass
        else:
            for disco_identity in disco_identities:
                if (
                    disco_identity.get("category") == "conference"
                    and disco_identity.get("type") == "audio-video"
                ):
                    self.is_conference = True
                    log.info(f"{callee_jid} is an A/V Conference room.")


        self.status = "dialing"
        self.set_avatar(callee_jid)

        self.switch_mode("group_call" if self.is_conference else "call" )
        if remote:
            await self.webrtc.start_remote_control(
                callee_jid, audio, video
            )
        else:
            if self.is_conference:
                direction = "sendonly"
                group_video_grid_elt = document["group_video_grid"]
                await cache.fill_identities([str(own_jid.bare)])
                group_peer_elt = self.group_peer_tpl.get_elt({
                    "entity": str(own_jid.bare),
                    "identities": cache.identities,
                })
                for pin_item in group_peer_elt.select('.action_pin'):
                    pin_item.bind('click', self.toggle_pin)
                group_video_grid_elt <= group_peer_elt
                peer_video_stream_elt = group_peer_elt.select_one(".peer_video_stream")
                assert peer_video_stream_elt is not None
                self.webrtc.local_video_elt = peer_video_stream_elt
            else:
                direction = "sendrecv"
                self.webrtc.local_video_elt = document["local_video"]

            await self.webrtc.make_call(
                callee_jid,
                audio,
                video,
                direction
            )

    async def make_group_call(
        self,
    ) -> None:
        """Start a group call.

        This will run a call for small group, using MUJI (XEP-0272).
        """
        group_video_grid_elt = document["group_video_grid"]
        group_video_grid_elt.clear()
        self._group_call_peers.clear()
        selected_jids = self.jid_search.selected_jids

        await cache.fill_identities(selected_jids)

        self.switch_mode("group_call")
        group_call_data = json.loads(
            await bridge.call_group_start(self.jid_search.selected_jids, "")
        )
        self._group_call_room = JID(group_call_data["room_jid"])

    async def end_call(self, data: dict) -> None:
        """Stop streaming and clean instance"""
        # if there is any ringing, we stop it
        self.audio_player_elt.pause()
        self.audio_player_elt.currentTime = 0
        reason = data.get("reason", "")
        text = data.get("text", "")

        if self.incoming_call_dialog_elt is not None:
            self.incoming_call_dialog_elt.cancel(reason, text)
            self.incoming_call_dialog_elt = None

        self.switch_mode("search")

        await self.webrtc.end_call()

    async def hang_up(self) -> None:
        """Terminate the call"""
        session_id = self.sid
        if not session_id:
            log.warning("Can't hand_up, no call in progress")
            return
        await self.end_call({"reason": "terminated"})
        await bridge.call_end(session_id, "")

    async def group_hang_up(self) -> None:
        """Terminate the group_call"""
        self.switch_mode("search")
        group_video_grid_elt = document["group_video_grid"]
        group_video_grid_elt.clear()
        for peer_data in self._group_call_peers.values():
            webrtc = peer_data["webrtc"]
            await webrtc.end_call()
        self._group_call_peers.clear()
        self._group_call_room = None

    def _handle_animation_end(
        self,
        element,
        remove=None,
        add=None,
    ):
        """Return a handler that removes specified classes and the event handler.

        @param element: The element to operate on.
        @param remove: List of class names to remove from the element.
        @param add: List of class names to add to the element.
        """

        def handler(__, remove=remove, add=add):
            log.info(f"animation end OK {element=}")
            if add:
                if isinstance(add, str):
                    add = [add]
                element.classList.add(*add)
            if remove:
                if isinstance(remove, str):
                    remove = [remove]
                element.classList.remove(*remove)
            element.unbind("animationend", handler)

        return handler

    def switch_mode(self, mode: str) -> None:
        """Handles the user interface changes"""
        if mode == self.mode:
            return

        # Exiting from any other modes
        exit_animate_list = [
            (self.search_container_elt, "fade-out-y", "is-hidden"),
            (self.call_container_elt, "slide-in", "is-hidden"),
            (self.group_call_container_elt, "slide-in", "is-hidden"),
        ]

        for elt, anim, hide in exit_animate_list:
            if not elt.classList.contains(hide):  # Only animate if visible
                elt.classList.add(anim)
                elt.bind("animationend", self._handle_animation_end(elt, remove=anim, add=hide))

        # Entering into the new mode
        if mode == "call":
            self.call_container_elt.classList.remove("is-hidden")
            self.call_container_elt.classList.add("slide-in")
            self.call_container_elt.bind(
                "animationend",
                self._handle_animation_end(self.call_container_elt, remove="slide-in"),
            )

        elif mode == "group_call":
            self.group_call_container_elt.classList.remove("is-hidden")
            self.group_call_container_elt.classList.add("slide-in")
            self.group_call_container_elt.bind(
                "animationend",
                self._handle_animation_end(self.group_call_container_elt, remove="slide-in"),
            )

        elif mode == "search":
            self.toggle_fullscreen(False)
            self.search_container_elt.classList.remove("is-hidden")
            self.search_container_elt.classList.add("fade-out-y", "animation-reverse")
            self.search_container_elt.bind(
                "animationend",
                self._handle_animation_end(
                    self.search_container_elt,
                    remove=["fade-out-y", "animation-reverse"],
                ),
            )

        else:
            log.error(f"Internal Error: Unknown mode: {mode}")
            return

        self.mode = mode

    def on_clear_search(self, ev) -> None:
        """Clear the search input and trigger its 'input' event.

        @param ev: the event object from the button click.
        """
        if not self.search_elt.value:
            return
        # clear the search field
        self.search_elt.value = ""
        # and dispatch the input event so items are updated
        self.search_elt.dispatchEvent(window.Event.new("input"))

    def _on_search_selection(self, has_selection: bool) -> None:
        """Hide show buttons from search bar according to selection.

        If at least one search item is selected, the "group call" button will be shown,
        otherwise the "video call" and "audio call" button will be shown
        """
        if has_selection:
            to_hide = ["video_call_btn", "audio_call_btn"]
            to_show = ["group_call_btn"]
        else:
            to_hide = ["group_call_btn"]
            to_show = ["video_call_btn", "audio_call_btn"]
        for elt_id in to_hide:
            document[elt_id].parent.classList.add("is-hidden")
        for elt_id in to_show:
            document[elt_id].parent.classList.remove("is-hidden")

    def toggle_fullscreen(self, fullscreen: bool | None = None, group=False):
        """Toggle fullscreen mode for video elements.

        @param fullscreen: if set, determine the fullscreen state; otherwise,
            the fullscreen mode will be toggled.
        """
        if fullscreen is None:
            fullscreen = document.fullscreenElement is NULL
        log.debug(f"toggle_fullscreen {fullscreen=} {group=}")

        full_screen_cls = "full_screen_btn"
        exit_full_screen_cls = "exit_full_screen_btn"
        if group:
            full_screen_cls = f"group_{full_screen_cls}"
            exit_full_screen_cls = f"group_{exit_full_screen_cls}"
            parent_elt = self.group_call_container_elt
        else:
            parent_elt = self.call_box_elt

        try:
            if fullscreen:
                if document.fullscreenElement is NULL:
                    parent_elt.requestFullscreen()
                    document[full_screen_cls].classList.add("is-hidden")
                    document[exit_full_screen_cls].classList.remove("is-hidden")
            else:
                if document.fullscreenElement is not NULL:
                    document.exitFullscreen()
                    document[full_screen_cls].classList.remove("is-hidden")
                    document[exit_full_screen_cls].classList.add("is-hidden")

        except Exception as e:
            dialog.notification.show(
                f"An error occurred while toggling fullscreen: {e}", level="error"
            )

    def toggle_pin(self, event):
        peer_container = event.target.closest('.peer_video_container')
        is_pinned = peer_container.dataset.pinned == 'true'

        # Unpin all peers
        for container in self.group_call_container_elt.select('.peer_video_container'):
            container.dataset.pinned = 'false'
            container.classList.remove('is-12')
            container.classList.add('is-3-widescreen', 'is-4-desktop', 'is-6-tablet')

        if not is_pinned:
            # Pin the selected peer
            peer_container.dataset.pinned = 'true'
            peer_container.classList.remove('is-3-widescreen', 'is-4-desktop', 'is-6-tablet')
            peer_container.classList.add('is-12')

        # Rearrange the grid
        grid = document['group_video_grid']
        pinned = [peer for peer in grid.children if peer.dataset.pinned == 'true']
        unpinned = [peer for peer in grid.children if peer.dataset.pinned != 'true']

        grid.clear()
        for peer in pinned + unpinned:
            grid <= peer

    def toggle_audio_mute(self, evt):
        is_muted = self.webrtc.toggle_audio_mute()
        btn_elt = evt.currentTarget
        if is_muted:
            btn_elt.classList.remove("is-success")
            btn_elt.classList.add(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            dialog.notification.show(
                f"audio is now muted",
                level="info",
                delay=2,
            )
        else:
            btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            btn_elt.classList.add("is-success")

    def toggle_video_mute(self, evt):
        is_muted = self.webrtc.toggle_video_mute()
        btn_elt = evt.currentTarget
        if is_muted:
            btn_elt.classList.remove("is-success")
            btn_elt.classList.add(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            dialog.notification.show(
                f"video is now muted",
                level="info",
                delay=2,
            )
        else:
            btn_elt.classList.remove(INACTIVE_CLASS, MUTED_CLASS, "is-warning")
            btn_elt.classList.add("is-success")

    def toggle_screen_sharing(self, evt):
        aio.run(self.webrtc.toggle_screen_sharing())

    def on_sharing_screen(self, sharing: bool) -> None:
        """Called when screen sharing state changes"""
        share_desktop_btn_elt = document["share_desktop_btn"]
        if sharing:
            share_desktop_btn_elt.classList.add("is-danger")
            share_desktop_btn_elt.classList.remove(INACTIVE_CLASS, SCREEN_OFF_CLASS)
        else:
            share_desktop_btn_elt.classList.remove("is-danger")
            share_desktop_btn_elt.classList.add(INACTIVE_CLASS, SCREEN_OFF_CLASS)

    def on_switch_camera(self, __) -> None:
        aio.run(self.webrtc.switch_camera())

    def on_send_file(self, __) -> None:
        document["send_file_input"].click()

    def _on_send_input_change(self, evt) -> None:
        aio.run(self.on_send_input_change(evt))

    async def on_send_input_change(self, evt) -> None:
        assert self._callee is not None
        files = evt.currentTarget.files
        for file in files:
            webrtc = WebRTC(file_only=True)
            self.files_webrtc.append({
                "file": file,
                "webrtc": webrtc
            })
            await webrtc.send_file(self._callee, file)

    def _on_entity_click(self, evt: JSObject, item: dict) -> None:
        aio.run(self.on_entity_click(evt, item))

    async def on_entity_click(self, evt: JSObject, item: dict) -> None:
        """Set entity JID to search bar, and start the call"""
        # we don't want to start a call when there is a selection, has a group call is
        # expected, and a click may just be accidental.
        if self.jid_search.has_selection:
            checkbox_elt = evt.currentTarget.select_one("input[type='checkbox']")
            if checkbox_elt is not None:
                checkbox_elt.checked = not checkbox_elt.checked
                checkbox_elt.dispatchEvent(window.Event.new("change"))
        else:
            self.search_elt.value = item["entity"]
            await self.make_call()

    async def on_entity_action(self, evt, action: str, item: dict) -> None:
        """Handle extra actions on search items"""
        evt.stopPropagation()
        if action == "menu":
            evt.currentTarget.parent.classList.toggle("is-active")
        elif action in (VIDEO, AUDIO, REMOTE):
            self.search_elt.value = item["entity"]
            # we want the dropdown to be inactive
            evt.currentTarget.closest(".dropdown").classList.remove("is-active")
            if action == REMOTE:
                await self.make_call(audio=False, video=True, remote=True)

            else:
                await self.make_call(video=action == VIDEO)


CallUI()
loading.remove_loading_screen()