view libervia/backend/plugins/plugin_xep_0077.py @ 4306:94e0968987cd

plugin XEP-0033: code modernisation, improve delivery, data validation: - Code has been rewritten using Pydantic models and `async` coroutines for data validation and cleaner element parsing/generation. - Delivery has been completely rewritten. It now works even if server doesn't support multicast, and send to local multicast service first. Delivering to local multicast service first is due to bad support of XEP-0033 in server (notably Prosody which has an incomplete implementation), and the current impossibility to detect if a sub-domain service handles fully multicast or only for local domains. This is a workaround to have a good balance between backward compatilibity and use of bandwith, and to make it work with the incoming email gateway implementation (the gateway will only deliver to entities of its own domain). - disco feature checking now uses `async` corountines. `host` implementation still use Deferred return values for compatibility with legacy code. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents 7ded09452875
children
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for managing xep-0077
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Awaitable, Callable, NamedTuple, cast

from twisted.internet import defer, reactor, ssl
from twisted.internet.interfaces import IReactorCore
from twisted.python.failure import Failure
from twisted.words.protocols.jabber import (
    client,
    error as jabber_error,
    jid,
    xmlstream as xmlstream_mod,
)
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPEntity
from libervia.backend.tools import xml_tools
from libervia.backend.tools.utils import as_deferred, ensure_deferred

log = getLogger(__name__)

NS_IQ_REGISTER = "jabber:iq:register"
IQ_REGISTER_REQUEST = f'{C.IQ_GET}/query[@xmlns="{NS_IQ_REGISTER}"]'
IQ_SUBMIT_REQUEST = f'{C.IQ_SET}/query[@xmlns="{NS_IQ_REGISTER}"]'

PLUGIN_INFO = {
    C.PI_NAME: "In-Band Registration",
    C.PI_IMPORT_NAME: "XEP-0077",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0077"],
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "XEP_0077",
    C.PI_DESCRIPTION: _("""Implementation of in-band registration."""),
    C.PI_HANDLER: C.BOOL_TRUE,
}

# FIXME: this implementation is incomplete


class RegistrationHandlers(NamedTuple):
    form_handler: Callable
    submit_handler: Callable


class RegisteringAuthenticator(xmlstream_mod.ConnectAuthenticator):
    # FIXME: request IQ is not send to check available fields,
    #        while XEP recommand to use it
    # FIXME: doesn't handle data form or oob
    namespace = "jabber:client"

    def __init__(self, jid_, password, email=None, check_certificate=True):
        log.debug(_("Registration asked for {jid}").format(jid=jid_))
        xmlstream_mod.ConnectAuthenticator.__init__(self, jid_.host)
        self.jid = jid_
        self.password = password
        self.email = email
        self.check_certificate = check_certificate
        self.registered = defer.Deferred()

    def associateWithStream(self, xmlstream):
        xmlstream_mod.ConnectAuthenticator.associateWithStream(self, xmlstream)
        xmlstream.addObserver(xmlstream_mod.STREAM_AUTHD_EVENT, self.register)

        xmlstream.initializers = [client.CheckVersionInitializer(xmlstream)]
        if self.check_certificate:
            tls_required, configurationForTLS = True, None
        else:
            tls_required = False
            configurationForTLS = ssl.CertificateOptions(trustRoot=None)
        tls_init = xmlstream_mod.TLSInitiatingInitializer(
            xmlstream, required=tls_required, configurationForTLS=configurationForTLS
        )

        xmlstream.initializers.append(tls_init)

    def register(self, xmlstream):
        log.debug(
            _(
                "Stream started with {server}, now registering".format(
                    server=self.jid.host
                )
            )
        )
        iq = XEP_0077.build_register_iq(
            self.xmlstream, self.jid, self.password, self.email
        )
        d = iq.send(self.jid.host).addCallbacks(
            self.registration_cb, self.registration_eb
        )
        d.chainDeferred(self.registered)

    def registration_cb(self, answer):
        log.debug(_("Registration answer: {}").format(answer.toXml()))
        assert self.xmlstream is not None
        self.xmlstream.sendFooter()

    def registration_eb(self, failure_):
        log.info(_("Registration failure: {}").format(str(failure_.value)))
        assert self.xmlstream is not None
        self.xmlstream.sendFooter()
        raise failure_


