view libervia/web/pages/merge-requests/view/page_meta.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents eb00d593801d
children
line wrap: on
line source

#!/usr/bin/env python3


from libervia.web.server.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.web.server.utils import SubPage
from libervia.web.server import session_iface
from twisted.words.protocols.jabber import jid
from libervia.backend.tools.common import template_xmlui
from libervia.backend.tools.common import uri
from libervia.backend.tools.common import data_format
from libervia.backend.core.log import getLogger

name = "merge-requests_view"
access = C.PAGES_ACCESS_PUBLIC
template = "merge-request/item.html"
log = getLogger(__name__)


def parse_url(self, request):
    try:
        item_id = self.next_path(request)
    except IndexError:
        log.warning(_("no list item id specified"))
        self.page_error(request, C.HTTP_BAD_REQUEST)

    data = self.get_r_data(request)
    data["list_item_id"] = item_id


async def prepare_render(self, request):
    data = self.get_r_data(request)
    template_data = request.template_data
    session = self.host.get_session_data(request, session_iface.IWebSession)
    service, node, list_item_id = (
        data.get("service", ""),
        data.get("node", ""),
        data["list_item_id"],
    )
    profile = self.get_profile(request)

    if profile is None:
        profile = C.SERVICE_PROFILE

    merge_requests = data_format.deserialise(
        await self.host.bridge_call(
            "merge_requests_get",
            service.full() if service else "",
            node,
            C.NO_LIMIT,
            [list_item_id],
            "",
            data_format.serialise({"parse": C.BOOL_TRUE, "labels_as_list": C.BOOL_TRUE}),
            profile,
        )
    )
    list_item = template_xmlui.create(
        self.host, merge_requests['items'][0], ignore=["request_data", "type"]
    )
    template_data["item"] = list_item
    template_data["patches"] = merge_requests['items_patches'][0]
    comments_uri = list_item.widgets["comments_uri"].value
    if comments_uri:
        uri_data = uri.parse_xmpp_uri(comments_uri)
        template_data["comments_node"] = comments_node = uri_data["node"]
        template_data["comments_service"] = comments_service = uri_data["path"]
        template_data["comments"] = data_format.deserialise(await self.host.bridge_call(
            "mb_get", comments_service, comments_node, C.NO_LIMIT, [],
            data_format.serialise({}), profile
        ))

        template_data["login_url"] = self.get_page_redirect_url(request)

    if session.connected:
        # we set edition URL only if user is the publisher or the node owner
        publisher = jid.JID(list_item.widgets["publisher"].value)
        is_publisher = publisher.userhostJID() == session.jid.userhostJID()
        affiliation = None
        if not is_publisher:
            node = node or self.host.ns_map["merge_requests"]
            affiliation = await self.host.get_affiliation(request, service, node)
        if is_publisher or affiliation == "owner":
            template_data["url_list_item_edit"] = self.get_url_by_path(
                SubPage("merge-requests"),
                service.full(),
                node or "@",
                SubPage("merge-requests_edit"),
                list_item_id,
            )


async def on_data_post(self, request):
    type_ = self.get_posted_data(request, "type")
    if type_ == "comment":
        blog_page = self.get_page_by_name("blog_view")
        await blog_page.on_data_post(self, request)
    else:
        log.warning(_("Unhandled data type: {}").format(type_))