comparison libervia/backend/plugins/plugin_xep_0470.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0470.py@524856bd7b19
children 2109d864a3e7
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Pubsub Attachments
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import List, Tuple, Dict, Any, Callable, Optional
20
21 from twisted.words.protocols.jabber import jid, xmlstream, error
22 from twisted.words.xish import domish
23 from twisted.internet import defer
24 from zope.interface import implementer
25 from wokkel import pubsub, disco, iwokkel
26
27 from libervia.backend.core.constants import Const as C
28 from libervia.backend.core.i18n import _
29 from libervia.backend.core.log import getLogger
30 from libervia.backend.core.core_types import SatXMPPEntity
31 from libervia.backend.core import exceptions
32 from libervia.backend.tools.common import uri, data_format, date_utils
33 from libervia.backend.tools.utils import as_deferred, xmpp_date
34
35
36 log = getLogger(__name__)
37
38 IMPORT_NAME = "XEP-0470"
39
40 PLUGIN_INFO = {
41 C.PI_NAME: "Pubsub Attachments",
42 C.PI_IMPORT_NAME: IMPORT_NAME,
43 C.PI_TYPE: C.PLUG_TYPE_XEP,
44 C.PI_MODES: C.PLUG_MODE_BOTH,
45 C.PI_PROTOCOLS: [],
46 C.PI_DEPENDENCIES: ["XEP-0060"],
47 C.PI_MAIN: "PubsubAttachments",
48 C.PI_HANDLER: "yes",
49 C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""),
50 }
51 NS_PREFIX = "urn:xmpp:pubsub-attachments:"
52 NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}1"
53 NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:1"
54
55
56 class PubsubAttachments:
57 namespace = NS_PUBSUB_ATTACHMENTS
58
59 def __init__(self, host):
60 log.info(_("XEP-0470 (Pubsub Attachments) plugin initialization"))
61 host.register_namespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS)
62 self.host = host
63 self._p = host.plugins["XEP-0060"]
64 self.handlers: Dict[Tuple[str, str], dict[str, Any]] = {}
65 host.trigger.add("XEP-0277_send", self.on_mb_send)
66 self.register_attachment_handler(
67 "noticed", NS_PUBSUB_ATTACHMENTS, self.noticed_get, self.noticed_set
68 )
69 self.register_attachment_handler(
70 "reactions", NS_PUBSUB_ATTACHMENTS, self.reactions_get, self.reactions_set
71 )
72 host.bridge.add_method(
73 "ps_attachments_get",
74 ".plugin",
75 in_sign="sssasss",
76 out_sign="(ss)",
77 method=self._get,
78 async_=True,
79 )
80 host.bridge.add_method(
81 "ps_attachments_set",
82 ".plugin",
83 in_sign="ss",
84 out_sign="",
85 method=self._set,
86 async_=True,
87 )
88
89 def get_handler(self, client):
90 return PubsubAttachments_Handler()
91
92 def register_attachment_handler(
93 self,
94 name: str,
95 namespace: str,
96 get_cb: Callable[
97 [SatXMPPEntity, domish.Element, Dict[str, Any]],
98 None],
99 set_cb: Callable[
100 [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]],
101 Optional[domish.Element]],
102 ) -> None:
103 """Register callbacks to handle an attachment
104
105 @param name: name of the element
106 @param namespace: namespace of the element
107 (name, namespace) couple must be unique
108 @param get: method to call when attachments are retrieved
109 it will be called with (client, element, data) where element is the
110 <attachments> element to parse, and data must be updated in place with
111 parsed data
112 @param set: method to call when the attachment need to be set or udpated
113 it will be called with (client, data, former_elt of None if there was no
114 former element). When suitable, ``operation`` should be used to check if we
115 request an ``update`` or a ``replace``.
116 The callback can be either a blocking method, a Deferred or a coroutine
117 """
118 key = (name, namespace)
119 if key in self.handlers:
120 raise exceptions.ConflictError(
121 f"({name}, {namespace}) attachment handlers are already registered"
122 )
123 self.handlers[(name, namespace)] = {
124 "get": get_cb,
125 "set": set_cb
126 }
127
128 def get_attachment_node_name(self, service: jid.JID, node: str, item: str) -> str:
129 """Generate name to use for attachment node"""
130 target_item_uri = uri.build_xmpp_uri(
131 "pubsub",
132 path=service.userhost(),
133 node=node,
134 item=item
135 )
136 return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}"
137
138 def is_attachment_node(self, node: str) -> bool:
139 """Return True if node name is an attachment node"""
140 return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/")
141
142 def attachment_node_2_item(self, node: str) -> Tuple[jid.JID, str, str]:
143 """Retrieve service, node and item from attachement node's name"""
144 if not self.is_attachment_node(node):
145 raise ValueError("this is not an attachment node!")
146 prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/")
147 item_uri = node[prefix_len:]
148 parsed_uri = uri.parse_xmpp_uri(item_uri)
149 if parsed_uri["type"] != "pubsub":
150 raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}")
151 try:
152 service = jid.JID(parsed_uri["path"])
153 except RuntimeError:
154 raise ValueError(f"invalid service in pubsub URI: {item_uri}")
155 node = parsed_uri["node"]
156 item = parsed_uri["item"]
157 return (service, node, item)
158
159 async def on_mb_send(
160 self,
161 client: SatXMPPEntity,
162 service: jid.JID,
163 node: str,
164 item: domish.Element,
165 data: dict
166 ) -> bool:
167 """trigger to create attachment node on each publication"""
168 await self.create_attachments_node(
169 client, service, node, item["id"], autocreate=True
170 )
171 return True
172
173 async def create_attachments_node(
174 self,
175 client: SatXMPPEntity,
176 service: jid.JID,
177 node: str,
178 item_id: str,
179 autocreate: bool = False
180 ):
181 """Create node for attachements if necessary
182
183 @param service: service of target node
184 @param node: node where target item is published
185 @param item_id: ID of target item
186 @param autocrate: if True, target node is create if it doesn't exist
187 """
188 try:
189 node_config = await self._p.getConfiguration(client, service, node)
190 except error.StanzaError as e:
191 if e.condition == "item-not-found" and autocreate:
192 # we auto-create the missing node
193 await self._p.createNode(
194 client, service, node
195 )
196 node_config = await self._p.getConfiguration(client, service, node)
197 elif e.condition == "forbidden":
198 node_config = self._p.make_configuration_form({})
199 else:
200 raise e
201 try:
202 # FIXME: check if this is the best publish_model option
203 node_config.fields["pubsub#publish_model"].value = "open"
204 except KeyError:
205 log.warning("pubsub#publish_model field is missing")
206 attachment_node = self.get_attachment_node_name(service, node, item_id)
207 # we use the same options as target node
208 try:
209 await self._p.create_if_new_node(
210 client, service, attachment_node, options=dict(node_config)
211 )
212 except Exception as e:
213 log.warning(f"Can't create attachment node {attachment_node}: {e}")
214
215 def items_2_attachment_data(
216 self,
217 client: SatXMPPEntity,
218 items: List[domish.Element]
219 ) -> List[Dict[str, Any]]:
220 """Convert items from attachment node to attachment data"""
221 list_data = []
222 for item in items:
223 try:
224 attachments_elt = next(
225 item.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
226 )
227 except StopIteration:
228 log.warning(
229 "item is missing <attachments> elements, ignoring it: {item.toXml()}"
230 )
231 continue
232 item_id = item["id"]
233 publisher_s = item.getAttribute("publisher")
234 # publisher is not filled by all pubsub service, so we can't count on it
235 if publisher_s:
236 publisher = jid.JID(publisher_s)
237 if publisher.userhost() != item_id:
238 log.warning(
239 f"publisher {publisher.userhost()!r} doesn't correspond to item "
240 f"id {item['id']!r}, ignoring. This may be a hack attempt.\n"
241 f"{item.toXml()}"
242 )
243 continue
244 try:
245 jid.JID(item_id)
246 except RuntimeError:
247 log.warning(
248 "item ID is not a JID, this is not compliant and is ignored: "
249 f"{item.toXml}"
250 )
251 continue
252 data = {
253 "from": item_id
254 }
255 for handler in self.handlers.values():
256 handler["get"](client, attachments_elt, data)
257 if len(data) > 1:
258 list_data.append(data)
259 return list_data
260
261 def _get(
262 self,
263 service_s: str,
264 node: str,
265 item: str,
266 senders_s: List[str],
267 extra_s: str,
268 profile_key: str
269 ) -> defer.Deferred:
270 client = self.host.get_client(profile_key)
271 extra = data_format.deserialise(extra_s)
272 senders = [jid.JID(s) for s in senders_s]
273 d = defer.ensureDeferred(
274 self.get_attachments(client, jid.JID(service_s), node, item, senders)
275 )
276 d.addCallback(
277 lambda ret:
278 (data_format.serialise(ret[0]),
279 data_format.serialise(ret[1]))
280 )
281 return d
282
283 async def get_attachments(
284 self,
285 client: SatXMPPEntity,
286 service: jid.JID,
287 node: str,
288 item: str,
289 senders: Optional[List[jid.JID]],
290 extra: Optional[dict] = None
291 ) -> Tuple[List[Dict[str, Any]], dict]:
292 """Retrieve data attached to a pubsub item
293
294 @param service: pubsub service where the node is
295 @param node: pubsub node containing the item
296 @param item: ID of the item for which attachments will be retrieved
297 @param senders: bare JIDs of entities that are checked. Attachments from those
298 entities will be retrieved.
299 If None, attachments from all entities will be retrieved
300 @param extra: extra data, will be used as ``extra`` argument when doing
301 ``get_items`` call.
302 @return: A tuple with:
303 - the list of attachments data, one item per found sender. The attachments
304 data are dict containing attachment, no ``extra`` field is used here
305 (contrarily to attachments data used with ``set_attachements``).
306 - metadata returned by the call to ``get_items``
307 """
308 if extra is None:
309 extra = {}
310 attachment_node = self.get_attachment_node_name(service, node, item)
311 item_ids = [e.userhost() for e in senders] if senders else None
312 items, metadata = await self._p.get_items(
313 client, service, attachment_node, item_ids=item_ids, extra=extra
314 )
315 list_data = self.items_2_attachment_data(client, items)
316
317 return list_data, metadata
318
319 def _set(
320 self,
321 attachments_s: str,
322 profile_key: str
323 ) -> None:
324 client = self.host.get_client(profile_key)
325 attachments = data_format.deserialise(attachments_s) or {}
326 return defer.ensureDeferred(self.set_attachements(client, attachments))
327
328 async def apply_set_handler(
329 self,
330 client: SatXMPPEntity,
331 attachments_data: dict,
332 item_elt: Optional[domish.Element],
333 handlers: Optional[List[Tuple[str, str]]] = None,
334 from_jid: Optional[jid.JID] = None,
335 ) -> domish.Element:
336 """Apply all ``set`` callbacks to an attachments item
337
338 @param attachments_data: data describing the attachments
339 ``extra`` key will be used, and created if not found
340 @param from_jid: jid of the author of the attachments
341 ``client.jid.userhostJID()`` will be used if not specified
342 @param item_elt: item containing an <attachments> element
343 will be modified in place
344 if None, a new element will be created
345 @param handlers: list of (name, namespace) of handlers to use.
346 if None, all registered handlers will be used.
347 @return: updated item_elt if given, otherwise a new item_elt
348 """
349 attachments_data.setdefault("extra", {})
350 if item_elt is None:
351 item_id = client.jid.userhost() if from_jid is None else from_jid.userhost()
352 item_elt = pubsub.Item(item_id)
353 item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
354
355 try:
356 attachments_elt = next(
357 item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
358 )
359 except StopIteration:
360 log.warning(
361 f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
362 )
363 attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
364
365 if handlers is None:
366 handlers = list(self.handlers.keys())
367
368 for name, namespace in handlers:
369 try:
370 handler = self.handlers[(name, namespace)]
371 except KeyError:
372 log.error(
373 f"unregistered handler ({name!r}, {namespace!r}) is requested, "
374 "ignoring"
375 )
376 continue
377 try:
378 former_elt = next(attachments_elt.elements(namespace, name))
379 except StopIteration:
380 former_elt = None
381 new_elt = await as_deferred(
382 handler["set"], client, attachments_data, former_elt
383 )
384 if new_elt != former_elt:
385 if former_elt is not None:
386 attachments_elt.children.remove(former_elt)
387 if new_elt is not None:
388 attachments_elt.addChild(new_elt)
389 return item_elt
390
391 async def set_attachements(
392 self,
393 client: SatXMPPEntity,
394 attachments_data: Dict[str, Any]
395 ) -> None:
396 """Set or update attachments
397
398 Former <attachments> element will be retrieved and updated. Individual
399 attachments replace or update their elements individually, according to the
400 "operation" key.
401
402 "operation" key may be "update" or "replace", and defaults to update, it is only
403 used in attachments where "update" makes sense (e.g. it's used for "reactions"
404 but not for "noticed").
405
406 @param attachments_data: data describing attachments. Various keys (usually stored
407 in attachments_data["extra"]) may be used depending on the attachments
408 handlers registered. The keys "service", "node" and "id" MUST be set.
409 ``attachments_data`` is thought to be compatible with microblog data.
410
411 """
412 try:
413 service = jid.JID(attachments_data["service"])
414 node = attachments_data["node"]
415 item = attachments_data["id"]
416 except (KeyError, RuntimeError):
417 raise ValueError(
418 'data must have "service", "node" and "id" set'
419 )
420 attachment_node = self.get_attachment_node_name(service, node, item)
421 try:
422 items, __ = await self._p.get_items(
423 client, service, attachment_node, item_ids=[client.jid.userhost()]
424 )
425 except exceptions.NotFound:
426 item_elt = None
427 else:
428 if not items:
429 item_elt = None
430 else:
431 item_elt = items[0]
432
433 item_elt = await self.apply_set_handler(
434 client,
435 attachments_data,
436 item_elt=item_elt,
437 )
438
439 try:
440 await self._p.send_items(client, service, attachment_node, [item_elt])
441 except error.StanzaError as e:
442 if e.condition == "item-not-found":
443 # the node doesn't exist, we can't publish attachments
444 log.warning(
445 f"no attachment node found at {service} on {node!r} for item "
446 f"{item!r}, we can't update attachments."
447 )
448 raise exceptions.NotFound("No attachment node available")
449 else:
450 raise e
451
452 async def subscribe(
453 self,
454 client: SatXMPPEntity,
455 service: jid.JID,
456 node: str,
457 item: str,
458 ) -> None:
459 """Subscribe to attachment node targeting the item
460
461 @param service: service of target item (will also be used for attachment node)
462 @param node: node of target item (used to get attachment node's name)
463 @param item: name of target item (used to get attachment node's name)
464 """
465 attachment_node = self.get_attachment_node_name(service, node, item)
466 await self._p.subscribe(client, service, attachment_node)
467
468
469 def set_timestamp(self, attachment_elt: domish.Element, data: dict) -> None:
470 """Check if a ``timestamp`` attribute is set, parse it, and fill data
471
472 @param attachments_elt: element where the ``timestamp`` attribute may be set
473 @param data: data specific to the attachment (i.e. not the whole microblog data)
474 ``timestamp`` field will be set there if timestamp exists and is parsable
475 """
476 timestamp_raw = attachment_elt.getAttribute("timestamp")
477 if timestamp_raw:
478 try:
479 timestamp = date_utils.date_parse(timestamp_raw)
480 except date_utils.ParserError:
481 log.warning(f"can't parse timestamp: {timestamp_raw}")
482 else:
483 data["timestamp"] = timestamp
484
485 def noticed_get(
486 self,
487 client: SatXMPPEntity,
488 attachments_elt: domish.Element,
489 data: Dict[str, Any],
490 ) -> None:
491 try:
492 noticed_elt = next(
493 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed")
494 )
495 except StopIteration:
496 pass
497 else:
498 noticed_data = {
499 "noticed": True
500 }
501 self.set_timestamp(noticed_elt, noticed_data)
502 data["noticed"] = noticed_data
503
504 def noticed_set(
505 self,
506 client: SatXMPPEntity,
507 data: Dict[str, Any],
508 former_elt: Optional[domish.Element]
509 ) -> Optional[domish.Element]:
510 """add or remove a <noticed> attachment
511
512 if data["noticed"] is True, element is added, if it's False, it's removed, and
513 it's not present or None, the former element is kept.
514 """
515 noticed = data["extra"].get("noticed")
516 if noticed is None:
517 return former_elt
518 elif noticed:
519 return domish.Element(
520 (NS_PUBSUB_ATTACHMENTS, "noticed"),
521 attribs = {
522 "timestamp": xmpp_date()
523 }
524 )
525 else:
526 return None
527
528 def reactions_get(
529 self,
530 client: SatXMPPEntity,
531 attachments_elt: domish.Element,
532 data: Dict[str, Any],
533 ) -> None:
534 try:
535 reactions_elt = next(
536 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reactions")
537 )
538 except StopIteration:
539 pass
540 else:
541 reactions_data = {"reactions": []}
542 reactions = reactions_data["reactions"]
543 for reaction_elt in reactions_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction"):
544 reactions.append(str(reaction_elt))
545 self.set_timestamp(reactions_elt, reactions_data)
546 data["reactions"] = reactions_data
547
548 def reactions_set(
549 self,
550 client: SatXMPPEntity,
551 data: Dict[str, Any],
552 former_elt: Optional[domish.Element]
553 ) -> Optional[domish.Element]:
554 """update the <reaction> attachment"""
555 reactions_data = data["extra"].get("reactions")
556 if reactions_data is None:
557 return former_elt
558 operation_type = reactions_data.get("operation", "update")
559 if operation_type == "update":
560 former_reactions = {
561 str(r) for r in former_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction")
562 } if former_elt is not None else set()
563 added_reactions = set(reactions_data.get("add") or [])
564 removed_reactions = set(reactions_data.get("remove") or [])
565 reactions = list((former_reactions | added_reactions) - removed_reactions)
566 elif operation_type == "replace":
567 reactions = reactions_data.get("reactions") or []
568 else:
569 raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}")
570 if reactions:
571 reactions_elt = domish.Element(
572 (NS_PUBSUB_ATTACHMENTS, "reactions"),
573 attribs = {
574 "timestamp": xmpp_date()
575 }
576 )
577 for reactions_data in reactions:
578 reactions_elt.addElement("reaction", content=reactions_data)
579 return reactions_elt
580 else:
581 return None
582
583
584 @implementer(iwokkel.IDisco)
585 class PubsubAttachments_Handler(xmlstream.XMPPHandler):
586
587 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
588 return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)]
589
590 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
591 return []