Mercurial > libervia-backend
view libervia/backend/plugins/plugin_misc_url_preview.py @ 4303:a7ec325246fb
component email-gateway: first draft:
Initial implementation of the Email Gateway.
This component uses XEP-0100 for registration. Upon registration and subsequent startups,
a connection is made to registered IMAP services, and incoming emails (in `INBOX`
mailboxes) are immediately forwarded as XMPP messages.
In the opposite direction, an SMTP connection is established to send emails on incoming
XMPP messages.
rel 449
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 06 Sep 2024 18:07:17 +0200 |
parents | 0d7bb4df2343 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia plugin to handle events # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from dataclasses import dataclass import json from textwrap import dedent from typing import Callable, Dict, List, Optional, Union from urllib import parse import fnmatch from lxml import etree import treq from twisted.internet import defer from libervia.backend.core.constants import Const as C from libervia.backend.core.core_types import SatXMPPEntity from libervia.backend.core.exceptions import ConflictError from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.tools.common import data_format from libervia.backend.tools.common.async_utils import async_lru log = getLogger(__name__) PLUGIN_INFO = { C.PI_NAME: "Preview", C.PI_IMPORT_NAME: "Preview", C.PI_TYPE: C.PLUG_TYPE_MISC, C.PI_PROTOCOLS: ["Open Graph", "oEmbed"], C.PI_DEPENDENCIES: ["TEXT_SYNTAXES"], C.PI_MAIN: "Preview", C.PI_HANDLER: "no", C.PI_DESCRIPTION: dedent( _( """\ Retrieves and provides a preview of URLs using various protocols. Initially, it uses the Open Graph protocol for most web pages. Specialized handlers are implemented for YouTube using the oEmbed protocol. """ ) ), } OG_TAGS = [ "title", "type", "image", "url", "audio", "description", "determiner", "locale", "locale:alternate", "site_name", "video", ] class PreviewFetchError(Exception): pass @dataclass class Protocol: name: str callback: Callable priority: int class Preview: protocols: Dict[str, Protocol] = {} domain_protocols: Dict[str, str] = {} def __init__(self, host): log.info(_("Preview plugin initialization")) self.host = host # generic protocols self.register("open_graph", self.fetch_open_graph_data, priority=100) self.register("oembed", self.fetch_generic_oembed_data, priority=50) self.register("generic", self.fetch_generic_data, priority=0) # domain specific protocols self.register("oembed-youtube", self.fetch_youtube_oembed_data, priority=-100) self.register_domain_protocol( ["www.youtube.com", "youtu.be", "m.youtube.com"], "oembed-youtube" ) self.register("wikipedia", self.fetch_wikipedia_data, priority=-80) self.register_domain_protocol(["*.wikipedia.org"], "wikipedia") self.register("invidious", self.fetch_invidious_data, priority=-90) self.register_domain_protocol( ["yewtu.be", "www.yewtu.be", "invidious.fdn.fr"], "invidious" ) # bridge methods host.bridge.add_method( "url_preview_get", ".plugin", in_sign="sss", out_sign="s", method=self._url_preview_get, async_=True, ) # API def _url_preview_get( self, url: str, options: str, profile_key: str ) -> defer.Deferred: client = self.host.get_client(profile_key) d = defer.ensureDeferred( self.get_preview_data(client, url, data_format.deserialise(options)) ) d.addCallback(data_format.serialise) return d @async_lru() async def get_preview_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """Fetch preview data from a url using registered protocols @param url: The url to fetch the preview data from @param options: Additional options that may be used while fetching preview data @return: A dictionary containing the preview data or None if no data could be fetched """ parsed_url = parse.urlparse(url) domain = parsed_url.netloc preview_data: Optional[dict] = None matched_protocol = None for registered_domain, registered_protocol in self.domain_protocols.items(): if fnmatch.fnmatch(domain, registered_domain): matched_protocol = registered_protocol break if matched_protocol is not None: callback = self.protocols[matched_protocol].callback preview_data = await callback(client, url, options) else: for name, protocol in sorted( self.protocols.items(), key=lambda item: item[1].priority, reverse=True ): try: preview_data = await protocol.callback(client, url, options) except Exception as e: log.warning(f"Can't run protocol {name} for {url}: {e}") else: if preview_data is not None: matched_protocol = protocol.name break if preview_data is not None: preview_data["protocol"] = matched_protocol # we don't clean html for youtube as we need Javascript to make it work, and # for invidious as we generate it ourself if "html" in preview_data: if matched_protocol in ("oembed-youtube", "invidious"): # this flag indicate that we know the source of HTML and we should be # able to trust it. This will add `allow-scripts` and # `allow-same-origin` in the preview <iframe> "sandbox" attribute preview_data["html_known"] = True else: preview_data["html_known"] = False clean_xhtml = self.host.plugins["TEXT_SYNTAXES"].clean_xhtml try: preview_data["html"] = clean_xhtml(preview_data["html"]) except Exception as e: log.warning(f"Can't clean html data: {e}\n{preview_data}") del preview_data["html"] return preview_data @classmethod def register(cls, name: str, callback: Callable, priority: int = 0): """Register a protocol to retrieve preview data The registered callback should return a dictionary of preview data if available, or None otherwise. @param name: Unique name of the protocol @param callback: Async callback function to fetch preview data @param priority: Priority of the protocol, with higher numbers indicating higher priority @return: None """ if name in cls.protocols: raise ConflictError(f"Protocol with the name {name} is already registered.") cls.protocols[name] = Protocol(name=name, callback=callback, priority=priority) @classmethod def register_domain_protocol(cls, domains: Union[str, List[str]], protocol_name: str): """Register a protocol for a specific domain or list of domains @param domains: The domain name or list of domain names @param protocol_name: The name of the protocol to be associated with the domain(s) @return: None """ protocol_name = protocol_name.replace(" ", "").lower() if protocol_name not in cls.protocols: raise ConflictError( f"Protocol with the name {protocol_name} is not registered." ) if isinstance(domains, str): domains = [domains] for domain in domains: domain = domain.strip() if not domain: log.warning("empty string used as domain, ignoring") continue cls.domain_protocols[domain] = protocol_name # Open Graph async def fetch_open_graph_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """Fetch Open Graph data from a url This method implements the Open Graph protocol, details of which can be found at: http://ogp.me/ @param url: The url to fetch the Open Graph data from @param options: Additional options that may be used while fetching data @return: A dictionary containing the Open Graph data or None if no data could be fetched """ resp = await treq.get(url) if resp.code == 200: html = await resp.text() parser = etree.HTMLParser() tree = etree.fromstring(html, parser) # Extract Open Graph data metadata = {} for tag in OG_TAGS: og_el = tree.find('.//meta[@property="og:{tag}"]'.format(tag=tag)) if og_el is not None: metadata[tag] = og_el.get("content") if metadata: if "site_name" in metadata and not "provider_name" in metadata: metadata["provider_name"] = metadata["site_name"] return metadata return None else: raise PreviewFetchError( f"Failed to fetch preview for {url}, status code: {resp.code}" ) # oEmbed async def _fetch_oembed_data(self, oembed_url: str) -> Optional[dict]: """Fetch oEmbed data from a given oEmbed URL @param oembed_url: The url to fetch the oEmbed data from @return: A dictionary containing the oEmbed data or None if no data could be fetched """ resp = await treq.get(oembed_url) if resp.code == 200: return json.loads(await resp.text()) else: raise PreviewFetchError( f"Failed to fetch oEmbed preview for {oembed_url}, status code: " f"{resp.code}" ) async def fetch_youtube_oembed_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """Fetch YouTube oEmbed data from a url @param url: The url to fetch the YouTube oEmbed data from @param options: Additional options that may be used while fetching data @return: A dictionary containing the YouTube oEmbed data or None if no data could be fetched """ oembed_url = f"https://www.youtube.com/oembed?url={parse.quote(url)}&format=json" data = await self._fetch_oembed_data(oembed_url) if data is not None and "html" in data: html = data["html"] root = etree.HTML(html) iframe_elt = root.xpath("//iframe") if iframe_elt: iframe_elt[0].attrib[ "style" ] = "position: absolute; top: 0; left: 0; width: 100%; height: 100%;" data["html"] = etree.tostring(root, method="html", encoding="unicode") else: log.warning("No <iframe> found in the YouTube oEmbed response") return data async def fetch_generic_oembed_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """Fetch generic oEmbed data from a url @param url: The url to fetch the oEmbed data from @param options: Additional options that may be used while fetching data @return: A dictionary containing the oEmbed data or None if no data could be fetched """ resp = await treq.get(url) if resp.code == 200: html = await resp.text() parser = etree.HTMLParser() tree = etree.fromstring(html, parser) # Find oEmbed URL oembed_link = tree.find('.//link[@type="application/json+oembed"]') if oembed_link is not None: oembed_url = oembed_link.get("href") return await self._fetch_oembed_data(oembed_url) else: return None else: raise PreviewFetchError( f"Failed to fetch preview for {url}, status code: {resp.code}" ) async def fetch_generic_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """Fetch generic data from a url This method attempts to extract the title, description, and author metadata from the HTML of the page. If these data cannot be found, the method will return None. @param url: The url to fetch the generic data from @param options: Additional options that may be used while fetching data @return: A dictionary containing the generic data or None if no data could be fetched """ resp = await treq.get(url) if resp.code == 200: html = await resp.text() parser = etree.HTMLParser() tree = etree.fromstring(html, parser) # Find title, description, and author metadata title_el = tree.find(".//title") desc_el = tree.find('.//meta[@name="description"]') author_el = tree.find('.//meta[@name="author"]') metadata = { "title": title_el.text if title_el is not None else "", "description": desc_el.get("content") if desc_el is not None else "", "author_name": author_el.get("content") if author_el is not None else "", "url": url, "provider_name": parse.urlparse(url).netloc, "provider_url": f"{parse.urlparse(url).scheme}://{parse.urlparse(url).netloc}", } return metadata if any(metadata.values()) else None else: raise PreviewFetchError( f"Failed to fetch generic preview for {url}, status code: {resp.code}" ) # Wikipedia async def fetch_wikipedia_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """Fetch Wikipedia data from a url This method implements the Wikipedia API, details of which can be found at: https://www.mediawiki.org/wiki/API:Main_page @param url: The url to fetch the Wikipedia data from @param options: Additional options that may be used while fetching data @return: A dictionary containing the Wikipedia data or None if no data could be fetched """ parsed_url = parse.urlparse(url) page_name = parsed_url.path.split("/")[-1] # Use the Wikipedia API to get a summary of the page and a preview image api_url = ( f"https://{parsed_url.netloc}/w/api.php?format=json&action=query&" f"prop=extracts|pageimages&exintro&explaintext&redirects=1&piprop=thumbnail" f"&pithumbsize=300&titles={page_name}" ) resp = await treq.get(api_url) if resp.code == 200: data = json.loads(await resp.text()) # The page ID is the first key in the "pages" dictionary page_id = next(iter(data["query"]["pages"].keys())) page = data["query"]["pages"][page_id] # The API may return a page with a missing title or extract if the page does # not exist if "missing" in page: return None return { "provider_name": "Wikipedia", "provider_url": "https://www.wikipedia.org", "title": page.get("title"), "description": page.get("extract"), "url": url, "image": ( page.get("thumbnail", {}).get("source") if "thumbnail" in page else None ), } else: raise PreviewFetchError( f"Failed to fetch Wikipedia preview for {url}, status code: {resp.code}" ) # Invidious async def fetch_invidious_data( self, client: SatXMPPEntity, url: str, options: dict ) -> Optional[dict]: """ Fetch Invidious data from a url and generate HTML iframe. @param url: The url to fetch the Invidious data from. @param options: Additional options that may be used while fetching data. @return: A dictionary containing the Invidious data or None if no data could be fetched. """ parsed_url = parse.urlparse(url) if "watch" in parsed_url.path: video_id = parse.parse_qs(parsed_url.query).get("v", [None])[0] else: video_id = parsed_url.path.strip("/") if not video_id: log.warning(f"Can't extract video ID from {url}") return None invidious_api_url = f"https://{parsed_url.netloc}/api/v1/videos/{video_id}" resp = await treq.get(invidious_api_url) if resp.code == 200: video_data = await resp.json() # construct the iframe html code html = ( f"<iframe" f' width="100%"' f' height="auto"' f' src="https://{parsed_url.netloc}/embed/{video_id}"' f' frameborder="0" ' f' allow="' f" accelerometer;" f" autoplay;" f" clipboard-write;" f" encrypted-media;" f" gyroscope;" f' picture-in-picture"' f' style="' f" position: absolute;" f" top: 0;" f" left: 0;" f" width: 100%;" f' height: 100%;"' f" allowfullscreen></iframe>" ) # structure the data to be returned data = { "title": video_data.get("title"), "description": video_data.get("description"), "url": url, "image": video_data.get("videoThumbnails", [{}])[0].get("url"), "provider_name": "Invidious", "provider_url": f"https://{parsed_url.netloc}", "html": html, "author_name": video_data.get("author"), "author_url": f"https://{parsed_url.netloc}/channel/{video_data.get('authorId')}", } return data else: log.warning(f"Unable to fetch video data from Invidious API for {video_id}") return None