diff libervia/backend/plugins/plugin_adhoc_registration.py @ 4185:c6d85c31a59f

plugin ad-hoc registration: Implement plugin to handle registration links: registration links are used in web frontend to allow registration with secret links when it's normally closed.
author Goffi <goffi@goffi.org>
date Sun, 10 Dec 2023 18:32:04 +0100
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_adhoc_registration.py	Sun Dec 10 18:32:04 2023 +0100
@@ -0,0 +1,346 @@
+#!/usr/bin/env python3
+
+# Libervia plugin to handle web frontend registration links with Ad-Hoc Commands
+# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+import time
+from uuid import uuid4
+
+from twisted.internet import defer
+from twisted.words.xish import domish
+from wokkel import data_form
+
+from libervia.backend.core import exceptions
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.i18n import D_, _
+from libervia.backend.core.log import getLogger
+from libervia.backend.memory import persistent
+from libervia.backend.tools.common import data_format, date_utils
+
+log = getLogger(__name__)
+
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Ad-Hoc Commands - Registration",
+    C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION",
+    C.PI_TYPE: "Misc",
+    C.PI_PROTOCOLS: [],
+    C.PI_DEPENDENCIES: ["XEP-0050"],
+    C.PI_MAIN: "AdHocRegistration",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""),
+}
+
+
+class AdHocRegistration:
+    def __init__(self, host):
+        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
+        self.host = host
+        self._c = host.plugins["XEP-0050"]
+        self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict(
+            "registration_links"
+        )
+        host.bridge.add_method(
+            "registration_link_get",
+            ".plugin",
+            in_sign="s",
+            out_sign="s",
+            method=self._get,
+            async_=True,
+        )
+
+    def _get(self, registration_link_id: str) -> defer.Deferred[str]:
+        d = defer.ensureDeferred(self.get(registration_link_id))
+        d.addCallback(data_format.serialise)
+        return d
+
+    async def get(self, registration_link_id: str) -> dict:
+        """Retrieve registration link from its ID
+
+        @param registration_link_id: registration link data
+        @return: registration link data
+        @raise exceptions.NotFound: not registration link found with this ID.
+        """
+        link_data = await self.ad_hoc_registration_data.get(registration_link_id)
+        if not link_data:
+            raise exceptions.NotFound
+        expiration_timestamp = link_data.get("expiration_timestamp")
+        if expiration_timestamp is not None and expiration_timestamp < time.time():
+            log.info(f"Deleting expiration link {registration_link_id}.")
+            await self.ad_hoc_registration_data.adel(registration_link_id)
+            raise exceptions.NotFound
+        return link_data
+
+    async def profile_connected(self, client):
+        if client.is_admin:
+            self._c.add_ad_hoc_command(
+                client,
+                self.create_registration_link,
+                D_("Create Registration Link"),
+                node="https://libervia.org/registration/create",
+            )
+            self._c.add_ad_hoc_command(
+                client,
+                self.list_registration_links,
+                D_("List Registration Links"),
+                node="https://libervia.org/registration/list",
+            )
+            self._c.add_ad_hoc_command(
+                client,
+                self.delete_registration_link,
+                D_("Delete Registration Link"),
+                node="https://libervia.org/registration/delete",
+            )
+
+    async def create_registration_link(
+        self,
+        client: SatXMPPEntity,
+        command_elt: domish.Element,
+        session_data: dict,
+        action: str,
+        node: str,
+    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
+        """Ad-hoc command used to create a registration link.
+
+        This method presents a form to the user for creating a registration link,
+        and processes the submitted form to generate and store the link with its
+        associated data.
+
+        @param client: The XMPP client instance.
+        @param command_elt: The command element.
+        @param session_data: Data associated with the current session.
+        @param action: The action being performed.
+        @param node: The node identifier.
+
+        @return: A tuple containing the payload if any,
+        """
+        actions = session_data.setdefault("actions", [])
+        actions.append(action)
+
+        if len(actions) == 1:
+            # First request, presenting the form to the user
+            status = self._c.STATUS.EXECUTING
+            form = data_form.Form("form", title=D_("Create Registration Link"))
+
+            form.addField(
+                data_form.Field("text-single", "recipient", label=D_("Recipient Name"))
+            )
+            form.addField(
+                data_form.Field(
+                    "text-single",
+                    "expiration_time_pattern",
+                    label=D_("Expiration Date"),
+                    desc=D_(
+                        "Set the expiry duration for this link. Use the Libervia Time "
+                        "Pattern (e.g., '1 week'). The link will expire after this "
+                        "period."
+                    ),
+                    value="1 week",
+                    required=True,
+                )
+            )
+            form.addField(
+                data_form.Field(
+                    "text-single",
+                    "registration_limit",
+                    label=D_("Maximum registrations limit"),
+                    desc=D_(
+                        "How many accounts can be registered using this link. Set to 0 "
+                        "for unlimited registrations."
+                    ),
+                    value="1",
+                    required=True,
+                )
+            )
+            form.addField(
+                data_form.Field(
+                    "text-multi", "welcome_message", label=D_("Welcome Message")
+                )
+            )
+            form.addField(
+                data_form.Field(
+                    "text-multi",
+                    "extra_details",
+                    label=D_("Additional Details"),
+                )
+            )
+
+            payload = form.toElement()
+            note = None
+
+        elif len(actions) == 2:
+            try:
+                x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
+                answer_form = data_form.Form.fromElement(x_elt)
+
+                recipient = answer_form.get("recipient", None)
+                expiration_time_pattern = answer_form["expiration_time_pattern"]
+                if expiration_time_pattern == "never":
+                    expiration_timestamp = None
+                else:
+                    expiration_timestamp = date_utils.date_parse_ext(
+                        expiration_time_pattern, default_tz=date_utils.TZ_LOCAL
+                    )
+                registration_limit = int(answer_form["registration_limit"])
+                welcome_message = answer_form.get("welcome_message", "")
+                extra_details = answer_form.get("extra_details", "")
+
+                link_id = str(uuid4())
+
+                link_data = {
+                    "recipient": recipient,
+                    "registration_limit": registration_limit,
+                    "welcome_message": welcome_message,
+                    "extra_details": extra_details,
+                    "creator_jid": session_data["requestor"].full(),
+                    "use_count": 0,
+                    "creation_timestamp": int(time.time()),
+                }
+                if expiration_timestamp is not None:
+                    link_data["expiration_timestamp"] = expiration_timestamp
+
+                await self.ad_hoc_registration_data.aset(link_id, link_data)
+
+                status = self._c.STATUS.COMPLETED
+                payload = None
+                note = (
+                    self._c.NOTE.INFO,
+                    D_("Registration link created successfully: {link_id}").format(
+                        link_id=link_id
+                    ),
+                )
+            except (KeyError, StopIteration, ValueError):
+                raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
+        else:
+            raise self._c.AdHocError(self._c.ERROR.INTERNAL)
+
+        return (payload, status, None, note)
+
+    async def list_registration_links(
+        self,
+        client: SatXMPPEntity,
+        command_elt: domish.Element,
+        session_data: dict,
+        action: str,
+        node: str,
+    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
+        """Ad-hoc command used to list all registration links.
+
+        This method retrieves all the registration links and presents them to the user.
+
+        @param client: The XMPP client instance.
+        @param command_elt: The command element.
+        @param session_data: Data associated with the current session.
+        @param action: The action being performed.
+        @param node: The node identifier.
+
+        @return: A tuple containing the payload if any,
+        """
+        actions = session_data.setdefault("actions", [])
+        actions.append(action)
+
+        if len(actions) == 1:
+            all_links = await self.ad_hoc_registration_data.all()
+            status = self._c.STATUS.EXECUTING
+
+            form = data_form.Form("form", title=D_("Registered Links"))
+            for link_id, link_data in all_links.items():
+                form.addField(
+                    data_form.Field(
+                        "text-multi",
+                        var=f"link_{link_id}",
+                        label=D_("Link ID: {link_id}").format(link_id=link_id),
+                        value=str(link_data),
+                    )
+                )
+
+            payload = form.toElement()
+            note = None
+
+            status = self._c.STATUS.COMPLETED
+
+        else:
+            raise self._c.AdHocError(self._c.ERROR.INTERNAL)
+
+        return (payload, status, None, note)
+
+    async def delete_registration_link(
+        self,
+        client: SatXMPPEntity,
+        command_elt: domish.Element,
+        session_data: dict,
+        action: str,
+        node: str,
+    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
+        """Ad-hoc command used to delete a registration link.
+
+        This method presents a form to the user for selecting a registration link to
+        delete, and processes the submitted form to delete the selected link.
+
+        @param client: The XMPP client instance.
+        @param command_elt: The command element.
+        @param session_data: Data associated with the current session.
+        @param action: The action being performed.
+        @param node: The node identifier.
+
+        @return: A tuple containing the payload if any,
+        """
+        actions = session_data.setdefault("actions", [])
+        actions.append(action)
+
+        if len(actions) == 1:
+            all_links = await self.ad_hoc_registration_data.all()
+            status = self._c.STATUS.EXECUTING
+            form = data_form.Form("form", title=D_("Delete Registration Link"))
+
+            link_options = [data_form.Option(link_id, link_id) for link_id in all_links]
+            form.addField(
+                data_form.Field(
+                    "list-single",
+                    "link_to_delete",
+                    label=D_("Select Link to Delete"),
+                    options=link_options,
+                    required=True,
+                )
+            )
+
+            payload = form.toElement()
+            note = None
+
+        elif len(actions) == 2:
+            try:
+                x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
+                answer_form = data_form.Form.fromElement(x_elt)
+                link_to_delete = answer_form["link_to_delete"]
+
+                await self.ad_hoc_registration_data.adel(link_to_delete)
+
+                status = self._c.STATUS.COMPLETED
+                payload = None
+                note = (
+                    self._c.NOTE.INFO,
+                    D_("Registration link {link_id} deleted successfully.").format(
+                        link_id=link_to_delete
+                    ),
+                )
+            except (KeyError, StopIteration, ValueError):
+                raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
+        else:
+            raise self._c.AdHocError(self._c.ERROR.INTERNAL)
+
+        return (payload, status, None, note)