Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0470.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0470.py@524856bd7b19 |
children | 2109d864a3e7 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Pubsub Attachments | |
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import List, Tuple, Dict, Any, Callable, Optional | |
20 | |
21 from twisted.words.protocols.jabber import jid, xmlstream, error | |
22 from twisted.words.xish import domish | |
23 from twisted.internet import defer | |
24 from zope.interface import implementer | |
25 from wokkel import pubsub, disco, iwokkel | |
26 | |
27 from libervia.backend.core.constants import Const as C | |
28 from libervia.backend.core.i18n import _ | |
29 from libervia.backend.core.log import getLogger | |
30 from libervia.backend.core.core_types import SatXMPPEntity | |
31 from libervia.backend.core import exceptions | |
32 from libervia.backend.tools.common import uri, data_format, date_utils | |
33 from libervia.backend.tools.utils import as_deferred, xmpp_date | |
34 | |
35 | |
36 log = getLogger(__name__) | |
37 | |
38 IMPORT_NAME = "XEP-0470" | |
39 | |
40 PLUGIN_INFO = { | |
41 C.PI_NAME: "Pubsub Attachments", | |
42 C.PI_IMPORT_NAME: IMPORT_NAME, | |
43 C.PI_TYPE: C.PLUG_TYPE_XEP, | |
44 C.PI_MODES: C.PLUG_MODE_BOTH, | |
45 C.PI_PROTOCOLS: [], | |
46 C.PI_DEPENDENCIES: ["XEP-0060"], | |
47 C.PI_MAIN: "PubsubAttachments", | |
48 C.PI_HANDLER: "yes", | |
49 C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""), | |
50 } | |
51 NS_PREFIX = "urn:xmpp:pubsub-attachments:" | |
52 NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}1" | |
53 NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:1" | |
54 | |
55 | |
56 class PubsubAttachments: | |
57 namespace = NS_PUBSUB_ATTACHMENTS | |
58 | |
59 def __init__(self, host): | |
60 log.info(_("XEP-0470 (Pubsub Attachments) plugin initialization")) | |
61 host.register_namespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS) | |
62 self.host = host | |
63 self._p = host.plugins["XEP-0060"] | |
64 self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {} | |
65 host.trigger.add("XEP-0277_send", self.on_mb_send) | |
66 self.register_attachment_handler( | |
67 "noticed", NS_PUBSUB_ATTACHMENTS, self.noticed_get, self.noticed_set | |
68 ) | |
69 self.register_attachment_handler( | |
70 "reactions", NS_PUBSUB_ATTACHMENTS, self.reactions_get, self.reactions_set | |
71 ) | |
72 host.bridge.add_method( | |
73 "ps_attachments_get", | |
74 ".plugin", | |
75 in_sign="sssasss", | |
76 out_sign="(ss)", | |
77 method=self._get, | |
78 async_=True, | |
79 ) | |
80 host.bridge.add_method( | |
81 "ps_attachments_set", | |
82 ".plugin", | |
83 in_sign="ss", | |
84 out_sign="", | |
85 method=self._set, | |
86 async_=True, | |
87 ) | |
88 | |
89 def get_handler(self, client): | |
90 return PubsubAttachments_Handler() | |
91 | |
92 def register_attachment_handler( | |
93 self, | |
94 name: str, | |
95 namespace: str, | |
96 get_cb: Callable[ | |
97 [SatXMPPEntity, domish.Element, Dict[str, Any]], | |
98 None], | |
99 set_cb: Callable[ | |
100 [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]], | |
101 Optional[domish.Element]], | |
102 ) -> None: | |
103 """Register callbacks to handle an attachment | |
104 | |
105 @param name: name of the element | |
106 @param namespace: namespace of the element | |
107 (name, namespace) couple must be unique | |
108 @param get: method to call when attachments are retrieved | |
109 it will be called with (client, element, data) where element is the | |
110 <attachments> element to parse, and data must be updated in place with | |
111 parsed data | |
112 @param set: method to call when the attachment need to be set or udpated | |
113 it will be called with (client, data, former_elt of None if there was no | |
114 former element). When suitable, ``operation`` should be used to check if we | |
115 request an ``update`` or a ``replace``. | |
116 The callback can be either a blocking method, a Deferred or a coroutine | |
117 """ | |
118 key = (name, namespace) | |
119 if key in self.handlers: | |
120 raise exceptions.ConflictError( | |
121 f"({name}, {namespace}) attachment handlers are already registered" | |
122 ) | |
123 self.handlers[(name, namespace)] = { | |
124 "get": get_cb, | |
125 "set": set_cb | |
126 } | |
127 | |
128 def get_attachment_node_name(self, service: jid.JID, node: str, item: str) -> str: | |
129 """Generate name to use for attachment node""" | |
130 target_item_uri = uri.build_xmpp_uri( | |
131 "pubsub", | |
132 path=service.userhost(), | |
133 node=node, | |
134 item=item | |
135 ) | |
136 return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}" | |
137 | |
138 def is_attachment_node(self, node: str) -> bool: | |
139 """Return True if node name is an attachment node""" | |
140 return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/") | |
141 | |
142 def attachment_node_2_item(self, node: str) -> Tuple[jid.JID, str, str]: | |
143 """Retrieve service, node and item from attachement node's name""" | |
144 if not self.is_attachment_node(node): | |
145 raise ValueError("this is not an attachment node!") | |
146 prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/") | |
147 item_uri = node[prefix_len:] | |
148 parsed_uri = uri.parse_xmpp_uri(item_uri) | |
149 if parsed_uri["type"] != "pubsub": | |
150 raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}") | |
151 try: | |
152 service = jid.JID(parsed_uri["path"]) | |
153 except RuntimeError: | |
154 raise ValueError(f"invalid service in pubsub URI: {item_uri}") | |
155 node = parsed_uri["node"] | |
156 item = parsed_uri["item"] | |
157 return (service, node, item) | |
158 | |
159 async def on_mb_send( | |
160 self, | |
161 client: SatXMPPEntity, | |
162 service: jid.JID, | |
163 node: str, | |
164 item: domish.Element, | |
165 data: dict | |
166 ) -> bool: | |
167 """trigger to create attachment node on each publication""" | |
168 await self.create_attachments_node( | |
169 client, service, node, item["id"], autocreate=True | |
170 ) | |
171 return True | |
172 | |
173 async def create_attachments_node( | |
174 self, | |
175 client: SatXMPPEntity, | |
176 service: jid.JID, | |
177 node: str, | |
178 item_id: str, | |
179 autocreate: bool = False | |
180 ): | |
181 """Create node for attachements if necessary | |
182 | |
183 @param service: service of target node | |
184 @param node: node where target item is published | |
185 @param item_id: ID of target item | |
186 @param autocrate: if True, target node is create if it doesn't exist | |
187 """ | |
188 try: | |
189 node_config = await self._p.getConfiguration(client, service, node) | |
190 except error.StanzaError as e: | |
191 if e.condition == "item-not-found" and autocreate: | |
192 # we auto-create the missing node | |
193 await self._p.createNode( | |
194 client, service, node | |
195 ) | |
196 node_config = await self._p.getConfiguration(client, service, node) | |
197 elif e.condition == "forbidden": | |
198 node_config = self._p.make_configuration_form({}) | |
199 else: | |
200 raise e | |
201 try: | |
202 # FIXME: check if this is the best publish_model option | |
203 node_config.fields["pubsub#publish_model"].value = "open" | |
204 except KeyError: | |
205 log.warning("pubsub#publish_model field is missing") | |
206 attachment_node = self.get_attachment_node_name(service, node, item_id) | |
207 # we use the same options as target node | |
208 try: | |
209 await self._p.create_if_new_node( | |
210 client, service, attachment_node, options=dict(node_config) | |
211 ) | |
212 except Exception as e: | |
213 log.warning(f"Can't create attachment node {attachment_node}: {e}") | |
214 | |
215 def items_2_attachment_data( | |
216 self, | |
217 client: SatXMPPEntity, | |
218 items: List[domish.Element] | |
219 ) -> List[Dict[str, Any]]: | |
220 """Convert items from attachment node to attachment data""" | |
221 list_data = [] | |
222 for item in items: | |
223 try: | |
224 attachments_elt = next( | |
225 item.elements(NS_PUBSUB_ATTACHMENTS, "attachments") | |
226 ) | |
227 except StopIteration: | |
228 log.warning( | |
229 "item is missing <attachments> elements, ignoring it: {item.toXml()}" | |
230 ) | |
231 continue | |
232 item_id = item["id"] | |
233 publisher_s = item.getAttribute("publisher") | |
234 # publisher is not filled by all pubsub service, so we can't count on it | |
235 if publisher_s: | |
236 publisher = jid.JID(publisher_s) | |
237 if publisher.userhost() != item_id: | |
238 log.warning( | |
239 f"publisher {publisher.userhost()!r} doesn't correspond to item " | |
240 f"id {item['id']!r}, ignoring. This may be a hack attempt.\n" | |
241 f"{item.toXml()}" | |
242 ) | |
243 continue | |
244 try: | |
245 jid.JID(item_id) | |
246 except RuntimeError: | |
247 log.warning( | |
248 "item ID is not a JID, this is not compliant and is ignored: " | |
249 f"{item.toXml}" | |
250 ) | |
251 continue | |
252 data = { | |
253 "from": item_id | |
254 } | |
255 for handler in self.handlers.values(): | |
256 handler["get"](client, attachments_elt, data) | |
257 if len(data) > 1: | |
258 list_data.append(data) | |
259 return list_data | |
260 | |
261 def _get( | |
262 self, | |
263 service_s: str, | |
264 node: str, | |
265 item: str, | |
266 senders_s: List[str], | |
267 extra_s: str, | |
268 profile_key: str | |
269 ) -> defer.Deferred: | |
270 client = self.host.get_client(profile_key) | |
271 extra = data_format.deserialise(extra_s) | |
272 senders = [jid.JID(s) for s in senders_s] | |
273 d = defer.ensureDeferred( | |
274 self.get_attachments(client, jid.JID(service_s), node, item, senders) | |
275 ) | |
276 d.addCallback( | |
277 lambda ret: | |
278 (data_format.serialise(ret[0]), | |
279 data_format.serialise(ret[1])) | |
280 ) | |
281 return d | |
282 | |
283 async def get_attachments( | |
284 self, | |
285 client: SatXMPPEntity, | |
286 service: jid.JID, | |
287 node: str, | |
288 item: str, | |
289 senders: Optional[List[jid.JID]], | |
290 extra: Optional[dict] = None | |
291 ) -> Tuple[List[Dict[str, Any]], dict]: | |
292 """Retrieve data attached to a pubsub item | |
293 | |
294 @param service: pubsub service where the node is | |
295 @param node: pubsub node containing the item | |
296 @param item: ID of the item for which attachments will be retrieved | |
297 @param senders: bare JIDs of entities that are checked. Attachments from those | |
298 entities will be retrieved. | |
299 If None, attachments from all entities will be retrieved | |
300 @param extra: extra data, will be used as ``extra`` argument when doing | |
301 ``get_items`` call. | |
302 @return: A tuple with: | |
303 - the list of attachments data, one item per found sender. The attachments | |
304 data are dict containing attachment, no ``extra`` field is used here | |
305 (contrarily to attachments data used with ``set_attachements``). | |
306 - metadata returned by the call to ``get_items`` | |
307 """ | |
308 if extra is None: | |
309 extra = {} | |
310 attachment_node = self.get_attachment_node_name(service, node, item) | |
311 item_ids = [e.userhost() for e in senders] if senders else None | |
312 items, metadata = await self._p.get_items( | |
313 client, service, attachment_node, item_ids=item_ids, extra=extra | |
314 ) | |
315 list_data = self.items_2_attachment_data(client, items) | |
316 | |
317 return list_data, metadata | |
318 | |
319 def _set( | |
320 self, | |
321 attachments_s: str, | |
322 profile_key: str | |
323 ) -> None: | |
324 client = self.host.get_client(profile_key) | |
325 attachments = data_format.deserialise(attachments_s) or {} | |
326 return defer.ensureDeferred(self.set_attachements(client, attachments)) | |
327 | |
328 async def apply_set_handler( | |
329 self, | |
330 client: SatXMPPEntity, | |
331 attachments_data: dict, | |
332 item_elt: Optional[domish.Element], | |
333 handlers: Optional[List[Tuple[str, str]]] = None, | |
334 from_jid: Optional[jid.JID] = None, | |
335 ) -> domish.Element: | |
336 """Apply all ``set`` callbacks to an attachments item | |
337 | |
338 @param attachments_data: data describing the attachments | |
339 ``extra`` key will be used, and created if not found | |
340 @param from_jid: jid of the author of the attachments | |
341 ``client.jid.userhostJID()`` will be used if not specified | |
342 @param item_elt: item containing an <attachments> element | |
343 will be modified in place | |
344 if None, a new element will be created | |
345 @param handlers: list of (name, namespace) of handlers to use. | |
346 if None, all registered handlers will be used. | |
347 @return: updated item_elt if given, otherwise a new item_elt | |
348 """ | |
349 attachments_data.setdefault("extra", {}) | |
350 if item_elt is None: | |
351 item_id = client.jid.userhost() if from_jid is None else from_jid.userhost() | |
352 item_elt = pubsub.Item(item_id) | |
353 item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) | |
354 | |
355 try: | |
356 attachments_elt = next( | |
357 item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") | |
358 ) | |
359 except StopIteration: | |
360 log.warning( | |
361 f"no <attachments> element found, creating a new one: {item_elt.toXml()}" | |
362 ) | |
363 attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) | |
364 | |
365 if handlers is None: | |
366 handlers = list(self.handlers.keys()) | |
367 | |
368 for name, namespace in handlers: | |
369 try: | |
370 handler = self.handlers[(name, namespace)] | |
371 except KeyError: | |
372 log.error( | |
373 f"unregistered handler ({name!r}, {namespace!r}) is requested, " | |
374 "ignoring" | |
375 ) | |
376 continue | |
377 try: | |
378 former_elt = next(attachments_elt.elements(namespace, name)) | |
379 except StopIteration: | |
380 former_elt = None | |
381 new_elt = await as_deferred( | |
382 handler["set"], client, attachments_data, former_elt | |
383 ) | |
384 if new_elt != former_elt: | |
385 if former_elt is not None: | |
386 attachments_elt.children.remove(former_elt) | |
387 if new_elt is not None: | |
388 attachments_elt.addChild(new_elt) | |
389 return item_elt | |
390 | |
391 async def set_attachements( | |
392 self, | |
393 client: SatXMPPEntity, | |
394 attachments_data: Dict[str, Any] | |
395 ) -> None: | |
396 """Set or update attachments | |
397 | |
398 Former <attachments> element will be retrieved and updated. Individual | |
399 attachments replace or update their elements individually, according to the | |
400 "operation" key. | |
401 | |
402 "operation" key may be "update" or "replace", and defaults to update, it is only | |
403 used in attachments where "update" makes sense (e.g. it's used for "reactions" | |
404 but not for "noticed"). | |
405 | |
406 @param attachments_data: data describing attachments. Various keys (usually stored | |
407 in attachments_data["extra"]) may be used depending on the attachments | |
408 handlers registered. The keys "service", "node" and "id" MUST be set. | |
409 ``attachments_data`` is thought to be compatible with microblog data. | |
410 | |
411 """ | |
412 try: | |
413 service = jid.JID(attachments_data["service"]) | |
414 node = attachments_data["node"] | |
415 item = attachments_data["id"] | |
416 except (KeyError, RuntimeError): | |
417 raise ValueError( | |
418 'data must have "service", "node" and "id" set' | |
419 ) | |
420 attachment_node = self.get_attachment_node_name(service, node, item) | |
421 try: | |
422 items, __ = await self._p.get_items( | |
423 client, service, attachment_node, item_ids=[client.jid.userhost()] | |
424 ) | |
425 except exceptions.NotFound: | |
426 item_elt = None | |
427 else: | |
428 if not items: | |
429 item_elt = None | |
430 else: | |
431 item_elt = items[0] | |
432 | |
433 item_elt = await self.apply_set_handler( | |
434 client, | |
435 attachments_data, | |
436 item_elt=item_elt, | |
437 ) | |
438 | |
439 try: | |
440 await self._p.send_items(client, service, attachment_node, [item_elt]) | |
441 except error.StanzaError as e: | |
442 if e.condition == "item-not-found": | |
443 # the node doesn't exist, we can't publish attachments | |
444 log.warning( | |
445 f"no attachment node found at {service} on {node!r} for item " | |
446 f"{item!r}, we can't update attachments." | |
447 ) | |
448 raise exceptions.NotFound("No attachment node available") | |
449 else: | |
450 raise e | |
451 | |
452 async def subscribe( | |
453 self, | |
454 client: SatXMPPEntity, | |
455 service: jid.JID, | |
456 node: str, | |
457 item: str, | |
458 ) -> None: | |
459 """Subscribe to attachment node targeting the item | |
460 | |
461 @param service: service of target item (will also be used for attachment node) | |
462 @param node: node of target item (used to get attachment node's name) | |
463 @param item: name of target item (used to get attachment node's name) | |
464 """ | |
465 attachment_node = self.get_attachment_node_name(service, node, item) | |
466 await self._p.subscribe(client, service, attachment_node) | |
467 | |
468 | |
469 def set_timestamp(self, attachment_elt: domish.Element, data: dict) -> None: | |
470 """Check if a ``timestamp`` attribute is set, parse it, and fill data | |
471 | |
472 @param attachments_elt: element where the ``timestamp`` attribute may be set | |
473 @param data: data specific to the attachment (i.e. not the whole microblog data) | |
474 ``timestamp`` field will be set there if timestamp exists and is parsable | |
475 """ | |
476 timestamp_raw = attachment_elt.getAttribute("timestamp") | |
477 if timestamp_raw: | |
478 try: | |
479 timestamp = date_utils.date_parse(timestamp_raw) | |
480 except date_utils.ParserError: | |
481 log.warning(f"can't parse timestamp: {timestamp_raw}") | |
482 else: | |
483 data["timestamp"] = timestamp | |
484 | |
485 def noticed_get( | |
486 self, | |
487 client: SatXMPPEntity, | |
488 attachments_elt: domish.Element, | |
489 data: Dict[str, Any], | |
490 ) -> None: | |
491 try: | |
492 noticed_elt = next( | |
493 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed") | |
494 ) | |
495 except StopIteration: | |
496 pass | |
497 else: | |
498 noticed_data = { | |
499 "noticed": True | |
500 } | |
501 self.set_timestamp(noticed_elt, noticed_data) | |
502 data["noticed"] = noticed_data | |
503 | |
504 def noticed_set( | |
505 self, | |
506 client: SatXMPPEntity, | |
507 data: Dict[str, Any], | |
508 former_elt: Optional[domish.Element] | |
509 ) -> Optional[domish.Element]: | |
510 """add or remove a <noticed> attachment | |
511 | |
512 if data["noticed"] is True, element is added, if it's False, it's removed, and | |
513 it's not present or None, the former element is kept. | |
514 """ | |
515 noticed = data["extra"].get("noticed") | |
516 if noticed is None: | |
517 return former_elt | |
518 elif noticed: | |
519 return domish.Element( | |
520 (NS_PUBSUB_ATTACHMENTS, "noticed"), | |
521 attribs = { | |
522 "timestamp": xmpp_date() | |
523 } | |
524 ) | |
525 else: | |
526 return None | |
527 | |
528 def reactions_get( | |
529 self, | |
530 client: SatXMPPEntity, | |
531 attachments_elt: domish.Element, | |
532 data: Dict[str, Any], | |
533 ) -> None: | |
534 try: | |
535 reactions_elt = next( | |
536 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reactions") | |
537 ) | |
538 except StopIteration: | |
539 pass | |
540 else: | |
541 reactions_data = {"reactions": []} | |
542 reactions = reactions_data["reactions"] | |
543 for reaction_elt in reactions_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction"): | |
544 reactions.append(str(reaction_elt)) | |
545 self.set_timestamp(reactions_elt, reactions_data) | |
546 data["reactions"] = reactions_data | |
547 | |
548 def reactions_set( | |
549 self, | |
550 client: SatXMPPEntity, | |
551 data: Dict[str, Any], | |
552 former_elt: Optional[domish.Element] | |
553 ) -> Optional[domish.Element]: | |
554 """update the <reaction> attachment""" | |
555 reactions_data = data["extra"].get("reactions") | |
556 if reactions_data is None: | |
557 return former_elt | |
558 operation_type = reactions_data.get("operation", "update") | |
559 if operation_type == "update": | |
560 former_reactions = { | |
561 str(r) for r in former_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction") | |
562 } if former_elt is not None else set() | |
563 added_reactions = set(reactions_data.get("add") or []) | |
564 removed_reactions = set(reactions_data.get("remove") or []) | |
565 reactions = list((former_reactions | added_reactions) - removed_reactions) | |
566 elif operation_type == "replace": | |
567 reactions = reactions_data.get("reactions") or [] | |
568 else: | |
569 raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}") | |
570 if reactions: | |
571 reactions_elt = domish.Element( | |
572 (NS_PUBSUB_ATTACHMENTS, "reactions"), | |
573 attribs = { | |
574 "timestamp": xmpp_date() | |
575 } | |
576 ) | |
577 for reactions_data in reactions: | |
578 reactions_elt.addElement("reaction", content=reactions_data) | |
579 return reactions_elt | |
580 else: | |
581 return None | |
582 | |
583 | |
584 @implementer(iwokkel.IDisco) | |
585 class PubsubAttachments_Handler(xmlstream.XMPPHandler): | |
586 | |
587 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): | |
588 return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)] | |
589 | |
590 def getDiscoItems(self, requestor, service, nodeIdentifier=""): | |
591 return [] |