class ServerRegister(xmlstream_mod.XmlStreamFactory):

    def __init__(self, *args, **kwargs):
        xmlstream_mod.XmlStreamFactory.__init__(self, *args, **kwargs)
        self.addBootstrap(xmlstream_mod.STREAM_END_EVENT, self._disconnected)

    def clientConnectionLost(self, connector, unused_reason):
        connector.disconnect()

    def _disconnected(self, reason):
        if not self.authenticator.registered.called:
            err = jabber_error.StreamError("Server unexpectedly closed the connection")
            try:
                if reason.value.args[0][0][2] == "certificate verify failed":
                    err = exceptions.InvalidCertificate()
            except (IndexError, TypeError):
                pass
            self.authenticator.registered.errback(err)


class XEP_0077:

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.register_namespace("iq-register", NS_IQ_REGISTER)
        host.bridge.add_method(
            "in_band_register",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._in_band_register,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_account_new",
            ".plugin",
            in_sign="ssssi",
            out_sign="",
            method=self._register_new_account,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_unregister",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._unregister,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_password_change",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._change_password,
            async_=True,
        )
        self.handlers = set()

    def get_handler(self, client: SatXMPPEntity) -> xmlstream_mod.XMPPHandler:
        return XEP_0077_handler(self)

    def register_handler(
        self,
        form_handler: Callable[
            [SatXMPPEntity, domish.Element],
            Awaitable[tuple[bool, data_form.Form] | None]
            | defer.Deferred[tuple[bool, data_form.Form] | None]
            | tuple[bool, data_form.Form]
            | None,
        ],
        submit_handler: Callable[
            [SatXMPPEntity, domish.Element, data_form.Form | None],
            Awaitable[bool | None] | defer.Deferred[bool | None] | bool | None,
        ],
    ) -> None:
        """
        Register a new handler.

        Mostly useful for component which handle In-Band Registration.
        @param form_handler: method to call on registration request to get a data form.
            May be async.
            The handler must return a data_form.Form instance if it can handle the
            request, otherwise other handlers will be tried until one returns a data form.
        @param submit_handler: method to call on registration request to submit the
            handler.
            In case of "unregister" request, None will be used instead of a
            data_form.Form.
        """
        self.handlers.add(RegistrationHandlers(form_handler, submit_handler))

    def _on_register_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None:
        defer.ensureDeferred(self._on_request(iq_elt, client, True))

    def _on_submit_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None:
        defer.ensureDeferred(self._on_request(iq_elt, client, False))

    async def _on_request(
        self, iq_elt: domish.Element, client: SatXMPPEntity, is_register: bool
    ) -> None:
        """Handle a register or submit request.

        @param iq_elt: The IQ element of the request.
        @param client: Client session.
        @param is_register: Whether this is a register request (True) or a submit request
            (False).
        """
        iq_elt.handled = True

        # Submit request must have a form with submitted values.
        if is_register:
            handler_type = "register"
            submit_form = None
        else:
            handler_type = "submit"
            remove_elt = next(iq_elt.query.elements(NS_IQ_REGISTER, "remove"), None)
            if remove_elt is not None:
                # This is a unregister request.
                submit_form = None
            else:
                submit_form = data_form.findForm(iq_elt.query, NS_IQ_REGISTER)
                if submit_form is None:
                    log.warning(f"Data form not found, invalid request: {iq_elt.toXml()}")
                    client.send(
                        jabber_error.StanzaError(
                            "bad-request", text="No data form found."
                        ).toResponse(iq_elt)
                    )
                    return

        # We look and run relevant handler.
        for handlers in self.handlers:
            if is_register:
                handler = handlers.form_handler
                handler_call = as_deferred(handler, client, iq_elt)
            else:
                handler = handlers.submit_handler
                handler_call = as_deferred(handler, client, iq_elt, submit_form)
            try:
                callback_ret = await handler_call
            except jabber_error.StanzaError as e:
                iq_error_elt = e.toResponse(iq_elt)
                client.send(iq_error_elt)
                return
            except exceptions.PasswordError as e:
                log.warning("Invalid login or password while registering to service.")
                iq_error_elt = jabber_error.StanzaError(
                    "forbidden", text=str(e)
                ).toResponse(iq_elt)
                client.send(iq_error_elt)
                return
            except Exception:
                log.exception(
                    f"Error while handling {handler_type} request with {handler}, "
                    "ignoring it."
                )
                continue
            if callback_ret is not None:
                if is_register:
                    try:
                        registered, registration_form = callback_ret
                        assert isinstance(registered, bool)
                        assert isinstance(registration_form, data_form.Form)
                    except (TypeError, ValueError, AssertionError) as e:
                        log.warning(
                            f"Invalid return value from {handler}, ignoring it: {e}"
                        )
                        continue
                    # We need to be sure to have the right namespace for the form.
                    registration_form.formNamespace = NS_IQ_REGISTER
                    iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result")
                    query_elt = iq_result_elt.addElement((NS_IQ_REGISTER, "query"))

                    if registered:
                        # The requestor is already registered, we indicate it.
                        query_elt.addElement("registered")

                    query_elt.addChild(registration_form.toElement())
                    client.send(iq_result_elt)
                else:
                    if callback_ret is True:
                        if submit_form is None:
                            log.info(f"User {iq_elt['from']} successfully unregistered.")
                        else:
                            log.info(f"User {iq_elt['from']} successfully registered.")
                    else:
                        log.error(
                            f"Unexpected return value from {handler}, was expecting "
                            '"True".'
                        )
                        client.send(
                            jabber_error.StanzaError(
                                "internal-server-error", text="Error in request handler."
                            ).toResponse(iq_elt)
                        )
                        return
                    iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result")
                    client.send(iq_result_elt)
                break
        else:
            log.warning(
                f"No handler found for in-band registration {handler_type} request: "
                f"{iq_elt.toXml()}."
            )
            iq_error_elt = jabber_error.StanzaError("service-unavailable").toResponse(
                iq_elt
            )
            client.send(iq_error_elt)
            return

    @staticmethod
    def build_register_iq(xmlstream, jid_, password, email=None):
        iq_elt = xmlstream_mod.IQ(xmlstream, "set")
        iq_elt["to"] = jid_.host
        query_elt = iq_elt.addElement(("jabber:iq:register", "query"))
        username_elt = query_elt.addElement("username")
        username_elt.addContent(jid_.user)
        password_elt = query_elt.addElement("password")
        password_elt.addContent(password)
        if email is not None:
            email_elt = query_elt.addElement("email")
            email_elt.addContent(email)
        return iq_elt

    def _in_band_register(self, to_jid_s, profile_key=C.PROF_KEY_NONE):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(self.in_band_register(client, jid.JID(to_jid_s)))

    async def in_band_register(
        self,
        client: SatXMPPEntity,
        to_jid: jid.JID,
        post_treat_cb: Callable | None = None,
    ) -> xml_tools.XMLUI:
        """Register to a service.

        Send an IQ request to register with the given service and return the registration
        form as XML UI.

        @param client: client session.
        @param to_jid: The JID of the service to register to.
        @param post_treat_cb: A callback function to handle the registration result, if
            provided.
        @return: The registration form as XML UI.
        """
        # FIXME: this post_treat_cb arguments seems wrong, check it
        log.debug(_("Asking registration for {}").format(to_jid.full()))
        reg_request = client.IQ("get")
        reg_request["from"] = client.jid.full()
        reg_request["to"] = to_jid.full()
        reg_request.addElement("query", NS_IQ_REGISTER)
        try:
            iq_result_elt = await reg_request.send(to_jid.full())
        except Exception as e:
            log.warning(_("Registration failure: {}").format(e))
            raise e
        else:
            try:
                query_elt = next(iq_result_elt.elements(NS_IQ_REGISTER, "query"))
            except StopIteration:
                raise exceptions.DataError(
                    f"Can't find expected query element: {iq_result_elt.toXml()}"
                )

            try:
                x_elem = next(query_elt.elements(data_form.NS_X_DATA, "x"))
            except StopIteration:
                # XXX: it seems we have an old service which doesn't manage data forms
                log.warning("Can't find data form: {iq_result_elt.toXml()}")
                raise exceptions.DataError(
                    _(
                        "This gateway is outdated and can't be managed by Libervia, "
                        "sorry :("
                    )
                )

            form = data_form.Form.fromElement(x_elem)
            xml_ui = xml_tools.data_form_2_xmlui(form, "")
            d = xml_tools.deferred_ui(self.host, xml_ui)
            d.addCallback(
                self._on_xml_ui_cb,
                client,
                iq_result_elt["id"],
                iq_result_elt["from"],
                post_treat_cb,
            )
            d.addErrback(self._on_xml_ui_eb)
            d.addTimeout(600, cast(IReactorCore, reactor))
            return xml_ui

    @ensure_deferred
    async def _on_xml_ui_cb(
        self,
        data: dict,
        client: SatXMPPEntity,
        stanza_id: str,
        to_: str,
        post_treat_cb: Callable | None,
    ) -> None:
        """Handle the XML UI result of a registration form.

        Process the filled registration form (from frontend) and send it back to complete
        the registration process.

        @param data: The filled registration form as a XMLUI Form result.
        @param client: Client session.
        @param stanza_id: The ID of the IQ stanza of the registration process.
        @param to_: The JID of the service that sent the registration form.
        @param post_treat_cb: A callback function to handle the registration result, if
            provided.
        """
        form_elt = xml_tools.xmlui_result_to_elt(data, NS_IQ_REGISTER)

        iq_elt = client.IQ()
        iq_elt["id"] = stanza_id
        iq_elt["to"] = to_
        query_elt = iq_elt.addElement("query", NS_IQ_REGISTER)
        query_elt.addChild(form_elt)
        try:
            answer = await iq_elt.send()
            log.debug(_("registration answer: %s") % answer.toXml())
            if post_treat_cb is not None:
                post_treat_cb(jid.JID(answer["from"]), client.profile)
        except jabber_error.StanzaError as e:
            log.info(_("Registration failure: {}").format(e))
            if e.condition == "conflict":
                raise exceptions.ConflictError(
                    _("Username already exists, please choose another one.")
                )
            raise e

    def _on_xml_ui_eb(self, failure_: Failure) -> None:
        """Handle error during handling of registration form by frontend."""
        log.warning(f"Error while handling registration form to frontend: {failure_}")

    def _register_new_account(self, jid_, password, email, host, port):
        kwargs = {}
        if email:
            kwargs["email"] = email
        if host:
            kwargs["host"] = host
        if port:
            kwargs["port"] = port
        return self.register_new_account(jid.JID(jid_), password, **kwargs)

    def register_new_account(
        self,
        jid_: jid.JID,
        password: str,
        email: str | None = None,
        host: str | None = None,
        port: int = C.XMPP_C2S_PORT,
    ):
        """Register a new account on a XMPP server.

        @param jid_: request jid to register
        @param password: password of the account
        @param email: email of the account
        @param host: host of the server to register to
        @param port: port of the server to register to
        """
        if host is None:
            host = self.host.memory.config_get("", "xmpp_domain", "127.0.0.1")
        check_certificate = host != "127.0.0.1"
        authenticator = RegisteringAuthenticator(
            jid_, password, email, check_certificate=check_certificate
        )
        registered_d = authenticator.registered
        server_register = ServerRegister(authenticator)
        reactor.connectTCP(host, port, server_register)
        return registered_d

    def _change_password(self, new_password, profile_key):
        client = self.host.get_client(profile_key)
        return self.change_password(client, new_password)

    def change_password(self, client, new_password):
        iq_elt = self.build_register_iq(client.xmlstream, client.jid, new_password)
        d = iq_elt.send(client.jid.host)
        d.addCallback(
            lambda __: self.host.memory.param_set(
                "Password", new_password, "Connection", profile_key=client.profile
            )
        )
        return d

    def _unregister(self, to_jid_s, profile_key):
        client = self.host.get_client(profile_key)
        return self.unregister(client, jid.JID(to_jid_s))

    def unregister(self, client: SatXMPPEntity, to_jid: jid.JID) -> defer.Deferred:
        """Remove registration from a server/service.

        BEWARE! if you remove registration from profile own server, this will
        DELETE THE XMPP ACCOUNT WITHOUT WARNING
        @param to_jid: jid of the service or server
            None to delete client's account (DANGEROUS!)
        """
        iq_elt = client.IQ()
        if to_jid is not None:
            iq_elt["to"] = to_jid.full()
        query_elt = iq_elt.addElement((NS_IQ_REGISTER, "query"))
        query_elt.addElement("remove")
        d = iq_elt.send()
        if not to_jid or to_jid == jid.JID(client.jid.host):
            d.addCallback(lambda __: client.entity_disconnect())
        return d


@implementer(iwokkel.IDisco)
class XEP_0077_handler(xmlstream_mod.XMPPHandler):

    def __init__(self, plugin_parent: XEP_0077) -> None:
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        client = cast(SatXMPPEntity, self.parent)
        if client.is_component:
            self.xmlstream.addObserver(
                IQ_REGISTER_REQUEST,
                self.plugin_parent._on_register_request,
                client=client,
            )
            self.xmlstream.addObserver(
                IQ_SUBMIT_REQUEST,
                self.plugin_parent._on_submit_request,
                client=client,
            )

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_IQ_REGISTER)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []