Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_xep_0277.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_xep_0277.py@524856bd7b19 |
children | 0e48181d50ab |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SAT plugin for microblogging over XMPP (xep-0277) | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import time | |
20 import dateutil | |
21 import calendar | |
22 from mimetypes import guess_type | |
23 from secrets import token_urlsafe | |
24 from typing import List, Optional, Dict, Tuple, Any, Dict | |
25 from functools import partial | |
26 | |
27 import shortuuid | |
28 | |
29 from twisted.words.protocols.jabber import jid, error | |
30 from twisted.words.protocols.jabber.xmlstream import XMPPHandler | |
31 from twisted.words.xish import domish | |
32 from twisted.internet import defer | |
33 from twisted.python import failure | |
34 | |
35 # XXX: sat_tmp.wokkel.pubsub is actually used instead of wokkel version | |
36 from wokkel import pubsub | |
37 from wokkel import disco, iwokkel, rsm | |
38 from zope.interface import implementer | |
39 | |
40 from libervia.backend.core.i18n import _ | |
41 from libervia.backend.core.constants import Const as C | |
42 from libervia.backend.core.log import getLogger | |
43 from libervia.backend.core import exceptions | |
44 from libervia.backend.core.core_types import SatXMPPEntity | |
45 from libervia.backend.tools import xml_tools | |
46 from libervia.backend.tools import sat_defer | |
47 from libervia.backend.tools import utils | |
48 from libervia.backend.tools.common import data_format | |
49 from libervia.backend.tools.common import uri as xmpp_uri | |
50 from libervia.backend.tools.common import regex | |
51 | |
52 | |
53 log = getLogger(__name__) | |
54 | |
55 | |
56 NS_MICROBLOG = "urn:xmpp:microblog:0" | |
57 NS_ATOM = "http://www.w3.org/2005/Atom" | |
58 NS_PUBSUB_EVENT = f"{pubsub.NS_PUBSUB}#event" | |
59 NS_COMMENT_PREFIX = f"{NS_MICROBLOG}:comments/" | |
60 | |
61 | |
62 PLUGIN_INFO = { | |
63 C.PI_NAME: "Microblogging over XMPP Plugin", | |
64 C.PI_IMPORT_NAME: "XEP-0277", | |
65 C.PI_TYPE: "XEP", | |
66 C.PI_MODES: C.PLUG_MODE_BOTH, | |
67 C.PI_PROTOCOLS: ["XEP-0277"], | |
68 C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0060", "TEXT_SYNTAXES"], | |
69 C.PI_RECOMMENDATIONS: ["XEP-0059", "EXTRA-PEP", "PUBSUB_CACHE"], | |
70 C.PI_MAIN: "XEP_0277", | |
71 C.PI_HANDLER: "yes", | |
72 C.PI_DESCRIPTION: _("""Implementation of microblogging Protocol"""), | |
73 } | |
74 | |
75 | |
76 class NodeAccessChangeException(Exception): | |
77 pass | |
78 | |
79 | |
80 class XEP_0277(object): | |
81 namespace = NS_MICROBLOG | |
82 NS_ATOM = NS_ATOM | |
83 | |
84 def __init__(self, host): | |
85 log.info(_("Microblogging plugin initialization")) | |
86 self.host = host | |
87 host.register_namespace("microblog", NS_MICROBLOG) | |
88 self._p = self.host.plugins[ | |
89 "XEP-0060" | |
90 ] # this facilitate the access to pubsub plugin | |
91 ps_cache = self.host.plugins.get("PUBSUB_CACHE") | |
92 if ps_cache is not None: | |
93 ps_cache.register_analyser( | |
94 { | |
95 "name": "XEP-0277", | |
96 "node": NS_MICROBLOG, | |
97 "namespace": NS_ATOM, | |
98 "type": "blog", | |
99 "to_sync": True, | |
100 "parser": self.item_2_mb_data, | |
101 "match_cb": self._cache_node_match_cb, | |
102 } | |
103 ) | |
104 self.rt_sessions = sat_defer.RTDeferredSessions() | |
105 self.host.plugins["XEP-0060"].add_managed_node( | |
106 NS_MICROBLOG, items_cb=self._items_received | |
107 ) | |
108 | |
109 host.bridge.add_method( | |
110 "mb_send", | |
111 ".plugin", | |
112 in_sign="ssss", | |
113 out_sign="s", | |
114 method=self._mb_send, | |
115 async_=True, | |
116 ) | |
117 host.bridge.add_method( | |
118 "mb_repeat", | |
119 ".plugin", | |
120 in_sign="sssss", | |
121 out_sign="s", | |
122 method=self._mb_repeat, | |
123 async_=True, | |
124 ) | |
125 host.bridge.add_method( | |
126 "mb_preview", | |
127 ".plugin", | |
128 in_sign="ssss", | |
129 out_sign="s", | |
130 method=self._mb_preview, | |
131 async_=True, | |
132 ) | |
133 host.bridge.add_method( | |
134 "mb_retract", | |
135 ".plugin", | |
136 in_sign="ssss", | |
137 out_sign="", | |
138 method=self._mb_retract, | |
139 async_=True, | |
140 ) | |
141 host.bridge.add_method( | |
142 "mb_get", | |
143 ".plugin", | |
144 in_sign="ssiasss", | |
145 out_sign="s", | |
146 method=self._mb_get, | |
147 async_=True, | |
148 ) | |
149 host.bridge.add_method( | |
150 "mb_rename", | |
151 ".plugin", | |
152 in_sign="sssss", | |
153 out_sign="", | |
154 method=self._mb_rename, | |
155 async_=True, | |
156 ) | |
157 host.bridge.add_method( | |
158 "mb_access_set", | |
159 ".plugin", | |
160 in_sign="ss", | |
161 out_sign="", | |
162 method=self.mb_access_set, | |
163 async_=True, | |
164 ) | |
165 host.bridge.add_method( | |
166 "mb_subscribe_to_many", | |
167 ".plugin", | |
168 in_sign="sass", | |
169 out_sign="s", | |
170 method=self._mb_subscribe_to_many, | |
171 ) | |
172 host.bridge.add_method( | |
173 "mb_get_from_many_rt_result", | |
174 ".plugin", | |
175 in_sign="ss", | |
176 out_sign="(ua(sssasa{ss}))", | |
177 method=self._mb_get_from_many_rt_result, | |
178 async_=True, | |
179 ) | |
180 host.bridge.add_method( | |
181 "mb_get_from_many", | |
182 ".plugin", | |
183 in_sign="sasia{ss}s", | |
184 out_sign="s", | |
185 method=self._mb_get_from_many, | |
186 ) | |
187 host.bridge.add_method( | |
188 "mb_get_from_many_with_comments_rt_result", | |
189 ".plugin", | |
190 in_sign="ss", | |
191 out_sign="(ua(sssa(sa(sssasa{ss}))a{ss}))", | |
192 method=self._mb_get_from_many_with_comments_rt_result, | |
193 async_=True, | |
194 ) | |
195 host.bridge.add_method( | |
196 "mb_get_from_many_with_comments", | |
197 ".plugin", | |
198 in_sign="sasiia{ss}a{ss}s", | |
199 out_sign="s", | |
200 method=self._mb_get_from_many_with_comments, | |
201 ) | |
202 host.bridge.add_method( | |
203 "mb_is_comment_node", | |
204 ".plugin", | |
205 in_sign="s", | |
206 out_sign="b", | |
207 method=self.is_comment_node, | |
208 ) | |
209 | |
210 def get_handler(self, client): | |
211 return XEP_0277_handler() | |
212 | |
213 def _cache_node_match_cb( | |
214 self, | |
215 client: SatXMPPEntity, | |
216 analyse: dict, | |
217 ) -> None: | |
218 """Check is analysed node is a comment and fill analyse accordingly""" | |
219 if analyse["node"].startswith(NS_COMMENT_PREFIX): | |
220 analyse["subtype"] = "comment" | |
221 | |
222 def _check_features_cb(self, available): | |
223 return {"available": C.BOOL_TRUE} | |
224 | |
225 def _check_features_eb(self, fail): | |
226 return {"available": C.BOOL_FALSE} | |
227 | |
228 def features_get(self, profile): | |
229 client = self.host.get_client(profile) | |
230 d = self.host.check_features(client, [], identity=("pubsub", "pep")) | |
231 d.addCallbacks(self._check_features_cb, self._check_features_eb) | |
232 return d | |
233 | |
234 ## plugin management methods ## | |
235 | |
236 def _items_received(self, client, itemsEvent): | |
237 """Callback which manage items notifications (publish + retract)""" | |
238 | |
239 def manage_item(data, event): | |
240 self.host.bridge.ps_event( | |
241 C.PS_MICROBLOG, | |
242 itemsEvent.sender.full(), | |
243 itemsEvent.nodeIdentifier, | |
244 event, | |
245 data_format.serialise(data), | |
246 client.profile, | |
247 ) | |
248 | |
249 for item in itemsEvent.items: | |
250 if item.name == C.PS_ITEM: | |
251 # FIXME: service and node should be used here | |
252 self.item_2_mb_data(client, item, None, None).addCallbacks( | |
253 manage_item, lambda failure: None, (C.PS_PUBLISH,) | |
254 ) | |
255 elif item.name == C.PS_RETRACT: | |
256 manage_item({"id": item["id"]}, C.PS_RETRACT) | |
257 else: | |
258 raise exceptions.InternalError("Invalid event value") | |
259 | |
260 ## data/item transformation ## | |
261 | |
262 @defer.inlineCallbacks | |
263 def item_2_mb_data( | |
264 self, | |
265 client: SatXMPPEntity, | |
266 item_elt: domish.Element, | |
267 service: Optional[jid.JID], | |
268 # FIXME: node is Optional until all calls to item_2_mb_data set properly service | |
269 # and node. Once done, the Optional must be removed here | |
270 node: Optional[str] | |
271 ) -> dict: | |
272 """Convert an XML Item to microblog data | |
273 | |
274 @param item_elt: microblog item element | |
275 @param service: PubSub service where the item has been retrieved | |
276 profile's PEP is used when service is None | |
277 @param node: PubSub node where the item has been retrieved | |
278 if None, "uri" won't be set | |
279 @return: microblog data | |
280 """ | |
281 if service is None: | |
282 service = client.jid.userhostJID() | |
283 | |
284 extra: Dict[str, Any] = {} | |
285 microblog_data: Dict[str, Any] = { | |
286 "service": service.full(), | |
287 "extra": extra | |
288 } | |
289 | |
290 def check_conflict(key, increment=False): | |
291 """Check if key is already in microblog data | |
292 | |
293 @param key(unicode): key to check | |
294 @param increment(bool): if suffix the key with an increment | |
295 instead of raising an exception | |
296 @raise exceptions.DataError: the key already exists | |
297 (not raised if increment is True) | |
298 """ | |
299 if key in microblog_data: | |
300 if not increment: | |
301 raise failure.Failure( | |
302 exceptions.DataError( | |
303 "key {} is already present for item {}" | |
304 ).format(key, item_elt["id"]) | |
305 ) | |
306 else: | |
307 idx = 1 # the idx 0 is the key without suffix | |
308 fmt = "{}#{}" | |
309 new_key = fmt.format(key, idx) | |
310 while new_key in microblog_data: | |
311 idx += 1 | |
312 new_key = fmt.format(key, idx) | |
313 key = new_key | |
314 return key | |
315 | |
316 @defer.inlineCallbacks | |
317 def parseElement(elem): | |
318 """Parse title/content elements and fill microblog_data accordingly""" | |
319 type_ = elem.getAttribute("type") | |
320 if type_ == "xhtml": | |
321 data_elt = elem.firstChildElement() | |
322 if data_elt is None: | |
323 raise failure.Failure( | |
324 exceptions.DataError( | |
325 "XHML content not wrapped in a <div/> element, this is not " | |
326 "standard !" | |
327 ) | |
328 ) | |
329 if data_elt.uri != C.NS_XHTML: | |
330 raise failure.Failure( | |
331 exceptions.DataError( | |
332 _("Content of type XHTML must declare its namespace!") | |
333 ) | |
334 ) | |
335 key = check_conflict("{}_xhtml".format(elem.name)) | |
336 data = data_elt.toXml() | |
337 microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml( | |
338 data | |
339 ) | |
340 else: | |
341 key = check_conflict(elem.name) | |
342 microblog_data[key] = str(elem) | |
343 | |
344 id_ = item_elt.getAttribute("id", "") # there can be no id for transient nodes | |
345 microblog_data["id"] = id_ | |
346 if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT): | |
347 msg = "Unsupported namespace {ns} in pubsub item {id_}".format( | |
348 ns=item_elt.uri, id_=id_ | |
349 ) | |
350 log.warning(msg) | |
351 raise failure.Failure(exceptions.DataError(msg)) | |
352 | |
353 try: | |
354 entry_elt = next(item_elt.elements(NS_ATOM, "entry")) | |
355 except StopIteration: | |
356 msg = "No atom entry found in the pubsub item {}".format(id_) | |
357 raise failure.Failure(exceptions.DataError(msg)) | |
358 | |
359 # uri | |
360 # FIXME: node should alway be set in the future, check FIXME in method signature | |
361 if node is not None: | |
362 microblog_data["node"] = node | |
363 microblog_data['uri'] = xmpp_uri.build_xmpp_uri( | |
364 "pubsub", | |
365 path=service.full(), | |
366 node=node, | |
367 item=id_, | |
368 ) | |
369 | |
370 # language | |
371 try: | |
372 microblog_data["language"] = entry_elt[(C.NS_XML, "lang")].strip() | |
373 except KeyError: | |
374 pass | |
375 | |
376 # atom:id | |
377 try: | |
378 id_elt = next(entry_elt.elements(NS_ATOM, "id")) | |
379 except StopIteration: | |
380 msg = ("No atom id found in the pubsub item {}, this is not standard !" | |
381 .format(id_)) | |
382 log.warning(msg) | |
383 microblog_data["atom_id"] = "" | |
384 else: | |
385 microblog_data["atom_id"] = str(id_elt) | |
386 | |
387 # title/content(s) | |
388 | |
389 # FIXME: ATOM and XEP-0277 only allow 1 <title/> element | |
390 # but in the wild we have some blogs with several ones | |
391 # so we don't respect the standard for now (it doesn't break | |
392 # anything anyway), and we'll find a better option later | |
393 # try: | |
394 # title_elt = entry_elt.elements(NS_ATOM, 'title').next() | |
395 # except StopIteration: | |
396 # msg = u'No atom title found in the pubsub item {}'.format(id_) | |
397 # raise failure.Failure(exceptions.DataError(msg)) | |
398 title_elts = list(entry_elt.elements(NS_ATOM, "title")) | |
399 if not title_elts: | |
400 msg = "No atom title found in the pubsub item {}".format(id_) | |
401 raise failure.Failure(exceptions.DataError(msg)) | |
402 for title_elt in title_elts: | |
403 yield parseElement(title_elt) | |
404 | |
405 # FIXME: as for <title/>, Atom only authorise at most 1 content | |
406 # but XEP-0277 allows several ones. So for no we handle as | |
407 # if more than one can be present | |
408 for content_elt in entry_elt.elements(NS_ATOM, "content"): | |
409 yield parseElement(content_elt) | |
410 | |
411 # we check that text content is present | |
412 for key in ("title", "content"): | |
413 if key not in microblog_data and ("{}_xhtml".format(key)) in microblog_data: | |
414 log.warning( | |
415 "item {id_} provide a {key}_xhtml data but not a text one".format( | |
416 id_=id_, key=key | |
417 ) | |
418 ) | |
419 # ... and do the conversion if it's not | |
420 microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert( | |
421 microblog_data["{}_xhtml".format(key)], | |
422 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML, | |
423 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT, | |
424 False, | |
425 ) | |
426 | |
427 if "content" not in microblog_data: | |
428 # use the atom title data as the microblog body content | |
429 microblog_data["content"] = microblog_data["title"] | |
430 del microblog_data["title"] | |
431 if "title_xhtml" in microblog_data: | |
432 microblog_data["content_xhtml"] = microblog_data["title_xhtml"] | |
433 del microblog_data["title_xhtml"] | |
434 | |
435 # published/updated dates | |
436 try: | |
437 updated_elt = next(entry_elt.elements(NS_ATOM, "updated")) | |
438 except StopIteration: | |
439 msg = "No atom updated element found in the pubsub item {}".format(id_) | |
440 raise failure.Failure(exceptions.DataError(msg)) | |
441 microblog_data["updated"] = calendar.timegm( | |
442 dateutil.parser.parse(str(updated_elt)).utctimetuple() | |
443 ) | |
444 try: | |
445 published_elt = next(entry_elt.elements(NS_ATOM, "published")) | |
446 except StopIteration: | |
447 microblog_data["published"] = microblog_data["updated"] | |
448 else: | |
449 microblog_data["published"] = calendar.timegm( | |
450 dateutil.parser.parse(str(published_elt)).utctimetuple() | |
451 ) | |
452 | |
453 # links | |
454 comments = microblog_data['comments'] = [] | |
455 for link_elt in entry_elt.elements(NS_ATOM, "link"): | |
456 href = link_elt.getAttribute("href") | |
457 if not href: | |
458 log.warning( | |
459 f'missing href in <link> element: {link_elt.toXml()}' | |
460 ) | |
461 continue | |
462 rel = link_elt.getAttribute("rel") | |
463 if (rel == "replies" and link_elt.getAttribute("title") == "comments"): | |
464 uri = href | |
465 comments_data = { | |
466 "uri": uri, | |
467 } | |
468 try: | |
469 comment_service, comment_node = self.parse_comment_url(uri) | |
470 except Exception as e: | |
471 log.warning(f"Can't parse comments url: {e}") | |
472 continue | |
473 else: | |
474 comments_data["service"] = comment_service.full() | |
475 comments_data["node"] = comment_node | |
476 comments.append(comments_data) | |
477 elif rel == "via": | |
478 try: | |
479 repeater_jid = jid.JID(item_elt["publisher"]) | |
480 except (KeyError, RuntimeError): | |
481 try: | |
482 # we look for stanza element which is at the root, meaning that it | |
483 # has not parent | |
484 top_elt = item_elt.parent | |
485 while top_elt.parent is not None: | |
486 top_elt = top_elt.parent | |
487 repeater_jid = jid.JID(top_elt["from"]) | |
488 except (AttributeError, RuntimeError): | |
489 # we should always have either the "publisher" attribute or the | |
490 # stanza available | |
491 log.error( | |
492 f"Can't find repeater of the post: {item_elt.toXml()}" | |
493 ) | |
494 continue | |
495 | |
496 extra["repeated"] = { | |
497 "by": repeater_jid.full(), | |
498 "uri": href | |
499 } | |
500 elif rel in ("related", "enclosure"): | |
501 attachment: Dict[str, Any] = { | |
502 "sources": [{"url": href}] | |
503 } | |
504 if rel == "related": | |
505 attachment["external"] = True | |
506 for attr, key in ( | |
507 ("type", "media_type"), | |
508 ("title", "desc"), | |
509 ): | |
510 value = link_elt.getAttribute(attr) | |
511 if value: | |
512 attachment[key] = value | |
513 try: | |
514 attachment["size"] = int(link_elt.attributes["lenght"]) | |
515 except (KeyError, ValueError): | |
516 pass | |
517 if "media_type" not in attachment: | |
518 media_type = guess_type(href, False)[0] | |
519 if media_type is not None: | |
520 attachment["media_type"] = media_type | |
521 | |
522 attachments = extra.setdefault("attachments", []) | |
523 attachments.append(attachment) | |
524 else: | |
525 log.warning( | |
526 f"Unmanaged link element: {link_elt.toXml()}" | |
527 ) | |
528 | |
529 # author | |
530 publisher = item_elt.getAttribute("publisher") | |
531 try: | |
532 author_elt = next(entry_elt.elements(NS_ATOM, "author")) | |
533 except StopIteration: | |
534 log.debug("Can't find author element in item {}".format(id_)) | |
535 else: | |
536 # name | |
537 try: | |
538 name_elt = next(author_elt.elements(NS_ATOM, "name")) | |
539 except StopIteration: | |
540 log.warning( | |
541 "No name element found in author element of item {}".format(id_) | |
542 ) | |
543 author = None | |
544 else: | |
545 author = microblog_data["author"] = str(name_elt).strip() | |
546 # uri | |
547 try: | |
548 uri_elt = next(author_elt.elements(NS_ATOM, "uri")) | |
549 except StopIteration: | |
550 log.debug( | |
551 "No uri element found in author element of item {}".format(id_) | |
552 ) | |
553 if publisher: | |
554 microblog_data["author_jid"] = publisher | |
555 else: | |
556 uri = str(uri_elt) | |
557 if uri.startswith("xmpp:"): | |
558 uri = uri[5:] | |
559 microblog_data["author_jid"] = uri | |
560 else: | |
561 microblog_data["author_jid"] = ( | |
562 item_elt.getAttribute("publisher") or "" | |
563 ) | |
564 if not author and microblog_data["author_jid"]: | |
565 # FIXME: temporary workaround for missing author name, would be | |
566 # better to use directly JID's identity (to be done from frontends?) | |
567 try: | |
568 microblog_data["author"] = jid.JID(microblog_data["author_jid"]).user | |
569 except Exception as e: | |
570 log.warning(f"No author name found, and can't parse author jid: {e}") | |
571 | |
572 if not publisher: | |
573 log.debug("No publisher attribute, we can't verify author jid") | |
574 microblog_data["author_jid_verified"] = False | |
575 elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID(): | |
576 microblog_data["author_jid_verified"] = True | |
577 else: | |
578 if "repeated" not in extra: | |
579 log.warning( | |
580 "item atom:uri differ from publisher attribute, spoofing " | |
581 "attempt ? atom:uri = {} publisher = {}".format( | |
582 uri, item_elt.getAttribute("publisher") | |
583 ) | |
584 ) | |
585 microblog_data["author_jid_verified"] = False | |
586 # email | |
587 try: | |
588 email_elt = next(author_elt.elements(NS_ATOM, "email")) | |
589 except StopIteration: | |
590 pass | |
591 else: | |
592 microblog_data["author_email"] = str(email_elt) | |
593 | |
594 if not microblog_data.get("author_jid"): | |
595 if publisher: | |
596 microblog_data["author_jid"] = publisher | |
597 microblog_data["author_jid_verified"] = True | |
598 else: | |
599 iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM) | |
600 microblog_data["author_jid"] = iq_elt["from"] | |
601 microblog_data["author_jid_verified"] = False | |
602 | |
603 # categories | |
604 categories = [ | |
605 category_elt.getAttribute("term", "") | |
606 for category_elt in entry_elt.elements(NS_ATOM, "category") | |
607 ] | |
608 microblog_data["tags"] = categories | |
609 | |
610 ## the trigger ## | |
611 # if other plugins have things to add or change | |
612 yield self.host.trigger.point( | |
613 "XEP-0277_item2data", item_elt, entry_elt, microblog_data | |
614 ) | |
615 | |
616 defer.returnValue(microblog_data) | |
617 | |
618 async def mb_data_2_entry_elt(self, client, mb_data, item_id, service, node): | |
619 """Convert a data dict to en entry usable to create an item | |
620 | |
621 @param mb_data: data dict as given by bridge method. | |
622 @param item_id(unicode): id of the item to use | |
623 @param service(jid.JID, None): pubsub service where the item is sent | |
624 Needed to construct Atom id | |
625 @param node(unicode): pubsub node where the item is sent | |
626 Needed to construct Atom id | |
627 @return: deferred which fire domish.Element | |
628 """ | |
629 entry_elt = domish.Element((NS_ATOM, "entry")) | |
630 extra = mb_data.get("extra", {}) | |
631 | |
632 ## language ## | |
633 if "language" in mb_data: | |
634 entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip() | |
635 | |
636 ## content and title ## | |
637 synt = self.host.plugins["TEXT_SYNTAXES"] | |
638 | |
639 for elem_name in ("title", "content"): | |
640 for type_ in ["", "_rich", "_xhtml"]: | |
641 attr = f"{elem_name}{type_}" | |
642 if attr in mb_data: | |
643 elem = entry_elt.addElement(elem_name) | |
644 if type_: | |
645 if type_ == "_rich": # convert input from current syntax to XHTML | |
646 xml_content = await synt.convert( | |
647 mb_data[attr], synt.get_current_syntax(client.profile), "XHTML" | |
648 ) | |
649 if f"{elem_name}_xhtml" in mb_data: | |
650 raise failure.Failure( | |
651 exceptions.DataError( | |
652 _( | |
653 "Can't have xhtml and rich content at the same time" | |
654 ) | |
655 ) | |
656 ) | |
657 else: | |
658 xml_content = mb_data[attr] | |
659 | |
660 div_elt = xml_tools.ElementParser()( | |
661 xml_content, namespace=C.NS_XHTML | |
662 ) | |
663 if ( | |
664 div_elt.name != "div" | |
665 or div_elt.uri != C.NS_XHTML | |
666 or div_elt.attributes | |
667 ): | |
668 # we need a wrapping <div/> at the top with XHTML namespace | |
669 wrap_div_elt = domish.Element((C.NS_XHTML, "div")) | |
670 wrap_div_elt.addChild(div_elt) | |
671 div_elt = wrap_div_elt | |
672 elem.addChild(div_elt) | |
673 elem["type"] = "xhtml" | |
674 if elem_name not in mb_data: | |
675 # there is raw text content, which is mandatory | |
676 # so we create one from xhtml content | |
677 elem_txt = entry_elt.addElement(elem_name) | |
678 text_content = await self.host.plugins[ | |
679 "TEXT_SYNTAXES" | |
680 ].convert( | |
681 xml_content, | |
682 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML, | |
683 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT, | |
684 False, | |
685 ) | |
686 elem_txt.addContent(text_content) | |
687 elem_txt["type"] = "text" | |
688 | |
689 else: # raw text only needs to be escaped to get HTML-safe sequence | |
690 elem.addContent(mb_data[attr]) | |
691 elem["type"] = "text" | |
692 | |
693 try: | |
694 next(entry_elt.elements(NS_ATOM, "title")) | |
695 except StopIteration: | |
696 # we have no title element which is mandatory | |
697 # so we transform content element to title | |
698 elems = list(entry_elt.elements(NS_ATOM, "content")) | |
699 if not elems: | |
700 raise exceptions.DataError( | |
701 "There must be at least one content or title element" | |
702 ) | |
703 for elem in elems: | |
704 elem.name = "title" | |
705 | |
706 ## attachments ## | |
707 attachments = extra.get(C.KEY_ATTACHMENTS) | |
708 if attachments: | |
709 for attachment in attachments: | |
710 try: | |
711 url = attachment["url"] | |
712 except KeyError: | |
713 try: | |
714 url = next( | |
715 s['url'] for s in attachment["sources"] if 'url' in s | |
716 ) | |
717 except (StopIteration, KeyError): | |
718 log.warning( | |
719 f'"url" missing in attachment, ignoring: {attachment}' | |
720 ) | |
721 continue | |
722 | |
723 if not url.startswith("http"): | |
724 log.warning(f"non HTTP URL in attachment, ignoring: {attachment}") | |
725 continue | |
726 link_elt = entry_elt.addElement("link") | |
727 # XXX: "uri" is set in self._manage_comments if not already existing | |
728 link_elt["href"] = url | |
729 if attachment.get("external", False): | |
730 # this is a link to an external data such as a website | |
731 link_elt["rel"] = "related" | |
732 else: | |
733 # this is an attached file | |
734 link_elt["rel"] = "enclosure" | |
735 for key, attr in ( | |
736 ("media_type", "type"), | |
737 ("desc", "title"), | |
738 ("size", "lenght") | |
739 ): | |
740 value = attachment.get(key) | |
741 if value: | |
742 link_elt[attr] = str(value) | |
743 | |
744 ## author ## | |
745 author_elt = entry_elt.addElement("author") | |
746 try: | |
747 author_name = mb_data["author"] | |
748 except KeyError: | |
749 # FIXME: must use better name | |
750 author_name = client.jid.user | |
751 author_elt.addElement("name", content=author_name) | |
752 | |
753 try: | |
754 author_jid_s = mb_data["author_jid"] | |
755 except KeyError: | |
756 author_jid_s = client.jid.userhost() | |
757 author_elt.addElement("uri", content="xmpp:{}".format(author_jid_s)) | |
758 | |
759 try: | |
760 author_jid_s = mb_data["author_email"] | |
761 except KeyError: | |
762 pass | |
763 | |
764 ## published/updated time ## | |
765 current_time = time.time() | |
766 entry_elt.addElement( | |
767 "updated", content=utils.xmpp_date(float(mb_data.get("updated", current_time))) | |
768 ) | |
769 entry_elt.addElement( | |
770 "published", | |
771 content=utils.xmpp_date(float(mb_data.get("published", current_time))), | |
772 ) | |
773 | |
774 ## categories ## | |
775 for tag in mb_data.get('tags', []): | |
776 category_elt = entry_elt.addElement("category") | |
777 category_elt["term"] = tag | |
778 | |
779 ## id ## | |
780 entry_id = mb_data.get( | |
781 "id", | |
782 xmpp_uri.build_xmpp_uri( | |
783 "pubsub", | |
784 path=service.full() if service is not None else client.jid.userhost(), | |
785 node=node, | |
786 item=item_id, | |
787 ), | |
788 ) | |
789 entry_elt.addElement("id", content=entry_id) # | |
790 | |
791 ## comments ## | |
792 for comments_data in mb_data.get('comments', []): | |
793 link_elt = entry_elt.addElement("link") | |
794 # XXX: "uri" is set in self._manage_comments if not already existing | |
795 link_elt["href"] = comments_data["uri"] | |
796 link_elt["rel"] = "replies" | |
797 link_elt["title"] = "comments" | |
798 | |
799 if "repeated" in extra: | |
800 try: | |
801 repeated = extra["repeated"] | |
802 link_elt = entry_elt.addElement("link") | |
803 link_elt["rel"] = "via" | |
804 link_elt["href"] = repeated["uri"] | |
805 except KeyError as e: | |
806 log.warning( | |
807 f"invalid repeated element({e}): {extra['repeated']}" | |
808 ) | |
809 | |
810 ## final item building ## | |
811 item_elt = pubsub.Item(id=item_id, payload=entry_elt) | |
812 | |
813 ## the trigger ## | |
814 # if other plugins have things to add or change | |
815 self.host.trigger.point( | |
816 "XEP-0277_data2entry", client, mb_data, entry_elt, item_elt | |
817 ) | |
818 | |
819 return item_elt | |
820 | |
821 ## publish/preview ## | |
822 | |
823 def is_comment_node(self, node: str) -> bool: | |
824 """Indicate if the node is prefixed with comments namespace""" | |
825 return node.startswith(NS_COMMENT_PREFIX) | |
826 | |
827 def get_parent_item(self, item_id: str) -> str: | |
828 """Return parent of a comment node | |
829 | |
830 @param item_id: a comment node | |
831 """ | |
832 if not self.is_comment_node(item_id): | |
833 raise ValueError("This node is not a comment node") | |
834 return item_id[len(NS_COMMENT_PREFIX):] | |
835 | |
836 def get_comments_node(self, item_id): | |
837 """Generate comment node | |
838 | |
839 @param item_id(unicode): id of the parent item | |
840 @return (unicode): comment node to use | |
841 """ | |
842 return f"{NS_COMMENT_PREFIX}{item_id}" | |
843 | |
844 def get_comments_service(self, client, parent_service=None): | |
845 """Get prefered PubSub service to create comment node | |
846 | |
847 @param pubsub_service(jid.JID, None): PubSub service of the parent item | |
848 @param return((D)jid.JID, None): PubSub service to use | |
849 """ | |
850 if parent_service is not None: | |
851 if parent_service.user: | |
852 # we are on a PEP | |
853 if parent_service.host == client.jid.host: | |
854 # it's our server, we use already found client.pubsub_service below | |
855 pass | |
856 else: | |
857 # other server, let's try to find a non PEP service there | |
858 d = self.host.find_service_entity( | |
859 client, "pubsub", "service", parent_service | |
860 ) | |
861 d.addCallback(lambda entity: entity or parent_service) | |
862 else: | |
863 # parent is already on a normal Pubsub service, we re-use it | |
864 return defer.succeed(parent_service) | |
865 | |
866 return defer.succeed( | |
867 client.pubsub_service if client.pubsub_service is not None else parent_service | |
868 ) | |
869 | |
870 async def _manage_comments(self, client, mb_data, service, node, item_id, access=None): | |
871 """Check comments keys in mb_data and create comments node if necessary | |
872 | |
873 if a comments node metadata is set in the mb_data['comments'] list, it is used | |
874 otherwise it is generated (if allow_comments is True). | |
875 @param mb_data(dict): microblog mb_data | |
876 @param service(jid.JID, None): PubSub service of the parent item | |
877 @param node(unicode): node of the parent item | |
878 @param item_id(unicode): id of the parent item | |
879 @param access(unicode, None): access model | |
880 None to use same access model as parent item | |
881 """ | |
882 allow_comments = mb_data.pop("allow_comments", None) | |
883 if allow_comments is None: | |
884 if "comments" in mb_data: | |
885 mb_data["allow_comments"] = True | |
886 else: | |
887 # no comments set or requested, nothing to do | |
888 return | |
889 elif allow_comments == False: | |
890 if "comments" in mb_data: | |
891 log.warning( | |
892 "comments are not allowed but there is already a comments node, " | |
893 "it may be lost: {uri}".format( | |
894 uri=mb_data["comments"] | |
895 ) | |
896 ) | |
897 del mb_data["comments"] | |
898 return | |
899 | |
900 # we have usually a single comment node, but the spec allow several, so we need to | |
901 # handle this in a list | |
902 if len(mb_data.setdefault('comments', [])) == 0: | |
903 # we need at least one comment node | |
904 comments_data = {} | |
905 mb_data['comments'].append({}) | |
906 | |
907 if access is None: | |
908 # TODO: cache access models per service/node | |
909 parent_node_config = await self._p.getConfiguration(client, service, node) | |
910 access = parent_node_config.get(self._p.OPT_ACCESS_MODEL, self._p.ACCESS_OPEN) | |
911 | |
912 options = { | |
913 self._p.OPT_ACCESS_MODEL: access, | |
914 self._p.OPT_MAX_ITEMS: "max", | |
915 self._p.OPT_PERSIST_ITEMS: 1, | |
916 self._p.OPT_DELIVER_PAYLOADS: 1, | |
917 self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, | |
918 # FIXME: would it make sense to restrict publish model to subscribers? | |
919 self._p.OPT_PUBLISH_MODEL: self._p.ACCESS_OPEN, | |
920 } | |
921 | |
922 # if other plugins need to change the options | |
923 self.host.trigger.point("XEP-0277_comments", client, mb_data, options) | |
924 | |
925 for comments_data in mb_data['comments']: | |
926 uri = comments_data.get('uri') | |
927 comments_node = comments_data.get('node') | |
928 try: | |
929 comments_service = jid.JID(comments_data["service"]) | |
930 except KeyError: | |
931 comments_service = None | |
932 | |
933 if uri: | |
934 uri_service, uri_node = self.parse_comment_url(uri) | |
935 if ((comments_node is not None and comments_node!=uri_node) | |
936 or (comments_service is not None and comments_service!=uri_service)): | |
937 raise ValueError( | |
938 f"Incoherence between comments URI ({uri}) and comments_service " | |
939 f"({comments_service}) or comments_node ({comments_node})") | |
940 comments_data['service'] = comments_service = uri_service | |
941 comments_data['node'] = comments_node = uri_node | |
942 else: | |
943 if not comments_node: | |
944 comments_node = self.get_comments_node(item_id) | |
945 comments_data['node'] = comments_node | |
946 if comments_service is None: | |
947 comments_service = await self.get_comments_service(client, service) | |
948 if comments_service is None: | |
949 comments_service = client.jid.userhostJID() | |
950 comments_data['service'] = comments_service | |
951 | |
952 comments_data['uri'] = xmpp_uri.build_xmpp_uri( | |
953 "pubsub", | |
954 path=comments_service.full(), | |
955 node=comments_node, | |
956 ) | |
957 | |
958 try: | |
959 await self._p.createNode(client, comments_service, comments_node, options) | |
960 except error.StanzaError as e: | |
961 if e.condition == "conflict": | |
962 log.info( | |
963 "node {} already exists on service {}".format( | |
964 comments_node, comments_service | |
965 ) | |
966 ) | |
967 else: | |
968 raise e | |
969 else: | |
970 if access == self._p.ACCESS_WHITELIST: | |
971 # for whitelist access we need to copy affiliations from parent item | |
972 comments_affiliations = await self._p.get_node_affiliations( | |
973 client, service, node | |
974 ) | |
975 # …except for "member", that we transform to publisher | |
976 # because we wants members to be able to write to comments | |
977 for jid_, affiliation in list(comments_affiliations.items()): | |
978 if affiliation == "member": | |
979 comments_affiliations[jid_] == "publisher" | |
980 | |
981 await self._p.set_node_affiliations( | |
982 client, comments_service, comments_node, comments_affiliations | |
983 ) | |
984 | |
985 def friendly_id(self, data): | |
986 """Generate a user friendly id from title or content""" | |
987 # TODO: rich content should be converted to plain text | |
988 id_base = regex.url_friendly_text( | |
989 data.get('title') | |
990 or data.get('title_rich') | |
991 or data.get('content') | |
992 or data.get('content_rich') | |
993 or '' | |
994 ) | |
995 return f"{id_base}-{token_urlsafe(3)}" | |
996 | |
997 def _mb_send(self, service, node, data, profile_key): | |
998 service = jid.JID(service) if service else None | |
999 node = node if node else NS_MICROBLOG | |
1000 client = self.host.get_client(profile_key) | |
1001 data = data_format.deserialise(data) | |
1002 return defer.ensureDeferred(self.send(client, data, service, node)) | |
1003 | |
1004 async def send( | |
1005 self, | |
1006 client: SatXMPPEntity, | |
1007 data: dict, | |
1008 service: Optional[jid.JID] = None, | |
1009 node: Optional[str] = NS_MICROBLOG | |
1010 ) -> Optional[str]: | |
1011 """Send XEP-0277's microblog data | |
1012 | |
1013 @param data: microblog data (must include at least a "content" or a "title" key). | |
1014 see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details | |
1015 @param service: PubSub service where the microblog must be published | |
1016 None to publish on profile's PEP | |
1017 @param node: PubSub node to use (defaut to microblog NS) | |
1018 None is equivalend as using default value | |
1019 @return: ID of the published item | |
1020 """ | |
1021 # TODO: check that all data keys are used, this would avoid sending publicly a private message | |
1022 # by accident (e.g. if group plugin is not loaded, and "group*" key are not used) | |
1023 if service is None: | |
1024 service = client.jid.userhostJID() | |
1025 if node is None: | |
1026 node = NS_MICROBLOG | |
1027 | |
1028 item_id = data.get("id") | |
1029 if item_id is None: | |
1030 if data.get("user_friendly_id", True): | |
1031 item_id = self.friendly_id(data) | |
1032 else: | |
1033 item_id = str(shortuuid.uuid()) | |
1034 | |
1035 try: | |
1036 await self._manage_comments(client, data, service, node, item_id, access=None) | |
1037 except error.StanzaError: | |
1038 log.warning("Can't create comments node for item {}".format(item_id)) | |
1039 item = await self.mb_data_2_entry_elt(client, data, item_id, service, node) | |
1040 | |
1041 if not await self.host.trigger.async_point( | |
1042 "XEP-0277_send", client, service, node, item, data | |
1043 ): | |
1044 return None | |
1045 | |
1046 extra = {} | |
1047 for key in ("encrypted", "encrypted_for", "signed"): | |
1048 value = data.get(key) | |
1049 if value is not None: | |
1050 extra[key] = value | |
1051 | |
1052 await self._p.publish(client, service, node, [item], extra=extra) | |
1053 return item_id | |
1054 | |
1055 def _mb_repeat( | |
1056 self, | |
1057 service_s: str, | |
1058 node: str, | |
1059 item: str, | |
1060 extra_s: str, | |
1061 profile_key: str | |
1062 ) -> defer.Deferred: | |
1063 service = jid.JID(service_s) if service_s else None | |
1064 node = node if node else NS_MICROBLOG | |
1065 client = self.host.get_client(profile_key) | |
1066 extra = data_format.deserialise(extra_s) | |
1067 d = defer.ensureDeferred( | |
1068 self.repeat(client, item, service, node, extra) | |
1069 ) | |
1070 # [repeat] can return None, and we always need a str | |
1071 d.addCallback(lambda ret: ret or "") | |
1072 return d | |
1073 | |
1074 async def repeat( | |
1075 self, | |
1076 client: SatXMPPEntity, | |
1077 item: str, | |
1078 service: Optional[jid.JID] = None, | |
1079 node: str = NS_MICROBLOG, | |
1080 extra: Optional[dict] = None, | |
1081 ) -> Optional[str]: | |
1082 """Re-publish a post from somewhere else | |
1083 | |
1084 This is a feature often name "share" or "boost", it is generally used to make a | |
1085 publication more visible by sharing it with our own audience | |
1086 """ | |
1087 if service is None: | |
1088 service = client.jid.userhostJID() | |
1089 | |
1090 # we first get the post to repeat | |
1091 items, __ = await self._p.get_items( | |
1092 client, | |
1093 service, | |
1094 node, | |
1095 item_ids = [item] | |
1096 ) | |
1097 if not items: | |
1098 raise exceptions.NotFound( | |
1099 f"no item found at node {node!r} on {service} with ID {item!r}" | |
1100 ) | |
1101 item_elt = items[0] | |
1102 try: | |
1103 entry_elt = next(item_elt.elements(NS_ATOM, "entry")) | |
1104 except StopIteration: | |
1105 raise exceptions.DataError( | |
1106 "post to repeat is not a XEP-0277 blog item" | |
1107 ) | |
1108 | |
1109 # we want to be sure that we have an author element | |
1110 try: | |
1111 author_elt = next(entry_elt.elements(NS_ATOM, "author")) | |
1112 except StopIteration: | |
1113 author_elt = entry_elt.addElement("author") | |
1114 | |
1115 try: | |
1116 next(author_elt.elements(NS_ATOM, "name")) | |
1117 except StopIteration: | |
1118 author_elt.addElement("name", content=service.user) | |
1119 | |
1120 try: | |
1121 next(author_elt.elements(NS_ATOM, "uri")) | |
1122 except StopIteration: | |
1123 entry_elt.addElement( | |
1124 "uri", content=xmpp_uri.build_xmpp_uri(None, path=service.full()) | |
1125 ) | |
1126 | |
1127 # we add the link indicating that it's a repeated post | |
1128 link_elt = entry_elt.addElement("link") | |
1129 link_elt["rel"] = "via" | |
1130 link_elt["href"] = xmpp_uri.build_xmpp_uri( | |
1131 "pubsub", path=service.full(), node=node, item=item | |
1132 ) | |
1133 | |
1134 return await self._p.send_item( | |
1135 client, | |
1136 client.jid.userhostJID(), | |
1137 NS_MICROBLOG, | |
1138 entry_elt | |
1139 ) | |
1140 | |
1141 def _mb_preview(self, service, node, data, profile_key): | |
1142 service = jid.JID(service) if service else None | |
1143 node = node if node else NS_MICROBLOG | |
1144 client = self.host.get_client(profile_key) | |
1145 data = data_format.deserialise(data) | |
1146 d = defer.ensureDeferred(self.preview(client, data, service, node)) | |
1147 d.addCallback(data_format.serialise) | |
1148 return d | |
1149 | |
1150 async def preview( | |
1151 self, | |
1152 client: SatXMPPEntity, | |
1153 data: dict, | |
1154 service: Optional[jid.JID] = None, | |
1155 node: Optional[str] = NS_MICROBLOG | |
1156 ) -> dict: | |
1157 """Preview microblog data without publishing them | |
1158 | |
1159 params are the same as for [send] | |
1160 @return: microblog data as would be retrieved from published item | |
1161 """ | |
1162 if node is None: | |
1163 node = NS_MICROBLOG | |
1164 | |
1165 item_id = data.get("id", "") | |
1166 | |
1167 # we have to serialise then deserialise to be sure that all triggers are called | |
1168 item_elt = await self.mb_data_2_entry_elt(client, data, item_id, service, node) | |
1169 item_elt.uri = pubsub.NS_PUBSUB | |
1170 return await self.item_2_mb_data(client, item_elt, service, node) | |
1171 | |
1172 | |
1173 ## retract ## | |
1174 | |
1175 def _mb_retract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key): | |
1176 """Call self._p._retract_item, but use default node if node is empty""" | |
1177 return self._p._retract_item( | |
1178 service_jid_s, | |
1179 nodeIdentifier or NS_MICROBLOG, | |
1180 itemIdentifier, | |
1181 True, | |
1182 profile_key, | |
1183 ) | |
1184 | |
1185 ## get ## | |
1186 | |
1187 def _mb_get_serialise(self, data): | |
1188 items, metadata = data | |
1189 metadata['items'] = items | |
1190 return data_format.serialise(metadata) | |
1191 | |
1192 def _mb_get(self, service="", node="", max_items=10, item_ids=None, extra="", | |
1193 profile_key=C.PROF_KEY_NONE): | |
1194 """ | |
1195 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | |
1196 @param item_ids (list[unicode]): list of item IDs | |
1197 """ | |
1198 client = self.host.get_client(profile_key) | |
1199 service = jid.JID(service) if service else None | |
1200 max_items = None if max_items == C.NO_LIMIT else max_items | |
1201 extra = self._p.parse_extra(data_format.deserialise(extra)) | |
1202 d = defer.ensureDeferred( | |
1203 self.mb_get(client, service, node or None, max_items, item_ids, | |
1204 extra.rsm_request, extra.extra) | |
1205 ) | |
1206 d.addCallback(self._mb_get_serialise) | |
1207 return d | |
1208 | |
1209 async def mb_get( | |
1210 self, | |
1211 client: SatXMPPEntity, | |
1212 service: Optional[jid.JID] = None, | |
1213 node: Optional[str] = None, | |
1214 max_items: Optional[int] = 10, | |
1215 item_ids: Optional[List[str]] = None, | |
1216 rsm_request: Optional[rsm.RSMRequest] = None, | |
1217 extra: Optional[Dict[str, Any]] = None | |
1218 ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: | |
1219 """Get some microblogs | |
1220 | |
1221 @param service(jid.JID, None): jid of the publisher | |
1222 None to get profile's PEP | |
1223 @param node(unicode, None): node to get (or microblog node if None) | |
1224 @param max_items(int): maximum number of item to get, None for no limit | |
1225 ignored if rsm_request is set | |
1226 @param item_ids (list[unicode]): list of item IDs | |
1227 @param rsm_request (rsm.RSMRequest): RSM request data | |
1228 @param extra (dict): extra data | |
1229 | |
1230 @return: a deferred couple with the list of items and metadatas. | |
1231 """ | |
1232 if node is None: | |
1233 node = NS_MICROBLOG | |
1234 if rsm_request: | |
1235 max_items = None | |
1236 items_data = await self._p.get_items( | |
1237 client, | |
1238 service, | |
1239 node, | |
1240 max_items=max_items, | |
1241 item_ids=item_ids, | |
1242 rsm_request=rsm_request, | |
1243 extra=extra, | |
1244 ) | |
1245 mb_data_list, metadata = await self._p.trans_items_data_d( | |
1246 items_data, partial(self.item_2_mb_data, client, service=service, node=node)) | |
1247 encrypted = metadata.pop("encrypted", None) | |
1248 if encrypted is not None: | |
1249 for mb_data in mb_data_list: | |
1250 try: | |
1251 mb_data["encrypted"] = encrypted[mb_data["id"]] | |
1252 except KeyError: | |
1253 pass | |
1254 return (mb_data_list, metadata) | |
1255 | |
1256 def _mb_rename(self, service, node, item_id, new_id, profile_key): | |
1257 return defer.ensureDeferred(self.mb_rename( | |
1258 self.host.get_client(profile_key), | |
1259 jid.JID(service) if service else None, | |
1260 node or None, | |
1261 item_id, | |
1262 new_id | |
1263 )) | |
1264 | |
1265 async def mb_rename( | |
1266 self, | |
1267 client: SatXMPPEntity, | |
1268 service: Optional[jid.JID], | |
1269 node: Optional[str], | |
1270 item_id: str, | |
1271 new_id: str | |
1272 ) -> None: | |
1273 if not node: | |
1274 node = NS_MICROBLOG | |
1275 await self._p.rename_item(client, service, node, item_id, new_id) | |
1276 | |
1277 def parse_comment_url(self, node_url): | |
1278 """Parse a XMPP URI | |
1279 | |
1280 Determine the fields comments_service and comments_node of a microblog data | |
1281 from the href attribute of an entry's link element. For example this input: | |
1282 xmpp:sat-pubsub.example.net?;node=urn%3Axmpp%3Acomments%3A_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asomebody%40example.net | |
1283 will return(JID(u'sat-pubsub.example.net'), 'urn:xmpp:comments:_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:somebody@example.net') | |
1284 @return (tuple[jid.JID, unicode]): service and node | |
1285 """ | |
1286 try: | |
1287 parsed_url = xmpp_uri.parse_xmpp_uri(node_url) | |
1288 service = jid.JID(parsed_url["path"]) | |
1289 node = parsed_url["node"] | |
1290 except Exception as e: | |
1291 raise exceptions.DataError(f"Invalid comments link: {e}") | |
1292 | |
1293 return (service, node) | |
1294 | |
1295 ## configure ## | |
1296 | |
1297 def mb_access_set(self, access="presence", profile_key=C.PROF_KEY_NONE): | |
1298 """Create a microblog node on PEP with given access | |
1299 | |
1300 If the node already exists, it change options | |
1301 @param access: Node access model, according to xep-0060 #4.5 | |
1302 @param profile_key: profile key | |
1303 """ | |
1304 # FIXME: check if this mehtod is need, deprecate it if not | |
1305 client = self.host.get_client(profile_key) | |
1306 | |
1307 _options = { | |
1308 self._p.OPT_ACCESS_MODEL: access, | |
1309 self._p.OPT_MAX_ITEMS: "max", | |
1310 self._p.OPT_PERSIST_ITEMS: 1, | |
1311 self._p.OPT_DELIVER_PAYLOADS: 1, | |
1312 self._p.OPT_SEND_ITEM_SUBSCRIBE: 1, | |
1313 } | |
1314 | |
1315 def cb(result): | |
1316 # Node is created with right permission | |
1317 log.debug(_("Microblog node has now access %s") % access) | |
1318 | |
1319 def fatal_err(s_error): | |
1320 # Something went wrong | |
1321 log.error(_("Can't set microblog access")) | |
1322 raise NodeAccessChangeException() | |
1323 | |
1324 def err_cb(s_error): | |
1325 # If the node already exists, the condition is "conflict", | |
1326 # else we have an unmanaged error | |
1327 if s_error.value.condition == "conflict": | |
1328 # d = self.host.plugins["XEP-0060"].deleteNode(client, client.jid.userhostJID(), NS_MICROBLOG) | |
1329 # d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err)) | |
1330 change_node_options().addCallback(cb).addErrback(fatal_err) | |
1331 else: | |
1332 fatal_err(s_error) | |
1333 | |
1334 def create_node(): | |
1335 return self._p.createNode( | |
1336 client, client.jid.userhostJID(), NS_MICROBLOG, _options | |
1337 ) | |
1338 | |
1339 def change_node_options(): | |
1340 return self._p.setOptions( | |
1341 client.jid.userhostJID(), | |
1342 NS_MICROBLOG, | |
1343 client.jid.userhostJID(), | |
1344 _options, | |
1345 profile_key=profile_key, | |
1346 ) | |
1347 | |
1348 create_node().addCallback(cb).addErrback(err_cb) | |
1349 | |
1350 ## methods to manage several stanzas/jids at once ## | |
1351 | |
1352 # common | |
1353 | |
1354 def _get_client_and_node_data(self, publishers_type, publishers, profile_key): | |
1355 """Helper method to construct node_data from publishers_type/publishers | |
1356 | |
1357 @param publishers_type: type of the list of publishers, one of: | |
1358 C.ALL: get all jids from roster, publishers is not used | |
1359 C.GROUP: get jids from groups | |
1360 C.JID: use publishers directly as list of jids | |
1361 @param publishers: list of publishers, according to "publishers_type" (None, | |
1362 list of groups or list of jids) | |
1363 @param profile_key: %(doc_profile_key)s | |
1364 """ | |
1365 client = self.host.get_client(profile_key) | |
1366 if publishers_type == C.JID: | |
1367 jids_set = set(publishers) | |
1368 else: | |
1369 jids_set = client.roster.get_jids_set(publishers_type, publishers) | |
1370 if publishers_type == C.ALL: | |
1371 try: | |
1372 # display messages from salut-a-toi@libervia.org or other PEP services | |
1373 services = self.host.plugins["EXTRA-PEP"].get_followed_entities( | |
1374 profile_key | |
1375 ) | |
1376 except KeyError: | |
1377 pass # plugin is not loaded | |
1378 else: | |
1379 if services: | |
1380 log.debug( | |
1381 "Extra PEP followed entities: %s" | |
1382 % ", ".join([str(service) for service in services]) | |
1383 ) | |
1384 jids_set.update(services) | |
1385 | |
1386 node_data = [] | |
1387 for jid_ in jids_set: | |
1388 node_data.append((jid_, NS_MICROBLOG)) | |
1389 return client, node_data | |
1390 | |
1391 def _check_publishers(self, publishers_type, publishers): | |
1392 """Helper method to deserialise publishers coming from bridge | |
1393 | |
1394 publishers_type(unicode): type of the list of publishers, one of: | |
1395 publishers: list of publishers according to type | |
1396 @return: deserialised (publishers_type, publishers) tuple | |
1397 """ | |
1398 if publishers_type == C.ALL: | |
1399 if publishers: | |
1400 raise failure.Failure( | |
1401 ValueError( | |
1402 "Can't use publishers with {} type".format(publishers_type) | |
1403 ) | |
1404 ) | |
1405 else: | |
1406 publishers = None | |
1407 elif publishers_type == C.JID: | |
1408 publishers[:] = [jid.JID(publisher) for publisher in publishers] | |
1409 return publishers_type, publishers | |
1410 | |
1411 # subscribe # | |
1412 | |
1413 def _mb_subscribe_to_many(self, publishers_type, publishers, profile_key): | |
1414 """ | |
1415 | |
1416 @return (str): session id: Use pubsub.getSubscribeRTResult to get the results | |
1417 """ | |
1418 publishers_type, publishers = self._check_publishers(publishers_type, publishers) | |
1419 return self.mb_subscribe_to_many(publishers_type, publishers, profile_key) | |
1420 | |
1421 def mb_subscribe_to_many(self, publishers_type, publishers, profile_key): | |
1422 """Subscribe microblogs for a list of groups or jids | |
1423 | |
1424 @param publishers_type: type of the list of publishers, one of: | |
1425 C.ALL: get all jids from roster, publishers is not used | |
1426 C.GROUP: get jids from groups | |
1427 C.JID: use publishers directly as list of jids | |
1428 @param publishers: list of publishers, according to "publishers_type" (None, list | |
1429 of groups or list of jids) | |
1430 @param profile: %(doc_profile)s | |
1431 @return (str): session id | |
1432 """ | |
1433 client, node_data = self._get_client_and_node_data( | |
1434 publishers_type, publishers, profile_key | |
1435 ) | |
1436 return self._p.subscribe_to_many( | |
1437 node_data, client.jid.userhostJID(), profile_key=profile_key | |
1438 ) | |
1439 | |
1440 # get # | |
1441 | |
1442 def _mb_get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT): | |
1443 """Get real-time results for mb_get_from_many session | |
1444 | |
1445 @param session_id: id of the real-time deferred session | |
1446 @param return (tuple): (remaining, results) where: | |
1447 - remaining is the number of still expected results | |
1448 - results is a list of tuple with | |
1449 - service (unicode): pubsub service | |
1450 - node (unicode): pubsub node | |
1451 - failure (unicode): empty string in case of success, error message else | |
1452 - items_data(list): data as returned by [mb_get] | |
1453 - items_metadata(dict): metadata as returned by [mb_get] | |
1454 @param profile_key: %(doc_profile_key)s | |
1455 """ | |
1456 | |
1457 client = self.host.get_client(profile_key) | |
1458 | |
1459 def onSuccess(items_data): | |
1460 """convert items elements to list of microblog data in items_data""" | |
1461 d = self._p.trans_items_data_d( | |
1462 items_data, | |
1463 # FIXME: service and node should be used here | |
1464 partial(self.item_2_mb_data, client), | |
1465 serialise=True | |
1466 ) | |
1467 d.addCallback(lambda serialised: ("", serialised)) | |
1468 return d | |
1469 | |
1470 d = self._p.get_rt_results( | |
1471 session_id, | |
1472 on_success=onSuccess, | |
1473 on_error=lambda failure: (str(failure.value), ([], {})), | |
1474 profile=client.profile, | |
1475 ) | |
1476 d.addCallback( | |
1477 lambda ret: ( | |
1478 ret[0], | |
1479 [ | |
1480 (service.full(), node, failure, items, metadata) | |
1481 for (service, node), (success, (failure, (items, metadata))) in ret[ | |
1482 1 | |
1483 ].items() | |
1484 ], | |
1485 ) | |
1486 ) | |
1487 return d | |
1488 | |
1489 def _mb_get_from_many(self, publishers_type, publishers, max_items=10, extra_dict=None, | |
1490 profile_key=C.PROF_KEY_NONE): | |
1491 """ | |
1492 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | |
1493 """ | |
1494 max_items = None if max_items == C.NO_LIMIT else max_items | |
1495 publishers_type, publishers = self._check_publishers(publishers_type, publishers) | |
1496 extra = self._p.parse_extra(extra_dict) | |
1497 return self.mb_get_from_many( | |
1498 publishers_type, | |
1499 publishers, | |
1500 max_items, | |
1501 extra.rsm_request, | |
1502 extra.extra, | |
1503 profile_key, | |
1504 ) | |
1505 | |
1506 def mb_get_from_many(self, publishers_type, publishers, max_items=None, rsm_request=None, | |
1507 extra=None, profile_key=C.PROF_KEY_NONE): | |
1508 """Get the published microblogs for a list of groups or jids | |
1509 | |
1510 @param publishers_type (str): type of the list of publishers (one of "GROUP" or | |
1511 "JID" or "ALL") | |
1512 @param publishers (list): list of publishers, according to publishers_type (list | |
1513 of groups or list of jids) | |
1514 @param max_items (int): optional limit on the number of retrieved items. | |
1515 @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers | |
1516 @param extra (dict): Extra data | |
1517 @param profile_key: profile key | |
1518 @return (str): RT Deferred session id | |
1519 """ | |
1520 # XXX: extra is unused here so far | |
1521 client, node_data = self._get_client_and_node_data( | |
1522 publishers_type, publishers, profile_key | |
1523 ) | |
1524 return self._p.get_from_many( | |
1525 node_data, max_items, rsm_request, profile_key=profile_key | |
1526 ) | |
1527 | |
1528 # comments # | |
1529 | |
1530 def _mb_get_from_many_with_comments_rt_result_serialise(self, data): | |
1531 """Serialisation of result | |
1532 | |
1533 This is probably the longest method name of whole SàT ecosystem ^^ | |
1534 @param data(dict): data as received by rt_sessions | |
1535 @return (tuple): see [_mb_get_from_many_with_comments_rt_result] | |
1536 """ | |
1537 ret = [] | |
1538 data_iter = iter(data[1].items()) | |
1539 for (service, node), (success, (failure_, (items_data, metadata))) in data_iter: | |
1540 items = [] | |
1541 for item, item_metadata in items_data: | |
1542 item = data_format.serialise(item) | |
1543 items.append((item, item_metadata)) | |
1544 ret.append(( | |
1545 service.full(), | |
1546 node, | |
1547 failure_, | |
1548 items, | |
1549 metadata)) | |
1550 | |
1551 return data[0], ret | |
1552 | |
1553 def _mb_get_from_many_with_comments_rt_result(self, session_id, | |
1554 profile_key=C.PROF_KEY_DEFAULT): | |
1555 """Get real-time results for [mb_get_from_many_with_comments] session | |
1556 | |
1557 @param session_id: id of the real-time deferred session | |
1558 @param return (tuple): (remaining, results) where: | |
1559 - remaining is the number of still expected results | |
1560 - results is a list of 5-tuple with | |
1561 - service (unicode): pubsub service | |
1562 - node (unicode): pubsub node | |
1563 - failure (unicode): empty string in case of success, error message else | |
1564 - items(list[tuple(dict, list)]): list of 2-tuple with | |
1565 - item(dict): item microblog data | |
1566 - comments_list(list[tuple]): list of 5-tuple with | |
1567 - service (unicode): pubsub service where the comments node is | |
1568 - node (unicode): comments node | |
1569 - failure (unicode): empty in case of success, else error message | |
1570 - comments(list[dict]): list of microblog data | |
1571 - comments_metadata(dict): metadata of the comment node | |
1572 - metadata(dict): original node metadata | |
1573 @param profile_key: %(doc_profile_key)s | |
1574 """ | |
1575 profile = self.host.get_client(profile_key).profile | |
1576 d = self.rt_sessions.get_results(session_id, profile=profile) | |
1577 d.addCallback(self._mb_get_from_many_with_comments_rt_result_serialise) | |
1578 return d | |
1579 | |
1580 def _mb_get_from_many_with_comments(self, publishers_type, publishers, max_items=10, | |
1581 max_comments=C.NO_LIMIT, extra_dict=None, | |
1582 extra_comments_dict=None, profile_key=C.PROF_KEY_NONE): | |
1583 """ | |
1584 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit | |
1585 @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no | |
1586 limit | |
1587 """ | |
1588 max_items = None if max_items == C.NO_LIMIT else max_items | |
1589 max_comments = None if max_comments == C.NO_LIMIT else max_comments | |
1590 publishers_type, publishers = self._check_publishers(publishers_type, publishers) | |
1591 extra = self._p.parse_extra(extra_dict) | |
1592 extra_comments = self._p.parse_extra(extra_comments_dict) | |
1593 return self.mb_get_from_many_with_comments( | |
1594 publishers_type, | |
1595 publishers, | |
1596 max_items, | |
1597 max_comments or None, | |
1598 extra.rsm_request, | |
1599 extra.extra, | |
1600 extra_comments.rsm_request, | |
1601 extra_comments.extra, | |
1602 profile_key, | |
1603 ) | |
1604 | |
1605 def mb_get_from_many_with_comments(self, publishers_type, publishers, max_items=None, | |
1606 max_comments=None, rsm_request=None, extra=None, | |
1607 rsm_comments=None, extra_comments=None, | |
1608 profile_key=C.PROF_KEY_NONE): | |
1609 """Helper method to get the microblogs and their comments in one shot | |
1610 | |
1611 @param publishers_type (str): type of the list of publishers (one of "GROUP" or | |
1612 "JID" or "ALL") | |
1613 @param publishers (list): list of publishers, according to publishers_type (list | |
1614 of groups or list of jids) | |
1615 @param max_items (int): optional limit on the number of retrieved items. | |
1616 @param max_comments (int): maximum number of comments to retrieve | |
1617 @param rsm_request (rsm.RSMRequest): RSM request for initial items only | |
1618 @param extra (dict): extra configuration for initial items only | |
1619 @param rsm_comments (rsm.RSMRequest): RSM request for comments only | |
1620 @param extra_comments (dict): extra configuration for comments only | |
1621 @param profile_key: profile key | |
1622 @return (str): RT Deferred session id | |
1623 """ | |
1624 # XXX: this method seems complicated because it do a couple of treatments | |
1625 # to serialise and associate the data, but it make life in frontends side | |
1626 # a lot easier | |
1627 | |
1628 client, node_data = self._get_client_and_node_data( | |
1629 publishers_type, publishers, profile_key | |
1630 ) | |
1631 | |
1632 def get_comments(items_data): | |
1633 """Retrieve comments and add them to the items_data | |
1634 | |
1635 @param items_data: serialised items data | |
1636 @return (defer.Deferred): list of items where each item is associated | |
1637 with a list of comments data (service, node, list of items, metadata) | |
1638 """ | |
1639 items, metadata = items_data | |
1640 items_dlist = [] # deferred list for items | |
1641 for item in items: | |
1642 dlist = [] # deferred list for comments | |
1643 for key, value in item.items(): | |
1644 # we look for comments | |
1645 if key.startswith("comments") and key.endswith("_service"): | |
1646 prefix = key[: key.find("_")] | |
1647 service_s = value | |
1648 service = jid.JID(service_s) | |
1649 node = item["{}{}".format(prefix, "_node")] | |
1650 # time to get the comments | |
1651 d = defer.ensureDeferred( | |
1652 self._p.get_items( | |
1653 client, | |
1654 service, | |
1655 node, | |
1656 max_comments, | |
1657 rsm_request=rsm_comments, | |
1658 extra=extra_comments, | |
1659 ) | |
1660 ) | |
1661 # then serialise | |
1662 d.addCallback( | |
1663 lambda items_data: self._p.trans_items_data_d( | |
1664 items_data, | |
1665 partial( | |
1666 self.item_2_mb_data, client, service=service, node=node | |
1667 ), | |
1668 serialise=True | |
1669 ) | |
1670 ) | |
1671 # with failure handling | |
1672 d.addCallback( | |
1673 lambda serialised_items_data: ("",) + serialised_items_data | |
1674 ) | |
1675 d.addErrback(lambda failure: (str(failure.value), [], {})) | |
1676 # and associate with service/node (needed if there are several | |
1677 # comments nodes) | |
1678 d.addCallback( | |
1679 lambda serialised, service_s=service_s, node=node: ( | |
1680 service_s, | |
1681 node, | |
1682 ) | |
1683 + serialised | |
1684 ) | |
1685 dlist.append(d) | |
1686 # we get the comments | |
1687 comments_d = defer.gatherResults(dlist) | |
1688 # and add them to the item data | |
1689 comments_d.addCallback( | |
1690 lambda comments_data, item=item: (item, comments_data) | |
1691 ) | |
1692 items_dlist.append(comments_d) | |
1693 # we gather the items + comments in a list | |
1694 items_d = defer.gatherResults(items_dlist) | |
1695 # and add the metadata | |
1696 items_d.addCallback(lambda items_completed: (items_completed, metadata)) | |
1697 return items_d | |
1698 | |
1699 deferreds = {} | |
1700 for service, node in node_data: | |
1701 d = deferreds[(service, node)] = defer.ensureDeferred(self._p.get_items( | |
1702 client, service, node, max_items, rsm_request=rsm_request, extra=extra | |
1703 )) | |
1704 d.addCallback( | |
1705 lambda items_data: self._p.trans_items_data_d( | |
1706 items_data, | |
1707 partial(self.item_2_mb_data, client, service=service, node=node), | |
1708 ) | |
1709 ) | |
1710 d.addCallback(get_comments) | |
1711 d.addCallback(lambda items_comments_data: ("", items_comments_data)) | |
1712 d.addErrback(lambda failure: (str(failure.value), ([], {}))) | |
1713 | |
1714 return self.rt_sessions.new_session(deferreds, client.profile) | |
1715 | |
1716 | |
1717 @implementer(iwokkel.IDisco) | |
1718 class XEP_0277_handler(XMPPHandler): | |
1719 | |
1720 def getDiscoInfo(self, requestor, target, nodeIdentifier=""): | |
1721 return [disco.DiscoFeature(NS_MICROBLOG)] | |
1722 | |
1723 def getDiscoItems(self, requestor, target, nodeIdentifier=""): | |
1724 return [] |