comparison sat/plugins/plugin_xep_0277.py @ 4020:d8a1219e913f

plugin XEP-0277: handle "related" and "enclosure" links: those links are used for attachments, "enclosure" for files, "related" when it's other kind of data, such as an external website (in this case the `external` key is set in the attchment).
author Goffi <goffi@goffi.org>
date Thu, 23 Mar 2023 15:32:10 +0100
parents 86efd854dee1
children 78b5f356900c
comparison
equal deleted inserted replaced
4019:7bf7677b893d 4020:d8a1219e913f
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. 17 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 18
19 import time 19 import time
20 import dateutil 20 import dateutil
21 import calendar 21 import calendar
22 from mimetypes import guess_type
22 from secrets import token_urlsafe 23 from secrets import token_urlsafe
23 from typing import List, Optional, Dict, Tuple, Union, Any, Dict 24 from typing import List, Optional, Dict, Tuple, Any, Dict
24 from functools import partial 25 from functools import partial
25 26
26 import shortuuid 27 import shortuuid
27 28
28 from twisted.words.protocols.jabber import jid, error 29 from twisted.words.protocols.jabber import jid, error
450 ) 451 )
451 452
452 # links 453 # links
453 comments = microblog_data['comments'] = [] 454 comments = microblog_data['comments'] = []
454 for link_elt in entry_elt.elements(NS_ATOM, "link"): 455 for link_elt in entry_elt.elements(NS_ATOM, "link"):
456 href = link_elt.getAttribute("href")
457 if not href:
458 log.warning(
459 f'missing href in <link> element: {link_elt.toXml()}'
460 )
461 continue
455 rel = link_elt.getAttribute("rel") 462 rel = link_elt.getAttribute("rel")
456 if (rel == "replies" and link_elt.getAttribute("title") == "comments"): 463 if (rel == "replies" and link_elt.getAttribute("title") == "comments"):
457 uri = link_elt["href"] 464 uri = href
458 comments_data = { 465 comments_data = {
459 "uri": uri, 466 "uri": uri,
460 } 467 }
461 try: 468 try:
462 comment_service, comment_node = self.parseCommentUrl(uri) 469 comment_service, comment_node = self.parseCommentUrl(uri)
466 else: 473 else:
467 comments_data["service"] = comment_service.full() 474 comments_data["service"] = comment_service.full()
468 comments_data["node"] = comment_node 475 comments_data["node"] = comment_node
469 comments.append(comments_data) 476 comments.append(comments_data)
470 elif rel == "via": 477 elif rel == "via":
471 href = link_elt.getAttribute("href")
472 if not href:
473 log.warning(
474 f'missing href in "via" <link> element: {link_elt.toXml()}'
475 )
476 continue
477 try: 478 try:
478 repeater_jid = jid.JID(item_elt["publisher"]) 479 repeater_jid = jid.JID(item_elt["publisher"])
479 except (KeyError, RuntimeError): 480 except (KeyError, RuntimeError):
480 try: 481 try:
481 # we look for stanza element which is at the root, meaning that it 482 # we look for stanza element which is at the root, meaning that it
494 495
495 extra["repeated"] = { 496 extra["repeated"] = {
496 "by": repeater_jid.full(), 497 "by": repeater_jid.full(),
497 "uri": href 498 "uri": href
498 } 499 }
500 elif rel in ("related", "enclosure"):
501 attachment: Dict[str, Any] = {
502 "sources": [{"url": href}]
503 }
504 if rel == "related":
505 attachment["external"] = True
506 for attr, key in (
507 ("type", "media_type"),
508 ("title", "desc"),
509 ):
510 value = link_elt.getAttribute(attr)
511 if value:
512 attachment[key] = value
513 try:
514 attachment["size"] = int(link_elt.attributes["lenght"])
515 except (KeyError, ValueError):
516 pass
517 if "media_type" not in attachment:
518 media_type = guess_type(href, False)[0]
519 if media_type is not None:
520 attachment["media_type"] = media_type
521
522 attachments = extra.setdefault("attachments", [])
523 attachments.append(attachment)
499 else: 524 else:
500 title = link_elt.getAttribute("title", "")
501 href = link_elt.getAttribute("href", "")
502 log.warning( 525 log.warning(
503 "Unmanaged link element: rel={rel} title={title} href={href}".format( 526 f"Unmanaged link element: {link_elt.toXml()}"
504 rel=rel, title=title, href=href
505 )
506 ) 527 )
507 528
508 # author 529 # author
509 publisher = item_elt.getAttribute("publisher") 530 publisher = item_elt.getAttribute("publisher")
510 try: 531 try:
604 @param node(unicode): pubsub node where the item is sent 625 @param node(unicode): pubsub node where the item is sent
605 Needed to construct Atom id 626 Needed to construct Atom id
606 @return: deferred which fire domish.Element 627 @return: deferred which fire domish.Element
607 """ 628 """
608 entry_elt = domish.Element((NS_ATOM, "entry")) 629 entry_elt = domish.Element((NS_ATOM, "entry"))
630 extra = mb_data.get("extra", {})
609 631
610 ## language ## 632 ## language ##
611 if "language" in mb_data: 633 if "language" in mb_data:
612 entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip() 634 entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip()
613 635
614 ## content and title ## 636 ## content and title ##
615 synt = self.host.plugins["TEXT_SYNTAXES"] 637 synt = self.host.plugins["TEXT_SYNTAXES"]
616 638
617 for elem_name in ("title", "content"): 639 for elem_name in ("title", "content"):
618 for type_ in ["", "_rich", "_xhtml"]: 640 for type_ in ["", "_rich", "_xhtml"]:
619 attr = "{}{}".format(elem_name, type_) 641 attr = f"{elem_name}{type_}"
620 if attr in mb_data: 642 if attr in mb_data:
621 elem = entry_elt.addElement(elem_name) 643 elem = entry_elt.addElement(elem_name)
622 if type_: 644 if type_:
623 if type_ == "_rich": # convert input from current syntax to XHTML 645 if type_ == "_rich": # convert input from current syntax to XHTML
624 xml_content = await synt.convert( 646 xml_content = await synt.convert(
625 mb_data[attr], synt.getCurrentSyntax(client.profile), "XHTML" 647 mb_data[attr], synt.getCurrentSyntax(client.profile), "XHTML"
626 ) 648 )
627 if "{}_xhtml".format(elem_name) in mb_data: 649 if f"{elem_name}_xhtml" in mb_data:
628 raise failure.Failure( 650 raise failure.Failure(
629 exceptions.DataError( 651 exceptions.DataError(
630 _( 652 _(
631 "Can't have xhtml and rich content at the same time" 653 "Can't have xhtml and rich content at the same time"
632 ) 654 )
679 "There must be at least one content or title element" 701 "There must be at least one content or title element"
680 ) 702 )
681 for elem in elems: 703 for elem in elems:
682 elem.name = "title" 704 elem.name = "title"
683 705
706 ## attachments ##
707 attachments = extra.get(C.KEY_ATTACHMENTS)
708 if attachments:
709 for attachment in attachments:
710 try:
711 url = attachment["url"]
712 except KeyError:
713 try:
714 url = next(
715 s['url'] for s in attachment["sources"] if 'url' in s
716 )
717 except (StopIteration, KeyError):
718 log.warning(
719 f'"url" missing in attachment, ignoring: {attachment}'
720 )
721 continue
722
723 if not url.startswith("http"):
724 log.warning(f"non HTTP URL in attachment, ignoring: {attachment}")
725 continue
726 link_elt = entry_elt.addElement("link")
727 # XXX: "uri" is set in self._manageComments if not already existing
728 link_elt["href"] = url
729 if attachment.get("external", False):
730 # this is a link to an external data such as a website
731 link_elt["rel"] = "related"
732 else:
733 # this is an attached file
734 link_elt["rel"] = "enclosure"
735 for key, attr in (
736 ("media_type", "type"),
737 ("desc", "title"),
738 ("size", "lenght")
739 ):
740 value = attachment.get(key)
741 if value:
742 link_elt[attr] = str(value)
743
684 ## author ## 744 ## author ##
685 author_elt = entry_elt.addElement("author") 745 author_elt = entry_elt.addElement("author")
686 try: 746 try:
687 author_name = mb_data["author"] 747 author_name = mb_data["author"]
688 except KeyError: 748 except KeyError:
734 # XXX: "uri" is set in self._manageComments if not already existing 794 # XXX: "uri" is set in self._manageComments if not already existing
735 link_elt["href"] = comments_data["uri"] 795 link_elt["href"] = comments_data["uri"]
736 link_elt["rel"] = "replies" 796 link_elt["rel"] = "replies"
737 link_elt["title"] = "comments" 797 link_elt["title"] = "comments"
738 798
739 extra = mb_data.get("extra", {})
740 if "repeated" in extra: 799 if "repeated" in extra:
741 try: 800 try:
742 repeated = extra["repeated"] 801 repeated = extra["repeated"]
743 link_elt = entry_elt.addElement("link") 802 link_elt = entry_elt.addElement("link")
744 link_elt["rel"] = "via" 803 link_elt["rel"] = "via"