view libervia/backend/plugins/plugin_xep_0077.py @ 4295:7d98d894933c

frontends (tools/aio): Fix excessive use of CPU: Use a short delay instead of `loop.call_soonloop.call_soon` when work as been done, as this was resulting in an excessive use of CPU.
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 17:38:31 +0200
parents 0d7bb4df2343
children 7ded09452875
line wrap: on
line source

#!/usr/bin/env python3


# SAT plugin for managing xep-0077
# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from twisted.words.protocols.jabber import jid, xmlstream, client, error as jabber_error
from twisted.internet import defer, reactor, ssl
from wokkel import data_form
from libervia.backend.core.i18n import _
from libervia.backend.core.constants import Const as C
from libervia.backend.core import exceptions
from libervia.backend.core.log import getLogger
from libervia.backend.core.xmpp import SatXMPPEntity
from libervia.backend.tools import xml_tools

log = getLogger(__name__)

NS_REG = "jabber:iq:register"

PLUGIN_INFO = {
    C.PI_NAME: "XEP 0077 Plugin",
    C.PI_IMPORT_NAME: "XEP-0077",
    C.PI_TYPE: "XEP",
    C.PI_PROTOCOLS: ["XEP-0077"],
    C.PI_DEPENDENCIES: [],
    C.PI_MAIN: "XEP_0077",
    C.PI_DESCRIPTION: _("""Implementation of in-band registration"""),
}

# FIXME: this implementation is incomplete


class RegisteringAuthenticator(xmlstream.ConnectAuthenticator):
    # FIXME: request IQ is not send to check available fields,
    #        while XEP recommand to use it
    # FIXME: doesn't handle data form or oob
    namespace = "jabber:client"

    def __init__(self, jid_, password, email=None, check_certificate=True):
        log.debug(_("Registration asked for {jid}").format(jid=jid_))
        xmlstream.ConnectAuthenticator.__init__(self, jid_.host)
        self.jid = jid_
        self.password = password
        self.email = email
        self.check_certificate = check_certificate
        self.registered = defer.Deferred()

    def associateWithStream(self, xs):
        xmlstream.ConnectAuthenticator.associateWithStream(self, xs)
        xs.addObserver(xmlstream.STREAM_AUTHD_EVENT, self.register)

        xs.initializers = [client.CheckVersionInitializer(xs)]
        if self.check_certificate:
            tls_required, configurationForTLS = True, None
        else:
            tls_required = False
            configurationForTLS = ssl.CertificateOptions(trustRoot=None)
        tls_init = xmlstream.TLSInitiatingInitializer(
            xs, required=tls_required, configurationForTLS=configurationForTLS
        )

        xs.initializers.append(tls_init)

    def register(self, xmlstream):
        log.debug(
            _(
                "Stream started with {server}, now registering".format(
                    server=self.jid.host
                )
            )
        )
        iq = XEP_0077.build_register_iq(
            self.xmlstream, self.jid, self.password, self.email
        )
        d = iq.send(self.jid.host).addCallbacks(
            self.registration_cb, self.registration_eb
        )
        d.chainDeferred(self.registered)

    def registration_cb(self, answer):
        log.debug(_("Registration answer: {}").format(answer.toXml()))
        self.xmlstream.sendFooter()

    def registration_eb(self, failure_):
        log.info(_("Registration failure: {}").format(str(failure_.value)))
        self.xmlstream.sendFooter()
        raise failure_


class ServerRegister(xmlstream.XmlStreamFactory):

    def __init__(self, *args, **kwargs):
        xmlstream.XmlStreamFactory.__init__(self, *args, **kwargs)
        self.addBootstrap(xmlstream.STREAM_END_EVENT, self._disconnected)

    def clientConnectionLost(self, connector, reason):
        connector.disconnect()

    def _disconnected(self, reason):
        if not self.authenticator.registered.called:
            err = jabber_error.StreamError("Server unexpectedly closed the connection")
            try:
                if reason.value.args[0][0][2] == "certificate verify failed":
                    err = exceptions.InvalidCertificate()
            except (IndexError, TypeError):
                pass
            self.authenticator.registered.errback(err)


