view tests/browser/unit/test_calls.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents 00d04f51787e
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Web
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import sys
import pytest
from unittest.mock import MagicMock, AsyncMock

# Mock all non-stdlib modules


class CancelError(Exception):
    pass


sys.modules["bridge"] = MagicMock(AsyncBridge=AsyncMock)
browser = sys.modules["browser"] = MagicMock(
    aio=MagicMock(), console=MagicMock(), document=MagicMock(), window=MagicMock()
)
sys.modules["cache"] = MagicMock(cache=MagicMock())
dialog = sys.modules["dialog"] = MagicMock()
dialog.CancelError = CancelError
confirm_mock = dialog.Confirm.return_value = MagicMock()
confirm_ashow = confirm_mock.ashow = AsyncMock()
sys.modules["jid"] = MagicMock(JID=MagicMock)
sys.modules["jid_search"] = MagicMock(JidSearch=MagicMock())
sys.modules["loading"] = MagicMock()
sys.modules["template"] = MagicMock(Template=MagicMock())
sys.modules["webrtc"] = MagicMock(WebRTC=MagicMock())

# After mocking the modules, import the actual module
from libervia.web.pages.calls import _browser as calls

calls.cache.fill_identities = AsyncMock()


class TestCallUI:
    @pytest.fixture
    def call_ui(self):
        ui = calls.CallUI()
        ui.webrtc.sid = None
        ui.call_status_tpl = MagicMock()
        ui.call_status_wrapper_elt = MagicMock()
        ui.call_box_elt = MagicMock()
        ui._update_call_button = MagicMock()
        ui.audio_player_elt = MagicMock()
        ui.switch_mode = MagicMock()
        ui.incoming_call_dialog_elt = MagicMock()

        ui.call_status_wrapper_elt.appended_elements = []

        # Mock the `<=` operator for call_status_wrapper_elt
        def mock_append_child(parent, child):
            # Store the child for later assertion
            parent.appended_elements.append(child)
            return child

        ui.call_status_wrapper_elt.__le__ = mock_append_child

        return ui

    def test_status(self, call_ui):
        """Allowed values are set"""
        # Set initial status
        call_ui._status = None
        assert call_ui.status == None

        # Change status
        call_ui.status = "dialing"
        assert call_ui.status == "dialing"

    def test_disallowed_status(self, call_ui):
        """Not allowed status raises an Exception"""
        with pytest.raises(Exception, match="this status is not allowed"):
            call_ui.status = "disallowed_status"

    def test_status_element_update(self, call_ui):
        """A status change update the status element"""
        call_ui._callee = "test_callee"
        mock_cache = calls.cache
        mock_cache.identities = {"test_callee": {"nicknames": ["Test Callee Nickname"]}}

        status_elt_mock = MagicMock()
        call_ui.call_status_tpl.get_elt.return_value = status_elt_mock

        call_ui.status = "dialing"
        call_ui.call_status_tpl.get_elt.assert_called_with(
            {
                "entity": "test_callee",
                "status": "dialing",
                "name": "Test Callee Nickname",
            }
        )
        assert call_ui.call_status_wrapper_elt.appended_elements[-1] == status_elt_mock

    def test_status_element_updates_with_unknown_callee(self, call_ui):
        """A status change update the element even if callee is not known"""
        call_ui._callee = "test_callee"
        mock_cache = calls.cache
        mock_cache.identities = {}

        call_ui.status = "dialing"
        call_ui.call_status_tpl.get_elt.assert_called_with(
            {"entity": "test_callee", "status": "dialing", "name": "test_callee"}
        )

    def test_call_mode(self, call_ui):
        call_ui.call_mode = calls.AUDIO
        assert call_ui.call_mode == calls.AUDIO
        call_ui._update_call_button.assert_called_once()
        call_ui.call_box_elt.select.assert_called_with(".is-video-only")

        call_ui.call_mode = calls.VIDEO
        assert call_ui.call_mode == calls.VIDEO
        call_ui.call_box_elt.select.assert_called_with(".is-video-only")

    def test_call_mode_invalid_mode(self, call_ui):
        with pytest.raises(ValueError, match="Invalid call mode"):
            call_ui.call_mode = "invalid_mode"

    def test_switch_call_mode(self, call_ui):
        call_ui._call_mode = calls.AUDIO
        call_ui.switch_call_mode(None)
        assert call_ui.call_mode == calls.VIDEO

        call_ui._call_mode = calls.VIDEO
        call_ui.switch_call_mode(None)
        assert call_ui.call_mode == calls.AUDIO

    @pytest.mark.asyncio
    async def test_ignore_new_call_when_call_in_progress(self, call_ui):
        """Ignoring new incoming call when a call is already in progress."""
        call_ui.sid = "some_sid"
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        call_ui.audio_player_elt.play.assert_not_called()

    @pytest.mark.asyncio
    async def test_action_new_fill_identities(self, call_ui):
        """Filling identities from cache with the correct JID."""
        calls.cache.identities = {"test@example.org": {"nicknames": ["Test User"]}}

        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        calls.cache.fill_identities.assert_called_once_with(["test@example.org"])

    @pytest.mark.asyncio
    async def test_incoming_call_triggers_audio(self, call_ui):
        """Incoming call triggering the play action on the audio element."""
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        call_ui.audio_player_elt.play.assert_called_once()

    @pytest.mark.asyncio
    async def test_user_accepts_call(self, call_ui):
        """User accepting the incoming call."""
        confirm_ashow.return_value = True
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "video",
            },
            "action_id",
        )
        call_ui.switch_mode.assert_called_once_with("call")

    @pytest.mark.asyncio
    async def test_user_rejects_call(self, call_ui):
        """User rejecting the incoming call."""
        confirm_ashow.return_value = False
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "video",
            },
            "action_id",
        )
        call_ui.switch_mode.assert_not_called()

    @pytest.mark.asyncio
    async def test_call_gets_cancelled(self, call_ui):
        """Incoming call gets cancelled."""
        confirm_ashow.side_effect = dialog.CancelError
        await call_ui.on_action_new(
            {
                "from_jid": "test@example.org",
                "session_id": "session123",
                "sub_type": "audio",
            },
            "action_id",
        )
        assert call_ui.sid is None
        call_ui.switch_mode.assert_not_called()

    def test_toggle_fullscreen_enter(self, call_ui, monkeypatch):
        """Entering fullscreen mode for video elements."""
        monkeypatch.setattr(browser.document, "fullscreenElement", None)
        monkeypatch.setattr(call_ui.call_box_elt, "requestFullscreen", MagicMock())

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)
        btn_mock_add = MagicMock()
        btn_mock_remove = MagicMock()

        full_screen_mock = MagicMock(classList=MagicMock(add=btn_mock_add))
        exit_full_screen_mock = MagicMock(classList=MagicMock(remove=btn_mock_remove))

        mock_document = {
            "full_screen_btn": full_screen_mock,
            "exit_full_screen_btn": exit_full_screen_mock,
        }

        getitem_mock = MagicMock()
        monkeypatch.setattr(browser.document, "__getitem__", getitem_mock)
        getitem_mock.side_effect = mock_document.__getitem__

        call_ui.toggle_fullscreen(True)

        call_ui.call_box_elt.requestFullscreen.assert_called_once()
        btn_mock_add.assert_called_with("is-hidden")
        btn_mock_remove.assert_called_with("is-hidden")
        btn_mock_show.assert_not_called()

    def test_toggle_fullscreen_exit(self, call_ui, monkeypatch):
        """Exiting fullscreen mode for video elements."""
        # As it's not None, we're in fullscreen
        monkeypatch.setattr(browser.document, "fullscreenElement", "mocked_value")
        monkeypatch.setattr(browser.document, "exitFullscreen", MagicMock())

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)
        btn_mock_add = MagicMock()
        btn_mock_remove = MagicMock()

        full_screen_mock = MagicMock(classList=MagicMock(remove=btn_mock_remove))
        exit_full_screen_mock = MagicMock(classList=MagicMock(add=btn_mock_add))

        mock_document = {
            "full_screen_btn": full_screen_mock,
            "exit_full_screen_btn": exit_full_screen_mock,
        }

        getitem_mock = MagicMock()
        monkeypatch.setattr(browser.document, "__getitem__", getitem_mock)
        getitem_mock.side_effect = mock_document.__getitem__

        call_ui.toggle_fullscreen(False)

        browser.document.exitFullscreen.assert_called_once()
        btn_mock_add.assert_called_with("is-hidden")
        btn_mock_remove.assert_called_with("is-hidden")
        btn_mock_show.assert_not_called()

    def test_toggle_audio_mute(self, call_ui, monkeypatch):
        """Toggle audio mute."""
        mock_toggle_audio_mute = MagicMock(
            return_value=True
        )
        monkeypatch.setattr(call_ui.webrtc, "toggle_audio_mute", mock_toggle_audio_mute)

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)

        evt_mock = MagicMock(currentTarget=MagicMock(classList=MagicMock()))

        call_ui.toggle_audio_mute(evt_mock)

        mock_toggle_audio_mute.assert_called_once()
        evt_mock.currentTarget.classList.remove.assert_called_with("is-success")
        evt_mock.currentTarget.classList.add.assert_called_with("muted", "is-warning")
        btn_mock_show.assert_called_once_with("audio is now muted", level="info", delay=2)

    def test_toggle_video_mute(self, call_ui, monkeypatch):
        """Toggle video mute."""
        mock_toggle_video_mute = MagicMock(
            return_value=True
        )
        monkeypatch.setattr(call_ui.webrtc, "toggle_video_mute", mock_toggle_video_mute)

        btn_mock_show = MagicMock()
        monkeypatch.setattr("dialog.notification.show", btn_mock_show)

        evt_mock = MagicMock(currentTarget=MagicMock(classList=MagicMock()))

        call_ui.toggle_video_mute(evt_mock)

        mock_toggle_video_mute.assert_called_once()
        evt_mock.currentTarget.classList.remove.assert_called_with("is-success")
        evt_mock.currentTarget.classList.add.assert_called_with("muted", "is-warning")
        btn_mock_show.assert_called_once_with("video is now muted", level="info", delay=2)