Mercurial > libervia-backend
view libervia/backend/plugins/plugin_adhoc_registration.py @ 4306:94e0968987cd
plugin XEP-0033: code modernisation, improve delivery, data validation:
- Code has been rewritten using Pydantic models and `async` coroutines for data validation
and cleaner element parsing/generation.
- Delivery has been completely rewritten. It now works even if server doesn't support
multicast, and send to local multicast service first. Delivering to local multicast
service first is due to bad support of XEP-0033 in server (notably Prosody which has an
incomplete implementation), and the current impossibility to detect if a sub-domain
service handles fully multicast or only for local domains. This is a workaround to have
a good balance between backward compatilibity and use of bandwith, and to make it work
with the incoming email gateway implementation (the gateway will only deliver to
entities of its own domain).
- disco feature checking now uses `async` corountines. `host` implementation still use
Deferred return values for compatibility with legacy code.
rel 450
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 26 Sep 2024 16:12:01 +0200 |
parents | c6d85c31a59f |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to handle web frontend registration links with Ad-Hoc Commands # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time from uuid import uuid4 from twisted.internet import defer from twisted.words.xish import domish from wokkel import data_form from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.tools.common import data_format, date_utils log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Ad-Hoc Commands - Registration", C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION", C.PI_TYPE: "Misc", C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0050"], C.PI_MAIN: "AdHocRegistration", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""), } class AdHocRegistration: def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._c = host.plugins["XEP-0050"] self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict( "registration_links" ) host.bridge.add_method( "registration_link_get", ".plugin", in_sign="s", out_sign="s", method=self._get, async_=True, ) def _get(self, registration_link_id: str) -> defer.Deferred[str]: d = defer.ensureDeferred(self.get(registration_link_id)) d.addCallback(data_format.serialise) return d async def get(self, registration_link_id: str) -> dict: """Retrieve registration link from its ID @param registration_link_id: registration link data @return: registration link data @raise exceptions.NotFound: not registration link found with this ID. """ link_data = await self.ad_hoc_registration_data.get(registration_link_id) if not link_data: raise exceptions.NotFound expiration_timestamp = link_data.get("expiration_timestamp") if expiration_timestamp is not None and expiration_timestamp < time.time(): log.info(f"Deleting expiration link {registration_link_id}.") await self.ad_hoc_registration_data.adel(registration_link_id) raise exceptions.NotFound return link_data async def profile_connected(self, client): if client.is_admin: self._c.add_ad_hoc_command( client, self.create_registration_link, D_("Create Registration Link"), node="https://libervia.org/registration/create", ) self._c.add_ad_hoc_command( client, self.list_registration_links, D_("List Registration Links"), node="https://libervia.org/registration/list", ) self._c.add_ad_hoc_command( client, self.delete_registration_link, D_("Delete Registration Link"), node="https://libervia.org/registration/delete", ) async def create_registration_link( self, client: SatXMPPEntity, command_elt: domish.Element, session_data: dict, action: str, node: str, ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: """Ad-hoc command used to create a registration link. This method presents a form to the user for creating a registration link, and processes the submitted form to generate and store the link with its associated data. @param client: The XMPP client instance. @param command_elt: The command element. @param session_data: Data associated with the current session. @param action: The action being performed. @param node: The node identifier. @return: A tuple containing the payload if any, """ actions = session_data.setdefault("actions", []) actions.append(action) if len(actions) == 1: # First request, presenting the form to the user status = self._c.STATUS.EXECUTING form = data_form.Form("form", title=D_("Create Registration Link")) form.addField( data_form.Field("text-single", "recipient", label=D_("Recipient Name")) ) form.addField( data_form.Field( "text-single", "expiration_time_pattern", label=D_("Expiration Date"), desc=D_( "Set the expiry duration for this link. Use the Libervia Time " "Pattern (e.g., '1 week'). The link will expire after this " "period." ), value="1 week", required=True, ) ) form.addField( data_form.Field( "text-single", "registration_limit", label=D_("Maximum registrations limit"), desc=D_( "How many accounts can be registered using this link. Set to 0 " "for unlimited registrations." ), value="1", required=True, ) ) form.addField( data_form.Field( "text-multi", "welcome_message", label=D_("Welcome Message") ) ) form.addField( data_form.Field( "text-multi", "extra_details", label=D_("Additional Details"), ) ) payload = form.toElement() note = None elif len(actions) == 2: try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) answer_form = data_form.Form.fromElement(x_elt) recipient = answer_form.get("recipient", None) expiration_time_pattern = answer_form["expiration_time_pattern"] if expiration_time_pattern == "never": expiration_timestamp = None else: expiration_timestamp = date_utils.date_parse_ext( expiration_time_pattern, default_tz=date_utils.TZ_LOCAL ) registration_limit = int(answer_form["registration_limit"]) welcome_message = answer_form.get("welcome_message", "") extra_details = answer_form.get("extra_details", "") link_id = str(uuid4()) link_data = { "recipient": recipient, "registration_limit": registration_limit, "welcome_message": welcome_message, "extra_details": extra_details, "creator_jid": session_data["requestor"].full(), "use_count": 0, "creation_timestamp": int(time.time()), } if expiration_timestamp is not None: link_data["expiration_timestamp"] = expiration_timestamp await self.ad_hoc_registration_data.aset(link_id, link_data) status = self._c.STATUS.COMPLETED payload = None note = ( self._c.NOTE.INFO, D_("Registration link created successfully: {link_id}").format( link_id=link_id ), ) except (KeyError, StopIteration, ValueError): raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) else: raise self._c.AdHocError(self._c.ERROR.INTERNAL) return (payload, status, None, note) async def list_registration_links( self, client: SatXMPPEntity, command_elt: domish.Element, session_data: dict, action: str, node: str, ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: """Ad-hoc command used to list all registration links. This method retrieves all the registration links and presents them to the user. @param client: The XMPP client instance. @param command_elt: The command element. @param session_data: Data associated with the current session. @param action: The action being performed. @param node: The node identifier. @return: A tuple containing the payload if any, """ actions = session_data.setdefault("actions", []) actions.append(action) if len(actions) == 1: all_links = await self.ad_hoc_registration_data.all() status = self._c.STATUS.EXECUTING form = data_form.Form("form", title=D_("Registered Links")) for link_id, link_data in all_links.items(): form.addField( data_form.Field( "text-multi", var=f"link_{link_id}", label=D_("Link ID: {link_id}").format(link_id=link_id), value=str(link_data), ) ) payload = form.toElement() note = None status = self._c.STATUS.COMPLETED else: raise self._c.AdHocError(self._c.ERROR.INTERNAL) return (payload, status, None, note) async def delete_registration_link( self, client: SatXMPPEntity, command_elt: domish.Element, session_data: dict, action: str, node: str, ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: """Ad-hoc command used to delete a registration link. This method presents a form to the user for selecting a registration link to delete, and processes the submitted form to delete the selected link. @param client: The XMPP client instance. @param command_elt: The command element. @param session_data: Data associated with the current session. @param action: The action being performed. @param node: The node identifier. @return: A tuple containing the payload if any, """ actions = session_data.setdefault("actions", []) actions.append(action) if len(actions) == 1: all_links = await self.ad_hoc_registration_data.all() status = self._c.STATUS.EXECUTING form = data_form.Form("form", title=D_("Delete Registration Link")) link_options = [data_form.Option(link_id, link_id) for link_id in all_links] form.addField( data_form.Field( "list-single", "link_to_delete", label=D_("Select Link to Delete"), options=link_options, required=True, ) ) payload = form.toElement() note = None elif len(actions) == 2: try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) answer_form = data_form.Form.fromElement(x_elt) link_to_delete = answer_form["link_to_delete"] await self.ad_hoc_registration_data.adel(link_to_delete) status = self._c.STATUS.COMPLETED payload = None note = ( self._c.NOTE.INFO, D_("Registration link {link_id} deleted successfully.").format( link_id=link_to_delete ), ) except (KeyError, StopIteration, ValueError): raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) else: raise self._c.AdHocError(self._c.ERROR.INTERNAL) return (payload, status, None, note)