class XEP_0077(object):
    def __init__(self, host):
        log.info(_("Plugin XEP_0077 initialization"))
        self.host = host
        host.bridge.add_method(
            "in_band_register",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._in_band_register,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_account_new",
            ".plugin",
            in_sign="ssssi",
            out_sign="",
            method=self._register_new_account,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_unregister",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._unregister,
            async_=True,
        )
        host.bridge.add_method(
            "in_band_password_change",
            ".plugin",
            in_sign="ss",
            out_sign="",
            method=self._change_password,
            async_=True,
        )

    @staticmethod
    def build_register_iq(xmlstream_, jid_, password, email=None):
        iq_elt = xmlstream.IQ(xmlstream_, "set")
        iq_elt["to"] = jid_.host
        query_elt = iq_elt.addElement(("jabber:iq:register", "query"))
        username_elt = query_elt.addElement("username")
        username_elt.addContent(jid_.user)
        password_elt = query_elt.addElement("password")
        password_elt.addContent(password)
        if email is not None:
            email_elt = query_elt.addElement("email")
            email_elt.addContent(email)
        return iq_elt

    def _reg_cb(self, answer, client, post_treat_cb):
        """Called after the first get IQ"""
        try:
            query_elt = next(answer.elements(NS_REG, "query"))
        except StopIteration:
            raise exceptions.DataError("Can't find expected query element")

        try:
            x_elem = next(query_elt.elements(data_form.NS_X_DATA, "x"))
        except StopIteration:
            # XXX: it seems we have an old service which doesn't manage data forms
            log.warning(_("Can't find data form"))
            raise exceptions.DataError(
                _("This gateway can't be managed by SàT, sorry :(")
            )

        def submit_form(data, profile):
            form_elt = xml_tools.xmlui_result_to_elt(data)

            iq_elt = client.IQ()
            iq_elt["id"] = answer["id"]
            iq_elt["to"] = answer["from"]
            query_elt = iq_elt.addElement("query", NS_REG)
            query_elt.addChild(form_elt)
            d = iq_elt.send()
            d.addCallback(self._reg_success, client, post_treat_cb)
            d.addErrback(self._reg_failure, client)
            return d

        form = data_form.Form.fromElement(x_elem)
        submit_reg_id = self.host.register_callback(
            submit_form, with_data=True, one_shot=True
        )
        return xml_tools.data_form_2_xmlui(form, submit_reg_id)

    def _reg_eb(self, failure, client):
        """Called when something is wrong with registration"""
        log.info(_("Registration failure: %s") % str(failure.value))
        raise failure

    def _reg_success(self, answer, client, post_treat_cb):
        log.debug(_("registration answer: %s") % answer.toXml())
        if post_treat_cb is not None:
            post_treat_cb(jid.JID(answer["from"]), client.profile)
        return {}

    def _reg_failure(self, failure, client):
        log.info(_("Registration failure: %s") % str(failure.value))
        if failure.value.condition == "conflict":
            raise exceptions.ConflictError(
                _("Username already exists, please choose an other one")
            )
        raise failure

    def _in_band_register(self, to_jid_s, profile_key=C.PROF_KEY_NONE):
        return self.in_band_register, jid.JID(to_jid_s, profile_key)

    def in_band_register(self, to_jid, post_treat_cb=None, profile_key=C.PROF_KEY_NONE):
        """register to a service

        @param to_jid(jid.JID): jid of the service to register to
        """
        # FIXME: this post_treat_cb arguments seems wrong, check it
        client = self.host.get_client(profile_key)
        log.debug(_("Asking registration for {}").format(to_jid.full()))
        reg_request = client.IQ("get")
        reg_request["from"] = client.jid.full()
        reg_request["to"] = to_jid.full()
        reg_request.addElement("query", NS_REG)
        d = reg_request.send(to_jid.full()).addCallbacks(
            self._reg_cb,
            self._reg_eb,
            callbackArgs=[client, post_treat_cb],
            errbackArgs=[client],
        )
        return d

    def _register_new_account(self, jid_, password, email, host, port):
        kwargs = {}
        if email:
            kwargs["email"] = email
        if host:
            kwargs["host"] = host
        if port:
            kwargs["port"] = port
        return self.register_new_account(jid.JID(jid_), password, **kwargs)

    def register_new_account(
        self, jid_, password, email=None, host=None, port=C.XMPP_C2S_PORT
    ):
        """register a new account on a XMPP server

        @param jid_(jid.JID): request jid to register
        @param password(unicode): password of the account
        @param email(unicode): email of the account
        @param host(None, unicode): host of the server to register to
        @param port(int): port of the server to register to
        """
        if host is None:
            host = self.host.memory.config_get("", "xmpp_domain", "127.0.0.1")
        check_certificate = host != "127.0.0.1"
        authenticator = RegisteringAuthenticator(
            jid_, password, email, check_certificate=check_certificate
        )
        registered_d = authenticator.registered
        server_register = ServerRegister(authenticator)
        reactor.connectTCP(host, port, server_register)
        return registered_d

    def _change_password(self, new_password, profile_key):
        client = self.host.get_client(profile_key)
        return self.change_password(client, new_password)

    def change_password(self, client, new_password):
        iq_elt = self.build_register_iq(client.xmlstream, client.jid, new_password)
        d = iq_elt.send(client.jid.host)
        d.addCallback(
            lambda __: self.host.memory.param_set(
                "Password", new_password, "Connection", profile_key=client.profile
            )
        )
        return d

    def _unregister(self, to_jid_s, profile_key):
        client = self.host.get_client(profile_key)
        return self.unregister(client, jid.JID(to_jid_s))

    def unregister(self, client: SatXMPPEntity, to_jid: jid.JID) -> defer.Deferred:
        """remove registration from a server/service

        BEWARE! if you remove registration from profile own server, this will
        DELETE THE XMPP ACCOUNT WITHOUT WARNING
        @param to_jid: jid of the service or server
            None to delete client's account (DANGEROUS!)
        """
        iq_elt = client.IQ()
        if to_jid is not None:
            iq_elt["to"] = to_jid.full()
        query_elt = iq_elt.addElement((NS_REG, "query"))
        query_elt.addElement("remove")
        d = iq_elt.send()
        if not to_jid or to_jid == jid.JID(client.jid.host):
            d.addCallback(lambda __: client.entity_disconnect())
        return d