Mercurial > libervia-backend
comparison sat/plugins/plugin_xep_0277.py @ 4020:d8a1219e913f
plugin XEP-0277: handle "related" and "enclosure" links:
those links are used for attachments, "enclosure" for files, "related" when it's other
kind of data, such as an external website (in this case the `external` key is set in the
attchment).
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 23 Mar 2023 15:32:10 +0100 |
parents | 86efd854dee1 |
children | 78b5f356900c |
comparison
equal
deleted
inserted
replaced
4019:7bf7677b893d | 4020:d8a1219e913f |
---|---|
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import time | 19 import time |
20 import dateutil | 20 import dateutil |
21 import calendar | 21 import calendar |
22 from mimetypes import guess_type | |
22 from secrets import token_urlsafe | 23 from secrets import token_urlsafe |
23 from typing import List, Optional, Dict, Tuple, Union, Any, Dict | 24 from typing import List, Optional, Dict, Tuple, Any, Dict |
24 from functools import partial | 25 from functools import partial |
25 | 26 |
26 import shortuuid | 27 import shortuuid |
27 | 28 |
28 from twisted.words.protocols.jabber import jid, error | 29 from twisted.words.protocols.jabber import jid, error |
450 ) | 451 ) |
451 | 452 |
452 # links | 453 # links |
453 comments = microblog_data['comments'] = [] | 454 comments = microblog_data['comments'] = [] |
454 for link_elt in entry_elt.elements(NS_ATOM, "link"): | 455 for link_elt in entry_elt.elements(NS_ATOM, "link"): |
456 href = link_elt.getAttribute("href") | |
457 if not href: | |
458 log.warning( | |
459 f'missing href in <link> element: {link_elt.toXml()}' | |
460 ) | |
461 continue | |
455 rel = link_elt.getAttribute("rel") | 462 rel = link_elt.getAttribute("rel") |
456 if (rel == "replies" and link_elt.getAttribute("title") == "comments"): | 463 if (rel == "replies" and link_elt.getAttribute("title") == "comments"): |
457 uri = link_elt["href"] | 464 uri = href |
458 comments_data = { | 465 comments_data = { |
459 "uri": uri, | 466 "uri": uri, |
460 } | 467 } |
461 try: | 468 try: |
462 comment_service, comment_node = self.parseCommentUrl(uri) | 469 comment_service, comment_node = self.parseCommentUrl(uri) |
466 else: | 473 else: |
467 comments_data["service"] = comment_service.full() | 474 comments_data["service"] = comment_service.full() |
468 comments_data["node"] = comment_node | 475 comments_data["node"] = comment_node |
469 comments.append(comments_data) | 476 comments.append(comments_data) |
470 elif rel == "via": | 477 elif rel == "via": |
471 href = link_elt.getAttribute("href") | |
472 if not href: | |
473 log.warning( | |
474 f'missing href in "via" <link> element: {link_elt.toXml()}' | |
475 ) | |
476 continue | |
477 try: | 478 try: |
478 repeater_jid = jid.JID(item_elt["publisher"]) | 479 repeater_jid = jid.JID(item_elt["publisher"]) |
479 except (KeyError, RuntimeError): | 480 except (KeyError, RuntimeError): |
480 try: | 481 try: |
481 # we look for stanza element which is at the root, meaning that it | 482 # we look for stanza element which is at the root, meaning that it |
494 | 495 |
495 extra["repeated"] = { | 496 extra["repeated"] = { |
496 "by": repeater_jid.full(), | 497 "by": repeater_jid.full(), |
497 "uri": href | 498 "uri": href |
498 } | 499 } |
500 elif rel in ("related", "enclosure"): | |
501 attachment: Dict[str, Any] = { | |
502 "sources": [{"url": href}] | |
503 } | |
504 if rel == "related": | |
505 attachment["external"] = True | |
506 for attr, key in ( | |
507 ("type", "media_type"), | |
508 ("title", "desc"), | |
509 ): | |
510 value = link_elt.getAttribute(attr) | |
511 if value: | |
512 attachment[key] = value | |
513 try: | |
514 attachment["size"] = int(link_elt.attributes["lenght"]) | |
515 except (KeyError, ValueError): | |
516 pass | |
517 if "media_type" not in attachment: | |
518 media_type = guess_type(href, False)[0] | |
519 if media_type is not None: | |
520 attachment["media_type"] = media_type | |
521 | |
522 attachments = extra.setdefault("attachments", []) | |
523 attachments.append(attachment) | |
499 else: | 524 else: |
500 title = link_elt.getAttribute("title", "") | |
501 href = link_elt.getAttribute("href", "") | |
502 log.warning( | 525 log.warning( |
503 "Unmanaged link element: rel={rel} title={title} href={href}".format( | 526 f"Unmanaged link element: {link_elt.toXml()}" |
504 rel=rel, title=title, href=href | |
505 ) | |
506 ) | 527 ) |
507 | 528 |
508 # author | 529 # author |
509 publisher = item_elt.getAttribute("publisher") | 530 publisher = item_elt.getAttribute("publisher") |
510 try: | 531 try: |
604 @param node(unicode): pubsub node where the item is sent | 625 @param node(unicode): pubsub node where the item is sent |
605 Needed to construct Atom id | 626 Needed to construct Atom id |
606 @return: deferred which fire domish.Element | 627 @return: deferred which fire domish.Element |
607 """ | 628 """ |
608 entry_elt = domish.Element((NS_ATOM, "entry")) | 629 entry_elt = domish.Element((NS_ATOM, "entry")) |
630 extra = mb_data.get("extra", {}) | |
609 | 631 |
610 ## language ## | 632 ## language ## |
611 if "language" in mb_data: | 633 if "language" in mb_data: |
612 entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip() | 634 entry_elt[(C.NS_XML, "lang")] = mb_data["language"].strip() |
613 | 635 |
614 ## content and title ## | 636 ## content and title ## |
615 synt = self.host.plugins["TEXT_SYNTAXES"] | 637 synt = self.host.plugins["TEXT_SYNTAXES"] |
616 | 638 |
617 for elem_name in ("title", "content"): | 639 for elem_name in ("title", "content"): |
618 for type_ in ["", "_rich", "_xhtml"]: | 640 for type_ in ["", "_rich", "_xhtml"]: |
619 attr = "{}{}".format(elem_name, type_) | 641 attr = f"{elem_name}{type_}" |
620 if attr in mb_data: | 642 if attr in mb_data: |
621 elem = entry_elt.addElement(elem_name) | 643 elem = entry_elt.addElement(elem_name) |
622 if type_: | 644 if type_: |
623 if type_ == "_rich": # convert input from current syntax to XHTML | 645 if type_ == "_rich": # convert input from current syntax to XHTML |
624 xml_content = await synt.convert( | 646 xml_content = await synt.convert( |
625 mb_data[attr], synt.getCurrentSyntax(client.profile), "XHTML" | 647 mb_data[attr], synt.getCurrentSyntax(client.profile), "XHTML" |
626 ) | 648 ) |
627 if "{}_xhtml".format(elem_name) in mb_data: | 649 if f"{elem_name}_xhtml" in mb_data: |
628 raise failure.Failure( | 650 raise failure.Failure( |
629 exceptions.DataError( | 651 exceptions.DataError( |
630 _( | 652 _( |
631 "Can't have xhtml and rich content at the same time" | 653 "Can't have xhtml and rich content at the same time" |
632 ) | 654 ) |
679 "There must be at least one content or title element" | 701 "There must be at least one content or title element" |
680 ) | 702 ) |
681 for elem in elems: | 703 for elem in elems: |
682 elem.name = "title" | 704 elem.name = "title" |
683 | 705 |
706 ## attachments ## | |
707 attachments = extra.get(C.KEY_ATTACHMENTS) | |
708 if attachments: | |
709 for attachment in attachments: | |
710 try: | |
711 url = attachment["url"] | |
712 except KeyError: | |
713 try: | |
714 url = next( | |
715 s['url'] for s in attachment["sources"] if 'url' in s | |
716 ) | |
717 except (StopIteration, KeyError): | |
718 log.warning( | |
719 f'"url" missing in attachment, ignoring: {attachment}' | |
720 ) | |
721 continue | |
722 | |
723 if not url.startswith("http"): | |
724 log.warning(f"non HTTP URL in attachment, ignoring: {attachment}") | |
725 continue | |
726 link_elt = entry_elt.addElement("link") | |
727 # XXX: "uri" is set in self._manageComments if not already existing | |
728 link_elt["href"] = url | |
729 if attachment.get("external", False): | |
730 # this is a link to an external data such as a website | |
731 link_elt["rel"] = "related" | |
732 else: | |
733 # this is an attached file | |
734 link_elt["rel"] = "enclosure" | |
735 for key, attr in ( | |
736 ("media_type", "type"), | |
737 ("desc", "title"), | |
738 ("size", "lenght") | |
739 ): | |
740 value = attachment.get(key) | |
741 if value: | |
742 link_elt[attr] = str(value) | |
743 | |
684 ## author ## | 744 ## author ## |
685 author_elt = entry_elt.addElement("author") | 745 author_elt = entry_elt.addElement("author") |
686 try: | 746 try: |
687 author_name = mb_data["author"] | 747 author_name = mb_data["author"] |
688 except KeyError: | 748 except KeyError: |
734 # XXX: "uri" is set in self._manageComments if not already existing | 794 # XXX: "uri" is set in self._manageComments if not already existing |
735 link_elt["href"] = comments_data["uri"] | 795 link_elt["href"] = comments_data["uri"] |
736 link_elt["rel"] = "replies" | 796 link_elt["rel"] = "replies" |
737 link_elt["title"] = "comments" | 797 link_elt["title"] = "comments" |
738 | 798 |
739 extra = mb_data.get("extra", {}) | |
740 if "repeated" in extra: | 799 if "repeated" in extra: |
741 try: | 800 try: |
742 repeated = extra["repeated"] | 801 repeated = extra["repeated"] |
743 link_elt = entry_elt.addElement("link") | 802 link_elt = entry_elt.addElement("link") |
744 link_elt["rel"] = "via" | 803 link_elt["rel"] = "via" |