comparison libervia/backend/plugins/plugin_xep_0277.py @ 4071:4b842c1fb686

refactoring: renamed `sat` package to `libervia.backend`
author Goffi <goffi@goffi.org>
date Fri, 02 Jun 2023 11:49:51 +0200
parents sat/plugins/plugin_xep_0277.py@524856bd7b19
children 0e48181d50ab
comparison
equal deleted inserted replaced
4070:d10748475025 4071:4b842c1fb686
1 #!/usr/bin/env python3
2
3 # SAT plugin for microblogging over XMPP (xep-0277)
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org)
5
6 # This program is free software: you can redistribute it and/or modify
7 # it under the terms of the GNU Affero General Public License as published by
8 # the Free Software Foundation, either version 3 of the License, or
9 # (at your option) any later version.
10
11 # This program is distributed in the hope that it will be useful,
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 # GNU Affero General Public License for more details.
15
16 # You should have received a copy of the GNU Affero General Public License
17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18
19 import time
20 import dateutil
21 import calendar
22 from mimetypes import guess_type
23 from secrets import token_urlsafe
24 from typing import List, Optional, Dict, Tuple, Any, Dict
25 from functools import partial
26
27 import shortuuid
28
29 from twisted.words.protocols.jabber import jid, error
30 from twisted.words.protocols.jabber.xmlstream import XMPPHandler
31 from twisted.words.xish import domish
32 from twisted.internet import defer
33 from twisted.python import failure
34
35 # XXX: sat_tmp.wokkel.pubsub is actually used instead of wokkel version
36 from wokkel import pubsub
37 from wokkel import disco, iwokkel, rsm
38 from zope.interface import implementer
39
40 from libervia.backend.core.i18n import _
41 from libervia.backend.core.constants import Const as C
42 from libervia.backend.core.log import getLogger
43 from libervia.backend.core import exceptions
44 from libervia.backend.core.core_types import SatXMPPEntity
45 from libervia.backend.tools import xml_tools
46 from libervia.backend.tools import sat_defer
47 from libervia.backend.tools import utils
48 from libervia.backend.tools.common import data_format
49 from libervia.backend.tools.common import uri as xmpp_uri
50 from libervia.backend.tools.common import regex
51
52
53 log = getLogger(__name__)
54
55
56 NS_MICROBLOG = "urn:xmpp:microblog:0"
57 NS_ATOM = "http://www.w3.org/2005/Atom"
58 NS_PUBSUB_EVENT = f"{pubsub.NS_PUBSUB}#event"
59 NS_COMMENT_PREFIX = f"{NS_MICROBLOG}:comments/"
60
61
62 PLUGIN_INFO = {
63 C.PI_NAME: "Microblogging over XMPP Plugin",
64 C.PI_IMPORT_NAME: "XEP-0277",
65 C.PI_TYPE: "XEP",
66 C.PI_MODES: C.PLUG_MODE_BOTH,
67 C.PI_PROTOCOLS: ["XEP-0277"],
68 C.PI_DEPENDENCIES: ["XEP-0163", "XEP-0060", "TEXT_SYNTAXES"],
69 C.PI_RECOMMENDATIONS: ["XEP-0059", "EXTRA-PEP", "PUBSUB_CACHE"],
70 C.PI_MAIN: "XEP_0277",
71 C.PI_HANDLER: "yes",
72 C.PI_DESCRIPTION: _("""Implementation of microblogging Protocol"""),
73 }
74
75
76 class NodeAccessChangeException(Exception):
77 pass
78
79
80 class XEP_0277(object):
81 namespace = NS_MICROBLOG
82 NS_ATOM = NS_ATOM
83
84 def __init__(self, host):
85 log.info(_("Microblogging plugin initialization"))
86 self.host = host
87 host.register_namespace("microblog", NS_MICROBLOG)
88 self._p = self.host.plugins[
89 "XEP-0060"
90 ] # this facilitate the access to pubsub plugin
91 ps_cache = self.host.plugins.get("PUBSUB_CACHE")
92 if ps_cache is not None:
93 ps_cache.register_analyser(
94 {
95 "name": "XEP-0277",
96 "node": NS_MICROBLOG,
97 "namespace": NS_ATOM,
98 "type": "blog",
99 "to_sync": True,
100 "parser": self.item_2_mb_data,
101 "match_cb": self._cache_node_match_cb,
102 }
103 )
104 self.rt_sessions = sat_defer.RTDeferredSessions()
105 self.host.plugins["XEP-0060"].add_managed_node(
106 NS_MICROBLOG, items_cb=self._items_received
107 )
108
109 host.bridge.add_method(
110 "mb_send",
111 ".plugin",
112 in_sign="ssss",
113 out_sign="s",
114 method=self._mb_send,
115 async_=True,
116 )
117 host.bridge.add_method(
118 "mb_repeat",
119 ".plugin",
120 in_sign="sssss",
121 out_sign="s",
122 method=self._mb_repeat,
123 async_=True,
124 )
125 host.bridge.add_method(
126 "mb_preview",
127 ".plugin",
128 in_sign="ssss",
129 out_sign="s",
130 method=self._mb_preview,
131 async_=True,
132 )
133 host.bridge.add_method(
134 "mb_retract",
135 ".plugin",
136 in_sign="ssss",
137 out_sign="",
138 method=self._mb_retract,
139 async_=True,
140 )
141 host.bridge.add_method(
142 "mb_get",
143 ".plugin",
144 in_sign="ssiasss",
145 out_sign="s",
146 method=self._mb_get,
147 async_=True,
148 )
149 host.bridge.add_method(
150 "mb_rename",
151 ".plugin",
152 in_sign="sssss",
153 out_sign="",
154 method=self._mb_rename,
155 async_=True,
156 )
157 host.bridge.add_method(
158 "mb_access_set",
159 ".plugin",
160 in_sign="ss",
161 out_sign="",
162 method=self.mb_access_set,
163 async_=True,
164 )
165 host.bridge.add_method(
166 "mb_subscribe_to_many",
167 ".plugin",
168 in_sign="sass",
169 out_sign="s",
170 method=self._mb_subscribe_to_many,
171 )
172 host.bridge.add_method(
173 "mb_get_from_many_rt_result",
174 ".plugin",
175 in_sign="ss",
176 out_sign="(ua(sssasa{ss}))",
177 method=self._mb_get_from_many_rt_result,
178 async_=True,
179 )
180 host.bridge.add_method(
181 "mb_get_from_many",
182 ".plugin",
183 in_sign="sasia{ss}s",
184 out_sign="s",
185 method=self._mb_get_from_many,
186 )
187 host.bridge.add_method(
188 "mb_get_from_many_with_comments_rt_result",
189 ".plugin",
190 in_sign="ss",
191 out_sign="(ua(sssa(sa(sssasa{ss}))a{ss}))",
192 method=self._mb_get_from_many_with_comments_rt_result,
193 async_=True,
194 )
195 host.bridge.add_method(
196 "mb_get_from_many_with_comments",
197 ".plugin",
198 in_sign="sasiia{ss}a{ss}s",
199 out_sign="s",
200 method=self._mb_get_from_many_with_comments,
201 )
202 host.bridge.add_method(
203 "mb_is_comment_node",
204 ".plugin",
205 in_sign="s",
206 out_sign="b",
207 method=self.is_comment_node,
208 )
209
210 def get_handler(self, client):
211 return XEP_0277_handler()
212
213 def _cache_node_match_cb(
214 self,
215 client: SatXMPPEntity,
216 analyse: dict,
217 ) -> None:
218 """Check is analysed node is a comment and fill analyse accordingly"""
219 if analyse["node"].startswith(NS_COMMENT_PREFIX):
220 analyse["subtype"] = "comment"
221
222 def _check_features_cb(self, available):
223 return {"available": C.BOOL_TRUE}
224
225 def _check_features_eb(self, fail):
226 return {"available": C.BOOL_FALSE}
227
228 def features_get(self, profile):
229 client = self.host.get_client(profile)
230 d = self.host.check_features(client, [], identity=("pubsub", "pep"))
231 d.addCallbacks(self._check_features_cb, self._check_features_eb)
232 return d
233
234 ## plugin management methods ##
235
236 def _items_received(self, client, itemsEvent):
237 """Callback which manage items notifications (publish + retract)"""
238
239 def manage_item(data, event):
240 self.host.bridge.ps_event(
241 C.PS_MICROBLOG,
242 itemsEvent.sender.full(),
243 itemsEvent.nodeIdentifier,
244 event,
245 data_format.serialise(data),
246 client.profile,
247 )
248
249 for item in itemsEvent.items:
250 if item.name == C.PS_ITEM:
251 # FIXME: service and node should be used here
252 self.item_2_mb_data(client, item, None, None).addCallbacks(
253 manage_item, lambda failure: None, (C.PS_PUBLISH,)
254 )
255 elif item.name == C.PS_RETRACT:
256 manage_item({"id": item["id"]}, C.PS_RETRACT)
257 else:
258 raise exceptions.InternalError("Invalid event value")
259
260 ## data/item transformation ##
261
262 @defer.inlineCallbacks
263 def item_2_mb_data(
264 self,
265 client: SatXMPPEntity,
266 item_elt: domish.Element,
267 service: Optional[jid.JID],
268 # FIXME: node is Optional until all calls to item_2_mb_data set properly service
269 # and node. Once done, the Optional must be removed here
270 node: Optional[str]
271 ) -> dict:
272 """Convert an XML Item to microblog data
273
274 @param item_elt: microblog item element
275 @param service: PubSub service where the item has been retrieved
276 profile's PEP is used when service is None
277 @param node: PubSub node where the item has been retrieved
278 if None, "uri" won't be set
279 @return: microblog data
280 """
281 if service is None:
282 service = client.jid.userhostJID()
283
284 extra: Dict[str, Any] = {}
285 microblog_data: Dict[str, Any] = {
286 "service": service.full(),
287 "extra": extra
288 }
289
290 def check_conflict(key, increment=False):
291 """Check if key is already in microblog data
292
293 @param key(unicode): key to check
294 @param increment(bool): if suffix the key with an increment
295 instead of raising an exception
296 @raise exceptions.DataError: the key already exists
297 (not raised if increment is True)
298 """
299 if key in microblog_data:
300 if not increment:
301 raise failure.Failure(
302 exceptions.DataError(
303 "key {} is already present for item {}"
304 ).format(key, item_elt["id"])
305 )
306 else:
307 idx = 1 # the idx 0 is the key without suffix
308 fmt = "{}#{}"
309 new_key = fmt.format(key, idx)
310 while new_key in microblog_data:
311 idx += 1
312 new_key = fmt.format(key, idx)
313 key = new_key
314 return key
315
316 @defer.inlineCallbacks
317 def parseElement(elem):
318 """Parse title/content elements and fill microblog_data accordingly"""
319 type_ = elem.getAttribute("type")
320 if type_ == "xhtml":
321 data_elt = elem.firstChildElement()
322 if data_elt is None:
323 raise failure.Failure(
324 exceptions.DataError(
325 "XHML content not wrapped in a <div/> element, this is not "
326 "standard !"
327 )
328 )
329 if data_elt.uri != C.NS_XHTML:
330 raise failure.Failure(
331 exceptions.DataError(
332 _("Content of type XHTML must declare its namespace!")
333 )
334 )
335 key = check_conflict("{}_xhtml".format(elem.name))
336 data = data_elt.toXml()
337 microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml(
338 data
339 )
340 else:
341 key = check_conflict(elem.name)
342 microblog_data[key] = str(elem)
343
344 id_ = item_elt.getAttribute("id", "") # there can be no id for transient nodes
345 microblog_data["id"] = id_
346 if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT):
347 msg = "Unsupported namespace {ns} in pubsub item {id_}".format(
348 ns=item_elt.uri, id_=id_
349 )
350 log.warning(msg)
351 raise failure.Failure(exceptions.DataError(msg))
352
353 try:
354 entry_elt = next(item_elt.elements(NS_ATOM, "entry"))
355 except StopIteration:
356 msg = "No atom entry found in the pubsub item {}".format(id_)
357 raise failure.Failure(exceptions.DataError(msg))
358
359 # uri
360 # FIXME: node should alway be set in the future, check FIXME in method signature
361 if node is not None:
362 microblog_data["node"] = node
363 microblog_data['uri'] = xmpp_uri.build_xmpp_uri(
364 "pubsub",
365 path=service.full(),
366 node=node,
367 item=id_,
368 )
369
370 # language
371 try:
372 microblog_data["language"] = entry_elt[(C.NS_XML, "lang")].strip()
373 except KeyError:
374 pass
375
376 # atom:id
377 try:
378 id_elt = next(entry_elt.elements(NS_ATOM, "id"))
379 except StopIteration:
380 msg = ("No atom id found in the pubsub item {}, this is not standard !"
381 .format(id_))
382 log.warning(msg)
383 microblog_data["atom_id"] = ""
384 else:
385 microblog_data["atom_id"] = str(id_elt)
386
387 # title/content(s)
388
389 # FIXME: ATOM and XEP-0277 only allow 1 <title/> element
390 # but in the wild we have some blogs with several ones
391 # so we don't respect the standard for now (it doesn't break
392 # anything anyway), and we'll find a better option later
393 # try:
394 # title_elt = entry_elt.elements(NS_ATOM, 'title').next()
395 # except StopIteration:
396 # msg = u'No atom title found in the pubsub item {}'.format(id_)
397 # raise failure.Failure(exceptions.DataError(msg))
398 title_elts = list(entry_elt.elements(NS_ATOM, "title"))
399 if not title_elts:
400 msg = "No atom title found in the pubsub item {}".format(id_)
401 raise failure.Failure(exceptions.DataError(msg))
402 for title_elt in title_elts:
403 yield parseElement(title_elt)
404
405 # FIXME: as for <title/>, Atom only authorise at most 1 content
406 # but XEP-0277 allows several ones. So for no we handle as
407 # if more than one can be present
408 for content_elt in entry_elt.elements(NS_ATOM, "content"):
409 yield parseElement(content_elt)
410
411 # we check that text content is present
412 for key in ("title", "content"):
413 if key not in microblog_data and ("{}_xhtml".format(key)) in microblog_data:
414 log.warning(
415 "item {id_} provide a {key}_xhtml data but not a text one".format(
416 id_=id_, key=key
417 )
418 )
419 # ... and do the conversion if it's not
420 microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert(
421 microblog_data["{}_xhtml".format(key)],
422 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML,
423 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT,
424 False,
425 )
426
427 if "content" not in microblog_data:
428 # use the atom title data as the microblog body content
429 microblog_data["content"] = microblog_data["title"]
430 del microblog_data["title"]
431 if "title_xhtml" in microblog_data:
432 microblog_data["content_xhtml"] = microblog_data["title_xhtml"]
433 del microblog_data["title_xhtml"]
434
435 # published/updated dates
436 try:
437 updated_elt = next(entry_elt.elements(NS_ATOM, "updated"))
438 except StopIteration:
439 msg = "No atom updated element found in the pubsub item {}".format(id_)
440 raise failure.Failure(exceptions.DataError(msg))
441 microblog_data["updated"] = calendar.timegm(
442 dateutil.parser.parse(str(updated_elt)).utctimetuple()
443 )
444 try:
445 published_elt = next(entry_elt.elements(NS_ATOM, "published"))
446 except StopIteration:
447 microblog_data["published"] = microblog_data["updated"]
448 else:
449 microblog_data["published"] = calendar.timegm(
450 dateutil.parser.parse(str(published_elt)).utctimetuple()
451 )
452
453 # links
454 comments = microblog_data['comments'] = []
455 for link_elt in entry_elt.elements(NS_ATOM, "link"):
456 href = link_elt.getAttribute("href")
457 if not href:
458 log.warning(
459 f'missing href in <link> element: {link_elt.toXml()}'
460 )
461 continue
462 rel = link_elt.getAttribute("rel")
463 if (rel == "replies" and link_elt.getAttribute("title") == "comments"):
464 uri = href
465 comments_data = {
466 "uri": uri,
467 }
468 try:
469 comment_service, comment_node = self.parse_comment_url(uri)
470 except Exception as e:
471 log.warning(f"Can't parse comments url: {e}")
472 continue
473 else:
474 comments_data["service"] = comment_service.full()
475 comments_data["node"] = comment_node
476 comments.append(comments_data)
477 elif rel == "via":
478 try:
479 repeater_jid = jid.JID(item_elt["publisher"])
480 except (KeyError, RuntimeError):
481 try:
482 # we look for stanza element which is at the root, meaning that it
483 # has not parent
484 top_elt = item_elt.parent
485 while top_elt.parent is not None:
486 top_elt = top_elt.parent
487 repeater_jid = jid.JID(top_elt["from"])
488 except (AttributeError, RuntimeError):
489 # we should always have either the "publisher" attribute or the
490 # stanza available
491 log.error(
492 f"Can't find repeater of the post: {item_elt.toXml()}"
493 )
494 continue
495
496 extra["repeated"] = {
497 "by": repeater_jid.full(),
498 "uri": href
499 }
500 elif rel in ("related", "enclosure"):
501 attachment: Dict[str, Any] = {
502 "sources": [{"url": href}]
503 }
504 if rel == "related":
505 attachment["external"] = True
506 for attr, key in (
507 ("type", "media_type"),
508 ("title", "desc"),
509 ):
510 value = link_elt.getAttribute(attr)
511 if value:
512 attachment[key] = value
513 try:
514 attachment["size"] = int(link_elt.attributes["lenght"])
515 except (KeyError, ValueError):
516 pass
517 if "media_type" not in attachment:
518 media_type = guess_type(href, False)[0]
519 if media_type is not None:
520 attachment["media_type"] = media_type
521
522 attachments = extra.setdefault("attachments", [])
523 attachments.append(attachment)
524 else:
525 log.warning(
526 f"Unmanaged link element: {link_elt.toXml()}"
527 )
528
529 # author
530 publisher = item_elt.getAttribute("publisher")
531 try:
532 author_elt = next(entry_elt.elements(NS_ATOM, "author"))
533 except StopIteration:
534 log.debug("Can't find author element in item {}".format(id_))
535 else:
536 # name
537 try:
538 name_elt = next(author_elt.elements(NS_ATOM, "name"))
539 except StopIteration:
540 log.warning(
541 "No name element found in author element of item {}".format(id_)
542 )
543 author = None
544 else:
545 author = microblog_data["author"] = str(name_elt).strip()
546 # uri
547 try:
548 uri_elt = next(author_elt.elements(NS_ATOM, "uri"))
549 except StopIteration:
550 log.debug(
551 "No uri element found in author element of item {}".format(id_)
552 )
553 if publisher:
554 microblog_data["author_jid"] = publisher
555 else:
556 uri = str(uri_elt)
557 if uri.startswith("xmpp:"):
558 uri = uri[5:]
559 microblog_data["author_jid"] = uri
560 else:
561 microblog_data["author_jid"] = (
562 item_elt.getAttribute("publisher") or ""
563 )
564 if not author and microblog_data["author_jid"]:
565 # FIXME: temporary workaround for missing author name, would be
566 # better to use directly JID's identity (to be done from frontends?)
567 try:
568 microblog_data["author"] = jid.JID(microblog_data["author_jid"]).user
569 except Exception as e:
570 log.warning(f"No author name found, and can't parse author jid: {e}")
571
572 if not publisher:
573 log.debug("No publisher attribute, we can't verify author jid")
574 microblog_data["author_jid_verified"] = False
575 elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID():
576 microblog_data["author_jid_verified"] = True
577 else:
578 if "repeated" not in extra:
579 log.warning(
580 "item atom:uri differ from publisher attribute, spoofing "
581 "attempt ? atom:uri = {} publisher = {}".format(
582 uri, item_elt.getAttribute("publisher")
583 )
584 )
585 microblog_data["author_jid_verified"] = False
586 # email
587 try:
588 email_elt = next(author_elt.elements(NS_ATOM, "email"))
589 except StopIteration:
590 pass
591 else:
592 microblog_data["author_email"] = str(email_elt)
593
594 if not microblog_data.get("author_jid"):
595 if publisher:
596 microblog_data["author_jid"] = publisher
597 microblog_data["author_jid_verified"] = True
598 else:
599 iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM)
600 microblog_data["author_jid"] = iq_elt["from"]
601 microblog_data["author_jid_verified"] = False
602
603 # categories
604 categories = [
605 category_elt.getAttribute("term", "")
606 for category_elt in entry_elt.elements(NS_ATOM, "category")
607 ]
608 microblog_data["tags"] = categories
609
610 ## the trigger ##
611 # if other plugins have things to add or change
612 yield self.host.trigger.point(
613 "XEP-0277_item2data", item_elt, entry_elt, microblog_data
614 )
615
616 defer.returnValue(microblog_data)
617
618 async def mb_data_2_entry_elt(self, client, mb_data, item_id, service, node):
619 """Convert a data dict to en entry usable to create an item
620
621 @param mb_data: data dict as given by bridge method.
622 @param item_id(unicode): id of the item to use
623 @param service(jid.JID, None): pubsub service where the item is sent
624 Needed to construct Atom id
625 @param node(unicode): pubsub node where the item is sent
626 Needed to construct Atom id
627 @return: deferred which fire domish.Element
628 """
629 entry_elt = domish.Element((NS_ATOM, "entry"))
630 extra = mb_data.get("extra", {})
631
632 ## language ##
633 if "language" in mb_data:
634 entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip()
635
636 ## content and title ##
637 synt = self.host.plugins["TEXT_SYNTAXES"]
638
639 for elem_name in ("title", "content"):
640 for type_ in ["", "_rich", "_xhtml"]:
641 attr = f"{elem_name}{type_}"
642 if attr in mb_data:
643 elem = entry_elt.addElement(elem_name)
644 if type_:
645 if type_ == "_rich": # convert input from current syntax to XHTML
646 xml_content = await synt.convert(
647 mb_data[attr], synt.get_current_syntax(client.profile), "XHTML"
648 )
649 if f"{elem_name}_xhtml" in mb_data:
650 raise failure.Failure(
651 exceptions.DataError(
652 _(
653 "Can't have xhtml and rich content at the same time"
654 )
655 )
656 )
657 else:
658 xml_content = mb_data[attr]
659
660 div_elt = xml_tools.ElementParser()(
661 xml_content, namespace=C.NS_XHTML
662 )
663 if (
664 div_elt.name != "div"
665 or div_elt.uri != C.NS_XHTML
666 or div_elt.attributes
667 ):
668 # we need a wrapping <div/> at the top with XHTML namespace
669 wrap_div_elt = domish.Element((C.NS_XHTML, "div"))
670 wrap_div_elt.addChild(div_elt)
671 div_elt = wrap_div_elt
672 elem.addChild(div_elt)
673 elem["type"] = "xhtml"
674 if elem_name not in mb_data:
675 # there is raw text content, which is mandatory
676 # so we create one from xhtml content
677 elem_txt = entry_elt.addElement(elem_name)
678 text_content = await self.host.plugins[
679 "TEXT_SYNTAXES"
680 ].convert(
681 xml_content,
682 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML,
683 self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT,
684 False,
685 )
686 elem_txt.addContent(text_content)
687 elem_txt["type"] = "text"
688
689 else: # raw text only needs to be escaped to get HTML-safe sequence
690 elem.addContent(mb_data[attr])
691 elem["type"] = "text"
692
693 try:
694 next(entry_elt.elements(NS_ATOM, "title"))
695 except StopIteration:
696 # we have no title element which is mandatory
697 # so we transform content element to title
698 elems = list(entry_elt.elements(NS_ATOM, "content"))
699 if not elems:
700 raise exceptions.DataError(
701 "There must be at least one content or title element"
702 )
703 for elem in elems:
704 elem.name = "title"
705
706 ## attachments ##
707 attachments = extra.get(C.KEY_ATTACHMENTS)
708 if attachments:
709 for attachment in attachments:
710 try:
711 url = attachment["url"]
712 except KeyError:
713 try:
714 url = next(
715 s['url'] for s in attachment["sources"] if 'url' in s
716 )
717 except (StopIteration, KeyError):
718 log.warning(
719 f'"url" missing in attachment, ignoring: {attachment}'
720 )
721 continue
722
723 if not url.startswith("http"):
724 log.warning(f"non HTTP URL in attachment, ignoring: {attachment}")
725 continue
726 link_elt = entry_elt.addElement("link")
727 # XXX: "uri" is set in self._manage_comments if not already existing
728 link_elt["href"] = url
729 if attachment.get("external", False):
730 # this is a link to an external data such as a website
731 link_elt["rel"] = "related"
732 else:
733 # this is an attached file
734 link_elt["rel"] = "enclosure"
735 for key, attr in (
736 ("media_type", "type"),
737 ("desc", "title"),
738 ("size", "lenght")
739 ):
740 value = attachment.get(key)
741 if value:
742 link_elt[attr] = str(value)
743
744 ## author ##
745 author_elt = entry_elt.addElement("author")
746 try:
747 author_name = mb_data["author"]
748 except KeyError:
749 # FIXME: must use better name
750 author_name = client.jid.user
751 author_elt.addElement("name", content=author_name)
752
753 try:
754 author_jid_s = mb_data["author_jid"]
755 except KeyError:
756 author_jid_s = client.jid.userhost()
757 author_elt.addElement("uri", content="xmpp:{}".format(author_jid_s))
758
759 try:
760 author_jid_s = mb_data["author_email"]
761 except KeyError:
762 pass
763
764 ## published/updated time ##
765 current_time = time.time()
766 entry_elt.addElement(
767 "updated", content=utils.xmpp_date(float(mb_data.get("updated", current_time)))
768 )
769 entry_elt.addElement(
770 "published",
771 content=utils.xmpp_date(float(mb_data.get("published", current_time))),
772 )
773
774 ## categories ##
775 for tag in mb_data.get('tags', []):
776 category_elt = entry_elt.addElement("category")
777 category_elt["term"] = tag
778
779 ## id ##
780 entry_id = mb_data.get(
781 "id",
782 xmpp_uri.build_xmpp_uri(
783 "pubsub",
784 path=service.full() if service is not None else client.jid.userhost(),
785 node=node,
786 item=item_id,
787 ),
788 )
789 entry_elt.addElement("id", content=entry_id) #
790
791 ## comments ##
792 for comments_data in mb_data.get('comments', []):
793 link_elt = entry_elt.addElement("link")
794 # XXX: "uri" is set in self._manage_comments if not already existing
795 link_elt["href"] = comments_data["uri"]
796 link_elt["rel"] = "replies"
797 link_elt["title"] = "comments"
798
799 if "repeated" in extra:
800 try:
801 repeated = extra["repeated"]
802 link_elt = entry_elt.addElement("link")
803 link_elt["rel"] = "via"
804 link_elt["href"] = repeated["uri"]
805 except KeyError as e:
806 log.warning(
807 f"invalid repeated element({e}): {extra['repeated']}"
808 )
809
810 ## final item building ##
811 item_elt = pubsub.Item(id=item_id, payload=entry_elt)
812
813 ## the trigger ##
814 # if other plugins have things to add or change
815 self.host.trigger.point(
816 "XEP-0277_data2entry", client, mb_data, entry_elt, item_elt
817 )
818
819 return item_elt
820
821 ## publish/preview ##
822
823 def is_comment_node(self, node: str) -> bool:
824 """Indicate if the node is prefixed with comments namespace"""
825 return node.startswith(NS_COMMENT_PREFIX)
826
827 def get_parent_item(self, item_id: str) -> str:
828 """Return parent of a comment node
829
830 @param item_id: a comment node
831 """
832 if not self.is_comment_node(item_id):
833 raise ValueError("This node is not a comment node")
834 return item_id[len(NS_COMMENT_PREFIX):]
835
836 def get_comments_node(self, item_id):
837 """Generate comment node
838
839 @param item_id(unicode): id of the parent item
840 @return (unicode): comment node to use
841 """
842 return f"{NS_COMMENT_PREFIX}{item_id}"
843
844 def get_comments_service(self, client, parent_service=None):
845 """Get prefered PubSub service to create comment node
846
847 @param pubsub_service(jid.JID, None): PubSub service of the parent item
848 @param return((D)jid.JID, None): PubSub service to use
849 """
850 if parent_service is not None:
851 if parent_service.user:
852 # we are on a PEP
853 if parent_service.host == client.jid.host:
854 #  it's our server, we use already found client.pubsub_service below
855 pass
856 else:
857 # other server, let's try to find a non PEP service there
858 d = self.host.find_service_entity(
859 client, "pubsub", "service", parent_service
860 )
861 d.addCallback(lambda entity: entity or parent_service)
862 else:
863 # parent is already on a normal Pubsub service, we re-use it
864 return defer.succeed(parent_service)
865
866 return defer.succeed(
867 client.pubsub_service if client.pubsub_service is not None else parent_service
868 )
869
870 async def _manage_comments(self, client, mb_data, service, node, item_id, access=None):
871 """Check comments keys in mb_data and create comments node if necessary
872
873 if a comments node metadata is set in the mb_data['comments'] list, it is used
874 otherwise it is generated (if allow_comments is True).
875 @param mb_data(dict): microblog mb_data
876 @param service(jid.JID, None): PubSub service of the parent item
877 @param node(unicode): node of the parent item
878 @param item_id(unicode): id of the parent item
879 @param access(unicode, None): access model
880 None to use same access model as parent item
881 """
882 allow_comments = mb_data.pop("allow_comments", None)
883 if allow_comments is None:
884 if "comments" in mb_data:
885 mb_data["allow_comments"] = True
886 else:
887 # no comments set or requested, nothing to do
888 return
889 elif allow_comments == False:
890 if "comments" in mb_data:
891 log.warning(
892 "comments are not allowed but there is already a comments node, "
893 "it may be lost: {uri}".format(
894 uri=mb_data["comments"]
895 )
896 )
897 del mb_data["comments"]
898 return
899
900 # we have usually a single comment node, but the spec allow several, so we need to
901 # handle this in a list
902 if len(mb_data.setdefault('comments', [])) == 0:
903 # we need at least one comment node
904 comments_data = {}
905 mb_data['comments'].append({})
906
907 if access is None:
908 # TODO: cache access models per service/node
909 parent_node_config = await self._p.getConfiguration(client, service, node)
910 access = parent_node_config.get(self._p.OPT_ACCESS_MODEL, self._p.ACCESS_OPEN)
911
912 options = {
913 self._p.OPT_ACCESS_MODEL: access,
914 self._p.OPT_MAX_ITEMS: "max",
915 self._p.OPT_PERSIST_ITEMS: 1,
916 self._p.OPT_DELIVER_PAYLOADS: 1,
917 self._p.OPT_SEND_ITEM_SUBSCRIBE: 1,
918 # FIXME: would it make sense to restrict publish model to subscribers?
919 self._p.OPT_PUBLISH_MODEL: self._p.ACCESS_OPEN,
920 }
921
922 # if other plugins need to change the options
923 self.host.trigger.point("XEP-0277_comments", client, mb_data, options)
924
925 for comments_data in mb_data['comments']:
926 uri = comments_data.get('uri')
927 comments_node = comments_data.get('node')
928 try:
929 comments_service = jid.JID(comments_data["service"])
930 except KeyError:
931 comments_service = None
932
933 if uri:
934 uri_service, uri_node = self.parse_comment_url(uri)
935 if ((comments_node is not None and comments_node!=uri_node)
936 or (comments_service is not None and comments_service!=uri_service)):
937 raise ValueError(
938 f"Incoherence between comments URI ({uri}) and comments_service "
939 f"({comments_service}) or comments_node ({comments_node})")
940 comments_data['service'] = comments_service = uri_service
941 comments_data['node'] = comments_node = uri_node
942 else:
943 if not comments_node:
944 comments_node = self.get_comments_node(item_id)
945 comments_data['node'] = comments_node
946 if comments_service is None:
947 comments_service = await self.get_comments_service(client, service)
948 if comments_service is None:
949 comments_service = client.jid.userhostJID()
950 comments_data['service'] = comments_service
951
952 comments_data['uri'] = xmpp_uri.build_xmpp_uri(
953 "pubsub",
954 path=comments_service.full(),
955 node=comments_node,
956 )
957
958 try:
959 await self._p.createNode(client, comments_service, comments_node, options)
960 except error.StanzaError as e:
961 if e.condition == "conflict":
962 log.info(
963 "node {} already exists on service {}".format(
964 comments_node, comments_service
965 )
966 )
967 else:
968 raise e
969 else:
970 if access == self._p.ACCESS_WHITELIST:
971 # for whitelist access we need to copy affiliations from parent item
972 comments_affiliations = await self._p.get_node_affiliations(
973 client, service, node
974 )
975 # …except for "member", that we transform to publisher
976 # because we wants members to be able to write to comments
977 for jid_, affiliation in list(comments_affiliations.items()):
978 if affiliation == "member":
979 comments_affiliations[jid_] == "publisher"
980
981 await self._p.set_node_affiliations(
982 client, comments_service, comments_node, comments_affiliations
983 )
984
985 def friendly_id(self, data):
986 """Generate a user friendly id from title or content"""
987 # TODO: rich content should be converted to plain text
988 id_base = regex.url_friendly_text(
989 data.get('title')
990 or data.get('title_rich')
991 or data.get('content')
992 or data.get('content_rich')
993 or ''
994 )
995 return f"{id_base}-{token_urlsafe(3)}"
996
997 def _mb_send(self, service, node, data, profile_key):
998 service = jid.JID(service) if service else None
999 node = node if node else NS_MICROBLOG
1000 client = self.host.get_client(profile_key)
1001 data = data_format.deserialise(data)
1002 return defer.ensureDeferred(self.send(client, data, service, node))
1003
1004 async def send(
1005 self,
1006 client: SatXMPPEntity,
1007 data: dict,
1008 service: Optional[jid.JID] = None,
1009 node: Optional[str] = NS_MICROBLOG
1010 ) -> Optional[str]:
1011 """Send XEP-0277's microblog data
1012
1013 @param data: microblog data (must include at least a "content" or a "title" key).
1014 see http://wiki.goffi.org/wiki/Bridge_API_-_Microblogging/en for details
1015 @param service: PubSub service where the microblog must be published
1016 None to publish on profile's PEP
1017 @param node: PubSub node to use (defaut to microblog NS)
1018 None is equivalend as using default value
1019 @return: ID of the published item
1020 """
1021 # TODO: check that all data keys are used, this would avoid sending publicly a private message
1022 # by accident (e.g. if group plugin is not loaded, and "group*" key are not used)
1023 if service is None:
1024 service = client.jid.userhostJID()
1025 if node is None:
1026 node = NS_MICROBLOG
1027
1028 item_id = data.get("id")
1029 if item_id is None:
1030 if data.get("user_friendly_id", True):
1031 item_id = self.friendly_id(data)
1032 else:
1033 item_id = str(shortuuid.uuid())
1034
1035 try:
1036 await self._manage_comments(client, data, service, node, item_id, access=None)
1037 except error.StanzaError:
1038 log.warning("Can't create comments node for item {}".format(item_id))
1039 item = await self.mb_data_2_entry_elt(client, data, item_id, service, node)
1040
1041 if not await self.host.trigger.async_point(
1042 "XEP-0277_send", client, service, node, item, data
1043 ):
1044 return None
1045
1046 extra = {}
1047 for key in ("encrypted", "encrypted_for", "signed"):
1048 value = data.get(key)
1049 if value is not None:
1050 extra[key] = value
1051
1052 await self._p.publish(client, service, node, [item], extra=extra)
1053 return item_id
1054
1055 def _mb_repeat(
1056 self,
1057 service_s: str,
1058 node: str,
1059 item: str,
1060 extra_s: str,
1061 profile_key: str
1062 ) -> defer.Deferred:
1063 service = jid.JID(service_s) if service_s else None
1064 node = node if node else NS_MICROBLOG
1065 client = self.host.get_client(profile_key)
1066 extra = data_format.deserialise(extra_s)
1067 d = defer.ensureDeferred(
1068 self.repeat(client, item, service, node, extra)
1069 )
1070 # [repeat] can return None, and we always need a str
1071 d.addCallback(lambda ret: ret or "")
1072 return d
1073
1074 async def repeat(
1075 self,
1076 client: SatXMPPEntity,
1077 item: str,
1078 service: Optional[jid.JID] = None,
1079 node: str = NS_MICROBLOG,
1080 extra: Optional[dict] = None,
1081 ) -> Optional[str]:
1082 """Re-publish a post from somewhere else
1083
1084 This is a feature often name "share" or "boost", it is generally used to make a
1085 publication more visible by sharing it with our own audience
1086 """
1087 if service is None:
1088 service = client.jid.userhostJID()
1089
1090 # we first get the post to repeat
1091 items, __ = await self._p.get_items(
1092 client,
1093 service,
1094 node,
1095 item_ids = [item]
1096 )
1097 if not items:
1098 raise exceptions.NotFound(
1099 f"no item found at node {node!r} on {service} with ID {item!r}"
1100 )
1101 item_elt = items[0]
1102 try:
1103 entry_elt = next(item_elt.elements(NS_ATOM, "entry"))
1104 except StopIteration:
1105 raise exceptions.DataError(
1106 "post to repeat is not a XEP-0277 blog item"
1107 )
1108
1109 # we want to be sure that we have an author element
1110 try:
1111 author_elt = next(entry_elt.elements(NS_ATOM, "author"))
1112 except StopIteration:
1113 author_elt = entry_elt.addElement("author")
1114
1115 try:
1116 next(author_elt.elements(NS_ATOM, "name"))
1117 except StopIteration:
1118 author_elt.addElement("name", content=service.user)
1119
1120 try:
1121 next(author_elt.elements(NS_ATOM, "uri"))
1122 except StopIteration:
1123 entry_elt.addElement(
1124 "uri", content=xmpp_uri.build_xmpp_uri(None, path=service.full())
1125 )
1126
1127 # we add the link indicating that it's a repeated post
1128 link_elt = entry_elt.addElement("link")
1129 link_elt["rel"] = "via"
1130 link_elt["href"] = xmpp_uri.build_xmpp_uri(
1131 "pubsub", path=service.full(), node=node, item=item
1132 )
1133
1134 return await self._p.send_item(
1135 client,
1136 client.jid.userhostJID(),
1137 NS_MICROBLOG,
1138 entry_elt
1139 )
1140
1141 def _mb_preview(self, service, node, data, profile_key):
1142 service = jid.JID(service) if service else None
1143 node = node if node else NS_MICROBLOG
1144 client = self.host.get_client(profile_key)
1145 data = data_format.deserialise(data)
1146 d = defer.ensureDeferred(self.preview(client, data, service, node))
1147 d.addCallback(data_format.serialise)
1148 return d
1149
1150 async def preview(
1151 self,
1152 client: SatXMPPEntity,
1153 data: dict,
1154 service: Optional[jid.JID] = None,
1155 node: Optional[str] = NS_MICROBLOG
1156 ) -> dict:
1157 """Preview microblog data without publishing them
1158
1159 params are the same as for [send]
1160 @return: microblog data as would be retrieved from published item
1161 """
1162 if node is None:
1163 node = NS_MICROBLOG
1164
1165 item_id = data.get("id", "")
1166
1167 # we have to serialise then deserialise to be sure that all triggers are called
1168 item_elt = await self.mb_data_2_entry_elt(client, data, item_id, service, node)
1169 item_elt.uri = pubsub.NS_PUBSUB
1170 return await self.item_2_mb_data(client, item_elt, service, node)
1171
1172
1173 ## retract ##
1174
1175 def _mb_retract(self, service_jid_s, nodeIdentifier, itemIdentifier, profile_key):
1176 """Call self._p._retract_item, but use default node if node is empty"""
1177 return self._p._retract_item(
1178 service_jid_s,
1179 nodeIdentifier or NS_MICROBLOG,
1180 itemIdentifier,
1181 True,
1182 profile_key,
1183 )
1184
1185 ## get ##
1186
1187 def _mb_get_serialise(self, data):
1188 items, metadata = data
1189 metadata['items'] = items
1190 return data_format.serialise(metadata)
1191
1192 def _mb_get(self, service="", node="", max_items=10, item_ids=None, extra="",
1193 profile_key=C.PROF_KEY_NONE):
1194 """
1195 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
1196 @param item_ids (list[unicode]): list of item IDs
1197 """
1198 client = self.host.get_client(profile_key)
1199 service = jid.JID(service) if service else None
1200 max_items = None if max_items == C.NO_LIMIT else max_items
1201 extra = self._p.parse_extra(data_format.deserialise(extra))
1202 d = defer.ensureDeferred(
1203 self.mb_get(client, service, node or None, max_items, item_ids,
1204 extra.rsm_request, extra.extra)
1205 )
1206 d.addCallback(self._mb_get_serialise)
1207 return d
1208
1209 async def mb_get(
1210 self,
1211 client: SatXMPPEntity,
1212 service: Optional[jid.JID] = None,
1213 node: Optional[str] = None,
1214 max_items: Optional[int] = 10,
1215 item_ids: Optional[List[str]] = None,
1216 rsm_request: Optional[rsm.RSMRequest] = None,
1217 extra: Optional[Dict[str, Any]] = None
1218 ) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]:
1219 """Get some microblogs
1220
1221 @param service(jid.JID, None): jid of the publisher
1222 None to get profile's PEP
1223 @param node(unicode, None): node to get (or microblog node if None)
1224 @param max_items(int): maximum number of item to get, None for no limit
1225 ignored if rsm_request is set
1226 @param item_ids (list[unicode]): list of item IDs
1227 @param rsm_request (rsm.RSMRequest): RSM request data
1228 @param extra (dict): extra data
1229
1230 @return: a deferred couple with the list of items and metadatas.
1231 """
1232 if node is None:
1233 node = NS_MICROBLOG
1234 if rsm_request:
1235 max_items = None
1236 items_data = await self._p.get_items(
1237 client,
1238 service,
1239 node,
1240 max_items=max_items,
1241 item_ids=item_ids,
1242 rsm_request=rsm_request,
1243 extra=extra,
1244 )
1245 mb_data_list, metadata = await self._p.trans_items_data_d(
1246 items_data, partial(self.item_2_mb_data, client, service=service, node=node))
1247 encrypted = metadata.pop("encrypted", None)
1248 if encrypted is not None:
1249 for mb_data in mb_data_list:
1250 try:
1251 mb_data["encrypted"] = encrypted[mb_data["id"]]
1252 except KeyError:
1253 pass
1254 return (mb_data_list, metadata)
1255
1256 def _mb_rename(self, service, node, item_id, new_id, profile_key):
1257 return defer.ensureDeferred(self.mb_rename(
1258 self.host.get_client(profile_key),
1259 jid.JID(service) if service else None,
1260 node or None,
1261 item_id,
1262 new_id
1263 ))
1264
1265 async def mb_rename(
1266 self,
1267 client: SatXMPPEntity,
1268 service: Optional[jid.JID],
1269 node: Optional[str],
1270 item_id: str,
1271 new_id: str
1272 ) -> None:
1273 if not node:
1274 node = NS_MICROBLOG
1275 await self._p.rename_item(client, service, node, item_id, new_id)
1276
1277 def parse_comment_url(self, node_url):
1278 """Parse a XMPP URI
1279
1280 Determine the fields comments_service and comments_node of a microblog data
1281 from the href attribute of an entry's link element. For example this input:
1282 xmpp:sat-pubsub.example.net?;node=urn%3Axmpp%3Acomments%3A_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn%3Axmpp%3Agroupblog%3Asomebody%40example.net
1283 will return(JID(u'sat-pubsub.example.net'), 'urn:xmpp:comments:_af43b363-3259-4b2a-ba4c-1bc33aa87634__urn:xmpp:groupblog:somebody@example.net')
1284 @return (tuple[jid.JID, unicode]): service and node
1285 """
1286 try:
1287 parsed_url = xmpp_uri.parse_xmpp_uri(node_url)
1288 service = jid.JID(parsed_url["path"])
1289 node = parsed_url["node"]
1290 except Exception as e:
1291 raise exceptions.DataError(f"Invalid comments link: {e}")
1292
1293 return (service, node)
1294
1295 ## configure ##
1296
1297 def mb_access_set(self, access="presence", profile_key=C.PROF_KEY_NONE):
1298 """Create a microblog node on PEP with given access
1299
1300 If the node already exists, it change options
1301 @param access: Node access model, according to xep-0060 #4.5
1302 @param profile_key: profile key
1303 """
1304 #  FIXME: check if this mehtod is need, deprecate it if not
1305 client = self.host.get_client(profile_key)
1306
1307 _options = {
1308 self._p.OPT_ACCESS_MODEL: access,
1309 self._p.OPT_MAX_ITEMS: "max",
1310 self._p.OPT_PERSIST_ITEMS: 1,
1311 self._p.OPT_DELIVER_PAYLOADS: 1,
1312 self._p.OPT_SEND_ITEM_SUBSCRIBE: 1,
1313 }
1314
1315 def cb(result):
1316 # Node is created with right permission
1317 log.debug(_("Microblog node has now access %s") % access)
1318
1319 def fatal_err(s_error):
1320 # Something went wrong
1321 log.error(_("Can't set microblog access"))
1322 raise NodeAccessChangeException()
1323
1324 def err_cb(s_error):
1325 # If the node already exists, the condition is "conflict",
1326 # else we have an unmanaged error
1327 if s_error.value.condition == "conflict":
1328 # d = self.host.plugins["XEP-0060"].deleteNode(client, client.jid.userhostJID(), NS_MICROBLOG)
1329 # d.addCallback(lambda x: create_node().addCallback(cb).addErrback(fatal_err))
1330 change_node_options().addCallback(cb).addErrback(fatal_err)
1331 else:
1332 fatal_err(s_error)
1333
1334 def create_node():
1335 return self._p.createNode(
1336 client, client.jid.userhostJID(), NS_MICROBLOG, _options
1337 )
1338
1339 def change_node_options():
1340 return self._p.setOptions(
1341 client.jid.userhostJID(),
1342 NS_MICROBLOG,
1343 client.jid.userhostJID(),
1344 _options,
1345 profile_key=profile_key,
1346 )
1347
1348 create_node().addCallback(cb).addErrback(err_cb)
1349
1350 ## methods to manage several stanzas/jids at once ##
1351
1352 # common
1353
1354 def _get_client_and_node_data(self, publishers_type, publishers, profile_key):
1355 """Helper method to construct node_data from publishers_type/publishers
1356
1357 @param publishers_type: type of the list of publishers, one of:
1358 C.ALL: get all jids from roster, publishers is not used
1359 C.GROUP: get jids from groups
1360 C.JID: use publishers directly as list of jids
1361 @param publishers: list of publishers, according to "publishers_type" (None,
1362 list of groups or list of jids)
1363 @param profile_key: %(doc_profile_key)s
1364 """
1365 client = self.host.get_client(profile_key)
1366 if publishers_type == C.JID:
1367 jids_set = set(publishers)
1368 else:
1369 jids_set = client.roster.get_jids_set(publishers_type, publishers)
1370 if publishers_type == C.ALL:
1371 try:
1372 # display messages from salut-a-toi@libervia.org or other PEP services
1373 services = self.host.plugins["EXTRA-PEP"].get_followed_entities(
1374 profile_key
1375 )
1376 except KeyError:
1377 pass # plugin is not loaded
1378 else:
1379 if services:
1380 log.debug(
1381 "Extra PEP followed entities: %s"
1382 % ", ".join([str(service) for service in services])
1383 )
1384 jids_set.update(services)
1385
1386 node_data = []
1387 for jid_ in jids_set:
1388 node_data.append((jid_, NS_MICROBLOG))
1389 return client, node_data
1390
1391 def _check_publishers(self, publishers_type, publishers):
1392 """Helper method to deserialise publishers coming from bridge
1393
1394 publishers_type(unicode): type of the list of publishers, one of:
1395 publishers: list of publishers according to type
1396 @return: deserialised (publishers_type, publishers) tuple
1397 """
1398 if publishers_type == C.ALL:
1399 if publishers:
1400 raise failure.Failure(
1401 ValueError(
1402 "Can't use publishers with {} type".format(publishers_type)
1403 )
1404 )
1405 else:
1406 publishers = None
1407 elif publishers_type == C.JID:
1408 publishers[:] = [jid.JID(publisher) for publisher in publishers]
1409 return publishers_type, publishers
1410
1411 # subscribe #
1412
1413 def _mb_subscribe_to_many(self, publishers_type, publishers, profile_key):
1414 """
1415
1416 @return (str): session id: Use pubsub.getSubscribeRTResult to get the results
1417 """
1418 publishers_type, publishers = self._check_publishers(publishers_type, publishers)
1419 return self.mb_subscribe_to_many(publishers_type, publishers, profile_key)
1420
1421 def mb_subscribe_to_many(self, publishers_type, publishers, profile_key):
1422 """Subscribe microblogs for a list of groups or jids
1423
1424 @param publishers_type: type of the list of publishers, one of:
1425 C.ALL: get all jids from roster, publishers is not used
1426 C.GROUP: get jids from groups
1427 C.JID: use publishers directly as list of jids
1428 @param publishers: list of publishers, according to "publishers_type" (None, list
1429 of groups or list of jids)
1430 @param profile: %(doc_profile)s
1431 @return (str): session id
1432 """
1433 client, node_data = self._get_client_and_node_data(
1434 publishers_type, publishers, profile_key
1435 )
1436 return self._p.subscribe_to_many(
1437 node_data, client.jid.userhostJID(), profile_key=profile_key
1438 )
1439
1440 # get #
1441
1442 def _mb_get_from_many_rt_result(self, session_id, profile_key=C.PROF_KEY_DEFAULT):
1443 """Get real-time results for mb_get_from_many session
1444
1445 @param session_id: id of the real-time deferred session
1446 @param return (tuple): (remaining, results) where:
1447 - remaining is the number of still expected results
1448 - results is a list of tuple with
1449 - service (unicode): pubsub service
1450 - node (unicode): pubsub node
1451 - failure (unicode): empty string in case of success, error message else
1452 - items_data(list): data as returned by [mb_get]
1453 - items_metadata(dict): metadata as returned by [mb_get]
1454 @param profile_key: %(doc_profile_key)s
1455 """
1456
1457 client = self.host.get_client(profile_key)
1458
1459 def onSuccess(items_data):
1460 """convert items elements to list of microblog data in items_data"""
1461 d = self._p.trans_items_data_d(
1462 items_data,
1463 # FIXME: service and node should be used here
1464 partial(self.item_2_mb_data, client),
1465 serialise=True
1466 )
1467 d.addCallback(lambda serialised: ("", serialised))
1468 return d
1469
1470 d = self._p.get_rt_results(
1471 session_id,
1472 on_success=onSuccess,
1473 on_error=lambda failure: (str(failure.value), ([], {})),
1474 profile=client.profile,
1475 )
1476 d.addCallback(
1477 lambda ret: (
1478 ret[0],
1479 [
1480 (service.full(), node, failure, items, metadata)
1481 for (service, node), (success, (failure, (items, metadata))) in ret[
1482 1
1483 ].items()
1484 ],
1485 )
1486 )
1487 return d
1488
1489 def _mb_get_from_many(self, publishers_type, publishers, max_items=10, extra_dict=None,
1490 profile_key=C.PROF_KEY_NONE):
1491 """
1492 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
1493 """
1494 max_items = None if max_items == C.NO_LIMIT else max_items
1495 publishers_type, publishers = self._check_publishers(publishers_type, publishers)
1496 extra = self._p.parse_extra(extra_dict)
1497 return self.mb_get_from_many(
1498 publishers_type,
1499 publishers,
1500 max_items,
1501 extra.rsm_request,
1502 extra.extra,
1503 profile_key,
1504 )
1505
1506 def mb_get_from_many(self, publishers_type, publishers, max_items=None, rsm_request=None,
1507 extra=None, profile_key=C.PROF_KEY_NONE):
1508 """Get the published microblogs for a list of groups or jids
1509
1510 @param publishers_type (str): type of the list of publishers (one of "GROUP" or
1511 "JID" or "ALL")
1512 @param publishers (list): list of publishers, according to publishers_type (list
1513 of groups or list of jids)
1514 @param max_items (int): optional limit on the number of retrieved items.
1515 @param rsm_request (rsm.RSMRequest): RSM request data, common to all publishers
1516 @param extra (dict): Extra data
1517 @param profile_key: profile key
1518 @return (str): RT Deferred session id
1519 """
1520 # XXX: extra is unused here so far
1521 client, node_data = self._get_client_and_node_data(
1522 publishers_type, publishers, profile_key
1523 )
1524 return self._p.get_from_many(
1525 node_data, max_items, rsm_request, profile_key=profile_key
1526 )
1527
1528 # comments #
1529
1530 def _mb_get_from_many_with_comments_rt_result_serialise(self, data):
1531 """Serialisation of result
1532
1533 This is probably the longest method name of whole SàT ecosystem ^^
1534 @param data(dict): data as received by rt_sessions
1535 @return (tuple): see [_mb_get_from_many_with_comments_rt_result]
1536 """
1537 ret = []
1538 data_iter = iter(data[1].items())
1539 for (service, node), (success, (failure_, (items_data, metadata))) in data_iter:
1540 items = []
1541 for item, item_metadata in items_data:
1542 item = data_format.serialise(item)
1543 items.append((item, item_metadata))
1544 ret.append((
1545 service.full(),
1546 node,
1547 failure_,
1548 items,
1549 metadata))
1550
1551 return data[0], ret
1552
1553 def _mb_get_from_many_with_comments_rt_result(self, session_id,
1554 profile_key=C.PROF_KEY_DEFAULT):
1555 """Get real-time results for [mb_get_from_many_with_comments] session
1556
1557 @param session_id: id of the real-time deferred session
1558 @param return (tuple): (remaining, results) where:
1559 - remaining is the number of still expected results
1560 - results is a list of 5-tuple with
1561 - service (unicode): pubsub service
1562 - node (unicode): pubsub node
1563 - failure (unicode): empty string in case of success, error message else
1564 - items(list[tuple(dict, list)]): list of 2-tuple with
1565 - item(dict): item microblog data
1566 - comments_list(list[tuple]): list of 5-tuple with
1567 - service (unicode): pubsub service where the comments node is
1568 - node (unicode): comments node
1569 - failure (unicode): empty in case of success, else error message
1570 - comments(list[dict]): list of microblog data
1571 - comments_metadata(dict): metadata of the comment node
1572 - metadata(dict): original node metadata
1573 @param profile_key: %(doc_profile_key)s
1574 """
1575 profile = self.host.get_client(profile_key).profile
1576 d = self.rt_sessions.get_results(session_id, profile=profile)
1577 d.addCallback(self._mb_get_from_many_with_comments_rt_result_serialise)
1578 return d
1579
1580 def _mb_get_from_many_with_comments(self, publishers_type, publishers, max_items=10,
1581 max_comments=C.NO_LIMIT, extra_dict=None,
1582 extra_comments_dict=None, profile_key=C.PROF_KEY_NONE):
1583 """
1584 @param max_items(int): maximum number of item to get, C.NO_LIMIT for no limit
1585 @param max_comments(int): maximum number of comments to get, C.NO_LIMIT for no
1586 limit
1587 """
1588 max_items = None if max_items == C.NO_LIMIT else max_items
1589 max_comments = None if max_comments == C.NO_LIMIT else max_comments
1590 publishers_type, publishers = self._check_publishers(publishers_type, publishers)
1591 extra = self._p.parse_extra(extra_dict)
1592 extra_comments = self._p.parse_extra(extra_comments_dict)
1593 return self.mb_get_from_many_with_comments(
1594 publishers_type,
1595 publishers,
1596 max_items,
1597 max_comments or None,
1598 extra.rsm_request,
1599 extra.extra,
1600 extra_comments.rsm_request,
1601 extra_comments.extra,
1602 profile_key,
1603 )
1604
1605 def mb_get_from_many_with_comments(self, publishers_type, publishers, max_items=None,
1606 max_comments=None, rsm_request=None, extra=None,
1607 rsm_comments=None, extra_comments=None,
1608 profile_key=C.PROF_KEY_NONE):
1609 """Helper method to get the microblogs and their comments in one shot
1610
1611 @param publishers_type (str): type of the list of publishers (one of "GROUP" or
1612 "JID" or "ALL")
1613 @param publishers (list): list of publishers, according to publishers_type (list
1614 of groups or list of jids)
1615 @param max_items (int): optional limit on the number of retrieved items.
1616 @param max_comments (int): maximum number of comments to retrieve
1617 @param rsm_request (rsm.RSMRequest): RSM request for initial items only
1618 @param extra (dict): extra configuration for initial items only
1619 @param rsm_comments (rsm.RSMRequest): RSM request for comments only
1620 @param extra_comments (dict): extra configuration for comments only
1621 @param profile_key: profile key
1622 @return (str): RT Deferred session id
1623 """
1624 # XXX: this method seems complicated because it do a couple of treatments
1625 # to serialise and associate the data, but it make life in frontends side
1626 # a lot easier
1627
1628 client, node_data = self._get_client_and_node_data(
1629 publishers_type, publishers, profile_key
1630 )
1631
1632 def get_comments(items_data):
1633 """Retrieve comments and add them to the items_data
1634
1635 @param items_data: serialised items data
1636 @return (defer.Deferred): list of items where each item is associated
1637 with a list of comments data (service, node, list of items, metadata)
1638 """
1639 items, metadata = items_data
1640 items_dlist = [] # deferred list for items
1641 for item in items:
1642 dlist = [] # deferred list for comments
1643 for key, value in item.items():
1644 # we look for comments
1645 if key.startswith("comments") and key.endswith("_service"):
1646 prefix = key[: key.find("_")]
1647 service_s = value
1648 service = jid.JID(service_s)
1649 node = item["{}{}".format(prefix, "_node")]
1650 # time to get the comments
1651 d = defer.ensureDeferred(
1652 self._p.get_items(
1653 client,
1654 service,
1655 node,
1656 max_comments,
1657 rsm_request=rsm_comments,
1658 extra=extra_comments,
1659 )
1660 )
1661 # then serialise
1662 d.addCallback(
1663 lambda items_data: self._p.trans_items_data_d(
1664 items_data,
1665 partial(
1666 self.item_2_mb_data, client, service=service, node=node
1667 ),
1668 serialise=True
1669 )
1670 )
1671 # with failure handling
1672 d.addCallback(
1673 lambda serialised_items_data: ("",) + serialised_items_data
1674 )
1675 d.addErrback(lambda failure: (str(failure.value), [], {}))
1676 # and associate with service/node (needed if there are several
1677 # comments nodes)
1678 d.addCallback(
1679 lambda serialised, service_s=service_s, node=node: (
1680 service_s,
1681 node,
1682 )
1683 + serialised
1684 )
1685 dlist.append(d)
1686 # we get the comments
1687 comments_d = defer.gatherResults(dlist)
1688 # and add them to the item data
1689 comments_d.addCallback(
1690 lambda comments_data, item=item: (item, comments_data)
1691 )
1692 items_dlist.append(comments_d)
1693 # we gather the items + comments in a list
1694 items_d = defer.gatherResults(items_dlist)
1695 # and add the metadata
1696 items_d.addCallback(lambda items_completed: (items_completed, metadata))
1697 return items_d
1698
1699 deferreds = {}
1700 for service, node in node_data:
1701 d = deferreds[(service, node)] = defer.ensureDeferred(self._p.get_items(
1702 client, service, node, max_items, rsm_request=rsm_request, extra=extra
1703 ))
1704 d.addCallback(
1705 lambda items_data: self._p.trans_items_data_d(
1706 items_data,
1707 partial(self.item_2_mb_data, client, service=service, node=node),
1708 )
1709 )
1710 d.addCallback(get_comments)
1711 d.addCallback(lambda items_comments_data: ("", items_comments_data))
1712 d.addErrback(lambda failure: (str(failure.value), ([], {})))
1713
1714 return self.rt_sessions.new_session(deferreds, client.profile)
1715
1716
1717 @implementer(iwokkel.IDisco)
1718 class XEP_0277_handler(XMPPHandler):
1719
1720 def getDiscoInfo(self, requestor, target, nodeIdentifier=""):
1721 return [disco.DiscoFeature(NS_MICROBLOG)]
1722
1723 def getDiscoItems(self, requestor, target, nodeIdentifier=""):
1724 return []