view libervia/backend/plugins/plugin_xep_0077.py @ 4336:6e0918e638ee

plugin XEP-0498: "Pubsub File Sharing" implementation: Partial implementation of XEP-0498, necessary to implement the service part in email gateway. rel 453
author Goffi <goffi@goffi.org>
date Tue, 03 Dec 2024 00:13:23 +0100
parents 7ded09452875
children
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for managing xep-0077
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from typing import Awaitable, Callable, NamedTuple, cast

from twisted.internet import defer, reactor, ssl
from twisted.internet.interfaces import IReactorCore
from twisted.python.failure import Failure
from twisted.words.protocols.jabber import (
    client,
    error as jabber_error,
    jid,
    xmlstream as xmlstream_mod,
)
from twisted.words.xish import domish
from wokkel import data_form, disco, iwokkel
from zope.interface import implementer

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPEntity
from libervia.backend.tools import xml_tools
from libervia.backend.tools.utils import as_deferred, ensure_deferred

log = getLogger(__name__)

NS_IQ_REGISTER = "jabber:iq:register"
IQ_REGISTER_REQUEST = f'{C.IQ_GET}/query[@xmlns="{NS_IQ_REGISTER}"]'
IQ_SUBMIT_REQUEST = f'{C.IQ_SET}/query[@xmlns="{NS_IQ_REGISTER}"]'

PLUGIN_INFO = {
    C.PI_NAME: "In-Band Registration",
    C.PI_IMPORT_NAME: "XEP-0077",
    C.PI_TYPE: "XEP",
    C.PI_MODES: C.PLUG_MODE_BOTH,
    C.PI_PROTOCOLS: ["XEP-0077"],
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "XEP_0077",
    C.PI_DESCRIPTION: _("""Implementation of in-band registration."""),
    C.PI_HANDLER: C.BOOL_TRUE,
}

# FIXME: this implementation is incomplete


class RegistrationHandlers(NamedTuple):
    form_handler: Callable
    submit_handler: Callable


class RegisteringAuthenticator(xmlstream_mod.ConnectAuthenticator):
    # FIXME: request IQ is not send to check available fields,
    #        while XEP recommand to use it
    # FIXME: doesn't handle data form or oob
    namespace = "jabber:client"

    def __init__(self, jid_, password, email=None, check_certificate=True):
        log.debug(_("Registration asked for {jid}").format(jid=jid_))
        xmlstream_mod.ConnectAuthenticator.__init__(self, jid_.host)
        self.jid = jid_
        self.password = password
        self.email = email
        self.check_certificate = check_certificate
        self.registered = defer.Deferred()

    def associateWithStream(self, xmlstream):
        xmlstream_mod.ConnectAuthenticator.associateWithStream(self, xmlstream)
        xmlstream.addObserver(xmlstream_mod.STREAM_AUTHD_EVENT, self.register)

        xmlstream.initializers = [client.CheckVersionInitializer(xmlstream)]
        if self.check_certificate:
            tls_required, configurationForTLS = True, None
        else:
            tls_required = False
            configurationForTLS = ssl.CertificateOptions(trustRoot=None)
        tls_init = xmlstream_mod.TLSInitiatingInitializer(
            xmlstream, required=tls_required, configurationForTLS=configurationForTLS
        )

        xmlstream.initializers.append(tls_init)

    def register(self, xmlstream):
        log.debug(
            _(
                "Stream started with {server}, now registering".format(
                    server=self.jid.host
                )
            )
        )
        iq = XEP_0077.build_register_iq(
            self.xmlstream, self.jid, self.password, self.email
        )
        d = iq.send(self.jid.host).addCallbacks(
            self.registration_cb, self.registration_eb
        )
        d.chainDeferred(self.registered)

    def registration_cb(self, answer):
        log.debug(_("Registration answer: {}").format(answer.toXml()))
        assert self.xmlstream is not None
        self.xmlstream.sendFooter()

    def registration_eb(self, failure_):
        log.info(_("Registration failure: {}").format(str(failure_.value)))
        assert self.xmlstream is not None
        self.xmlstream.sendFooter()
        raise failure_


class ServerRegister(xmlstream_mod.XmlStreamFactory):

    def __init__(self, *args, **kwargs):
        xmlstream_mod.XmlStreamFactory.__init__(self, *args, **kwargs)
        self.addBootstrap(xmlstream_mod.STREAM_END_EVENT, self._disconnected)

    def clientConnectionLost(self, connector, unused_reason):
        connector.disconnect()

    def _disconnected(self, reason):
        if not self.authenticator.registered.called:
            err = jabber_error.StreamError("Server unexpectedly closed the connection")
            try:
                if reason.value.args[0][0][2] == "certificate verify failed":
                    err = exceptions.InvalidCertificate()
            except (IndexError, TypeError):
                pass
            self.authenticator.registered.errback(err)


