view tests/browser/unit/test_calls.py @ 1594:93abef9a3548

server: catch `DBusException` and retry to connect in this case.
author Goffi <goffi@goffi.org>
date Wed, 13 Dec 2023 22:03:49 +0100
parents 00d04f51787e
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Web
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import sys
import pytest
from unittest.mock import MagicMock, AsyncMock

# Mock all non-stdlib modules


class CancelError(Exception):
    pass


sys.modules["bridge"] = MagicMock(AsyncBridge=AsyncMock)
browser = sys.modules["browser"] = MagicMock(
    aio=MagicMock(), console=MagicMock(), document=MagicMock(), window=MagicMock()
)
sys.modules["cache"] = MagicMock(cache=MagicMock())
dialog = sys.modules["dialog"] = MagicMock()
dialog.CancelError = CancelError
confirm_mock = dialog.Confirm.return_value = MagicMock()
confirm_ashow = confirm_mock.ashow = AsyncMock()
sys.modules["jid"] = MagicMock(JID=MagicMock)
sys.modules["jid_search"] = MagicMock(JidSearch=MagicMock())
sys.modules["loading"] = MagicMock()
sys.modules["template"] = MagicMock(Template=MagicMock())
sys.modules["webrtc"] = MagicMock(WebRTC=MagicMock())

# After mocking the modules, import the actual module
from libervia.web.pages.calls import _browser as calls

calls.cache.fill_identities = AsyncMock()


