Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_adhoc_registration.py @ 4185:c6d85c31a59f
plugin ad-hoc registration: Implement plugin to handle registration links:
registration links are used in web frontend to allow registration with secret links when
it's normally closed.
author | Goffi <goffi@goffi.org> |
---|---|
date | Sun, 10 Dec 2023 18:32:04 +0100 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
4184:50c919dfe61b | 4185:c6d85c31a59f |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin to handle web frontend registration links with Ad-Hoc Commands | |
4 # Copyright (C) 2009-2023 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 | |
20 import time | |
21 from uuid import uuid4 | |
22 | |
23 from twisted.internet import defer | |
24 from twisted.words.xish import domish | |
25 from wokkel import data_form | |
26 | |
27 from libervia.backend.core import exceptions | |
28 from libervia.backend.core.constants import Const as C | |
29 from libervia.backend.core.core_types import SatXMPPEntity | |
30 from libervia.backend.core.i18n import D_, _ | |
31 from libervia.backend.core.log import getLogger | |
32 from libervia.backend.memory import persistent | |
33 from libervia.backend.tools.common import data_format, date_utils | |
34 | |
35 log = getLogger(__name__) | |
36 | |
37 | |
38 PLUGIN_INFO = { | |
39 C.PI_NAME: "Ad-Hoc Commands - Registration", | |
40 C.PI_IMPORT_NAME: "AD_HOC_REGISTRATION", | |
41 C.PI_TYPE: "Misc", | |
42 C.PI_PROTOCOLS: [], | |
43 C.PI_DEPENDENCIES: ["XEP-0050"], | |
44 C.PI_MAIN: "AdHocRegistration", | |
45 C.PI_HANDLER: "no", | |
46 C.PI_DESCRIPTION: _("""Add registration link handling Ad-Hoc commands"""), | |
47 } | |
48 | |
49 | |
50 class AdHocRegistration: | |
51 def __init__(self, host): | |
52 log.info(f"plugin {PLUGIN_INFO[C.PI_NAME]!r} initialization") | |
53 self.host = host | |
54 self._c = host.plugins["XEP-0050"] | |
55 self.ad_hoc_registration_data = persistent.LazyPersistentBinaryDict( | |
56 "registration_links" | |
57 ) | |
58 host.bridge.add_method( | |
59 "registration_link_get", | |
60 ".plugin", | |
61 in_sign="s", | |
62 out_sign="s", | |
63 method=self._get, | |
64 async_=True, | |
65 ) | |
66 | |
67 def _get(self, registration_link_id: str) -> defer.Deferred[str]: | |
68 d = defer.ensureDeferred(self.get(registration_link_id)) | |
69 d.addCallback(data_format.serialise) | |
70 return d | |
71 | |
72 async def get(self, registration_link_id: str) -> dict: | |
73 """Retrieve registration link from its ID | |
74 | |
75 @param registration_link_id: registration link data | |
76 @return: registration link data | |
77 @raise exceptions.NotFound: not registration link found with this ID. | |
78 """ | |
79 link_data = await self.ad_hoc_registration_data.get(registration_link_id) | |
80 if not link_data: | |
81 raise exceptions.NotFound | |
82 expiration_timestamp = link_data.get("expiration_timestamp") | |
83 if expiration_timestamp is not None and expiration_timestamp < time.time(): | |
84 log.info(f"Deleting expiration link {registration_link_id}.") | |
85 await self.ad_hoc_registration_data.adel(registration_link_id) | |
86 raise exceptions.NotFound | |
87 return link_data | |
88 | |
89 async def profile_connected(self, client): | |
90 if client.is_admin: | |
91 self._c.add_ad_hoc_command( | |
92 client, | |
93 self.create_registration_link, | |
94 D_("Create Registration Link"), | |
95 node="https://libervia.org/registration/create", | |
96 ) | |
97 self._c.add_ad_hoc_command( | |
98 client, | |
99 self.list_registration_links, | |
100 D_("List Registration Links"), | |
101 node="https://libervia.org/registration/list", | |
102 ) | |
103 self._c.add_ad_hoc_command( | |
104 client, | |
105 self.delete_registration_link, | |
106 D_("Delete Registration Link"), | |
107 node="https://libervia.org/registration/delete", | |
108 ) | |
109 | |
110 async def create_registration_link( | |
111 self, | |
112 client: SatXMPPEntity, | |
113 command_elt: domish.Element, | |
114 session_data: dict, | |
115 action: str, | |
116 node: str, | |
117 ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: | |
118 """Ad-hoc command used to create a registration link. | |
119 | |
120 This method presents a form to the user for creating a registration link, | |
121 and processes the submitted form to generate and store the link with its | |
122 associated data. | |
123 | |
124 @param client: The XMPP client instance. | |
125 @param command_elt: The command element. | |
126 @param session_data: Data associated with the current session. | |
127 @param action: The action being performed. | |
128 @param node: The node identifier. | |
129 | |
130 @return: A tuple containing the payload if any, | |
131 """ | |
132 actions = session_data.setdefault("actions", []) | |
133 actions.append(action) | |
134 | |
135 if len(actions) == 1: | |
136 # First request, presenting the form to the user | |
137 status = self._c.STATUS.EXECUTING | |
138 form = data_form.Form("form", title=D_("Create Registration Link")) | |
139 | |
140 form.addField( | |
141 data_form.Field("text-single", "recipient", label=D_("Recipient Name")) | |
142 ) | |
143 form.addField( | |
144 data_form.Field( | |
145 "text-single", | |
146 "expiration_time_pattern", | |
147 label=D_("Expiration Date"), | |
148 desc=D_( | |
149 "Set the expiry duration for this link. Use the Libervia Time " | |
150 "Pattern (e.g., '1 week'). The link will expire after this " | |
151 "period." | |
152 ), | |
153 value="1 week", | |
154 required=True, | |
155 ) | |
156 ) | |
157 form.addField( | |
158 data_form.Field( | |
159 "text-single", | |
160 "registration_limit", | |
161 label=D_("Maximum registrations limit"), | |
162 desc=D_( | |
163 "How many accounts can be registered using this link. Set to 0 " | |
164 "for unlimited registrations." | |
165 ), | |
166 value="1", | |
167 required=True, | |
168 ) | |
169 ) | |
170 form.addField( | |
171 data_form.Field( | |
172 "text-multi", "welcome_message", label=D_("Welcome Message") | |
173 ) | |
174 ) | |
175 form.addField( | |
176 data_form.Field( | |
177 "text-multi", | |
178 "extra_details", | |
179 label=D_("Additional Details"), | |
180 ) | |
181 ) | |
182 | |
183 payload = form.toElement() | |
184 note = None | |
185 | |
186 elif len(actions) == 2: | |
187 try: | |
188 x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) | |
189 answer_form = data_form.Form.fromElement(x_elt) | |
190 | |
191 recipient = answer_form.get("recipient", None) | |
192 expiration_time_pattern = answer_form["expiration_time_pattern"] | |
193 if expiration_time_pattern == "never": | |
194 expiration_timestamp = None | |
195 else: | |
196 expiration_timestamp = date_utils.date_parse_ext( | |
197 expiration_time_pattern, default_tz=date_utils.TZ_LOCAL | |
198 ) | |
199 registration_limit = int(answer_form["registration_limit"]) | |
200 welcome_message = answer_form.get("welcome_message", "") | |
201 extra_details = answer_form.get("extra_details", "") | |
202 | |
203 link_id = str(uuid4()) | |
204 | |
205 link_data = { | |
206 "recipient": recipient, | |
207 "registration_limit": registration_limit, | |
208 "welcome_message": welcome_message, | |
209 "extra_details": extra_details, | |
210 "creator_jid": session_data["requestor"].full(), | |
211 "use_count": 0, | |
212 "creation_timestamp": int(time.time()), | |
213 } | |
214 if expiration_timestamp is not None: | |
215 link_data["expiration_timestamp"] = expiration_timestamp | |
216 | |
217 await self.ad_hoc_registration_data.aset(link_id, link_data) | |
218 | |
219 status = self._c.STATUS.COMPLETED | |
220 payload = None | |
221 note = ( | |
222 self._c.NOTE.INFO, | |
223 D_("Registration link created successfully: {link_id}").format( | |
224 link_id=link_id | |
225 ), | |
226 ) | |
227 except (KeyError, StopIteration, ValueError): | |
228 raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) | |
229 else: | |
230 raise self._c.AdHocError(self._c.ERROR.INTERNAL) | |
231 | |
232 return (payload, status, None, note) | |
233 | |
234 async def list_registration_links( | |
235 self, | |
236 client: SatXMPPEntity, | |
237 command_elt: domish.Element, | |
238 session_data: dict, | |
239 action: str, | |
240 node: str, | |
241 ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: | |
242 """Ad-hoc command used to list all registration links. | |
243 | |
244 This method retrieves all the registration links and presents them to the user. | |
245 | |
246 @param client: The XMPP client instance. | |
247 @param command_elt: The command element. | |
248 @param session_data: Data associated with the current session. | |
249 @param action: The action being performed. | |
250 @param node: The node identifier. | |
251 | |
252 @return: A tuple containing the payload if any, | |
253 """ | |
254 actions = session_data.setdefault("actions", []) | |
255 actions.append(action) | |
256 | |
257 if len(actions) == 1: | |
258 all_links = await self.ad_hoc_registration_data.all() | |
259 status = self._c.STATUS.EXECUTING | |
260 | |
261 form = data_form.Form("form", title=D_("Registered Links")) | |
262 for link_id, link_data in all_links.items(): | |
263 form.addField( | |
264 data_form.Field( | |
265 "text-multi", | |
266 var=f"link_{link_id}", | |
267 label=D_("Link ID: {link_id}").format(link_id=link_id), | |
268 value=str(link_data), | |
269 ) | |
270 ) | |
271 | |
272 payload = form.toElement() | |
273 note = None | |
274 | |
275 status = self._c.STATUS.COMPLETED | |
276 | |
277 else: | |
278 raise self._c.AdHocError(self._c.ERROR.INTERNAL) | |
279 | |
280 return (payload, status, None, note) | |
281 | |
282 async def delete_registration_link( | |
283 self, | |
284 client: SatXMPPEntity, | |
285 command_elt: domish.Element, | |
286 session_data: dict, | |
287 action: str, | |
288 node: str, | |
289 ) -> tuple[domish.Element | None, str, None, tuple[str, str] | None]: | |
290 """Ad-hoc command used to delete a registration link. | |
291 | |
292 This method presents a form to the user for selecting a registration link to | |
293 delete, and processes the submitted form to delete the selected link. | |
294 | |
295 @param client: The XMPP client instance. | |
296 @param command_elt: The command element. | |
297 @param session_data: Data associated with the current session. | |
298 @param action: The action being performed. | |
299 @param node: The node identifier. | |
300 | |
301 @return: A tuple containing the payload if any, | |
302 """ | |
303 actions = session_data.setdefault("actions", []) | |
304 actions.append(action) | |
305 | |
306 if len(actions) == 1: | |
307 all_links = await self.ad_hoc_registration_data.all() | |
308 status = self._c.STATUS.EXECUTING | |
309 form = data_form.Form("form", title=D_("Delete Registration Link")) | |
310 | |
311 link_options = [data_form.Option(link_id, link_id) for link_id in all_links] | |
312 form.addField( | |
313 data_form.Field( | |
314 "list-single", | |
315 "link_to_delete", | |
316 label=D_("Select Link to Delete"), | |
317 options=link_options, | |
318 required=True, | |
319 ) | |
320 ) | |
321 | |
322 payload = form.toElement() | |
323 note = None | |
324 | |
325 elif len(actions) == 2: | |
326 try: | |
327 x_elt = next(command_elt.elements(data_form.NS_X_DATA, "x")) | |
328 answer_form = data_form.Form.fromElement(x_elt) | |
329 link_to_delete = answer_form["link_to_delete"] | |
330 | |
331 await self.ad_hoc_registration_data.adel(link_to_delete) | |
332 | |
333 status = self._c.STATUS.COMPLETED | |
334 payload = None | |
335 note = ( | |
336 self._c.NOTE.INFO, | |
337 D_("Registration link {link_id} deleted successfully.").format( | |
338 link_id=link_to_delete | |
339 ), | |
340 ) | |
341 except (KeyError, StopIteration, ValueError): | |
342 raise self._c.AdHocError(self._c.ERROR.BAD_PAYLOAD) | |
343 else: | |
344 raise self._c.AdHocError(self._c.ERROR.INTERNAL) | |
345 | |
346 return (payload, status, None, note) |