Mercurial > libervia-backend
comparison sat/plugins/plugin_pubsub_attachments.py @ 3864:ac255a0fbd4c
plugin pubsub attachments: partial implementation of pubsub-attachments protoXEP:
This is an implementation of the "Basic Usage" of
https://xmpp.org/extensions/inbox/pubsub-attachments.html.
rel 370
author | Goffi <goffi@goffi.org> |
---|---|
date | Wed, 20 Jul 2022 17:49:51 +0200 |
parents | |
children | c0bcbcf5b4b7 |
comparison
equal
deleted
inserted
replaced
3863:c04f5e8a3568 | 3864:ac255a0fbd4c |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # Libervia plugin for Pubsub Attachments | |
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 from typing import List, Tuple, Dict, Any, Callable, Optional | |
20 | |
21 from twisted.words.protocols.jabber import jid, xmlstream, error | |
22 from twisted.words.xish import domish | |
23 from twisted.internet import defer | |
24 from zope.interface import implementer | |
25 from wokkel import pubsub, disco, iwokkel | |
26 | |
27 from sat.core.constants import Const as C | |
28 from sat.core.i18n import _ | |
29 from sat.core.log import getLogger | |
30 from sat.core.core_types import SatXMPPEntity | |
31 from sat.core import exceptions | |
32 from sat.tools.common import uri, data_format, date_utils | |
33 from sat.tools.utils import xmpp_date | |
34 | |
35 | |
36 log = getLogger(__name__) | |
37 | |
38 IMPORT_NAME = "PUBSUB_ATTACHMENTS" | |
39 | |
40 PLUGIN_INFO = { | |
41 C.PI_NAME: "Pubsub Attachments", | |
42 C.PI_IMPORT_NAME: IMPORT_NAME, | |
43 C.PI_TYPE: C.PLUG_TYPE_EXP, | |
44 C.PI_MODES: C.PLUG_MODE_BOTH, | |
45 C.PI_PROTOCOLS: [], | |
46 C.PI_DEPENDENCIES: ["XEP-0060"], | |
47 C.PI_MAIN: "PubsubAttachments", | |
48 C.PI_HANDLER: "yes", | |
49 C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""), | |
50 } | |
51 NS_PREFIX = "urn:xmpp:pubsub-attachments:" | |
52 NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}0" | |
53 NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:0" | |
54 | |
55 | |
56 class PubsubAttachments: | |
57 namespace = NS_PUBSUB_ATTACHMENTS | |
58 | |
59 def __init__(self, host): | |
60 log.info(_("Pubsub Attachments plugin initialization")) | |
61 host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS) | |
62 self.host = host | |
63 self._p = host.plugins["XEP-0060"] | |
64 self.handlers = {} | |
65 host.trigger.add("XEP-0277_send", self.onMBSend) | |
66 self.registerAttachmentHandler( | |
67 "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet | |
68 ) | |
69 self.registerAttachmentHandler( | |
70 "reaction", NS_PUBSUB_ATTACHMENTS, self.reactionGet, self.reactionSet | |
71 ) | |
72 host.bridge.addMethod( | |
73 "psAttachmentsGet", | |
74 ".plugin", | |
75 in_sign="sssasss", | |
76 out_sign="(ss)", | |
77 method=self._get, | |
78 async_=True, | |
79 ) | |
80 host.bridge.addMethod( | |
81 "psAttachmentsSet", | |
82 ".plugin", | |
83 in_sign="ss", | |
84 out_sign="", | |
85 method=self._set, | |
86 async_=True, | |
87 ) | |
88 | |
89 def getHandler(self, client): | |
90 return PubsubAttachments_Handler() | |
91 | |
92 def registerAttachmentHandler( | |
93 self, | |
94 name: str, | |
95 namespace: str, | |
96 get_cb: Callable[ | |
97 [SatXMPPEntity, domish.Element, Dict[str, Any]], | |
98 None], | |
99 set_cb: Callable[ | |
100 [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]], | |
101 Optional[domish.Element]], | |
102 ) -> None: | |
103 """Register callbacks to handle an attachment | |
104 | |
105 @param name: name of the element | |
106 @param namespace: namespace of the element | |
107 (name, namespace) couple must be unique | |
108 @param get: method to call when attachments are retrieved | |
109 it will be called with (client, element, data) where element is the | |
110 <attachments> element to parse, and data must be updated in place with | |
111 parsed data | |
112 @param set: method to call when the attachment need to be set or udpated | |
113 it will be called with (client, data, former_elt of None if there was no | |
114 former element). When suitable, ``operation`` should be used to check if we | |
115 request an ``update`` or a ``replace``. | |
116 """ | |
117 key = (name, namespace) | |
118 if key in self.handlers: | |
119 raise exceptions.ConflictError( | |
120 f"({name}, {namespace}) attachment handlers are already registered" | |
121 ) | |
122 self.handlers[(name, namespace)] = { | |
123 "get": get_cb, | |
124 "set": set_cb | |
125 } | |
126 | |
127 def getAttachmentNodeName(self, service: jid.JID, node: str, item: str) -> str: | |
128 """Generate name to use for attachment node""" | |
129 target_item_uri = uri.buildXMPPUri( | |
130 "pubsub", | |
131 path=service.userhost(), | |
132 node=node, | |
133 item=item | |
134 ) | |
135 return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}" | |
136 | |
137 def isAttachmentNode(self, node: str) -> bool: | |
138 """Return True if node name is an attachment node""" | |
139 return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/") | |
140 | |
141 def attachmentNode2Item(self, node: str) -> Tuple[jid.JID, str, str]: | |
142 """Retrieve service, node and item from attachement node's name""" | |
143 if not self.isAttachmentNode(node): | |
144 raise ValueError("this is not an attachment node!") | |
145 prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/") | |
146 item_uri = node[prefix_len:] | |
147 parsed_uri = uri.parseXMPPUri(item_uri) | |
148 if parsed_uri["type"] != "pubsub": | |
149 raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}") | |
150 try: | |
151 service = jid.JID(parsed_uri["path"]) | |
152 except RuntimeError: | |
153 raise ValueError(f"invalid service in pubsub URI: {item_uri}") | |
154 node = parsed_uri["node"] | |
155 item = parsed_uri["item"] | |
156 return (service, node, item) | |
157 | |
158 async def onMBSend( | |
159 self, | |
160 client: SatXMPPEntity, | |
161 service: jid.JID, | |
162 node: str, | |
163 item: domish.Element, | |
164 data: dict | |
165 ) -> bool: | |
166 """trigger to create attachment node on each publication""" | |
167 node_config = await self._p.getConfiguration(client, service, node) | |
168 attachment_node = self.getAttachmentNodeName(service, node, item["id"]) | |
169 # we use the same options as target node | |
170 try: | |
171 await self._p.createIfNewNode( | |
172 client, service, attachment_node, options=dict(node_config) | |
173 ) | |
174 except Exception as e: | |
175 log.warning(f"Can't create attachment node {attachment_node}: {e}]") | |
176 return True | |
177 | |
178 def items2attachmentData( | |
179 self, | |
180 client: SatXMPPEntity, | |
181 items: List[domish.Element] | |
182 ) -> List[Dict[str, Any]]: | |
183 """Convert items from attachment node to attachment data""" | |
184 list_data = [] | |
185 for item in items: | |
186 try: | |
187 attachments_elt = next( | |
188 item.elements(NS_PUBSUB_ATTACHMENTS, "attachments") | |
189 ) | |
190 except StopIteration: | |
191 log.warning( | |
192 "item is missing <attachments> elements, ignoring it: {item.toXml()}" | |
193 ) | |
194 continue | |
195 item_id = item["id"] | |
196 publisher_s = item.getAttribute("publisher") | |
197 # publisher is not filled by all pubsub service, so we can't count on it | |
198 if publisher_s: | |
199 publisher = jid.JID(publisher_s) | |
200 if publisher.userhost() != item_id: | |
201 log.warning( | |
202 f"publisher {publisher.userhost()!r} doesn't correspond to item " | |
203 f"id {item['id']!r}, ignoring. This may be a hack attempt.\n" | |
204 f"{item.toXml()}" | |
205 ) | |
206 continue | |
207 try: | |
208 jid.JID(item_id) | |
209 except RuntimeError: | |
210 log.warning( | |
211 "item ID is not a JID, this is not compliant and is ignored: " | |
212 f"{item.toXml}" | |
213 ) | |
214 continue | |
215 data = { | |
216 "from": item_id | |
217 } | |
218 for handler in self.handlers.values(): | |
219 handler["get"](client, attachments_elt, data) | |
220 if len(data) > 1: | |
221 list_data.append(data) | |
222 return list_data | |
223 | |
224 def _get( | |
225 self, | |
226 service_s: str, | |
227 node: str, | |
228 item: str, | |
229 senders_s: List[str], | |
230 extra_s: str, | |
231 profile_key: str | |
232 ) -> defer.Deferred: | |
233 client = self.host.getClient(profile_key) | |
234 extra = data_format.deserialise(extra_s) | |
235 senders = [jid.JID(s) for s in senders_s] | |
236 d = defer.ensureDeferred( | |
237 self.getAttachments(client, jid.JID(service_s), node, item, senders) | |
238 ) | |
239 d.addCallback( | |
240 lambda ret: | |
241 (data_format.serialise(ret[0]), | |
242 data_format.serialise(ret[1])) | |
243 ) | |
244 return d | |
245 | |
246 async def getAttachments( | |
247 self, | |
248 client: SatXMPPEntity, | |
249 service: jid.JID, | |
250 node: str, | |
251 item: str, | |
252 senders: Optional[List[jid.JID]], | |
253 extra: Optional[dict] = None | |
254 ) -> Tuple[List[Dict[str, Any]], dict]: | |
255 if extra is None: | |
256 extra = {} | |
257 attachment_node = self.getAttachmentNodeName(service, node, item) | |
258 item_ids = [e.userhost() for e in senders] if senders else None | |
259 items, metadata = await self._p.getItems( | |
260 client, service, attachment_node, item_ids=item_ids, extra=extra | |
261 ) | |
262 list_data = self.items2attachmentData(client, items) | |
263 | |
264 return list_data, metadata | |
265 | |
266 def _set( | |
267 self, | |
268 attachments_s: str, | |
269 profile_key: str | |
270 ) -> None: | |
271 client = self.host.getClient(profile_key) | |
272 attachments = data_format.deserialise(attachments_s) or {} | |
273 return defer.ensureDeferred(self.setAttachments( client, attachments)) | |
274 | |
275 async def setAttachments( | |
276 self, | |
277 client: SatXMPPEntity, | |
278 data: Dict[str, Any] | |
279 ) -> None: | |
280 """Set or update attachments | |
281 | |
282 Former <attachments> element will be retrieved and updated. Individual | |
283 attachments replace or update their elements individually, according to the | |
284 "operation" key. | |
285 | |
286 "operation" key may be "update" or "replace", and defaults to update, it is only | |
287 used in attachments where "update" makes sense (e.g. it's used for "reactions" | |
288 but not for "noticed"). | |
289 | |
290 @param data: microblog data data. Various keys (usually stored in | |
291 data["extra"]) may be used depending on the attachments handlers | |
292 registered. The keys "service", "node" and "id" MUST be set. | |
293 """ | |
294 data.setdefault("extra", {}) | |
295 try: | |
296 service = jid.JID(data["service"]) | |
297 node = data["node"] | |
298 item = data["id"] | |
299 except (KeyError, RuntimeError): | |
300 raise ValueError( | |
301 'data must have "service", "node" and "id" set' | |
302 ) | |
303 attachment_node = self.getAttachmentNodeName(service, node, item) | |
304 items, __ = await self._p.getItems( | |
305 client, service, attachment_node, item_ids=[client.jid.userhost()] | |
306 ) | |
307 if not items: | |
308 # the item doesn't exist, we create a new one | |
309 item_elt = pubsub.Item(client.jid.userhost()) | |
310 item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) | |
311 else: | |
312 item_elt = items[0] | |
313 | |
314 try: | |
315 attachments_elt = next( | |
316 item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments") | |
317 ) | |
318 except StopIteration: | |
319 log.warning( | |
320 f"no <attachments> element found, creating a new one: {item_elt.toXml()}" | |
321 ) | |
322 attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments")) | |
323 | |
324 for (name, namespace), handler in self.handlers.items(): | |
325 try: | |
326 former_elt = next(attachments_elt.elements(namespace, name)) | |
327 except StopIteration: | |
328 former_elt = None | |
329 new_elt = handler["set"](client, data, former_elt) | |
330 if new_elt != former_elt: | |
331 if former_elt is not None: | |
332 attachments_elt.children.remove(former_elt) | |
333 if new_elt is not None: | |
334 attachments_elt.addChild(new_elt) | |
335 try: | |
336 await self._p.sendItems(client, service, attachment_node, [item_elt]) | |
337 except error.StanzaError as e: | |
338 if e.condition == "item-not-found": | |
339 # the node doesn't exist, we can't publish attachments | |
340 log.warning( | |
341 f"no attachment node found at {service} on {node!r} for item " | |
342 f"{item!r}, we can't update attachments." | |
343 ) | |
344 raise exceptions.NotFound("No attachment node available") | |
345 else: | |
346 raise e | |
347 | |
348 async def subscribe( | |
349 self, | |
350 client: SatXMPPEntity, | |
351 service: jid.JID, | |
352 node: str, | |
353 item: str, | |
354 ) -> None: | |
355 """Subscribe to attachment node targeting the item | |
356 | |
357 @param service: service of target item (will also be used for attachment node) | |
358 @param node: node of target item (used to get attachment node's name) | |
359 @param item: name of target item (used to get attachment node's name) | |
360 """ | |
361 attachment_node = self.getAttachmentNodeName(service, node, item) | |
362 await self._p.subscribe(client, service, attachment_node) | |
363 | |
364 | |
365 def setTimestamp(self, attachment_elt: domish.Element, data: dict) -> None: | |
366 """Check if a ``timestamp`` attribute is set, parse it, and fill data | |
367 | |
368 @param attachments_elt: element where the ``timestamp`` attribute may be set | |
369 @param data: data specific to the attachment (i.e. not the whole microblog data) | |
370 ``timestamp`` field will be set there if timestamp exists and is parsable | |
371 """ | |
372 timestamp_raw = attachment_elt.getAttribute("timestamp") | |
373 if timestamp_raw: | |
374 try: | |
375 timestamp = date_utils.date_parse(timestamp_raw) | |
376 except date_utils.ParserError: | |
377 log.warning(f"can't parse timestamp: {timestamp_raw}") | |
378 else: | |
379 data["timestamp"] = timestamp | |
380 | |
381 def noticedGet( | |
382 self, | |
383 client: SatXMPPEntity, | |
384 attachments_elt: domish.Element, | |
385 data: Dict[str, Any], | |
386 ) -> None: | |
387 try: | |
388 noticed_elt = next( | |
389 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed") | |
390 ) | |
391 except StopIteration: | |
392 pass | |
393 else: | |
394 noticed_data = { | |
395 "noticed": True | |
396 } | |
397 self.setTimestamp(noticed_elt, noticed_data) | |
398 data["noticed"] = noticed_data | |
399 | |
400 def noticedSet( | |
401 self, | |
402 client: SatXMPPEntity, | |
403 data: Dict[str, Any], | |
404 former_elt: Optional[domish.Element] | |
405 ) -> Optional[domish.Element]: | |
406 """add or remove a <noticed> attachment | |
407 | |
408 if data["noticed"] is True, element is added, if it's False, it's removed, and | |
409 it's not present or None, the former element is kept. | |
410 """ | |
411 noticed = data["extra"].get("noticed") | |
412 if noticed is None: | |
413 return former_elt | |
414 elif noticed: | |
415 return domish.Element( | |
416 (NS_PUBSUB_ATTACHMENTS, "noticed"), | |
417 attribs = { | |
418 "timestamp": xmpp_date() | |
419 } | |
420 ) | |
421 else: | |
422 return None | |
423 | |
424 def reactionGet( | |
425 self, | |
426 client: SatXMPPEntity, | |
427 attachments_elt: domish.Element, | |
428 data: Dict[str, Any], | |
429 ) -> None: | |
430 try: | |
431 reaction_elt = next( | |
432 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction") | |
433 ) | |
434 except StopIteration: | |
435 pass | |
436 else: | |
437 reaction_data = { | |
438 "reactions": str(reaction_elt) | |
439 } | |
440 self.setTimestamp(reaction_elt, reaction_data) | |
441 data["reaction"] = reaction_data | |
442 | |
443 def reactionSet( | |
444 self, | |
445 client: SatXMPPEntity, | |
446 data: Dict[str, Any], | |
447 former_elt: Optional[domish.Element] | |
448 ) -> Optional[domish.Element]: | |
449 """update the <reaction> attachment""" | |
450 reaction = data["extra"].get("reaction") | |
451 if reaction is None: | |
452 return former_elt | |
453 operation_type = reaction.get("operation", "update") | |
454 if operation_type == "update": | |
455 reactions = "".join( | |
456 set(str(former_elt or "")) | |
457 | set(reaction.get("reactions") or "") | |
458 ) | |
459 elif operation_type == "replace": | |
460 reactions = reaction.get("reactions", "") | |
461 else: | |
462 raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}") | |
463 if reactions: | |
464 reaction_elt = domish.Element( | |
465 (NS_PUBSUB_ATTACHMENTS, "reaction"), | |
466 attribs = { | |
467 "timestamp": xmpp_date() | |
468 } | |
469 ) | |
470 reaction_elt.addContent(reactions) | |
471 return reaction_elt | |
472 else: | |
473 return None | |
474 | |
475 | |
476 @implementer(iwokkel.IDisco) | |
477 class PubsubAttachments_Handler(xmlstream.XMPPHandler): | |
478 | |
479 def getDiscoInfo(self, requestor, service, nodeIdentifier=""): | |
480 return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)] | |
481 | |
482 def getDiscoItems(self, requestor, service, nodeIdentifier=""): | |
483 return [] |