class TestCallUI:
    @pytest.fixture
    def call_ui(self):
        ui = calls.CallUI()
        ui.webrtc.sid = None
        ui.call_status_tpl = MagicMock()
        ui.call_status_wrapper_elt = MagicMock()
        ui.call_box_elt = MagicMock()
        ui._update_call_button = MagicMock()
        ui.audio_player_elt = MagicMock()
        ui.switch_mode = MagicMock()
        ui.incoming_call_dialog_elt = MagicMock()

        ui.call_status_wrapper_elt.appended_elements = []

        # Mock the `<=` operator for call_status_wrapper_elt
        def mock_append_child(parent, child):
            # Store the child for later assertion
            parent.appended_elements.append(child)
            return child

        ui.call_status_wrapper_elt.__le__ = mock_append_child

        return ui

    def test_status(self, call_ui):
        """Allowed values are set"""
        # Set initial status
        call_ui._status = None
        assert call_ui.status == None

        # Change status
        call_ui.status = "dialing"
        assert call_ui.status == "dialing"

    def test_disallowed_status(self, call_ui):
        """Not allowed status raises an Exception"""
        with pytest.raises(Exception, match="this status is not allowed"):
            call_ui.status = "disallowed_status"

    def test_status_element_update(self, call_ui):
        """A status change update the status element"""
        call_ui._callee = "test_callee"
        mock_cache = calls.cache
        mock_cache.identities = {"test_callee": {"nicknames": ["Test Callee Nickname"]}}

        status_elt_mock = MagicMock()
        call_ui.call_status_tpl.get_elt.return_value = status_elt_mock

        call_ui.status = "dialing"
        call_ui.call_status_tpl.get_elt.assert_called_with(
            {
                "entity": "test_callee",
                "status": "dialing",
                "name": "Test Callee Nickname",
            }
        )
        assert call_ui.call_status_wrapper_elt.appended_elements[-1] == status_elt_mock

    def test_status_element_updates_with_unknown_callee(self, call_ui):
        """A status change update the element even if callee is not known"""
        call_ui._callee = "test_callee"
        mock_cache = calls.cache
        mock_cache.identities = {}

        call_ui.status = "dialing"
        call_ui.call_status_tpl.get_elt.assert_called_with(
            {"entity": "test_callee", "status": "dialing", "name": "test_callee"}
        )

    def test_call_mode(self, call_ui):
        call_ui.call_mode = calls.AUDIO
        assert call_ui.call_mode == calls.AUDIO
        call_ui._update_call_button.assert_called_once()
        call_ui.call_box_elt.select.assert_called_with(".is-video-only")

        call_ui.call_mode = calls.VIDEO
        assert call_ui.call_mode == calls.VIDEO
        call_ui.call_box_elt.select.assert_called_with(".is-video-only")

    def test_call_mode_invalid_mode(self, call_ui):
        with pytest.raises(ValueError, match="Invalid call mode"):
            call_ui.call_mode = "invalid_mode"

    def test_switch_call_mode(self, call_ui):
        call_ui._call_mode = calls.AUDIO
        call_ui.switch_call_mode(None)
        assert call_ui.call_mode == calls.VIDEO

        call_ui._call_mode = calls.VIDEO
        call_ui.switch_call_mode(None)
        assert call_ui.call_mode == calls.AUDIO

    @pytest.mark.asyncio
    async def test_ignore_new_call_when_call_in_progress(self, call_ui):
        """Ignoring new incoming call when a call is already in progress."""
        call_ui.sid = "some_sid"
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        call_ui.audio_player_elt.play.assert_not_called()

    @pytest.mark.asyncio
    async def test_action_new_fill_identities(self, call_ui):
        """Filling identities from cache with the correct JID."""
        calls.cache.identities = {"test@example.org": {"nicknames": ["Test User"]}}

        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        calls.cache.fill_identities.assert_called_once_with(["test@example.org"])

    @pytest.mark.asyncio
    async def test_incoming_call_triggers_audio(self, call_ui):
        """Incoming call triggering the play action on the audio element."""
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        call_ui.audio_player_elt.play.assert_called_once()

    @pytest.mark.asyncio
    async def test_user_accepts_call(self, call_ui):
        """User accepting the incoming call."""
        confirm_ashow.return_value = True
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "video",
            },
            "action_id",
        )
        call_ui.switch_mode.assert_called_once_with("call")

    @pytest.mark.asyncio
    async def test_user_rejects_call(self, call_ui):
        """User rejecting the incoming call."""
        confirm_ashow.return_value = False
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "video",
            },
            "action_id",
        )
        call_ui.switch_mode.assert_not_called()

    @pytest.mark.asyncio
    async def test_call_gets_cancelled(self, call_ui):
        """Incoming call gets cancelled."""
        confirm_ashow.side_effect = dialog.CancelError
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        assert call_ui.sid is None
        call_ui.switch_mode.assert_not_called()

    def test_toggle_fullscreen_enter(self, call_ui, monkeypatch):
        """Entering fullscreen mode for video elements."""
        monkeypatch.setattr(browser.document, "fullscreenElement", None)
        monkeypatch.setattr(call_ui.call_box_elt, "requestFullscreen", MagicMock())

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)
        btn_mock_add = MagicMock()
        btn_mock_remove = MagicMock()

        full_screen_mock = MagicMock(classList=MagicMock(add=btn_mock_add))
        exit_full_screen_mock = MagicMock(classList=MagicMock(remove=btn_mock_remove))

        mock_document = {
            "full_screen_btn": full_screen_mock,
            "exit_full_screen_btn": exit_full_screen_mock,
        }

        getitem_mock = MagicMock()
        monkeypatch.setattr(browser.document, "__getitem__", getitem_mock)
        getitem_mock.side_effect = mock_document.__getitem__

        call_ui.toggle_fullscreen(True)

        call_ui.call_box_elt.requestFullscreen.assert_called_once()
        btn_mock_add.assert_called_with("is-hidden")
        btn_mock_remove.assert_called_with("is-hidden")
        btn_mock_show.assert_not_called()

    def test_toggle_fullscreen_exit(self, call_ui, monkeypatch):
        """Exiting fullscreen mode for video elements."""
        # As it's not None, we're in fullscreen
        monkeypatch.setattr(browser.document, "fullscreenElement", "mocked_value")
        monkeypatch.setattr(browser.document, "exitFullscreen", MagicMock())

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)
        btn_mock_add = MagicMock()
        btn_mock_remove = MagicMock()

        full_screen_mock = MagicMock(classList=MagicMock(remove=btn_mock_remove))
        exit_full_screen_mock = MagicMock(classList=MagicMock(add=btn_mock_add))

        mock_document = {
            "full_screen_btn": full_screen_mock,
            "exit_full_screen_btn": exit_full_screen_mock,
        }

        getitem_mock = MagicMock()
        monkeypatch.setattr(browser.document, "__getitem__", getitem_mock)
        getitem_mock.side_effect = mock_document.__getitem__

        call_ui.toggle_fullscreen(False)

        browser.document.exitFullscreen.assert_called_once()
        btn_mock_add.assert_called_with("is-hidden")
        btn_mock_remove.assert_called_with("is-hidden")
        btn_mock_show.assert_not_called()

    def test_toggle_audio_mute(self, call_ui, monkeypatch):
        """Toggle audio mute."""
        mock_toggle_audio_mute = MagicMock(
            return_value=True
        )
        monkeypatch.setattr(call_ui.webrtc, "toggle_audio_mute", mock_toggle_audio_mute)

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)

        evt_mock = MagicMock(currentTarget=MagicMock(classList=MagicMock()))

        call_ui.toggle_audio_mute(evt_mock)

        mock_toggle_audio_mute.assert_called_once()
        evt_mock.currentTarget.classList.remove.assert_called_with("is-success")
        evt_mock.currentTarget.classList.add.assert_called_with("muted", "is-warning")
        btn_mock_show.assert_called_once_with("audio is now muted", level="info", delay=2)

    def test_toggle_video_mute(self, call_ui, monkeypatch):
        """Toggle video mute."""
        mock_toggle_video_mute = MagicMock(
            return_value=True
        )
        monkeypatch.setattr(call_ui.webrtc, "toggle_video_mute", mock_toggle_video_mute)

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)

        evt_mock = MagicMock(currentTarget=MagicMock(classList=MagicMock()))

        call_ui.toggle_video_mute(evt_mock)

        mock_toggle_video_mute.assert_called_once()
        evt_mock.currentTarget.classList.remove.assert_called_with("is-success")
        evt_mock.currentTarget.classList.add.assert_called_with("muted", "is-warning")
        btn_mock_show.assert_called_once_with("video is now muted", level="info", delay=2)