class XEP_0077:

    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        host.register_namespace("iq-register", NS_IQ_REGISTER)
        host.bridge.add_method(
            "in_band_register",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._in_band_register,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_account_new",
            ".plugin",
            in_sign="ssssi",
            out_sign="",
            method=self._register_new_account,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_unregister",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._unregister,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_password_change",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._change_password,
            async_=True,
        )
        self.handlers = set()

    def get_handler(self, client: SatXMPPEntity) -> xmlstream_mod.XMPPHandler:
        return XEP_0077_handler(self)

    def register_handler(
        self,
        form_handler: Callable[
            [SatXMPPEntity, domish.Element],
            Awaitable[tuple[bool, data_form.Form] | None]
            | defer.Deferred[tuple[bool, data_form.Form] | None]
            | tuple[bool, data_form.Form]
            | None,
        ],
        submit_handler: Callable[
            [SatXMPPEntity, domish.Element, data_form.Form | None],
            Awaitable[bool | None] | defer.Deferred[bool | None] | bool | None,
        ],
    ) -> None:
        """
        Register a new handler.

        Mostly useful for component which handle In-Band Registration.
        @param form_handler: method to call on registration request to get a data form.
            May be async.
            The handler must return a data_form.Form instance if it can handle the
            request, otherwise other handlers will be tried until one returns a data form.
        @param submit_handler: method to call on registration request to submit the
            handler.
            In case of "unregister" request, None will be used instead of a
            data_form.Form.
        """
        self.handlers.add(RegistrationHandlers(form_handler, submit_handler))

    def _on_register_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None:
        defer.ensureDeferred(self._on_request(iq_elt, client, True))

    def _on_submit_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None:
        defer.ensureDeferred(self._on_request(iq_elt, client, False))

    async def _on_request(
        self, iq_elt: domish.Element, client: SatXMPPEntity, is_register: bool
    ) -> None:
        """Handle a register or submit request.

        @param iq_elt: The IQ element of the request.
        @param client: Client session.
        @param is_register: Whether this is a register request (True) or a submit request
            (False).
        """
        iq_elt.handled = True

        # Submit request must have a form with submitted values.
        if is_register:
            handler_type = "register"
            submit_form = None
        else:
            handler_type = "submit"
            remove_elt = next(iq_elt.query.elements(NS_IQ_REGISTER, "remove"), None)
            if remove_elt is not None:
                # This is a unregister request.
                submit_form = None
            else:
                submit_form = data_form.findForm(iq_elt.query, NS_IQ_REGISTER)
                if submit_form is None:
                    log.warning(f"Data form not found, invalid request: {iq_elt.toXml()}")
                    client.send(
                        jabber_error.StanzaError(
                            "bad-request", text="No data form found."
                        ).toResponse(iq_elt)
                    )
                    return

        # We look and run relevant handler.
        for handlers in self.handlers:
            if is_register:
                handler = handlers.form_handler
                handler_call = as_deferred(handler, client, iq_elt)
            else:
                handler = handlers.submit_handler
                handler_call = as_deferred(handler, client, iq_elt, submit_form)
            try:
                callback_ret = await handler_call
            except jabber_error.StanzaError as e:
                iq_error_elt = e.toResponse(iq_elt)
                client.send(iq_error_elt)
                return
            except exceptions.PasswordError as e:
                log.warning("Invalid login or password while registering to service.")
                iq_error_elt = jabber_error.StanzaError(
                    "forbidden", text=str(e)
                ).toResponse(iq_elt)
                client.send(iq_error_elt)
                return
            except Exception:
                log.exception(
                    f"Error while handling {handler_type} request with {handler}, "
                    "ignoring it."
                )
                continue
            if callback_ret is not None:
                if is_register:
                    try:
                        registered, registration_form = callback_ret
                        assert isinstance(registered, bool)
                        assert isinstance(registration_form, data_form.Form)
                    except (TypeError, ValueError, AssertionError) as e:
                        log.warning(
                            f"Invalid return value from {handler}, ignoring it: {e}"
                        )
                        continue
                    # We need to be sure to have the right namespace for the form.
                    registration_form.formNamespace = NS_IQ_REGISTER
                    iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result")
                    query_elt = iq_result_elt.addElement((NS_IQ_REGISTER, "query"))

                    if registered:
                        # The requestor is already registered, we indicate it.
                        query_elt.addElement("registered")

                    query_elt.addChild(registration_form.toElement())
                    client.send(iq_result_elt)
                else:
                    if callback_ret is True:
                        if submit_form is None:
                            log.info(f"User {iq_elt['from']} successfully unregistered.")
                        else:
                            log.info(f"User {iq_elt['from']} successfully registered.")
                    else:
                        log.error(
                            f"Unexpected return value from {handler}, was expecting "
                            '"True".'
                        )
                        client.send(
                            jabber_error.StanzaError(
                                "internal-server-error", text="Error in request handler."
                            ).toResponse(iq_elt)
                        )
                        return
                    iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result")
                    client.send(iq_result_elt)
                break
        else:
            log.warning(
                f"No handler found for in-band registration {handler_type} request: "
                f"{iq_elt.toXml()}."
            )
            iq_error_elt = jabber_error.StanzaError("service-unavailable").toResponse(
                iq_elt
            )
            client.send(iq_error_elt)
            return

    @staticmethod
    def build_register_iq(xmlstream, jid_, password, email=None):
        iq_elt = xmlstream_mod.IQ(xmlstream, "set")
        iq_elt["to"] = jid_.host
        query_elt = iq_elt.addElement(("jabber:iq:register", "query"))
        username_elt = query_elt.addElement("username")
        username_elt.addContent(jid_.user)
        password_elt = query_elt.addElement("password")
        password_elt.addContent(password)
        if email is not None:
            email_elt = query_elt.addElement("email")
            email_elt.addContent(email)
        return iq_elt

    def _in_band_register(self, to_jid_s, profile_key=C.PROF_KEY_NONE):
        client = self.host.get_client(profile_key)
        return defer.ensureDeferred(self.in_band_register(client, jid.JID(to_jid_s)))

    async def in_band_register(
        self,
        client: SatXMPPEntity,
        to_jid: jid.JID,
        post_treat_cb: Callable | None = None,
    ) -> xml_tools.XMLUI:
        """Register to a service.

        Send an IQ request to register with the given service and return the registration
        form as XML UI.

        @param client: client session.
        @param to_jid: The JID of the service to register to.
        @param post_treat_cb: A callback function to handle the registration result, if
            provided.
        @return: The registration form as XML UI.
        """
        # FIXME: this post_treat_cb arguments seems wrong, check it
        log.debug(_("Asking registration for {}").format(to_jid.full()))
        reg_request = client.IQ("get")
        reg_request["from"] = client.jid.full()
        reg_request["to"] = to_jid.full()
        reg_request.addElement("query", NS_IQ_REGISTER)
        try:
            iq_result_elt = await reg_request.send(to_jid.full())
        except Exception as e:
            log.warning(_("Registration failure: {}").format(e))
            raise e
        else:
            try:
                query_elt = next(iq_result_elt.elements(NS_IQ_REGISTER, "query"))
            except StopIteration:
                raise exceptions.DataError(
                    f"Can't find expected query element: {iq_result_elt.toXml()}"
                )

            try:
                x_elem = next(query_elt.elements(data_form.NS_X_DATA, "x"))
            except StopIteration:
                # XXX: it seems we have an old service which doesn't manage data forms
                log.warning("Can't find data form: {iq_result_elt.toXml()}")
                raise exceptions.DataError(
                    _(
                        "This gateway is outdated and can't be managed by Libervia, "
                        "sorry :("
                    )
                )

            form = data_form.Form.fromElement(x_elem)
            xml_ui = xml_tools.data_form_2_xmlui(form, "")
            d = xml_tools.deferred_ui(self.host, xml_ui)
            d.addCallback(
                self._on_xml_ui_cb,
                client,
                iq_result_elt["id"],
                iq_result_elt["from"],
                post_treat_cb,
            )
            d.addErrback(self._on_xml_ui_eb)
            d.addTimeout(600, cast(IReactorCore, reactor))
            return xml_ui

    @ensure_deferred
    async def _on_xml_ui_cb(
        self,
        data: dict,
        client: SatXMPPEntity,
        stanza_id: str,
        to_: str,
        post_treat_cb: Callable | None,
    ) -> None:
        """Handle the XML UI result of a registration form.

        Process the filled registration form (from frontend) and send it back to complete
        the registration process.

        @param data: The filled registration form as a XMLUI Form result.
        @param client: Client session.
        @param stanza_id: The ID of the IQ stanza of the registration process.
        @param to_: The JID of the service that sent the registration form.
        @param post_treat_cb: A callback function to handle the registration result, if
            provided.
        """
        form_elt = xml_tools.xmlui_result_to_elt(data, NS_IQ_REGISTER)

        iq_elt = client.IQ()
        iq_elt["id"] = stanza_id
        iq_elt["to"] = to_
        query_elt = iq_elt.addElement("query", NS_IQ_REGISTER)
        query_elt.addChild(form_elt)
        try:
            answer = await iq_elt.send()
            log.debug(_("registration answer: %s") % answer.toXml())
            if post_treat_cb is not None:
                post_treat_cb(jid.JID(answer["from"]), client.profile)
        except jabber_error.StanzaError as e:
            log.info(_("Registration failure: {}").format(e))
            if e.condition == "conflict":
                raise exceptions.ConflictError(
                    _("Username already exists, please choose another one.")
                )
            raise e

    def _on_xml_ui_eb(self, failure_: Failure) -> None:
        """Handle error during handling of registration form by frontend."""
        log.warning(f"Error while handling registration form to frontend: {failure_}")

    def _register_new_account(self, jid_, password, email, host, port):
        kwargs = {}
        if email:
            kwargs["email"] = email
        if host:
            kwargs["host"] = host
        if port:
            kwargs["port"] = port
        return self.register_new_account(jid.JID(jid_), password, **kwargs)

    def register_new_account(
        self,
        jid_: jid.JID,
        password: str,
        email: str | None = None,
        host: str | None = None,
        port: int = C.XMPP_C2S_PORT,
    ):
        """Register a new account on a XMPP server.

        @param jid_: request jid to register
        @param password: password of the account
        @param email: email of the account
        @param host: host of the server to register to
        @param port: port of the server to register to
        """
        if host is None:
            host = self.host.memory.config_get("", "xmpp_domain", "127.0.0.1")
        check_certificate = host != "127.0.0.1"
        authenticator = RegisteringAuthenticator(
            jid_, password, email, check_certificate=check_certificate
        )
        registered_d = authenticator.registered
        server_register = ServerRegister(authenticator)
        reactor.connectTCP(host, port, server_register)
        return registered_d

    def _change_password(self, new_password, profile_key):
        client = self.host.get_client(profile_key)
        return self.change_password(client, new_password)

    def change_password(self, client, new_password):
        iq_elt = self.build_register_iq(client.xmlstream, client.jid, new_password)
        d = iq_elt.send(client.jid.host)
        d.addCallback(
            lambda __: self.host.memory.param_set(
                "Password", new_password, "Connection", profile_key=client.profile
            )
        )
        return d

    def _unregister(self, to_jid_s, profile_key):
        client = self.host.get_client(profile_key)
        return self.unregister(client, jid.JID(to_jid_s))

    def unregister(self, client: SatXMPPEntity, to_jid: jid.JID) -> defer.Deferred:
        """Remove registration from a server/service.

        BEWARE! if you remove registration from profile own server, this will
        DELETE THE XMPP ACCOUNT WITHOUT WARNING
        @param to_jid: jid of the service or server
            None to delete client's account (DANGEROUS!)
        """
        iq_elt = client.IQ()
        if to_jid is not None:
            iq_elt["to"] = to_jid.full()
        query_elt = iq_elt.addElement((NS_IQ_REGISTER, "query"))
        query_elt.addElement("remove")
        d = iq_elt.send()
        if not to_jid or to_jid == jid.JID(client.jid.host):
            d.addCallback(lambda __: client.entity_disconnect())
        return d


@implementer(iwokkel.IDisco)
class XEP_0077_handler(xmlstream_mod.XMPPHandler):

    def __init__(self, plugin_parent: XEP_0077) -> None:
        self.plugin_parent = plugin_parent

    def connectionInitialized(self):
        client = cast(SatXMPPEntity, self.parent)
        if client.is_component:
            self.xmlstream.addObserver(
                IQ_REGISTER_REQUEST,
                self.plugin_parent._on_register_request,
                client=client,
            )
            self.xmlstream.addObserver(
                IQ_SUBMIT_REQUEST,
                self.plugin_parent._on_submit_request,
                client=client,
            )

    def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
        return [disco.DiscoFeature(NS_IQ_REGISTER)]

    def getDiscoItems(self, requestor, target, nodeIdentifier=""):
        return []