changeset 4175:30f7513e5590

plugin XEP-0277: generate and parse altertate links with the new `alt_links` data in `extra`
author Goffi <goffi@goffi.org>
date Tue, 05 Dec 2023 13:14:03 +0100
parents 6929dabf3a7e
children 121925996ffb
files libervia/backend/plugins/plugin_xep_0277.py
diffstat 1 files changed, 78 insertions(+), 42 deletions(-) [+]
line wrap: on
line diff
--- a/libervia/backend/plugins/plugin_xep_0277.py	Tue Dec 05 13:13:03 2023 +0100
+++ b/libervia/backend/plugins/plugin_xep_0277.py	Tue Dec 05 13:14:03 2023 +0100
@@ -17,6 +17,7 @@
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 import time
+from urllib.parse import quote, urlparse
 import dateutil
 import calendar
 from mimetypes import guess_type
@@ -282,7 +283,7 @@
             service = client.jid.userhostJID()
 
         extra: Dict[str, Any] = {}
-        microblog_data: Dict[str, Any] = {
+        mb_data: Dict[str, Any] = {
             "service": service.full(),
             "extra": extra
         }
@@ -296,7 +297,7 @@
             @raise exceptions.DataError: the key already exists
                 (not raised if increment is True)
             """
-            if key in microblog_data:
+            if key in mb_data:
                 if not increment:
                     raise failure.Failure(
                         exceptions.DataError(
@@ -307,7 +308,7 @@
                     idx = 1  # the idx 0 is the key without suffix
                     fmt = "{}#{}"
                     new_key = fmt.format(key, idx)
-                    while new_key in microblog_data:
+                    while new_key in mb_data:
                         idx += 1
                         new_key = fmt.format(key, idx)
                     key = new_key
@@ -334,15 +335,15 @@
                     )
                 key = check_conflict("{}_xhtml".format(elem.name))
                 data = data_elt.toXml()
-                microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml(
+                mb_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml(
                     data
                 )
             else:
                 key = check_conflict(elem.name)
-                microblog_data[key] = str(elem)
+                mb_data[key] = str(elem)
 
         id_ = item_elt.getAttribute("id", "")  # there can be no id for transient nodes
-        microblog_data["id"] = id_
+        mb_data["id"] = id_
         if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT):
             msg = "Unsupported namespace {ns} in pubsub item {id_}".format(
                 ns=item_elt.uri, id_=id_
@@ -359,8 +360,8 @@
         # uri
         # FIXME: node should alway be set in the future, check FIXME in method signature
         if node is not None:
-            microblog_data["node"] = node
-            microblog_data['uri'] = xmpp_uri.build_xmpp_uri(
+            mb_data["node"] = node
+            mb_data['uri'] = xmpp_uri.build_xmpp_uri(
                 "pubsub",
                 path=service.full(),
                 node=node,
@@ -369,7 +370,7 @@
 
         # language
         try:
-            microblog_data["language"] = entry_elt[(C.NS_XML, "lang")].strip()
+            mb_data["language"] = entry_elt[(C.NS_XML, "lang")].strip()
         except KeyError:
             pass
 
@@ -380,9 +381,9 @@
             msg = ("No atom id found in the pubsub item {}, this is not standard !"
                    .format(id_))
             log.warning(msg)
-            microblog_data["atom_id"] = ""
+            mb_data["atom_id"] = ""
         else:
-            microblog_data["atom_id"] = str(id_elt)
+            mb_data["atom_id"] = str(id_elt)
 
         # title/content(s)
 
@@ -410,27 +411,27 @@
 
         # we check that text content is present
         for key in ("title", "content"):
-            if key not in microblog_data and ("{}_xhtml".format(key)) in microblog_data:
+            if key not in mb_data and ("{}_xhtml".format(key)) in mb_data:
                 log.warning(
                     "item {id_} provide a {key}_xhtml data but not a text one".format(
                         id_=id_, key=key
                     )
                 )
                 # ... and do the conversion if it's not
-                microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert(
-                    microblog_data["{}_xhtml".format(key)],
+                mb_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert(
+                    mb_data["{}_xhtml".format(key)],
                     self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML,
                     self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT,
                     False,
                 )
 
-        if "content" not in microblog_data:
+        if "content" not in mb_data:
             # use the atom title data as the microblog body content
-            microblog_data["content"] = microblog_data["title"]
-            del microblog_data["title"]
-            if "title_xhtml" in microblog_data:
-                microblog_data["content_xhtml"] = microblog_data["title_xhtml"]
-                del microblog_data["title_xhtml"]
+            mb_data["content"] = mb_data["title"]
+            del mb_data["title"]
+            if "title_xhtml" in mb_data:
+                mb_data["content_xhtml"] = mb_data["title_xhtml"]
+                del mb_data["title_xhtml"]
 
         # published/updated dates
         try:
@@ -438,20 +439,20 @@
         except StopIteration:
             msg = "No atom updated element found in the pubsub item {}".format(id_)
             raise failure.Failure(exceptions.DataError(msg))
-        microblog_data["updated"] = calendar.timegm(
+        mb_data["updated"] = calendar.timegm(
             dateutil.parser.parse(str(updated_elt)).utctimetuple()
         )
         try:
             published_elt = next(entry_elt.elements(NS_ATOM, "published"))
         except StopIteration:
-            microblog_data["published"] = microblog_data["updated"]
+            mb_data["published"] = mb_data["updated"]
         else:
-            microblog_data["published"] = calendar.timegm(
+            mb_data["published"] = calendar.timegm(
                 dateutil.parser.parse(str(published_elt)).utctimetuple()
             )
 
         # links
-        comments = microblog_data['comments'] = []
+        comments = mb_data['comments'] = []
         for link_elt in entry_elt.elements(NS_ATOM, "link"):
             href = link_elt.getAttribute("href")
             if not href:
@@ -521,6 +522,16 @@
 
                 attachments = extra.setdefault("attachments", [])
                 attachments.append(attachment)
+            elif rel == "alternate":
+                link_data = {"url": href}
+                media_type = link_elt.getAttribute("type") or guess_type(href)[0]
+                if media_type:
+                    link_data["media_type"] = media_type
+                else:
+                    log.warning(
+                        f"Invalid or missing media type for alternate link: {href}"
+                    )
+                extra.setdefault("alt_links", []).append(link_data)
             else:
                 log.warning(
                     f"Unmanaged link element: {link_elt.toXml()}"
@@ -542,7 +553,7 @@
                 )
                 author = None
             else:
-                author = microblog_data["author"] = str(name_elt).strip()
+                author = mb_data["author"] = str(name_elt).strip()
             # uri
             try:
                 uri_elt = next(author_elt.elements(NS_ATOM, "uri"))
@@ -551,29 +562,29 @@
                     "No uri element found in author element of item {}".format(id_)
                 )
                 if publisher:
-                    microblog_data["author_jid"] = publisher
+                    mb_data["author_jid"] = publisher
             else:
                 uri = str(uri_elt)
                 if uri.startswith("xmpp:"):
                     uri = uri[5:]
-                    microblog_data["author_jid"] = uri
+                    mb_data["author_jid"] = uri
                 else:
-                    microblog_data["author_jid"] = (
+                    mb_data["author_jid"] = (
                         item_elt.getAttribute("publisher") or ""
                     )
-                if not author and microblog_data["author_jid"]:
+                if not author and mb_data["author_jid"]:
                     # FIXME: temporary workaround for missing author name, would be
                     #   better to use directly JID's identity (to be done from frontends?)
                     try:
-                        microblog_data["author"] = jid.JID(microblog_data["author_jid"]).user
+                        mb_data["author"] = jid.JID(mb_data["author_jid"]).user
                     except Exception as e:
                         log.warning(f"No author name found, and can't parse author jid: {e}")
 
                 if not publisher:
                     log.debug("No publisher attribute, we can't verify author jid")
-                    microblog_data["author_jid_verified"] = False
+                    mb_data["author_jid_verified"] = False
                 elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID():
-                    microblog_data["author_jid_verified"] = True
+                    mb_data["author_jid_verified"] = True
                 else:
                     if "repeated" not in extra:
                         log.warning(
@@ -582,38 +593,38 @@
                                 uri, item_elt.getAttribute("publisher")
                             )
                         )
-                    microblog_data["author_jid_verified"] = False
+                    mb_data["author_jid_verified"] = False
             # email
             try:
                 email_elt = next(author_elt.elements(NS_ATOM, "email"))
             except StopIteration:
                 pass
             else:
-                microblog_data["author_email"] = str(email_elt)
+                mb_data["author_email"] = str(email_elt)
 
-        if not microblog_data.get("author_jid"):
+        if not mb_data.get("author_jid"):
             if publisher:
-                microblog_data["author_jid"] = publisher
-                microblog_data["author_jid_verified"] = True
+                mb_data["author_jid"] = publisher
+                mb_data["author_jid_verified"] = True
             else:
                 iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM)
-                microblog_data["author_jid"] = iq_elt["from"]
-                microblog_data["author_jid_verified"] = False
+                mb_data["author_jid"] = iq_elt["from"]
+                mb_data["author_jid_verified"] = False
 
         # categories
         categories = [
             category_elt.getAttribute("term", "")
             for category_elt in entry_elt.elements(NS_ATOM, "category")
         ]
-        microblog_data["tags"] = categories
+        mb_data["tags"] = categories
 
         ## the trigger ##
         # if other plugins have things to add or change
         yield self.host.trigger.point(
-            "XEP-0277_item2data", item_elt, entry_elt, microblog_data
+            "XEP-0277_item2data", item_elt, entry_elt, mb_data
         )
 
-        defer.returnValue(microblog_data)
+        defer.returnValue(mb_data)
 
     async def mb_data_2_entry_elt(self, client, mb_data, item_id, service, node):
         """Convert a data dict to en entry usable to create an item
@@ -741,6 +752,31 @@
                     if value:
                         link_elt[attr]  = str(value)
 
+        ## alternate links ##
+        alt_links = extra.get("alt_links")
+        if alt_links:
+            for link_data in alt_links:
+                url_template = link_data["url"]
+                url = url_template.format(
+                    service=quote(service.full(), safe=""),
+                    node=quote(node, safe=""),
+                    item=quote(item_id, safe="")
+                )
+
+                link_elt = entry_elt.addElement("link")
+                link_elt["href"] = url
+                link_elt["rel"] = "alternate"
+
+                media_type = link_data.get("media_type")
+                if not media_type:
+                    parsed_url = urlparse(url)
+                    if parsed_url.scheme in ["http", "https"]:
+                        media_type = "text/html"
+                    else:
+                        media_type = guess_type(url)[0] or "application/octet-stream"
+
+                link_elt["type"] = media_type
+
         ## author ##
         author_elt = entry_elt.addElement("author")
         try: