view libervia/backend/plugins/plugin_adhoc_registration.py @ 4303:a7ec325246fb

component email-gateway: first draft: Initial implementation of the Email Gateway. This component uses XEP-0100 for registration. Upon registration and subsequent startups, a connection is made to registered IMAP services, and incoming emails (in `INBOX` mailboxes) are immediately forwarded as XMPP messages. In the opposite direction, an SMTP connection is established to send emails on incoming XMPP messages. rel 449
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 18:07:17 +0200
parents c6d85c31a59f
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia plugin to handle web frontend registration links with Ad-Hoc Commands
# Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.


import time
from uuid import uuid4

from twisted.internet import defer
from twisted.words.xish import domish
from wokkel import data_form

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.i18n import D_, _
from libervia.backend.core.log import getLogger
from libervia.backend.memory import persistent
from libervia.backend.tools.common import data_format, date_utils

log = getLogger(__name__)


PLUGIN_INFO = {
    C.PI_NAME: "Ad-Hoc Commands - Registration",
    C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION",
    C.PI_TYPE: "Misc",
    C.PI_PROTOCOLS: [],
    C.PI_DEPENDENCIES: ["XEP-0050"],
    C.PI_MAIN: "AdHocRegistration",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""),
}


class AdHocRegistration:
    def __init__(self, host):
        log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization")
        self.host = host
        self._c = host.plugins["XEP-0050"]
        self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict(
            "registration_links"
        )
        host.bridge.add_method(
            "registration_link_get",
            ".plugin",
            in_sign="s",
            out_sign="s",
            method=self._get,
            async_=True,
        )

    def _get(self, registration_link_id: str) -> defer.Deferred[str]:
        d = defer.ensureDeferred(self.get(registration_link_id))
        d.addCallback(data_format.serialise)
        return d

    async def get(self, registration_link_id: str) -> dict:
        """Retrieve registration link from its ID

        @param registration_link_id: registration link data
        @return: registration link data
        @raise exceptions.NotFound: not registration link found with this ID.
        """
        link_data = await self.ad_hoc_registration_data.get(registration_link_id)
        if not link_data:
            raise exceptions.NotFound
        expiration_timestamp = link_data.get("expiration_timestamp")
        if expiration_timestamp is not None and expiration_timestamp < time.time():
            log.info(f"Deleting expiration link {registration_link_id}.")
            await self.ad_hoc_registration_data.adel(registration_link_id)
            raise exceptions.NotFound
        return link_data

    async def profile_connected(self, client):
        if client.is_admin:
            self._c.add_ad_hoc_command(
                client,
                self.create_registration_link,
                D_("Create Registration Link"),
                node="https://libervia.org/registration/create",
            )
            self._c.add_ad_hoc_command(
                client,
                self.list_registration_links,
                D_("List Registration Links"),
                node="https://libervia.org/registration/list",
            )
            self._c.add_ad_hoc_command(
                client,
                self.delete_registration_link,
                D_("Delete Registration Link"),
                node="https://libervia.org/registration/delete",
            )

    async def create_registration_link(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str,
    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
        """Ad-hoc command used to create a registration link.

        This method presents a form to the user for creating a registration link,
        and processes the submitted form to generate and store the link with its
        associated data.

        @param client: The XMPP client instance.
        @param command_elt: The command element.
        @param session_data: Data associated with the current session.
        @param action: The action being performed.
        @param node: The node identifier.

        @return: A tuple containing the payload if any,
        """
        actions = session_data.setdefault("actions", [])
        actions.append(action)

        if len(actions) == 1:
            # First request, presenting the form to the user
            status = self._c.STATUS.EXECUTING
            form = data_form.Form("form", title=D_("Create Registration Link"))

            form.addField(
                data_form.Field("text-single", "recipient", label=D_("Recipient Name"))
            )
            form.addField(
                data_form.Field(
                    "text-single",
                    "expiration_time_pattern",
                    label=D_("Expiration Date"),
                    desc=D_(
                        "Set the expiry duration for this link. Use the Libervia Time "
                        "Pattern (e.g., '1 week'). The link will expire after this "
                        "period."
                    ),
                    value="1 week",
                    required=True,
                )
            )
            form.addField(
                data_form.Field(
                    "text-single",
                    "registration_limit",
                    label=D_("Maximum registrations limit"),
                    desc=D_(
                        "How many accounts can be registered using this link. Set to 0 "
                        "for unlimited registrations."
                    ),
                    value="1",
                    required=True,
                )
            )
            form.addField(
                data_form.Field(
                    "text-multi", "welcome_message", label=D_("Welcome Message")
                )
            )
            form.addField(
                data_form.Field(
                    "text-multi",
                    "extra_details",
                    label=D_("Additional Details"),
                )
            )

            payload = form.toElement()
            note = None

        elif len(actions) == 2:
            try:
                x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
                answer_form = data_form.Form.fromElement(x_elt)

                recipient = answer_form.get("recipient", None)
                expiration_time_pattern = answer_form["expiration_time_pattern"]
                if expiration_time_pattern == "never":
                    expiration_timestamp = None
                else:
                    expiration_timestamp = date_utils.date_parse_ext(
                        expiration_time_pattern, default_tz=date_utils.TZ_LOCAL
                    )
                registration_limit = int(answer_form["registration_limit"])
                welcome_message = answer_form.get("welcome_message", "")
                extra_details = answer_form.get("extra_details", "")

                link_id = str(uuid4())

                link_data = {
                    "recipient": recipient,
                    "registration_limit": registration_limit,
                    "welcome_message": welcome_message,
                    "extra_details": extra_details,
                    "creator_jid": session_data["requestor"].full(),
                    "use_count": 0,
                    "creation_timestamp": int(time.time()),
                }
                if expiration_timestamp is not None:
                    link_data["expiration_timestamp"] = expiration_timestamp

                await self.ad_hoc_registration_data.aset(link_id, link_data)

                status = self._c.STATUS.COMPLETED
                payload = None
                note = (
                    self._c.NOTE.INFO,
                    D_("Registration link created successfully: {link_id}").format(
                        link_id=link_id
                    ),
                )
            except (KeyError, StopIteration, ValueError):
                raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
        else:
            raise self._c.AdHocError(self._c.ERROR.INTERNAL)

        return (payload, status, None, note)

    async def list_registration_links(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str,
    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
        """Ad-hoc command used to list all registration links.

        This method retrieves all the registration links and presents them to the user.

        @param client: The XMPP client instance.
        @param command_elt: The command element.
        @param session_data: Data associated with the current session.
        @param action: The action being performed.
        @param node: The node identifier.

        @return: A tuple containing the payload if any,
        """
        actions = session_data.setdefault("actions", [])
        actions.append(action)

        if len(actions) == 1:
            all_links = await self.ad_hoc_registration_data.all()
            status = self._c.STATUS.EXECUTING

            form = data_form.Form("form", title=D_("Registered Links"))
            for link_id, link_data in all_links.items():
                form.addField(
                    data_form.Field(
                        "text-multi",
                        var=f"link_{link_id}",
                        label=D_("Link ID: {link_id}").format(link_id=link_id),
                        value=str(link_data),
                    )
                )

            payload = form.toElement()
            note = None

            status = self._c.STATUS.COMPLETED

        else:
            raise self._c.AdHocError(self._c.ERROR.INTERNAL)

        return (payload, status, None, note)

    async def delete_registration_link(
        self,
        client: SatXMPPEntity,
        command_elt: domish.Element,
        session_data: dict,
        action: str,
        node: str,
    ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]:
        """Ad-hoc command used to delete a registration link.

        This method presents a form to the user for selecting a registration link to
        delete, and processes the submitted form to delete the selected link.

        @param client: The XMPP client instance.
        @param command_elt: The command element.
        @param session_data: Data associated with the current session.
        @param action: The action being performed.
        @param node: The node identifier.

        @return: A tuple containing the payload if any,
        """
        actions = session_data.setdefault("actions", [])
        actions.append(action)

        if len(actions) == 1:
            all_links = await self.ad_hoc_registration_data.all()
            status = self._c.STATUS.EXECUTING
            form = data_form.Form("form", title=D_("Delete Registration Link"))

            link_options = [data_form.Option(link_id, link_id) for link_id in all_links]
            form.addField(
                data_form.Field(
                    "list-single",
                    "link_to_delete",
                    label=D_("Select Link to Delete"),
                    options=link_options,
                    required=True,
                )
            )

            payload = form.toElement()
            note = None

        elif len(actions) == 2:
            try:
                x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x"))
                answer_form = data_form.Form.fromElement(x_elt)
                link_to_delete = answer_form["link_to_delete"]

                await self.ad_hoc_registration_data.adel(link_to_delete)

                status = self._c.STATUS.COMPLETED
                payload = None
                note = (
                    self._c.NOTE.INFO,
                    D_("Registration link {link_id} deleted successfully.").format(
                        link_id=link_to_delete
                    ),
                )
            except (KeyError, StopIteration, ValueError):
                raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD)
        else:
            raise self._c.AdHocError(self._c.ERROR.INTERNAL)

        return (payload, status, None, note)