Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0077.py @ 4351:6a0a081485b8
plugin autocrypt: Autocrypt protocol implementation:
Implementation of autocrypt: `autocrypt` header is checked, and if present and no public
key is known for the peer, the key is imported.
`autocrypt` header is also added to outgoing message (only if an email gateway is
detected).
For the moment, the JID is use as identifier, but the real email used by gateway should be
used in the future.
rel 456
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 28 Feb 2025 09:23:35 +0100 |
parents | 7ded09452875 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for managing xep-0077 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Awaitable, Callable, NamedTuple, cast from twisted.internet import defer, reactor, ssl from twisted.internet.interfaces import IReactorCore from twisted.python.failure import Failure from twisted.words.protocols.jabber import ( client, error as jabber_error, jid, xmlstream as xmlstream_mod, ) from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.xmpp import SatXMPPEntity from libervia.backend.tools import xml_tools from libervia.backend.tools.utils import as_deferred, ensure_deferred log = getLogger(__name__) NS_IQ_REGISTER = "jabber:iq:register" IQ_REGISTER_REQUEST = f'{C.IQ_GET}/query[@xmlns="{NS_IQ_REGISTER}"]' IQ_SUBMIT_REQUEST = f'{C.IQ_SET}/query[@xmlns="{NS_IQ_REGISTER}"]' PLUGIN_INFO = { C.PI_NAME: "In-Band Registration", C.PI_IMPORT_NAME: "XEP-0077", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0077"], C.PI_DEPENDENCIES: [], C.PI_MAIN: "XEP_0077", C.PI_DESCRIPTION: _("""Implementation of in-band registration."""), C.PI_HANDLER: C.BOOL_TRUE, } # FIXME: this implementation is incomplete class RegistrationHandlers(NamedTuple): form_handler: Callable submit_handler: Callable class RegisteringAuthenticator(xmlstream_mod.ConnectAuthenticator): # FIXME: request IQ is not send to check available fields, # while XEP recommand to use it # FIXME: doesn't handle data form or oob namespace = "jabber:client" def __init__(self, jid_, password, email=None, check_certificate=True): log.debug(_("Registration asked for {jid}").format(jid=jid_)) xmlstream_mod.ConnectAuthenticator.__init__(self, jid_.host) self.jid = jid_ self.password = password self.email = email self.check_certificate = check_certificate self.registered = defer.Deferred() def associateWithStream(self, xmlstream): xmlstream_mod.ConnectAuthenticator.associateWithStream(self, xmlstream) xmlstream.addObserver(xmlstream_mod.STREAM_AUTHD_EVENT, self.register) xmlstream.initializers = [client.CheckVersionInitializer(xmlstream)] if self.check_certificate: tls_required, configurationForTLS = True, None else: tls_required = False configurationForTLS = ssl.CertificateOptions(trustRoot=None) tls_init = xmlstream_mod.TLSInitiatingInitializer( xmlstream, required=tls_required, configurationForTLS=configurationForTLS ) xmlstream.initializers.append(tls_init) def register(self, xmlstream): log.debug( _( "Stream started with {server}, now registering".format( server=self.jid.host ) ) ) iq = XEP_0077.build_register_iq( self.xmlstream, self.jid, self.password, self.email ) d = iq.send(self.jid.host).addCallbacks( self.registration_cb, self.registration_eb ) d.chainDeferred(self.registered) def registration_cb(self, answer): log.debug(_("Registration answer: {}").format(answer.toXml())) assert self.xmlstream is not None self.xmlstream.sendFooter() def registration_eb(self, failure_): log.info(_("Registration failure: {}").format(str(failure_.value))) assert self.xmlstream is not None self.xmlstream.sendFooter() raise failure_ class ServerRegister(xmlstream_mod.XmlStreamFactory): def __init__(self, *args, **kwargs): xmlstream_mod.XmlStreamFactory.__init__(self, *args, **kwargs) self.addBootstrap(xmlstream_mod.STREAM_END_EVENT, self._disconnected) def clientConnectionLost(self, connector, unused_reason): connector.disconnect() def _disconnected(self, reason): if not self.authenticator.registered.called: err = jabber_error.StreamError("Server unexpectedly closed the connection") try: if reason.value.args[0][0][2] == "certificate verify failed": err = exceptions.InvalidCertificate() except (IndexError, TypeError): pass self.authenticator.registered.errback(err) class XEP_0077: def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host host.register_namespace("iq-register", NS_IQ_REGISTER) host.bridge.add_method( "in_band_register", ".plugin", in_sign="ss", out_sign="", method=self._in_band_register, async_=True, ) host.bridge.add_method( "in_band_account_new", ".plugin", in_sign="ssssi", out_sign="", method=self._register_new_account, async_=True, ) host.bridge.add_method( "in_band_unregister", ".plugin", in_sign="ss", out_sign="", method=self._unregister, async_=True, ) host.bridge.add_method( "in_band_password_change", ".plugin", in_sign="ss", out_sign="", method=self._change_password, async_=True, ) self.handlers = set() def get_handler(self, client: SatXMPPEntity) -> xmlstream_mod.XMPPHandler: return XEP_0077_handler(self) def register_handler( self, form_handler: Callable[ [SatXMPPEntity, domish.Element], Awaitable[tuple[bool, data_form.Form] | None] | defer.Deferred[tuple[bool, data_form.Form] | None] | tuple[bool, data_form.Form] | None, ], submit_handler: Callable[ [SatXMPPEntity, domish.Element, data_form.Form | None], Awaitable[bool | None] | defer.Deferred[bool | None] | bool | None, ], ) -> None: """ Register a new handler. Mostly useful for component which handle In-Band Registration. @param form_handler: method to call on registration request to get a data form. May be async. The handler must return a data_form.Form instance if it can handle the request, otherwise other handlers will be tried until one returns a data form. @param submit_handler: method to call on registration request to submit the handler. In case of "unregister" request, None will be used instead of a data_form.Form. """ self.handlers.add(RegistrationHandlers(form_handler, submit_handler)) def _on_register_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: defer.ensureDeferred(self._on_request(iq_elt, client, True)) def _on_submit_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: defer.ensureDeferred(self._on_request(iq_elt, client, False)) async def _on_request( self, iq_elt: domish.Element, client: SatXMPPEntity, is_register: bool ) -> None: """Handle a register or submit request. @param iq_elt: The IQ element of the request. @param client: Client session. @param is_register: Whether this is a register request (True) or a submit request (False). """ iq_elt.handled = True # Submit request must have a form with submitted values. if is_register: handler_type = "register" submit_form = None else: handler_type = "submit" remove_elt = next(iq_elt.query.elements(NS_IQ_REGISTER, "remove"), None) if remove_elt is not None: # This is a unregister request. submit_form = None else: submit_form = data_form.findForm(iq_elt.query, NS_IQ_REGISTER) if submit_form is None: log.warning(f"Data form not found, invalid request: {iq_elt.toXml()}") client.send( jabber_error.StanzaError( "bad-request", text="No data form found." ).toResponse(iq_elt) ) return # We look and run relevant handler. for handlers in self.handlers: if is_register: handler = handlers.form_handler handler_call = as_deferred(handler, client, iq_elt) else: handler = handlers.submit_handler handler_call = as_deferred(handler, client, iq_elt, submit_form) try: callback_ret = await handler_call except jabber_error.StanzaError as e: iq_error_elt = e.toResponse(iq_elt) client.send(iq_error_elt) return except exceptions.PasswordError as e: log.warning("Invalid login or password while registering to service.") iq_error_elt = jabber_error.StanzaError( "forbidden", text=str(e) ).toResponse(iq_elt) client.send(iq_error_elt) return except Exception: log.exception( f"Error while handling {handler_type} request with {handler}, " "ignoring it." ) continue if callback_ret is not None: if is_register: try: registered, registration_form = callback_ret assert isinstance(registered, bool) assert isinstance(registration_form, data_form.Form) except (TypeError, ValueError, AssertionError) as e: log.warning( f"Invalid return value from {handler}, ignoring it: {e}" ) continue # We need to be sure to have the right namespace for the form. registration_form.formNamespace = NS_IQ_REGISTER iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result") query_elt = iq_result_elt.addElement((NS_IQ_REGISTER, "query")) if registered: # The requestor is already registered, we indicate it. query_elt.addElement("registered") query_elt.addChild(registration_form.toElement()) client.send(iq_result_elt) else: if callback_ret is True: if submit_form is None: log.info(f"User {iq_elt['from']} successfully unregistered.") else: log.info(f"User {iq_elt['from']} successfully registered.") else: log.error( f"Unexpected return value from {handler}, was expecting " '"True".' ) client.send( jabber_error.StanzaError( "internal-server-error", text="Error in request handler." ).toResponse(iq_elt) ) return iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result") client.send(iq_result_elt) break else: log.warning( f"No handler found for in-band registration {handler_type} request: " f"{iq_elt.toXml()}." ) iq_error_elt = jabber_error.StanzaError("service-unavailable").toResponse( iq_elt ) client.send(iq_error_elt) return @staticmethod def build_register_iq(xmlstream, jid_, password, email=None): iq_elt = xmlstream_mod.IQ(xmlstream, "set") iq_elt["to"] = jid_.host query_elt = iq_elt.addElement(("jabber:iq:register", "query")) username_elt = query_elt.addElement("username") username_elt.addContent(jid_.user) password_elt = query_elt.addElement("password") password_elt.addContent(password) if email is not None: email_elt = query_elt.addElement("email") email_elt.addContent(email) return iq_elt def _in_band_register(self, to_jid_s, profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) return defer.ensureDeferred(self.in_band_register(client, jid.JID(to_jid_s))) async def in_band_register( self, client: SatXMPPEntity, to_jid: jid.JID, post_treat_cb: Callable | None = None, ) -> xml_tools.XMLUI: """Register to a service. Send an IQ request to register with the given service and return the registration form as XML UI. @param client: client session. @param to_jid: The JID of the service to register to. @param post_treat_cb: A callback function to handle the registration result, if provided. @return: The registration form as XML UI. """ # FIXME: this post_treat_cb arguments seems wrong, check it log.debug(_("Asking registration for {}").format(to_jid.full())) reg_request = client.IQ("get") reg_request["from"] = client.jid.full() reg_request["to"] = to_jid.full() reg_request.addElement("query", NS_IQ_REGISTER) try: iq_result_elt = await reg_request.send(to_jid.full()) except Exception as e: log.warning(_("Registration failure: {}").format(e)) raise e else: try: query_elt = next(iq_result_elt.elements(NS_IQ_REGISTER, "query")) except StopIteration: raise exceptions.DataError( f"Can't find expected query element: {iq_result_elt.toXml()}" ) try: x_elem = next(query_elt.elements(data_form.NS_X_DATA, "x")) except StopIteration: # XXX: it seems we have an old service which doesn't manage data forms log.warning("Can't find data form: {iq_result_elt.toXml()}") raise exceptions.DataError( _( "This gateway is outdated and can't be managed by Libervia, " "sorry :(" ) ) form = data_form.Form.fromElement(x_elem) xml_ui = xml_tools.data_form_2_xmlui(form, "") d = xml_tools.deferred_ui(self.host, xml_ui) d.addCallback( self._on_xml_ui_cb, client, iq_result_elt["id"], iq_result_elt["from"], post_treat_cb, ) d.addErrback(self._on_xml_ui_eb) d.addTimeout(600, cast(IReactorCore, reactor)) return xml_ui @ensure_deferred async def _on_xml_ui_cb( self, data: dict, client: SatXMPPEntity, stanza_id: str, to_: str, post_treat_cb: Callable | None, ) -> None: """Handle the XML UI result of a registration form. Process the filled registration form (from frontend) and send it back to complete the registration process. @param data: The filled registration form as a XMLUI Form result. @param client: Client session. @param stanza_id: The ID of the IQ stanza of the registration process. @param to_: The JID of the service that sent the registration form. @param post_treat_cb: A callback function to handle the registration result, if provided. """ form_elt = xml_tools.xmlui_result_to_elt(data, NS_IQ_REGISTER) iq_elt = client.IQ() iq_elt["id"] = stanza_id iq_elt["to"] = to_ query_elt = iq_elt.addElement("query", NS_IQ_REGISTER) query_elt.addChild(form_elt) try: answer = await iq_elt.send() log.debug(_("registration answer: %s") % answer.toXml()) if post_treat_cb is not None: post_treat_cb(jid.JID(answer["from"]), client.profile) except jabber_error.StanzaError as e: log.info(_("Registration failure: {}").format(e)) if e.condition == "conflict": raise exceptions.ConflictError( _("Username already exists, please choose another one.") ) raise e def _on_xml_ui_eb(self, failure_: Failure) -> None: """Handle error during handling of registration form by frontend.""" log.warning(f"Error while handling registration form to frontend: {failure_}") def _register_new_account(self, jid_, password, email, host, port): kwargs = {} if email: kwargs["email"] = email if host: kwargs["host"] = host if port: kwargs["port"] = port return self.register_new_account(jid.JID(jid_), password, **kwargs) def register_new_account( self, jid_: jid.JID, password: str, email: str | None = None, host: str | None = None, port: int = C.XMPP_C2S_PORT, ): """Register a new account on a XMPP server. @param jid_: request jid to register @param password: password of the account @param email: email of the account @param host: host of the server to register to @param port: port of the server to register to """ if host is None: host = self.host.memory.config_get("", "xmpp_domain", "127.0.0.1") check_certificate = host != "127.0.0.1" authenticator = RegisteringAuthenticator( jid_, password, email, check_certificate=check_certificate ) registered_d = authenticator.registered server_register = ServerRegister(authenticator) reactor.connectTCP(host, port, server_register) return registered_d def _change_password(self, new_password, profile_key): client = self.host.get_client(profile_key) return self.change_password(client, new_password) def change_password(self, client, new_password): iq_elt = self.build_register_iq(client.xmlstream, client.jid, new_password) d = iq_elt.send(client.jid.host) d.addCallback( lambda __: self.host.memory.param_set( "Password", new_password, "Connection", profile_key=client.profile ) ) return d def _unregister(self, to_jid_s, profile_key): client = self.host.get_client(profile_key) return self.unregister(client, jid.JID(to_jid_s)) def unregister(self, client: SatXMPPEntity, to_jid: jid.JID) -> defer.Deferred: """Remove registration from a server/service. BEWARE! if you remove registration from profile own server, this will DELETE THE XMPP ACCOUNT WITHOUT WARNING @param to_jid: jid of the service or server None to delete client's account (DANGEROUS!) """ iq_elt = client.IQ() if to_jid is not None: iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement((NS_IQ_REGISTER, "query")) query_elt.addElement("remove") d = iq_elt.send() if not to_jid or to_jid == jid.JID(client.jid.host): d.addCallback(lambda __: client.entity_disconnect()) return d @implementer(iwokkel.IDisco) class XEP_0077_handler(xmlstream_mod.XMPPHandler): def __init__(self, plugin_parent: XEP_0077) -> None: self.plugin_parent = plugin_parent def connectionInitialized(self): client = cast(SatXMPPEntity, self.parent) if client.is_component: self.xmlstream.addObserver( IQ_REGISTER_REQUEST, self.plugin_parent._on_register_request, client=client, ) self.xmlstream.addObserver( IQ_SUBMIT_REQUEST, self.plugin_parent._on_submit_request, client=client, ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_IQ_REGISTER)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []