Mercurial > libervia-backend
view libervia/backend/plugins/plugin_xep_0077.py @ 4336:6e0918e638ee
plugin XEP-0498: "Pubsub File Sharing" implementation:
Partial implementation of XEP-0498, necessary to implement the service part in email
gateway.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | 7ded09452875 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # SAT plugin for managing xep-0077 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from typing import Awaitable, Callable, NamedTuple, cast from twisted.internet import defer, reactor, ssl from twisted.internet.interfaces import IReactorCore from twisted.python.failure import Failure from twisted.words.protocols.jabber import ( client, error as jabber_error, jid, xmlstream as xmlstream_mod, ) from twisted.words.xish import domish from wokkel import data_form, disco, iwokkel from zope.interface import implementer from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.core.xmpp import SatXMPPEntity from libervia.backend.tools import xml_tools from libervia.backend.tools.utils import as_deferred, ensure_deferred log = getLogger(__name__) NS_IQ_REGISTER = "jabber:iq:register" IQ_REGISTER_REQUEST = f'{C.IQ_GET}/query[@xmlns="{NS_IQ_REGISTER}"]' IQ_SUBMIT_REQUEST = f'{C.IQ_SET}/query[@xmlns="{NS_IQ_REGISTER}"]' PLUGIN_INFO = { C.PI_NAME: "In-Band Registration", C.PI_IMPORT_NAME: "XEP-0077", C.PI_TYPE: "XEP", C.PI_MODES: C.PLUG_MODE_BOTH, C.PI_PROTOCOLS: ["XEP-0077"], C.PI_DEPENDENCIES: [], C.PI_MAIN: "XEP_0077", C.PI_DESCRIPTION: _("""Implementation of in-band registration."""), C.PI_HANDLER: C.BOOL_TRUE, } # FIXME: this implementation is incomplete class RegistrationHandlers(NamedTuple): form_handler: Callable submit_handler: Callable class RegisteringAuthenticator(xmlstream_mod.ConnectAuthenticator): # FIXME: request IQ is not send to check available fields, # while XEP recommand to use it # FIXME: doesn't handle data form or oob namespace = "jabber:client" def __init__(self, jid_, password, email=None, check_certificate=True): log.debug(_("Registration asked for {jid}").format(jid=jid_)) xmlstream_mod.ConnectAuthenticator.__init__(self, jid_.host) self.jid = jid_ self.password = password self.email = email self.check_certificate = check_certificate self.registered = defer.Deferred() def associateWithStream(self, xmlstream): xmlstream_mod.ConnectAuthenticator.associateWithStream(self, xmlstream) xmlstream.addObserver(xmlstream_mod.STREAM_AUTHD_EVENT, self.register) xmlstream.initializers = [client.CheckVersionInitializer(xmlstream)] if self.check_certificate: tls_required, configurationForTLS = True, None else: tls_required = False configurationForTLS = ssl.CertificateOptions(trustRoot=None) tls_init = xmlstream_mod.TLSInitiatingInitializer( xmlstream, required=tls_required, configurationForTLS=configurationForTLS ) xmlstream.initializers.append(tls_init) def register(self, xmlstream): log.debug( _( "Stream started with {server}, now registering".format( server=self.jid.host ) ) ) iq = XEP_0077.build_register_iq( self.xmlstream, self.jid, self.password, self.email ) d = iq.send(self.jid.host).addCallbacks( self.registration_cb, self.registration_eb ) d.chainDeferred(self.registered) def registration_cb(self, answer): log.debug(_("Registration answer: {}").format(answer.toXml())) assert self.xmlstream is not None self.xmlstream.sendFooter() def registration_eb(self, failure_): log.info(_("Registration failure: {}").format(str(failure_.value))) assert self.xmlstream is not None self.xmlstream.sendFooter() raise failure_ class ServerRegister(xmlstream_mod.XmlStreamFactory): def __init__(self, *args, **kwargs): xmlstream_mod.XmlStreamFactory.__init__(self, *args, **kwargs) self.addBootstrap(xmlstream_mod.STREAM_END_EVENT, self._disconnected) def clientConnectionLost(self, connector, unused_reason): connector.disconnect() def _disconnected(self, reason): if not self.authenticator.registered.called: err = jabber_error.StreamError("Server unexpectedly closed the connection") try: if reason.value.args[0][0][2] == "certificate verify failed": err = exceptions.InvalidCertificate() except (IndexError, TypeError): pass self.authenticator.registered.errback(err) class XEP_0077: def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host host.register_namespace("iq-register", NS_IQ_REGISTER) host.bridge.add_method( "in_band_register", ".plugin", in_sign="ss", out_sign="", method=self._in_band_register, async_=True, ) host.bridge.add_method( "in_band_account_new", ".plugin", in_sign="ssssi", out_sign="", method=self._register_new_account, async_=True, ) host.bridge.add_method( "in_band_unregister", ".plugin", in_sign="ss", out_sign="", method=self._unregister, async_=True, ) host.bridge.add_method( "in_band_password_change", ".plugin", in_sign="ss", out_sign="", method=self._change_password, async_=True, ) self.handlers = set() def get_handler(self, client: SatXMPPEntity) -> xmlstream_mod.XMPPHandler: return XEP_0077_handler(self) def register_handler( self, form_handler: Callable[ [SatXMPPEntity, domish.Element], Awaitable[tuple[bool, data_form.Form] | None] | defer.Deferred[tuple[bool, data_form.Form] | None] | tuple[bool, data_form.Form] | None, ], submit_handler: Callable[ [SatXMPPEntity, domish.Element, data_form.Form | None], Awaitable[bool | None] | defer.Deferred[bool | None] | bool | None, ], ) -> None: """ Register a new handler. Mostly useful for component which handle In-Band Registration. @param form_handler: method to call on registration request to get a data form. May be async. The handler must return a data_form.Form instance if it can handle the request, otherwise other handlers will be tried until one returns a data form. @param submit_handler: method to call on registration request to submit the handler. In case of "unregister" request, None will be used instead of a data_form.Form. """ self.handlers.add(RegistrationHandlers(form_handler, submit_handler)) def _on_register_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: defer.ensureDeferred(self._on_request(iq_elt, client, True)) def _on_submit_request(self, iq_elt: domish.Element, client: SatXMPPEntity) -> None: defer.ensureDeferred(self._on_request(iq_elt, client, False)) async def _on_request( self, iq_elt: domish.Element, client: SatXMPPEntity, is_register: bool ) -> None: """Handle a register or submit request. @param iq_elt: The IQ element of the request. @param client: Client session. @param is_register: Whether this is a register request (True) or a submit request (False). """ iq_elt.handled = True # Submit request must have a form with submitted values. if is_register: handler_type = "register" submit_form = None else: handler_type = "submit" remove_elt = next(iq_elt.query.elements(NS_IQ_REGISTER, "remove"), None) if remove_elt is not None: # This is a unregister request. submit_form = None else: submit_form = data_form.findForm(iq_elt.query, NS_IQ_REGISTER) if submit_form is None: log.warning(f"Data form not found, invalid request: {iq_elt.toXml()}") client.send( jabber_error.StanzaError( "bad-request", text="No data form found." ).toResponse(iq_elt) ) return # We look and run relevant handler. for handlers in self.handlers: if is_register: handler = handlers.form_handler handler_call = as_deferred(handler, client, iq_elt) else: handler = handlers.submit_handler handler_call = as_deferred(handler, client, iq_elt, submit_form) try: callback_ret = await handler_call except jabber_error.StanzaError as e: iq_error_elt = e.toResponse(iq_elt) client.send(iq_error_elt) return except exceptions.PasswordError as e: log.warning("Invalid login or password while registering to service.") iq_error_elt = jabber_error.StanzaError( "forbidden", text=str(e) ).toResponse(iq_elt) client.send(iq_error_elt) return except Exception: log.exception( f"Error while handling {handler_type} request with {handler}, " "ignoring it." ) continue if callback_ret is not None: if is_register: try: registered, registration_form = callback_ret assert isinstance(registered, bool) assert isinstance(registration_form, data_form.Form) except (TypeError, ValueError, AssertionError) as e: log.warning( f"Invalid return value from {handler}, ignoring it: {e}" ) continue # We need to be sure to have the right namespace for the form. registration_form.formNamespace = NS_IQ_REGISTER iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result") query_elt = iq_result_elt.addElement((NS_IQ_REGISTER, "query")) if registered: # The requestor is already registered, we indicate it. query_elt.addElement("registered") query_elt.addChild(registration_form.toElement()) client.send(iq_result_elt) else: if callback_ret is True: if submit_form is None: log.info(f"User {iq_elt['from']} successfully unregistered.") else: log.info(f"User {iq_elt['from']} successfully registered.") else: log.error( f"Unexpected return value from {handler}, was expecting " '"True".' ) client.send( jabber_error.StanzaError( "internal-server-error", text="Error in request handler." ).toResponse(iq_elt) ) return iq_result_elt = xmlstream_mod.toResponse(iq_elt, "result") client.send(iq_result_elt) break else: log.warning( f"No handler found for in-band registration {handler_type} request: " f"{iq_elt.toXml()}." ) iq_error_elt = jabber_error.StanzaError("service-unavailable").toResponse( iq_elt ) client.send(iq_error_elt) return @staticmethod def build_register_iq(xmlstream, jid_, password, email=None): iq_elt = xmlstream_mod.IQ(xmlstream, "set") iq_elt["to"] = jid_.host query_elt = iq_elt.addElement(("jabber:iq:register", "query")) username_elt = query_elt.addElement("username") username_elt.addContent(jid_.user) password_elt = query_elt.addElement("password") password_elt.addContent(password) if email is not None: email_elt = query_elt.addElement("email") email_elt.addContent(email) return iq_elt def _in_band_register(self, to_jid_s, profile_key=C.PROF_KEY_NONE): client = self.host.get_client(profile_key) return defer.ensureDeferred(self.in_band_register(client, jid.JID(to_jid_s))) async def in_band_register( self, client: SatXMPPEntity, to_jid: jid.JID, post_treat_cb: Callable | None = None, ) -> xml_tools.XMLUI: """Register to a service. Send an IQ request to register with the given service and return the registration form as XML UI. @param client: client session. @param to_jid: The JID of the service to register to. @param post_treat_cb: A callback function to handle the registration result, if provided. @return: The registration form as XML UI. """ # FIXME: this post_treat_cb arguments seems wrong, check it log.debug(_("Asking registration for {}").format(to_jid.full())) reg_request = client.IQ("get") reg_request["from"] = client.jid.full() reg_request["to"] = to_jid.full() reg_request.addElement("query", NS_IQ_REGISTER) try: iq_result_elt = await reg_request.send(to_jid.full()) except Exception as e: log.warning(_("Registration failure: {}").format(e)) raise e else: try: query_elt = next(iq_result_elt.elements(NS_IQ_REGISTER, "query")) except StopIteration: raise exceptions.DataError( f"Can't find expected query element: {iq_result_elt.toXml()}" ) try: x_elem = next(query_elt.elements(data_form.NS_X_DATA, "x")) except StopIteration: # XXX: it seems we have an old service which doesn't manage data forms log.warning("Can't find data form: {iq_result_elt.toXml()}") raise exceptions.DataError( _( "This gateway is outdated and can't be managed by Libervia, " "sorry :(" ) ) form = data_form.Form.fromElement(x_elem) xml_ui = xml_tools.data_form_2_xmlui(form, "") d = xml_tools.deferred_ui(self.host, xml_ui) d.addCallback( self._on_xml_ui_cb, client, iq_result_elt["id"], iq_result_elt["from"], post_treat_cb, ) d.addErrback(self._on_xml_ui_eb) d.addTimeout(600, cast(IReactorCore, reactor)) return xml_ui @ensure_deferred async def _on_xml_ui_cb( self, data: dict, client: SatXMPPEntity, stanza_id: str, to_: str, post_treat_cb: Callable | None, ) -> None: """Handle the XML UI result of a registration form. Process the filled registration form (from frontend) and send it back to complete the registration process. @param data: The filled registration form as a XMLUI Form result. @param client: Client session. @param stanza_id: The ID of the IQ stanza of the registration process. @param to_: The JID of the service that sent the registration form. @param post_treat_cb: A callback function to handle the registration result, if provided. """ form_elt = xml_tools.xmlui_result_to_elt(data, NS_IQ_REGISTER) iq_elt = client.IQ() iq_elt["id"] = stanza_id iq_elt["to"] = to_ query_elt = iq_elt.addElement("query", NS_IQ_REGISTER) query_elt.addChild(form_elt) try: answer = await iq_elt.send() log.debug(_("registration answer: %s") % answer.toXml()) if post_treat_cb is not None: post_treat_cb(jid.JID(answer["from"]), client.profile) except jabber_error.StanzaError as e: log.info(_("Registration failure: {}").format(e)) if e.condition == "conflict": raise exceptions.ConflictError( _("Username already exists, please choose another one.") ) raise e def _on_xml_ui_eb(self, failure_: Failure) -> None: """Handle error during handling of registration form by frontend.""" log.warning(f"Error while handling registration form to frontend: {failure_}") def _register_new_account(self, jid_, password, email, host, port): kwargs = {} if email: kwargs["email"] = email if host: kwargs["host"] = host if port: kwargs["port"] = port return self.register_new_account(jid.JID(jid_), password, **kwargs) def register_new_account( self, jid_: jid.JID, password: str, email: str | None = None, host: str | None = None, port: int = C.XMPP_C2S_PORT, ): """Register a new account on a XMPP server. @param jid_: request jid to register @param password: password of the account @param email: email of the account @param host: host of the server to register to @param port: port of the server to register to """ if host is None: host = self.host.memory.config_get("", "xmpp_domain", "127.0.0.1") check_certificate = host != "127.0.0.1" authenticator = RegisteringAuthenticator( jid_, password, email, check_certificate=check_certificate ) registered_d = authenticator.registered server_register = ServerRegister(authenticator) reactor.connectTCP(host, port, server_register) return registered_d def _change_password(self, new_password, profile_key): client = self.host.get_client(profile_key) return self.change_password(client, new_password) def change_password(self, client, new_password): iq_elt = self.build_register_iq(client.xmlstream, client.jid, new_password) d = iq_elt.send(client.jid.host) d.addCallback( lambda __: self.host.memory.param_set( "Password", new_password, "Connection", profile_key=client.profile ) ) return d def _unregister(self, to_jid_s, profile_key): client = self.host.get_client(profile_key) return self.unregister(client, jid.JID(to_jid_s)) def unregister(self, client: SatXMPPEntity, to_jid: jid.JID) -> defer.Deferred: """Remove registration from a server/service. BEWARE! if you remove registration from profile own server, this will DELETE THE XMPP ACCOUNT WITHOUT WARNING @param to_jid: jid of the service or server None to delete client's account (DANGEROUS!) """ iq_elt = client.IQ() if to_jid is not None: iq_elt["to"] = to_jid.full() query_elt = iq_elt.addElement((NS_IQ_REGISTER, "query")) query_elt.addElement("remove") d = iq_elt.send() if not to_jid or to_jid == jid.JID(client.jid.host): d.addCallback(lambda __: client.entity_disconnect()) return d @implementer(iwokkel.IDisco) class XEP_0077_handler(xmlstream_mod.XMPPHandler): def __init__(self, plugin_parent: XEP_0077) -> None: self.plugin_parent = plugin_parent def connectionInitialized(self): client = cast(SatXMPPEntity, self.parent) if client.is_component: self.xmlstream.addObserver( IQ_REGISTER_REQUEST, self.plugin_parent._on_register_request, client=client, ) self.xmlstream.addObserver( IQ_SUBMIT_REQUEST, self.plugin_parent._on_submit_request, client=client, ) def getDiscoInfo(self, requestor, target, nodeIdentifier=""): return [disco.DiscoFeature(NS_IQ_REGISTER)] def getDiscoItems(self, requestor, target, nodeIdentifier=""): return []