Mercurial > libervia-web
view libervia/web/server/restricted_bridge.py @ 1598:86c7a3a625d5
server: always start a new session on connection:
The session was kept when a user was connecting from service profile (but not from other
profiles), this was leading to session fixation vulnerability (an attacker on the same
machine could get service profile session cookie, and use it when a victim would log-in).
This patch fixes it by always starting a new session on connection.
fix 443
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 23 Feb 2024 13:35:24 +0100 |
parents | 7941444c1671 |
children | 0a4433a343a3 |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia Web Frontend # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from libervia.backend.tools.common import data_format from libervia.backend.core import exceptions from libervia.web.server.constants import Const as C class RestrictedBridge: """bridge with limited access, which can be used in browser Only a few method are implemented, with potentially dangerous argument controlled. Security limit is used """ def __init__(self, host): self.host = host self.security_limit = C.SECURITY_LIMIT def no_service_profile(self, profile): """Raise an error if service profile is used""" if profile == C.SERVICE_PROFILE: raise exceptions.PermissionError( "This action is not allowed for service profile" ) async def action_launch( self, callback_id: str, data_s: str, profile: str ) -> str: self.no_service_profile(profile) return await self.host.bridge_call( "action_launch", callback_id, data_s, profile ) async def bookmarks_list( self, type_: str, storage_location: str, profile: str ): self.no_service_profile(profile) return await self.host.bridge_call( "bookmarks_list", type_, storage_location, profile ) async def call_start(self, entity: str, call_data_s: str, profile: str) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "call_start", entity, call_data_s, profile ) async def call_answer_sdp( self, session_id: str, answer_sdp: str, profile: str ) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "call_answer_sdp", session_id, answer_sdp, profile ) async def call_info( self, session_id: str, info_type: str, extra_s: str, profile: str ) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "call_info", session_id, info_type, extra_s, profile ) async def call_end(self, session_id: str, call_data: str, profile: str) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "call_end", session_id, call_data, profile ) async def contacts_get(self, profile): return await self.host.bridge_call("contacts_get", profile) async def external_disco_get(self, entity, profile): self.no_service_profile(profile) return await self.host.bridge_call( "external_disco_get", entity, profile) async def history_get( self, from_jid: str, to_jid: str, limit: int, between: bool, filters: dict[str, str], profile: str ): self.no_service_profile(profile) return await self.host.bridge_call( "history_get", from_jid, to_jid, limit, between, filters, profile ) async def ice_candidates_add(self, session_id, media_ice_data_s, profile): self.no_service_profile(profile) return await self.host.bridge_call( "ice_candidates_add", session_id, media_ice_data_s, profile ) async def identity_get(self, entity, metadata_filter, use_cache, profile): return await self.host.bridge_call( "identity_get", entity, metadata_filter, use_cache, profile) async def identities_get(self, entities, metadata_filter, profile): return await self.host.bridge_call( "identities_get", entities, metadata_filter, profile) async def identities_base_get(self, profile): return await self.host.bridge_call( "identities_base_get", profile) async def message_edit( self, message_id: str, edit_data_s: str, profile: str ) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "message_edit", message_id, edit_data_s, profile ) async def message_reactions_set( self, message_id: str, reactions: list[str], update_type: str, profile: str ) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "message_reactions_set", message_id, reactions, update_type, profile ) async def message_retract( self, message_id: str, profile: str ) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "message_retract", message_id, profile ) async def message_send( self, to_jid_s, message, subject, mess_type, extra_s, profile ): self.no_service_profile(profile) return await self.host.bridge_call( "message_send", to_jid_s, message, subject, mess_type, extra_s, profile ) async def ps_node_delete(self, service_s, node, profile): self.no_service_profile(profile) return await self.host.bridge_call( "ps_node_delete", service_s, node, profile) async def ps_node_affiliations_set(self, service_s, node, affiliations, profile): self.no_service_profile(profile) return await self.host.bridge_call( "ps_node_affiliations_set", service_s, node, affiliations, profile) async def ps_item_retract(self, service_s, node, item_id, notify, profile): self.no_service_profile(profile) return await self.host.bridge_call( "ps_item_retract", service_s, node, item_id, notify, profile) async def mb_preview(self, service_s, node, data, profile): return await self.host.bridge_call( "mb_preview", service_s, node, data, profile) async def list_set(self, service_s, node, values, schema, item_id, extra, profile): self.no_service_profile(profile) return await self.host.bridge_call( "list_set", service_s, node, values, "", item_id, "", profile) async def file_http_upload_get_slot( self, filename, size, content_type, upload_jid, profile): self.no_service_profile(profile) return await self.host.bridge_call( "file_http_upload_get_slot", filename, size, content_type, upload_jid, profile) async def file_sharing_delete( self, service_jid, path, namespace, profile): self.no_service_profile(profile) return await self.host.bridge_call( "file_sharing_delete", service_jid, path, namespace, profile) async def interests_file_sharing_register( self, service, repos_type, namespace, path, name, extra_s, profile ): self.no_service_profile(profile) if extra_s: # we only allow "thumb_url" here extra = data_format.deserialise(extra_s) if "thumb_url" in extra: extra_s = data_format.serialise({"thumb_url": extra["thumb_url"]}) else: extra_s = "" return await self.host.bridge_call( "interests_file_sharing_register", service, repos_type, namespace, path, name, extra_s, profile ) async def interest_retract( self, service_jid, item_id, profile ): self.no_service_profile(profile) return await self.host.bridge_call( "interest_retract", service_jid, item_id, profile) async def jingle_terminate( self, session_id: str, reason: str, reason_txt: str, profile: str ) -> None: self.no_service_profile(profile) return await self.host.bridge_call( "jingle_terminate", session_id, reason, reason_txt, profile ) async def ps_invite( self, invitee_jid_s, service_s, node, item_id, name, extra_s, profile ): self.no_service_profile(profile) return await self.host.bridge_call( "ps_invite", invitee_jid_s, service_s, node, item_id, name, extra_s, profile ) async def fis_invite( self, invitee_jid_s, service_s, repos_type, namespace, path, name, extra_s, profile ): self.no_service_profile(profile) if extra_s: # we only allow "thumb_url" here extra = data_format.deserialise(extra_s) if "thumb_url" in extra: extra_s = data_format.serialise({"thumb_url": extra["thumb_url"]}) else: extra_s = "" return await self.host.bridge_call( "fis_invite", invitee_jid_s, service_s, repos_type, namespace, path, name, extra_s, profile ) async def fis_affiliations_set( self, service_s, namespace, path, affiliations, profile ): self.no_service_profile(profile) return await self.host.bridge_call( "fis_affiliations_set", service_s, namespace, path, affiliations, profile ) async def invitation_simple_create( self, invitee_email, invitee_name, url_template, extra_s, profile ): self.no_service_profile(profile) return await self.host.bridge_call( "invitation_simple_create", invitee_email, invitee_name, url_template, extra_s, profile ) async def url_preview_get( self, url, options_s, profile ): self.no_service_profile(profile) return await self.host.bridge_call( "url_preview_get", url, options_s, profile ) async def jid_search( self, search_term: str, options_s: str, profile: str ) -> str: self.no_service_profile(profile) return await self.host.bridge_call( "jid_search", search_term, options_s, profile )