view libervia/backend/stdui/ui_profile_manager.py @ 4282:8da377040ba6

doc (encryption): update pubsub encryption specifications.
author Goffi <goffi@goffi.org>
date Sat, 13 Jul 2024 17:45:47 +0200
parents 4b842c1fb686
children
line wrap: on
line source

#!/usr/bin/env python3


# SAT standard user interface for managing contacts
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
# Copyright (C) 2013-2016 Adrien Cossa (souliane@mailoo.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from libervia.backend.core.i18n import D_
from libervia.backend.core.constants import Const as C
from libervia.backend.core.log import getLogger
from libervia.backend.core import exceptions
from libervia.backend.tools import xml_tools
from libervia.backend.memory.memory import ProfileSessions
from twisted.internet import defer
from twisted.words.protocols.jabber import jid


log = getLogger(__name__)


class ProfileManager(object):
    """Manage profiles."""

    def __init__(self, host):
        self.host = host
        self.profile_ciphers = {}
        self._sessions = ProfileSessions()
        host.register_callback(
            self._authenticate_profile, force_id=C.AUTHENTICATE_PROFILE_ID, with_data=True
        )
        host.register_callback(
            self._change_xmpp_password, force_id=C.CHANGE_XMPP_PASSWD_ID, with_data=True
        )
        self.__new_xmpp_passwd_id = host.register_callback(
            self._change_xmpp_password_cb, with_data=True
        )

    def _start_session_eb(self, fail, first, profile):
        """Errback method for start_session during profile authentication

        @param first(bool): if True, this is the first try and we have tryied empty password
            in this case we ask for a password to the user.
        @param profile(unicode, None): %(doc_profile)s
            must only be used if first is True
        """
        if first:
            # first call, we ask for the password
            form_ui = xml_tools.XMLUI(
                "form", title=D_("Profile password for {}").format(profile), submit_id=""
            )
            form_ui.addPassword("profile_password", value="")
            d = xml_tools.deferred_ui(self.host, form_ui, chained=True)
            d.addCallback(self._authenticate_profile, profile)
            return {"xmlui": form_ui.toXml()}

        assert profile is None

        if fail.check(exceptions.PasswordError):
            dialog = xml_tools.XMLUI("popup", title=D_("Connection error"))
            dialog.addText(D_("The provided profile password doesn't match."))
        else:
            log.error("Unexpected exceptions: {}".format(fail))
            dialog = xml_tools.XMLUI("popup", title=D_("Internal error"))
            dialog.addText(D_("Internal error: {}".format(fail)))
        return {"xmlui": dialog.toXml(), "validated": C.BOOL_FALSE}

    def _authenticate_profile(self, data, profile):
        if C.bool(data.get("cancelled", "false")):
            return {}
        if self.host.memory.is_session_started(profile):
            return {"validated": C.BOOL_TRUE}
        try:
            password = data[xml_tools.form_escape("profile_password")]
        except KeyError:
            # first request, we try empty password
            password = ""
            first = True
            eb_profile = profile
        else:
            first = False
            eb_profile = None
        d = self.host.memory.start_session(password, profile)
        d.addCallback(lambda __: {"validated": C.BOOL_TRUE})
        d.addErrback(self._start_session_eb, first, eb_profile)
        return d

    def _change_xmpp_password(self, data, profile):
        session_data = self._sessions.profile_get_unique(profile)
        if not session_data:
            server = self.host.memory.param_get_a(
                C.FORCE_SERVER_PARAM, "Connection", profile_key=profile
            )
            if not server:
                server = jid.parse(
                    self.host.memory.param_get_a(
                        "JabberID", "Connection", profile_key=profile
                    )
                )[1]
            session_id, session_data = self._sessions.new_session(
                {"count": 0, "server": server}, profile=profile
            )
        if (
            session_data["count"] > 2
        ):  # 3 attempts with a new password after the initial try
            self._sessions.profile_del_unique(profile)
            _dialog = xml_tools.XMLUI("popup", title=D_("Connection error"))
            _dialog.addText(
                D_("Can't connect to %s. Please check your connection details.")
                % session_data["server"]
            )
            return {"xmlui": _dialog.toXml()}
        session_data["count"] += 1
        counter = " (%d)" % session_data["count"] if session_data["count"] > 1 else ""
        title = D_("XMPP password for %(profile)s%(counter)s") % {
            "profile": profile,
            "counter": counter,
        }
        form_ui = xml_tools.XMLUI(
            "form", title=title, submit_id=self.__new_xmpp_passwd_id
        )
        form_ui.addText(
            D_(
                "Can't connect to %s. Please check your connection details or try with another password."
            )
            % session_data["server"]
        )
        form_ui.addPassword("xmpp_password", value="")
        return {"xmlui": form_ui.toXml()}

    def _change_xmpp_password_cb(self, data, profile):
        xmpp_password = data[xml_tools.form_escape("xmpp_password")]
        d = self.host.memory.param_set(
            "Password", xmpp_password, "Connection", profile_key=profile
        )
        d.addCallback(lambda __: defer.ensureDeferred(self.host.connect(profile)))
        d.addCallback(lambda __: {})
        d.addErrback(lambda __: self._change_xmpp_password({}, profile))
        return d