comparison sat/plugins/plugin_pubsub_attachments.py @ 3864:ac255a0fbd4c

plugin pubsub attachments: partial implementation of pubsub-attachments protoXEP: This is an implementation of the "Basic Usage" of https://xmpp.org/extensions/inbox/pubsub-attachments.html. rel 370
author Goffi <goffi@goffi.org>
date Wed, 20 Jul 2022 17:49:51 +0200
parents
children c0bcbcf5b4b7
comparison
equal deleted inserted replaced
3863:c04f5e8a3568 3864:ac255a0fbd4c
1 #!/usr/bin/env python3
2
3 # Libervia plugin for Pubsub Attachments
4 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 from typing import List, Tuple, Dict, Any, Callable, Optional
20
21 from twisted.words.protocols.jabber import jid, xmlstream, error
22 from twisted.words.xish import domish
23 from twisted.internet import defer
24 from zope.interface import implementer
25 from wokkel import pubsub, disco, iwokkel
26
27 from sat.core.constants import Const as C
28 from sat.core.i18n import _
29 from sat.core.log import getLogger
30 from sat.core.core_types import SatXMPPEntity
31 from sat.core import exceptions
32 from sat.tools.common import uri, data_format, date_utils
33 from sat.tools.utils import xmpp_date
34
35
36 log = getLogger(__name__)
37
38 IMPORT_NAME = "PUBSUB_ATTACHMENTS"
39
40 PLUGIN_INFO = {
41 C.PI_NAME: "Pubsub Attachments",
42 C.PI_IMPORT_NAME: IMPORT_NAME,
43 C.PI_TYPE: C.PLUG_TYPE_EXP,
44 C.PI_MODES: C.PLUG_MODE_BOTH,
45 C.PI_PROTOCOLS: [],
46 C.PI_DEPENDENCIES: ["XEP-0060"],
47 C.PI_MAIN: "PubsubAttachments",
48 C.PI_HANDLER: "yes",
49 C.PI_DESCRIPTION: _("""Pubsub Attachments implementation"""),
50 }
51 NS_PREFIX = "urn:xmpp:pubsub-attachments:"
52 NS_PUBSUB_ATTACHMENTS = f"{NS_PREFIX}0"
53 NS_PUBSUB_ATTACHMENTS_SUM = f"{NS_PREFIX}summary:0"
54
55
56 class PubsubAttachments:
57 namespace = NS_PUBSUB_ATTACHMENTS
58
59 def __init__(self, host):
60 log.info(_("Pubsub Attachments plugin initialization"))
61 host.registerNamespace("pubsub-attachments", NS_PUBSUB_ATTACHMENTS)
62 self.host = host
63 self._p = host.plugins["XEP-0060"]
64 self.handlers = {}
65 host.trigger.add("XEP-0277_send", self.onMBSend)
66 self.registerAttachmentHandler(
67 "noticed", NS_PUBSUB_ATTACHMENTS, self.noticedGet, self.noticedSet
68 )
69 self.registerAttachmentHandler(
70 "reaction", NS_PUBSUB_ATTACHMENTS, self.reactionGet, self.reactionSet
71 )
72 host.bridge.addMethod(
73 "psAttachmentsGet",
74 ".plugin",
75 in_sign="sssasss",
76 out_sign="(ss)",
77 method=self._get,
78 async_=True,
79 )
80 host.bridge.addMethod(
81 "psAttachmentsSet",
82 ".plugin",
83 in_sign="ss",
84 out_sign="",
85 method=self._set,
86 async_=True,
87 )
88
89 def getHandler(self, client):
90 return PubsubAttachments_Handler()
91
92 def registerAttachmentHandler(
93 self,
94 name: str,
95 namespace: str,
96 get_cb: Callable[
97 [SatXMPPEntity, domish.Element, Dict[str, Any]],
98 None],
99 set_cb: Callable[
100 [SatXMPPEntity, Dict[str, Any], Optional[domish.Element]],
101 Optional[domish.Element]],
102 ) -> None:
103 """Register callbacks to handle an attachment
104
105 @param name: name of the element
106 @param namespace: namespace of the element
107 (name, namespace) couple must be unique
108 @param get: method to call when attachments are retrieved
109 it will be called with (client, element, data) where element is the
110 <attachments> element to parse, and data must be updated in place with
111 parsed data
112 @param set: method to call when the attachment need to be set or udpated
113 it will be called with (client, data, former_elt of None if there was no
114 former element). When suitable, ``operation`` should be used to check if we
115 request an ``update`` or a ``replace``.
116 """
117 key = (name, namespace)
118 if key in self.handlers:
119 raise exceptions.ConflictError(
120 f"({name}, {namespace}) attachment handlers are already registered"
121 )
122 self.handlers[(name, namespace)] = {
123 "get": get_cb,
124 "set": set_cb
125 }
126
127 def getAttachmentNodeName(self, service: jid.JID, node: str, item: str) -> str:
128 """Generate name to use for attachment node"""
129 target_item_uri = uri.buildXMPPUri(
130 "pubsub",
131 path=service.userhost(),
132 node=node,
133 item=item
134 )
135 return f"{NS_PUBSUB_ATTACHMENTS}/{target_item_uri}"
136
137 def isAttachmentNode(self, node: str) -> bool:
138 """Return True if node name is an attachment node"""
139 return node.startswith(f"{NS_PUBSUB_ATTACHMENTS}/")
140
141 def attachmentNode2Item(self, node: str) -> Tuple[jid.JID, str, str]:
142 """Retrieve service, node and item from attachement node's name"""
143 if not self.isAttachmentNode(node):
144 raise ValueError("this is not an attachment node!")
145 prefix_len = len(f"{NS_PUBSUB_ATTACHMENTS}/")
146 item_uri = node[prefix_len:]
147 parsed_uri = uri.parseXMPPUri(item_uri)
148 if parsed_uri["type"] != "pubsub":
149 raise ValueError(f"unexpected URI type, it must be a pubsub URI: {item_uri}")
150 try:
151 service = jid.JID(parsed_uri["path"])
152 except RuntimeError:
153 raise ValueError(f"invalid service in pubsub URI: {item_uri}")
154 node = parsed_uri["node"]
155 item = parsed_uri["item"]
156 return (service, node, item)
157
158 async def onMBSend(
159 self,
160 client: SatXMPPEntity,
161 service: jid.JID,
162 node: str,
163 item: domish.Element,
164 data: dict
165 ) -> bool:
166 """trigger to create attachment node on each publication"""
167 node_config = await self._p.getConfiguration(client, service, node)
168 attachment_node = self.getAttachmentNodeName(service, node, item["id"])
169 # we use the same options as target node
170 try:
171 await self._p.createIfNewNode(
172 client, service, attachment_node, options=dict(node_config)
173 )
174 except Exception as e:
175 log.warning(f"Can't create attachment node {attachment_node}: {e}]")
176 return True
177
178 def items2attachmentData(
179 self,
180 client: SatXMPPEntity,
181 items: List[domish.Element]
182 ) -> List[Dict[str, Any]]:
183 """Convert items from attachment node to attachment data"""
184 list_data = []
185 for item in items:
186 try:
187 attachments_elt = next(
188 item.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
189 )
190 except StopIteration:
191 log.warning(
192 "item is missing <attachments> elements, ignoring it: {item.toXml()}"
193 )
194 continue
195 item_id = item["id"]
196 publisher_s = item.getAttribute("publisher")
197 # publisher is not filled by all pubsub service, so we can't count on it
198 if publisher_s:
199 publisher = jid.JID(publisher_s)
200 if publisher.userhost() != item_id:
201 log.warning(
202 f"publisher {publisher.userhost()!r} doesn't correspond to item "
203 f"id {item['id']!r}, ignoring. This may be a hack attempt.\n"
204 f"{item.toXml()}"
205 )
206 continue
207 try:
208 jid.JID(item_id)
209 except RuntimeError:
210 log.warning(
211 "item ID is not a JID, this is not compliant and is ignored: "
212 f"{item.toXml}"
213 )
214 continue
215 data = {
216 "from": item_id
217 }
218 for handler in self.handlers.values():
219 handler["get"](client, attachments_elt, data)
220 if len(data) > 1:
221 list_data.append(data)
222 return list_data
223
224 def _get(
225 self,
226 service_s: str,
227 node: str,
228 item: str,
229 senders_s: List[str],
230 extra_s: str,
231 profile_key: str
232 ) -> defer.Deferred:
233 client = self.host.getClient(profile_key)
234 extra = data_format.deserialise(extra_s)
235 senders = [jid.JID(s) for s in senders_s]
236 d = defer.ensureDeferred(
237 self.getAttachments(client, jid.JID(service_s), node, item, senders)
238 )
239 d.addCallback(
240 lambda ret:
241 (data_format.serialise(ret[0]),
242 data_format.serialise(ret[1]))
243 )
244 return d
245
246 async def getAttachments(
247 self,
248 client: SatXMPPEntity,
249 service: jid.JID,
250 node: str,
251 item: str,
252 senders: Optional[List[jid.JID]],
253 extra: Optional[dict] = None
254 ) -> Tuple[List[Dict[str, Any]], dict]:
255 if extra is None:
256 extra = {}
257 attachment_node = self.getAttachmentNodeName(service, node, item)
258 item_ids = [e.userhost() for e in senders] if senders else None
259 items, metadata = await self._p.getItems(
260 client, service, attachment_node, item_ids=item_ids, extra=extra
261 )
262 list_data = self.items2attachmentData(client, items)
263
264 return list_data, metadata
265
266 def _set(
267 self,
268 attachments_s: str,
269 profile_key: str
270 ) -> None:
271 client = self.host.getClient(profile_key)
272 attachments = data_format.deserialise(attachments_s) or {}
273 return defer.ensureDeferred(self.setAttachments( client, attachments))
274
275 async def setAttachments(
276 self,
277 client: SatXMPPEntity,
278 data: Dict[str, Any]
279 ) -> None:
280 """Set or update attachments
281
282 Former <attachments> element will be retrieved and updated. Individual
283 attachments replace or update their elements individually, according to the
284 "operation" key.
285
286 "operation" key may be "update" or "replace", and defaults to update, it is only
287 used in attachments where "update" makes sense (e.g. it's used for "reactions"
288 but not for "noticed").
289
290 @param data: microblog data data. Various keys (usually stored in
291 data["extra"]) may be used depending on the attachments handlers
292 registered. The keys "service", "node" and "id" MUST be set.
293 """
294 data.setdefault("extra", {})
295 try:
296 service = jid.JID(data["service"])
297 node = data["node"]
298 item = data["id"]
299 except (KeyError, RuntimeError):
300 raise ValueError(
301 'data must have "service", "node" and "id" set'
302 )
303 attachment_node = self.getAttachmentNodeName(service, node, item)
304 items, __ = await self._p.getItems(
305 client, service, attachment_node, item_ids=[client.jid.userhost()]
306 )
307 if not items:
308 # the item doesn't exist, we create a new one
309 item_elt = pubsub.Item(client.jid.userhost())
310 item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
311 else:
312 item_elt = items[0]
313
314 try:
315 attachments_elt = next(
316 item_elt.elements(NS_PUBSUB_ATTACHMENTS, "attachments")
317 )
318 except StopIteration:
319 log.warning(
320 f"no <attachments> element found, creating a new one: {item_elt.toXml()}"
321 )
322 attachments_elt = item_elt.addElement((NS_PUBSUB_ATTACHMENTS, "attachments"))
323
324 for (name, namespace), handler in self.handlers.items():
325 try:
326 former_elt = next(attachments_elt.elements(namespace, name))
327 except StopIteration:
328 former_elt = None
329 new_elt = handler["set"](client, data, former_elt)
330 if new_elt != former_elt:
331 if former_elt is not None:
332 attachments_elt.children.remove(former_elt)
333 if new_elt is not None:
334 attachments_elt.addChild(new_elt)
335 try:
336 await self._p.sendItems(client, service, attachment_node, [item_elt])
337 except error.StanzaError as e:
338 if e.condition == "item-not-found":
339 # the node doesn't exist, we can't publish attachments
340 log.warning(
341 f"no attachment node found at {service} on {node!r} for item "
342 f"{item!r}, we can't update attachments."
343 )
344 raise exceptions.NotFound("No attachment node available")
345 else:
346 raise e
347
348 async def subscribe(
349 self,
350 client: SatXMPPEntity,
351 service: jid.JID,
352 node: str,
353 item: str,
354 ) -> None:
355 """Subscribe to attachment node targeting the item
356
357 @param service: service of target item (will also be used for attachment node)
358 @param node: node of target item (used to get attachment node's name)
359 @param item: name of target item (used to get attachment node's name)
360 """
361 attachment_node = self.getAttachmentNodeName(service, node, item)
362 await self._p.subscribe(client, service, attachment_node)
363
364
365 def setTimestamp(self, attachment_elt: domish.Element, data: dict) -> None:
366 """Check if a ``timestamp`` attribute is set, parse it, and fill data
367
368 @param attachments_elt: element where the ``timestamp`` attribute may be set
369 @param data: data specific to the attachment (i.e. not the whole microblog data)
370 ``timestamp`` field will be set there if timestamp exists and is parsable
371 """
372 timestamp_raw = attachment_elt.getAttribute("timestamp")
373 if timestamp_raw:
374 try:
375 timestamp = date_utils.date_parse(timestamp_raw)
376 except date_utils.ParserError:
377 log.warning(f"can't parse timestamp: {timestamp_raw}")
378 else:
379 data["timestamp"] = timestamp
380
381 def noticedGet(
382 self,
383 client: SatXMPPEntity,
384 attachments_elt: domish.Element,
385 data: Dict[str, Any],
386 ) -> None:
387 try:
388 noticed_elt = next(
389 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "noticed")
390 )
391 except StopIteration:
392 pass
393 else:
394 noticed_data = {
395 "noticed": True
396 }
397 self.setTimestamp(noticed_elt, noticed_data)
398 data["noticed"] = noticed_data
399
400 def noticedSet(
401 self,
402 client: SatXMPPEntity,
403 data: Dict[str, Any],
404 former_elt: Optional[domish.Element]
405 ) -> Optional[domish.Element]:
406 """add or remove a <noticed> attachment
407
408 if data["noticed"] is True, element is added, if it's False, it's removed, and
409 it's not present or None, the former element is kept.
410 """
411 noticed = data["extra"].get("noticed")
412 if noticed is None:
413 return former_elt
414 elif noticed:
415 return domish.Element(
416 (NS_PUBSUB_ATTACHMENTS, "noticed"),
417 attribs = {
418 "timestamp": xmpp_date()
419 }
420 )
421 else:
422 return None
423
424 def reactionGet(
425 self,
426 client: SatXMPPEntity,
427 attachments_elt: domish.Element,
428 data: Dict[str, Any],
429 ) -> None:
430 try:
431 reaction_elt = next(
432 attachments_elt.elements(NS_PUBSUB_ATTACHMENTS, "reaction")
433 )
434 except StopIteration:
435 pass
436 else:
437 reaction_data = {
438 "reactions": str(reaction_elt)
439 }
440 self.setTimestamp(reaction_elt, reaction_data)
441 data["reaction"] = reaction_data
442
443 def reactionSet(
444 self,
445 client: SatXMPPEntity,
446 data: Dict[str, Any],
447 former_elt: Optional[domish.Element]
448 ) -> Optional[domish.Element]:
449 """update the <reaction> attachment"""
450 reaction = data["extra"].get("reaction")
451 if reaction is None:
452 return former_elt
453 operation_type = reaction.get("operation", "update")
454 if operation_type == "update":
455 reactions = "".join(
456 set(str(former_elt or ""))
457 | set(reaction.get("reactions") or "")
458 )
459 elif operation_type == "replace":
460 reactions = reaction.get("reactions", "")
461 else:
462 raise exceptions.DataError(f"invalid reaction operation: {operation_type!r}")
463 if reactions:
464 reaction_elt = domish.Element(
465 (NS_PUBSUB_ATTACHMENTS, "reaction"),
466 attribs = {
467 "timestamp": xmpp_date()
468 }
469 )
470 reaction_elt.addContent(reactions)
471 return reaction_elt
472 else:
473 return None
474
475
476 @implementer(iwokkel.IDisco)
477 class PubsubAttachments_Handler(xmlstream.XMPPHandler):
478
479 def getDiscoInfo(self, requestor, service, nodeIdentifier=""):
480 return [disco.DiscoFeature(NS_PUBSUB_ATTACHMENTS)]
481
482 def getDiscoItems(self, requestor, service, nodeIdentifier=""):
483 return []