view libervia/web/server/restricted_bridge.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents 7941444c1671
children 0a4433a343a3
line wrap: on
line source

#!/usr/bin/env python3

# Libervia Web Frontend
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from libervia.backend.tools.common import data_format
from libervia.backend.core import exceptions
from libervia.web.server.constants import Const as C


class RestrictedBridge:
    """bridge with limited access, which can be used in browser

    Only a few method are implemented, with potentially dangerous argument controlled.
    Security limit is used
    """

    def __init__(self, host):
        self.host = host
        self.security_limit = C.SECURITY_LIMIT

    def no_service_profile(self, profile):
        """Raise an error if service profile is used"""
        if profile == C.SERVICE_PROFILE:
            raise exceptions.PermissionError(
                "This action is not allowed for service profile"
            )

    async def action_launch(
        self, callback_id: str, data_s: str, profile: str
    ) -> str:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "action_launch", callback_id, data_s, profile
        )

    async def bookmarks_list(
        self,
        type_: str,
        storage_location: str,
        profile: str
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "bookmarks_list", type_, storage_location, profile
        )

    async def call_start(self, entity: str, call_data_s: str, profile: str) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "call_start", entity, call_data_s, profile
        )

    async def call_answer_sdp(
        self, session_id: str, answer_sdp: str, profile: str
    ) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "call_answer_sdp", session_id, answer_sdp, profile
        )

    async def call_info(
        self, session_id: str, info_type: str, extra_s: str, profile: str
    ) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "call_info", session_id, info_type, extra_s, profile
        )

    async def call_end(self, session_id: str, call_data: str, profile: str) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "call_end", session_id, call_data, profile
        )

    async def contacts_get(self, profile):
        return await self.host.bridge_call("contacts_get", profile)

    async def external_disco_get(self, entity, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "external_disco_get", entity, profile)

    async def history_get(
        self,
        from_jid: str,
        to_jid: str,
        limit: int,
        between: bool,
        filters: dict[str, str],
        profile: str
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "history_get", from_jid, to_jid, limit, between, filters, profile
        )

    async def ice_candidates_add(self, session_id, media_ice_data_s, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "ice_candidates_add", session_id, media_ice_data_s, profile
        )

    async def identity_get(self, entity, metadata_filter, use_cache, profile):
        return await self.host.bridge_call(
            "identity_get", entity, metadata_filter, use_cache, profile)

    async def identities_get(self, entities, metadata_filter, profile):
        return await self.host.bridge_call(
            "identities_get", entities, metadata_filter, profile)

    async def identities_base_get(self, profile):
        return await self.host.bridge_call(
            "identities_base_get", profile)

    async def message_edit(
        self, message_id: str, edit_data_s: str, profile: str
    ) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "message_edit", message_id, edit_data_s, profile
        )

    async def message_reactions_set(
        self, message_id: str, reactions: list[str], update_type: str, profile: str
    ) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "message_reactions_set", message_id, reactions, update_type, profile
        )

    async def message_retract(
        self, message_id: str, profile: str
    ) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "message_retract", message_id, profile
        )

    async def message_send(
        self, to_jid_s, message, subject, mess_type, extra_s,
        profile
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "message_send", to_jid_s, message, subject, mess_type, extra_s, profile
        )

    async def ps_node_delete(self, service_s, node, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "ps_node_delete", service_s, node, profile)

    async def ps_node_affiliations_set(self, service_s, node, affiliations, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "ps_node_affiliations_set", service_s, node, affiliations, profile)

    async def ps_item_retract(self, service_s, node, item_id, notify, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "ps_item_retract", service_s, node, item_id, notify, profile)

    async def mb_preview(self, service_s, node, data, profile):
        return await self.host.bridge_call(
            "mb_preview", service_s, node, data, profile)

    async def list_set(self, service_s, node, values, schema, item_id, extra, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "list_set", service_s, node, values, "", item_id, "", profile)


    async def file_http_upload_get_slot(
        self, filename, size, content_type, upload_jid, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "file_http_upload_get_slot", filename, size, content_type,
            upload_jid, profile)

    async def file_sharing_delete(
        self, service_jid, path, namespace, profile):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "file_sharing_delete", service_jid, path, namespace, profile)

    async def interests_file_sharing_register(
        self, service, repos_type, namespace, path, name, extra_s, profile
    ):
        self.no_service_profile(profile)
        if extra_s:
            # we only allow "thumb_url" here
            extra = data_format.deserialise(extra_s)
            if "thumb_url" in extra:
                extra_s = data_format.serialise({"thumb_url": extra["thumb_url"]})
            else:
                extra_s = ""

        return await self.host.bridge_call(
            "interests_file_sharing_register", service, repos_type, namespace, path, name,
            extra_s, profile
        )

    async def interest_retract(
        self, service_jid, item_id, profile
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "interest_retract", service_jid, item_id, profile)

    async def jingle_terminate(
        self, session_id: str, reason: str, reason_txt: str, profile: str
    ) -> None:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "jingle_terminate", session_id, reason, reason_txt, profile
        )

    async def ps_invite(
        self, invitee_jid_s, service_s, node, item_id, name, extra_s, profile
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "ps_invite", invitee_jid_s, service_s, node, item_id, name, extra_s, profile
        )

    async def fis_invite(
        self, invitee_jid_s, service_s, repos_type, namespace, path, name, extra_s,
        profile
    ):
        self.no_service_profile(profile)
        if extra_s:
            # we only allow "thumb_url" here
            extra = data_format.deserialise(extra_s)
            if "thumb_url" in extra:
                extra_s = data_format.serialise({"thumb_url": extra["thumb_url"]})
            else:
                extra_s = ""

        return await self.host.bridge_call(
            "fis_invite", invitee_jid_s, service_s, repos_type, namespace, path, name,
            extra_s, profile
        )

    async def fis_affiliations_set(
        self, service_s, namespace, path, affiliations, profile
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "fis_affiliations_set", service_s, namespace, path, affiliations, profile
        )

    async def invitation_simple_create(
        self, invitee_email, invitee_name, url_template, extra_s, profile
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "invitation_simple_create", invitee_email, invitee_name, url_template, extra_s,
            profile
        )

    async def url_preview_get(
        self, url, options_s, profile
    ):
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "url_preview_get", url, options_s, profile
        )

    async def jid_search(
        self, search_term: str, options_s: str, profile: str
    ) -> str:
        self.no_service_profile(profile)
        return await self.host.bridge_call(
            "jid_search", search_term, options_s, profile
        )