# HG changeset patch # User Goffi # Date 1701778443 -3600 # Node ID 30f7513e5590bc31592bda56baf439c7f8ea17d4 # Parent 6929dabf3a7e0e7a26f22b31284c349aac4b5b20 plugin XEP-0277: generate and parse altertate links with the new `alt_links` data in `extra` diff -r 6929dabf3a7e -r 30f7513e5590 libervia/backend/plugins/plugin_xep_0277.py --- a/libervia/backend/plugins/plugin_xep_0277.py Tue Dec 05 13:13:03 2023 +0100 +++ b/libervia/backend/plugins/plugin_xep_0277.py Tue Dec 05 13:14:03 2023 +0100 @@ -17,6 +17,7 @@ # along with this program. If not, see . import time +from urllib.parse import quote, urlparse import dateutil import calendar from mimetypes import guess_type @@ -282,7 +283,7 @@ service = client.jid.userhostJID() extra: Dict[str, Any] = {} - microblog_data: Dict[str, Any] = { + mb_data: Dict[str, Any] = { "service": service.full(), "extra": extra } @@ -296,7 +297,7 @@ @raise exceptions.DataError: the key already exists (not raised if increment is True) """ - if key in microblog_data: + if key in mb_data: if not increment: raise failure.Failure( exceptions.DataError( @@ -307,7 +308,7 @@ idx = 1 # the idx 0 is the key without suffix fmt = "{}#{}" new_key = fmt.format(key, idx) - while new_key in microblog_data: + while new_key in mb_data: idx += 1 new_key = fmt.format(key, idx) key = new_key @@ -334,15 +335,15 @@ ) key = check_conflict("{}_xhtml".format(elem.name)) data = data_elt.toXml() - microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml( + mb_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].clean_xhtml( data ) else: key = check_conflict(elem.name) - microblog_data[key] = str(elem) + mb_data[key] = str(elem) id_ = item_elt.getAttribute("id", "") # there can be no id for transient nodes - microblog_data["id"] = id_ + mb_data["id"] = id_ if item_elt.uri not in (pubsub.NS_PUBSUB, NS_PUBSUB_EVENT): msg = "Unsupported namespace {ns} in pubsub item {id_}".format( ns=item_elt.uri, id_=id_ @@ -359,8 +360,8 @@ # uri # FIXME: node should alway be set in the future, check FIXME in method signature if node is not None: - microblog_data["node"] = node - microblog_data['uri'] = xmpp_uri.build_xmpp_uri( + mb_data["node"] = node + mb_data['uri'] = xmpp_uri.build_xmpp_uri( "pubsub", path=service.full(), node=node, @@ -369,7 +370,7 @@ # language try: - microblog_data["language"] = entry_elt[(C.NS_XML, "lang")].strip() + mb_data["language"] = entry_elt[(C.NS_XML, "lang")].strip() except KeyError: pass @@ -380,9 +381,9 @@ msg = ("No atom id found in the pubsub item {}, this is not standard !" .format(id_)) log.warning(msg) - microblog_data["atom_id"] = "" + mb_data["atom_id"] = "" else: - microblog_data["atom_id"] = str(id_elt) + mb_data["atom_id"] = str(id_elt) # title/content(s) @@ -410,27 +411,27 @@ # we check that text content is present for key in ("title", "content"): - if key not in microblog_data and ("{}_xhtml".format(key)) in microblog_data: + if key not in mb_data and ("{}_xhtml".format(key)) in mb_data: log.warning( "item {id_} provide a {key}_xhtml data but not a text one".format( id_=id_, key=key ) ) # ... and do the conversion if it's not - microblog_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert( - microblog_data["{}_xhtml".format(key)], + mb_data[key] = yield self.host.plugins["TEXT_SYNTAXES"].convert( + mb_data["{}_xhtml".format(key)], self.host.plugins["TEXT_SYNTAXES"].SYNTAX_XHTML, self.host.plugins["TEXT_SYNTAXES"].SYNTAX_TEXT, False, ) - if "content" not in microblog_data: + if "content" not in mb_data: # use the atom title data as the microblog body content - microblog_data["content"] = microblog_data["title"] - del microblog_data["title"] - if "title_xhtml" in microblog_data: - microblog_data["content_xhtml"] = microblog_data["title_xhtml"] - del microblog_data["title_xhtml"] + mb_data["content"] = mb_data["title"] + del mb_data["title"] + if "title_xhtml" in mb_data: + mb_data["content_xhtml"] = mb_data["title_xhtml"] + del mb_data["title_xhtml"] # published/updated dates try: @@ -438,20 +439,20 @@ except StopIteration: msg = "No atom updated element found in the pubsub item {}".format(id_) raise failure.Failure(exceptions.DataError(msg)) - microblog_data["updated"] = calendar.timegm( + mb_data["updated"] = calendar.timegm( dateutil.parser.parse(str(updated_elt)).utctimetuple() ) try: published_elt = next(entry_elt.elements(NS_ATOM, "published")) except StopIteration: - microblog_data["published"] = microblog_data["updated"] + mb_data["published"] = mb_data["updated"] else: - microblog_data["published"] = calendar.timegm( + mb_data["published"] = calendar.timegm( dateutil.parser.parse(str(published_elt)).utctimetuple() ) # links - comments = microblog_data['comments'] = [] + comments = mb_data['comments'] = [] for link_elt in entry_elt.elements(NS_ATOM, "link"): href = link_elt.getAttribute("href") if not href: @@ -521,6 +522,16 @@ attachments = extra.setdefault("attachments", []) attachments.append(attachment) + elif rel == "alternate": + link_data = {"url": href} + media_type = link_elt.getAttribute("type") or guess_type(href)[0] + if media_type: + link_data["media_type"] = media_type + else: + log.warning( + f"Invalid or missing media type for alternate link: {href}" + ) + extra.setdefault("alt_links", []).append(link_data) else: log.warning( f"Unmanaged link element: {link_elt.toXml()}" @@ -542,7 +553,7 @@ ) author = None else: - author = microblog_data["author"] = str(name_elt).strip() + author = mb_data["author"] = str(name_elt).strip() # uri try: uri_elt = next(author_elt.elements(NS_ATOM, "uri")) @@ -551,29 +562,29 @@ "No uri element found in author element of item {}".format(id_) ) if publisher: - microblog_data["author_jid"] = publisher + mb_data["author_jid"] = publisher else: uri = str(uri_elt) if uri.startswith("xmpp:"): uri = uri[5:] - microblog_data["author_jid"] = uri + mb_data["author_jid"] = uri else: - microblog_data["author_jid"] = ( + mb_data["author_jid"] = ( item_elt.getAttribute("publisher") or "" ) - if not author and microblog_data["author_jid"]: + if not author and mb_data["author_jid"]: # FIXME: temporary workaround for missing author name, would be # better to use directly JID's identity (to be done from frontends?) try: - microblog_data["author"] = jid.JID(microblog_data["author_jid"]).user + mb_data["author"] = jid.JID(mb_data["author_jid"]).user except Exception as e: log.warning(f"No author name found, and can't parse author jid: {e}") if not publisher: log.debug("No publisher attribute, we can't verify author jid") - microblog_data["author_jid_verified"] = False + mb_data["author_jid_verified"] = False elif jid.JID(publisher).userhostJID() == jid.JID(uri).userhostJID(): - microblog_data["author_jid_verified"] = True + mb_data["author_jid_verified"] = True else: if "repeated" not in extra: log.warning( @@ -582,38 +593,38 @@ uri, item_elt.getAttribute("publisher") ) ) - microblog_data["author_jid_verified"] = False + mb_data["author_jid_verified"] = False # email try: email_elt = next(author_elt.elements(NS_ATOM, "email")) except StopIteration: pass else: - microblog_data["author_email"] = str(email_elt) + mb_data["author_email"] = str(email_elt) - if not microblog_data.get("author_jid"): + if not mb_data.get("author_jid"): if publisher: - microblog_data["author_jid"] = publisher - microblog_data["author_jid_verified"] = True + mb_data["author_jid"] = publisher + mb_data["author_jid_verified"] = True else: iq_elt = xml_tools.find_ancestor(item_elt, "iq", C.NS_STREAM) - microblog_data["author_jid"] = iq_elt["from"] - microblog_data["author_jid_verified"] = False + mb_data["author_jid"] = iq_elt["from"] + mb_data["author_jid_verified"] = False # categories categories = [ category_elt.getAttribute("term", "") for category_elt in entry_elt.elements(NS_ATOM, "category") ] - microblog_data["tags"] = categories + mb_data["tags"] = categories ## the trigger ## # if other plugins have things to add or change yield self.host.trigger.point( - "XEP-0277_item2data", item_elt, entry_elt, microblog_data + "XEP-0277_item2data", item_elt, entry_elt, mb_data ) - defer.returnValue(microblog_data) + defer.returnValue(mb_data) async def mb_data_2_entry_elt(self, client, mb_data, item_id, service, node): """Convert a data dict to en entry usable to create an item @@ -741,6 +752,31 @@ if value: link_elt[attr] = str(value) + ## alternate links ## + alt_links = extra.get("alt_links") + if alt_links: + for link_data in alt_links: + url_template = link_data["url"] + url = url_template.format( + service=quote(service.full(), safe=""), + node=quote(node, safe=""), + item=quote(item_id, safe="") + ) + + link_elt = entry_elt.addElement("link") + link_elt["href"] = url + link_elt["rel"] = "alternate" + + media_type = link_data.get("media_type") + if not media_type: + parsed_url = urlparse(url) + if parsed_url.scheme in ["http", "https"]: + media_type = "text/html" + else: + media_type = guess_type(url)[0] or "application/octet-stream" + + link_elt["type"] = media_type + ## author ## author_elt = entry_elt.addElement("author") try: