# HG changeset patch # User Goffi # Date 1702229524 -3600 # Node ID c6d85c31a59f9252661d2ced717dd6cb2aa38711 # Parent 50c919dfe61b13e5f275c9a109edce9f55b8a61c plugin ad-hoc registration: Implement plugin to handle registration links: registration links are used in web frontend to allow registration with secret links when it's normally closed. diff -r 50c919dfe61b -r c6d85c31a59f libervia/backend/plugins/plugin_adhoc_registration.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_adhoc_registration.py Sun Dec 10 18:32:04 2023 +0100 @@ -0,0 +1,346 @@ +#!/usr/bin/env python3 + +# Libervia plugin to handle web frontend registration links with Ad-Hoc Commands +# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + + +import time +from uuid import uuid4 + +from twisted.internet import defer +from twisted.words.xish import domish +from wokkel import data_form + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.log import getLogger +from libervia.backend.memory import persistent +from libervia.backend.tools.common import data_format, date_utils + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "Ad-Hoc Commands - Registration", + C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION", + C.PI_TYPE: "Misc", + C.PI_PROTOCOLS: [], + C.PI_DEPENDENCIES: ["XEP-0050"], + C.PI_MAIN: "AdHocRegistration", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""), +} + + +class AdHocRegistration: + def __init__(self, host): + log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") + self.host = host + self._c = host.plugins["XEP-0050"] + self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict( + "registration_links" + ) + host.bridge.add_method( + "registration_link_get", + ".plugin", + in_sign="s", + out_sign="s", + method=self._get, + async_=True, + ) + + def _get(self, registration_link_id: str) -> defer.Deferred[str]: + d = defer.ensureDeferred(self.get(registration_link_id)) + d.addCallback(data_format.serialise) + return d + + async def get(self, registration_link_id: str) -> dict: + """Retrieve registration link from its ID + + @param registration_link_id: registration link data + @return: registration link data + @raise exceptions.NotFound: not registration link found with this ID. + """ + link_data = await self.ad_hoc_registration_data.get(registration_link_id) + if not link_data: + raise exceptions.NotFound + expiration_timestamp = link_data.get("expiration_timestamp") + if expiration_timestamp is not None and expiration_timestamp < time.time(): + log.info(f"Deleting expiration link {registration_link_id}.") + await self.ad_hoc_registration_data.adel(registration_link_id) + raise exceptions.NotFound + return link_data + + async def profile_connected(self, client): + if client.is_admin: + self._c.add_ad_hoc_command( + client, + self.create_registration_link, + D_("Create Registration Link"), + node="https://libervia.org/registration/create", + ) + self._c.add_ad_hoc_command( + client, + self.list_registration_links, + D_("List Registration Links"), + node="https://libervia.org/registration/list", + ) + self._c.add_ad_hoc_command( + client, + self.delete_registration_link, + D_("Delete Registration Link"), + node="https://libervia.org/registration/delete", + ) + + async def create_registration_link( + self, + client: SatXMPPEntity, + command_elt: domish.Element, + session_data: dict, + action: str, + node: str, + ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: + """Ad-hoc command used to create a registration link. + + This method presents a form to the user for creating a registration link, + and processes the submitted form to generate and store the link with its + associated data. + + @param client: The XMPP client instance. + @param command_elt: The command element. + @param session_data: Data associated with the current session. + @param action: The action being performed. + @param node: The node identifier. + + @return: A tuple containing the payload if any, + """ + actions = session_data.setdefault("actions", []) + actions.append(action) + + if len(actions) == 1: + # First request, presenting the form to the user + status = self._c.STATUS.EXECUTING + form = data_form.Form("form", title=D_("Create Registration Link")) + + form.addField( + data_form.Field("text-single", "recipient", label=D_("Recipient Name")) + ) + form.addField( + data_form.Field( + "text-single", + "expiration_time_pattern", + label=D_("Expiration Date"), + desc=D_( + "Set the expiry duration for this link. Use the Libervia Time " + "Pattern (e.g., '1 week'). The link will expire after this " + "period." + ), + value="1 week", + required=True, + ) + ) + form.addField( + data_form.Field( + "text-single", + "registration_limit", + label=D_("Maximum registrations limit"), + desc=D_( + "How many accounts can be registered using this link. Set to 0 " + "for unlimited registrations." + ), + value="1", + required=True, + ) + ) + form.addField( + data_form.Field( + "text-multi", "welcome_message", label=D_("Welcome Message") + ) + ) + form.addField( + data_form.Field( + "text-multi", + "extra_details", + label=D_("Additional Details"), + ) + ) + + payload = form.toElement() + note = None + + elif len(actions) == 2: + try: + x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) + answer_form = data_form.Form.fromElement(x_elt) + + recipient = answer_form.get("recipient", None) + expiration_time_pattern = answer_form["expiration_time_pattern"] + if expiration_time_pattern == "never": + expiration_timestamp = None + else: + expiration_timestamp = date_utils.date_parse_ext( + expiration_time_pattern, default_tz=date_utils.TZ_LOCAL + ) + registration_limit = int(answer_form["registration_limit"]) + welcome_message = answer_form.get("welcome_message", "") + extra_details = answer_form.get("extra_details", "") + + link_id = str(uuid4()) + + link_data = { + "recipient": recipient, + "registration_limit": registration_limit, + "welcome_message": welcome_message, + "extra_details": extra_details, + "creator_jid": session_data["requestor"].full(), + "use_count": 0, + "creation_timestamp": int(time.time()), + } + if expiration_timestamp is not None: + link_data["expiration_timestamp"] = expiration_timestamp + + await self.ad_hoc_registration_data.aset(link_id, link_data) + + status = self._c.STATUS.COMPLETED + payload = None + note = ( + self._c.NOTE.INFO, + D_("Registration link created successfully: {link_id}").format( + link_id=link_id + ), + ) + except (KeyError, StopIteration, ValueError): + raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) + else: + raise self._c.AdHocError(self._c.ERROR.INTERNAL) + + return (payload, status, None, note) + + async def list_registration_links( + self, + client: SatXMPPEntity, + command_elt: domish.Element, + session_data: dict, + action: str, + node: str, + ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: + """Ad-hoc command used to list all registration links. + + This method retrieves all the registration links and presents them to the user. + + @param client: The XMPP client instance. + @param command_elt: The command element. + @param session_data: Data associated with the current session. + @param action: The action being performed. + @param node: The node identifier. + + @return: A tuple containing the payload if any, + """ + actions = session_data.setdefault("actions", []) + actions.append(action) + + if len(actions) == 1: + all_links = await self.ad_hoc_registration_data.all() + status = self._c.STATUS.EXECUTING + + form = data_form.Form("form", title=D_("Registered Links")) + for link_id, link_data in all_links.items(): + form.addField( + data_form.Field( + "text-multi", + var=f"link_{link_id}", + label=D_("Link ID: {link_id}").format(link_id=link_id), + value=str(link_data), + ) + ) + + payload = form.toElement() + note = None + + status = self._c.STATUS.COMPLETED + + else: + raise self._c.AdHocError(self._c.ERROR.INTERNAL) + + return (payload, status, None, note) + + async def delete_registration_link( + self, + client: SatXMPPEntity, + command_elt: domish.Element, + session_data: dict, + action: str, + node: str, + ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: + """Ad-hoc command used to delete a registration link. + + This method presents a form to the user for selecting a registration link to + delete, and processes the submitted form to delete the selected link. + + @param client: The XMPP client instance. + @param command_elt: The command element. + @param session_data: Data associated with the current session. + @param action: The action being performed. + @param node: The node identifier. + + @return: A tuple containing the payload if any, + """ + actions = session_data.setdefault("actions", []) + actions.append(action) + + if len(actions) == 1: + all_links = await self.ad_hoc_registration_data.all() + status = self._c.STATUS.EXECUTING + form = data_form.Form("form", title=D_("Delete Registration Link")) + + link_options = [data_form.Option(link_id, link_id) for link_id in all_links] + form.addField( + data_form.Field( + "list-single", + "link_to_delete", + label=D_("Select Link to Delete"), + options=link_options, + required=True, + ) + ) + + payload = form.toElement() + note = None + + elif len(actions) == 2: + try: + x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) + answer_form = data_form.Form.fromElement(x_elt) + link_to_delete = answer_form["link_to_delete"] + + await self.ad_hoc_registration_data.adel(link_to_delete) + + status = self._c.STATUS.COMPLETED + payload = None + note = ( + self._c.NOTE.INFO, + D_("Registration link {link_id} deleted successfully.").format( + link_id=link_to_delete + ), + ) + except (KeyError, StopIteration, ValueError): + raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) + else: + raise self._c.AdHocError(self._c.ERROR.INTERNAL) + + return (payload, status, None, note) diff -r 50c919dfe61b -r c6d85c31a59f libervia/backend/tools/common/date_utils.py --- a/libervia/backend/tools/common/date_utils.py Sun Dec 10 15:08:08 2023 +0100 +++ b/libervia/backend/tools/common/date_utils.py Sun Dec 10 18:32:04 2023 +0100 @@ -86,17 +86,17 @@ raise e return calendar.timegm(dt.utctimetuple()) -def date_parse_ext(value, default_tz=TZ_UTC): +def date_parse_ext(value: str, default_tz: datetime.tzinfo=TZ_UTC) -> float: """Extended date parse which accept relative date - @param value(unicode): date to parse, in any format supported by parser + @param value: date to parse, in any format supported by parser and with the hability to specify X days/weeks/months/years in the past or future. Relative date are specified either with something like `[main_date] +1 week` or with something like `3 days ago`, and it is case insensitive. [main_date] is a date parsable by parser, or empty to specify current date/time. "now" can also be used to specify current date/time. - @param default_tz(datetime.tzinfo): same as for date_parse - @return (int): timestamp + @param default_tz: same as for date_parse + @return: timestamp """ m = RELATIVE_RE.match(value) if m is None: