view libervia/web/pages/login/page_meta.py @ 1598:86c7a3a625d5

server: always start a new session on connection: The session was kept when a user was connecting from service profile (but not from other profiles), this was leading to session fixation vulnerability (an attacker on the same machine could get service profile session cookie, and use it when a victim would log-in). This patch fixes it by always starting a new session on connection. fix 443
author Goffi <goffi@goffi.org>
date Fri, 23 Feb 2024 13:35:24 +0100
parents 7941444c1671
children
line wrap: on
line source

#!/usr/bin/env python3


from libervia.backend.core.i18n import _
from libervia.backend.core import exceptions
from libervia.web.server.constants import Const as C
from libervia.web.server import session_iface
from twisted.internet import defer
from libervia.backend.core.log import getLogger

log = getLogger(__name__)

"""Libervia Web log-in page, with link to create an account"""

name = "login"
access = C.PAGES_ACCESS_PUBLIC
template = "login/login.html"


def prepare_render(self, request):
    template_data = request.template_data

    #  we redirect to logged page if a session is active
    profile = self.get_profile(request)
    if profile is not None:
        self.page_redirect("/login/logged", request)

    # login error message
    session_data = self.host.get_session_data(request, session_iface.IWebSession)
    login_error = session_data.pop_page_data(self, "login_error")
    if login_error is not None:
        template_data["S_C"] = C  # we need server constants in template
        template_data["login_error"] = login_error
    template_data["empty_password_allowed"] = bool(
        self.host.options["empty_password_allowed_warning_dangerous_list"]
    )

    # register page url
    if self.host.options["allow_registration"]:
        template_data["register_url"] = self.get_page_redirect_url(request, "register")

    #  if login is set, we put it in template to prefill field
    template_data["login"] = session_data.pop_page_data(self, "login")


def login_error(self, request, error_const):
    """set login_error in page data

    @param error_const(unicode): one of login error constant
    @return C.POST_NO_CONFIRM: avoid confirm message
    """
    session_data = self.host.get_session_data(request, session_iface.IWebSession)
    session_data.set_page_data(self, "login_error", error_const)
    return C.POST_NO_CONFIRM


async def on_data_post(self, request):
    profile = self.get_profile(request)
    type_ = self.get_posted_data(request, "type")
    if type_ == "disconnect":
        if profile is None:
            log.warning(_("Disconnect called when no profile is logged"))
            self.page_error(request, C.HTTP_BAD_REQUEST)
        else:
            self.host.purge_session(request)
            return C.POST_NO_CONFIRM
    elif type_ == "login":
        login, password = self.get_posted_data(request, ("login", "password"))
        try:
            status = await self.host.connect(request, login, password)
        except exceptions.ProfileUnknownError:
            # the profile doesn't exist, we return the same error as for invalid password
            # to avoid bruteforcing valid profiles
            log.warning(f"login tentative with invalid profile: {login!r}")
            return login_error(self, request, C.PROFILE_AUTH_ERROR)
        except ValueError as e:
            message = str(e)
            if message in (C.XMPP_AUTH_ERROR, C.PROFILE_AUTH_ERROR):
                return login_error(self, request, message)
            else:
                # this error was not expected!
                raise e
        except exceptions.TimeOutError:
            return login_error(self, request, C.NO_REPLY)
        else:
            if status in (C.PROFILE_LOGGED, C.PROFILE_LOGGED_EXT_JID, C.SESSION_ACTIVE):
                # Profile has been logged correctly
                self.redirect_or_continue(request)
            else:
                log.error(_("Unhandled status: {status}".format(status=status)))
    else:
        self.page_error(request, C.HTTP_BAD_REQUEST)