view libervia/backend/plugins/plugin_adhoc_registration.py @ 4309:b56b1eae7994

component email gateway: add multicasting: XEP-0033 multicasting is now supported both for incoming and outgoing messages. XEP-0033 metadata are converted to suitable Email headers and vice versa. Email address and JID are both supported, and delivery is done by the gateway when suitable on incoming messages. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents c6d85c31a59f
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin to handle web frontend registration links with Ad-Hoc Commands
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import time
from uuid import uuid4

from twisted.internet import defer
from twisted.words.xish import domish
from wokkel import data_form

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.memory import persistent
from libervia.backend.tools.common import data_format, date_utils

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Ad-Hoc Commands - Registration",
    C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION",
    C.PI_TYPE: "Misc",
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0050"],
    C.PI_MAIN: "AdHocRegistration",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""),
}


class AdHocRegistration:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._c = host.plugins["XEP-0050"]
        self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict(
            "registration_links"
        )
        host.bridge.add_method(
            "registration_link_get",
            ".plugin",
            in_sign="s",
            out_sign="s",
            method=self._get,
            async_=True,
        )

    def _get(self, registration_link_id: str) -> defer.Deferred[str]:
        d = defer.ensureDeferred(self.get(registration_link_id))
        d.addCallback(data_format.serialise)
        return d

    async def get(self, registration_link_id: str) -> dict:
        """Retrieve registration link from its ID

        @param registration_link_id: registration link data
        @return: registration link data
        @raise exceptions.NotFound: not registration link found with this ID.
        """
        link_data = await self.ad_hoc_registration_data.get(registration_link_id)
        if not link_data:
            raise exceptions.NotFound
        expiration_timestamp = link_data.get("expiration_timestamp")
        if expiration_timestamp is not None and expiration_timestamp < time.time():
            log.info(f"Deleting expiration link {registration_link_id}.")
            await self.ad_hoc_registration_data.adel(registration_link_id)
            raise exceptions.NotFound
        return link_data

    async def profile_connected(self, client):
        if client.is_admin:
            self._c.add_ad_hoc_command(
                client,
                self.create_registration_link,
                D_("Create Registration Link"),
                node="https://libervia.org/registration/create",
            )
            self._c.add_ad_hoc_command(
                client,
                self.list_registration_links,
                D_("List Registration Links"),
                node="https://libervia.org/registration/list",
            )
            self._c.add_ad_hoc_command(
                client,
                self.delete_registration_link,
                D_("Delete Registration Link"),
                node="https://libervia.org/registration/delete",
            )

    async def create_registration_link(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str,
    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
        """Ad-hoc command used to create a registration link.

        This method presents a form to the user for creating a registration link,
        and processes the submitted form to generate and store the link with its
        associated data.

        @param client: The XMPP client instance.
        @param command_elt: The command element.
        @param session_data: Data associated with the current session.
        @param action: The action being performed.
        @param node: The node identifier.

        @return: A tuple containing the payload if any,
        """
        actions = session_data.setdefault("actions", [])
        actions.append(action)

        if len(actions) == 1:
            # First request, presenting the form to the user
            status = self._c.STATUS.EXECUTING
            form = data_form.Form("form", title=D_("Create Registration Link"))

            form.addField(
                data_form.Field("text-single", "recipient", label=D_("Recipient Name"))
            )
            form.addField(
                data_form.Field(
                    "text-single",
                    "expiration_time_pattern",
                    label=D_("Expiration Date"),
                    desc=D_(
                        "Set the expiry duration for this link. Use the Libervia Time "
                        "Pattern (e.g., '1 week'). The link will expire after this "
                        "period."
                    ),
                    value="1 week",
                    required=True,
                )
            )
            form.addField(
                data_form.Field(
                    "text-single",
                    "registration_limit",
                    label=D_("Maximum registrations limit"),
                    desc=D_(
                        "How many accounts can be registered using this link. Set to 0 "
                        "for unlimited registrations."
                    ),
                    value="1",
                    required=True,
                )
            )
            form.addField(
                data_form.Field(
                    "text-multi", "welcome_message", label=D_("Welcome Message")
                )
            )
            form.addField(
                data_form.Field(
                    "text-multi",
                    "extra_details",
                    label=D_("Additional Details"),
                )
            )

            payload = form.toElement()
            note = None

        elif len(actions) == 2:
            try:
                x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
                answer_form = data_form.Form.fromElement(x_elt)

                recipient = answer_form.get("recipient", None)
                expiration_time_pattern = answer_form["expiration_time_pattern"]
                if expiration_time_pattern == "never":
                    expiration_timestamp = None
                else:
                    expiration_timestamp = date_utils.date_parse_ext(
                        expiration_time_pattern, default_tz=date_utils.TZ_LOCAL
                    )
                registration_limit = int(answer_form["registration_limit"])
                welcome_message = answer_form.get("welcome_message", "")
                extra_details = answer_form.get("extra_details", "")

                link_id = str(uuid4())

                link_data = {
                    "recipient": recipient,
                    "registration_limit": registration_limit,
                    "welcome_message": welcome_message,
                    "extra_details": extra_details,
                    "creator_jid": session_data["requestor"].full(),
                    "use_count": 0,
                    "creation_timestamp": int(time.time()),
                }
                if expiration_timestamp is not None:
                    link_data["expiration_timestamp"] = expiration_timestamp

                await self.ad_hoc_registration_data.aset(link_id, link_data)

                status = self._c.STATUS.COMPLETED
                payload = None
                note = (
                    self._c.NOTE.INFO,
                    D_("Registration link created successfully: {link_id}").format(
                        link_id=link_id
                    ),
                )
            except (KeyError, StopIteration, ValueError):
                raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
        else:
            raise self._c.AdHocError(self._c.ERROR.INTERNAL)

        return (payload, status, None, note)

    async def list_registration_links(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str,
    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
        """Ad-hoc command used to list all registration links.

        This method retrieves all the registration links and presents them to the user.

        @param client: The XMPP client instance.
        @param command_elt: The command element.
        @param session_data: Data associated with the current session.
        @param action: The action being performed.
        @param node: The node identifier.

        @return: A tuple containing the payload if any,
        """
        actions = session_data.setdefault("actions", [])
        actions.append(action)

        if len(actions) == 1:
            all_links = await self.ad_hoc_registration_data.all()
            status = self._c.STATUS.EXECUTING

            form = data_form.Form("form", title=D_("Registered Links"))
            for link_id, link_data in all_links.items():
                form.addField(
                    data_form.Field(
                        "text-multi",
                        var=f"link_{link_id}",
                        label=D_("Link ID: {link_id}").format(link_id=link_id),
                        value=str(link_data),
                    )
                )

            payload = form.toElement()
            note = None

            status = self._c.STATUS.COMPLETED

        else:
            raise self._c.AdHocError(self._c.ERROR.INTERNAL)

        return (payload, status, None, note)

    async def delete_registration_link(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str,
    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
        """Ad-hoc command used to delete a registration link.

        This method presents a form to the user for selecting a registration link to
        delete, and processes the submitted form to delete the selected link.

        @param client: The XMPP client instance.
        @param command_elt: The command element.
        @param session_data: Data associated with the current session.
        @param action: The action being performed.
        @param node: The node identifier.

        @return: A tuple containing the payload if any,
        """
        actions = session_data.setdefault("actions", [])
        actions.append(action)

        if len(actions) == 1:
            all_links = await self.ad_hoc_registration_data.all()
            status = self._c.STATUS.EXECUTING
            form = data_form.Form("form", title=D_("Delete Registration Link"))

            link_options = [data_form.Option(link_id, link_id) for link_id in all_links]
            form.addField(
                data_form.Field(
                    "list-single",
                    "link_to_delete",
                    label=D_("Select Link to Delete"),
                    options=link_options,
                    required=True,
                )
            )

            payload = form.toElement()
            note = None

        elif len(actions) == 2:
            try:
                x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
                answer_form = data_form.Form.fromElement(x_elt)
                link_to_delete = answer_form["link_to_delete"]

                await self.ad_hoc_registration_data.adel(link_to_delete)

                status = self._c.STATUS.COMPLETED
                payload = None
                note = (
                    self._c.NOTE.INFO,
                    D_("Registration link {link_id} deleted successfully.").format(
                        link_id=link_to_delete
                    ),
                )
            except (KeyError, StopIteration, ValueError):
                raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
        else:
            raise self._c.AdHocError(self._c.ERROR.INTERNAL)

        return (payload, status, None, note)