Mercurial > libervia-backend
view libervia/backend/plugins/plugin_adhoc_registration.py @ 4236:f59e9421a650
test (unit/cli): Add a file send/receive test for WebRTC:
fix 442
author | Goffi <goffi@goffi.org> |
---|---|
date | Sat, 06 Apr 2024 15:21:00 +0200 |
parents | c6d85c31a59f |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to handle web frontend registration links with Ad-Hoc Commands # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. import time from uuid import uuid4 from twisted.internet import defer from twisted.words.xish import domish from wokkel import data_form from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.i18n import D_, _ from libervia.backend.core.log import getLogger from libervia.backend.memory import persistent from libervia.backend.tools.common import data_format, date_utils log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Ad-Hoc Commands - Registration", C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION", C.PI_TYPE: "Misc", C.PI_PROTOCOLS: [], C.PI_DEPENDENCIES: ["XEP-0050"], C.PI_MAIN: "AdHocRegistration", C.PI_HANDLER: "no", C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""), } class AdHocRegistration: def __init__(self, host): log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") self.host = host self._c = host.plugins["XEP-0050"] self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict( "registration_links" ) host.bridge.add_method( "registration_link_get", ".plugin", in_sign="s", out_sign="s", method=self._get, async_=True, ) def _get(self, registration_link_id: str) -> defer.Deferred[str]: d = defer.ensureDeferred(self.get(registration_link_id)) d.addCallback(data_format.serialise) return d async def get(self, registration_link_id: str) -> dict: """Retrieve registration link from its ID @param registration_link_id: registration link data @return: registration link data @raise exceptions.NotFound: not registration link found with this ID. """ link_data = await self.ad_hoc_registration_data.get(registration_link_id) if not link_data: raise exceptions.NotFound expiration_timestamp = link_data.get("expiration_timestamp") if expiration_timestamp is not None and expiration_timestamp < time.time(): log.info(f"Deleting expiration link {registration_link_id}.") await self.ad_hoc_registration_data.adel(registration_link_id) raise exceptions.NotFound return link_data async def profile_connected(self, client): if client.is_admin: self._c.add_ad_hoc_command( client, self.create_registration_link, D_("Create Registration Link"), node="https://libervia.org/registration/create", ) self._c.add_ad_hoc_command( client, self.list_registration_links, D_("List Registration Links"), node="https://libervia.org/registration/list", ) self._c.add_ad_hoc_command( client, self.delete_registration_link, D_("Delete Registration Link"), node="https://libervia.org/registration/delete", ) async def create_registration_link( self, client: SatXMPPEntity, command_elt: domish.Element, session_data: dict, action: str, node: str, ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: """Ad-hoc command used to create a registration link. This method presents a form to the user for creating a registration link, and processes the submitted form to generate and store the link with its associated data. @param client: The XMPP client instance. @param command_elt: The command element. @param session_data: Data associated with the current session. @param action: The action being performed. @param node: The node identifier. @return: A tuple containing the payload if any, """ actions = session_data.setdefault("actions", []) actions.append(action) if len(actions) == 1: # First request, presenting the form to the user status = self._c.STATUS.EXECUTING form = data_form.Form("form", title=D_("Create Registration Link")) form.addField( data_form.Field("text-single", "recipient", label=D_("Recipient Name")) ) form.addField( data_form.Field( "text-single", "expiration_time_pattern", label=D_("Expiration Date"), desc=D_( "Set the expiry duration for this link. Use the Libervia Time " "Pattern (e.g., '1 week'). The link will expire after this " "period." ), value="1 week", required=True, ) ) form.addField( data_form.Field( "text-single", "registration_limit", label=D_("Maximum registrations limit"), desc=D_( "How many accounts can be registered using this link. Set to 0 " "for unlimited registrations." ), value="1", required=True, ) ) form.addField( data_form.Field( "text-multi", "welcome_message", label=D_("Welcome Message") ) ) form.addField( data_form.Field( "text-multi", "extra_details", label=D_("Additional Details"), ) ) payload = form.toElement() note = None elif len(actions) == 2: try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) answer_form = data_form.Form.fromElement(x_elt) recipient = answer_form.get("recipient", None) expiration_time_pattern = answer_form["expiration_time_pattern"] if expiration_time_pattern == "never": expiration_timestamp = None else: expiration_timestamp = date_utils.date_parse_ext( expiration_time_pattern, default_tz=date_utils.TZ_LOCAL ) registration_limit = int(answer_form["registration_limit"]) welcome_message = answer_form.get("welcome_message", "") extra_details = answer_form.get("extra_details", "") link_id = str(uuid4()) link_data = { "recipient": recipient, "registration_limit": registration_limit, "welcome_message": welcome_message, "extra_details": extra_details, "creator_jid": session_data["requestor"].full(), "use_count": 0, "creation_timestamp": int(time.time()), } if expiration_timestamp is not None: link_data["expiration_timestamp"] = expiration_timestamp await self.ad_hoc_registration_data.aset(link_id, link_data) status = self._c.STATUS.COMPLETED payload = None note = ( self._c.NOTE.INFO, D_("Registration link created successfully: {link_id}").format( link_id=link_id ), ) except (KeyError, StopIteration, ValueError): raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) else: raise self._c.AdHocError(self._c.ERROR.INTERNAL) return (payload, status, None, note) async def list_registration_links( self, client: SatXMPPEntity, command_elt: domish.Element, session_data: dict, action: str, node: str, ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: """Ad-hoc command used to list all registration links. This method retrieves all the registration links and presents them to the user. @param client: The XMPP client instance. @param command_elt: The command element. @param session_data: Data associated with the current session. @param action: The action being performed. @param node: The node identifier. @return: A tuple containing the payload if any, """ actions = session_data.setdefault("actions", []) actions.append(action) if len(actions) == 1: all_links = await self.ad_hoc_registration_data.all() status = self._c.STATUS.EXECUTING form = data_form.Form("form", title=D_("Registered Links")) for link_id, link_data in all_links.items(): form.addField( data_form.Field( "text-multi", var=f"link_{link_id}", label=D_("Link ID: {link_id}").format(link_id=link_id), value=str(link_data), ) ) payload = form.toElement() note = None status = self._c.STATUS.COMPLETED else: raise self._c.AdHocError(self._c.ERROR.INTERNAL) return (payload, status, None, note) async def delete_registration_link( self, client: SatXMPPEntity, command_elt: domish.Element, session_data: dict, action: str, node: str, ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: """Ad-hoc command used to delete a registration link. This method presents a form to the user for selecting a registration link to delete, and processes the submitted form to delete the selected link. @param client: The XMPP client instance. @param command_elt: The command element. @param session_data: Data associated with the current session. @param action: The action being performed. @param node: The node identifier. @return: A tuple containing the payload if any, """ actions = session_data.setdefault("actions", []) actions.append(action) if len(actions) == 1: all_links = await self.ad_hoc_registration_data.all() status = self._c.STATUS.EXECUTING form = data_form.Form("form", title=D_("Delete Registration Link")) link_options = [data_form.Option(link_id, link_id) for link_id in all_links] form.addField( data_form.Field( "list-single", "link_to_delete", label=D_("Select Link to Delete"), options=link_options, required=True, ) ) payload = form.toElement() note = None elif len(actions) == 2: try: x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) answer_form = data_form.Form.fromElement(x_elt) link_to_delete = answer_form["link_to_delete"] await self.ad_hoc_registration_data.adel(link_to_delete) status = self._c.STATUS.COMPLETED payload = None note = ( self._c.NOTE.INFO, D_("Registration link {link_id} deleted successfully.").format( link_id=link_to_delete ), ) except (KeyError, StopIteration, ValueError): raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) else: raise self._c.AdHocError(self._c.ERROR.INTERNAL) return (payload, status, None, note)