Mercurial > libervia-backend
changeset 4103:eaa0daa7f834
plugin URL preview: URL preview first draft
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 27 Jun 2023 15:48:15 +0200 |
parents | c0bb4b3fdccf |
children | 58bbc100f13b |
files | libervia/backend/plugins/plugin_misc_url_preview.py |
diffstat | 1 files changed, 503 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_misc_url_preview.py Tue Jun 27 15:48:15 2023 +0200 @@ -0,0 +1,503 @@ +#!/usr/bin/env python3 + + +# Libervia plugin to handle events +# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +from dataclasses import dataclass +import json +from textwrap import dedent +from typing import Callable, Dict, List, Optional, Union +from urllib import parse +import fnmatch + +from lxml import etree +import treq +from twisted.internet import defer + +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.exceptions import ConflictError +from libervia.backend.core.i18n import _ +from libervia.backend.core.log import getLogger +from libervia.backend.tools.common import data_format +from libervia.backend.tools.common.async_utils import async_lru + +log = getLogger(__name__) + +PLUGIN_INFO = { + C.PI_NAME: "Preview", + C.PI_IMPORT_NAME: "Preview", + C.PI_TYPE: C.PLUG_TYPE_MISC, + C.PI_PROTOCOLS: ["Open Graph", "oEmbed"], + C.PI_DEPENDENCIES: ["TEXT_SYNTAXES"], + C.PI_MAIN: "Preview", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: dedent( + _( + """\ + Retrieves and provides a preview of URLs using various protocols. Initially, it + uses the Open Graph protocol for most web pages. Specialized handlers are + implemented for YouTube using the oEmbed protocol. + """ + ) + ), +} + +OG_TAGS = [ + "title", + "type", + "image", + "url", + "audio", + "description", + "determiner", + "locale", + "locale:alternate", + "site_name", + "video", +] + + +class PreviewFetchError(Exception): + pass + + +@dataclass +class Protocol: + name: str + callback: Callable + priority: int + + +class Preview: + protocols: Dict[str, Protocol] = {} + domain_protocols: Dict[str, str] = {} + + def __init__(self, host): + log.info(_("Preview plugin initialization")) + self.host = host + + # generic protocols + + self.register("open_graph", self.fetch_open_graph_data, priority=100) + self.register("oembed", self.fetch_generic_oembed_data, priority=50) + self.register("generic", self.fetch_generic_data, priority=0) + + # domain specific protocols + + self.register("oembed-youtube", self.fetch_youtube_oembed_data, priority=-100) + self.register_domain_protocol( + ["www.youtube.com", "youtu.be", "m.youtube.com"], "oembed-youtube" + ) + + self.register("wikipedia", self.fetch_wikipedia_data, priority=-80) + self.register_domain_protocol(["*.wikipedia.org"], "wikipedia") + + self.register("invidious", self.fetch_invidious_data, priority=-90) + self.register_domain_protocol( + ["yewtu.be", "www.yewtu.be", "invidious.fdn.fr"], + "invidious" + ) + + # bridge methods + + host.bridge.add_method( + "url_preview_get", + ".plugin", + in_sign="sss", + out_sign="s", + method=self._url_preview_get, + async_=True, + ) + + # API + + def _url_preview_get(self, url: str, options: str, profile_key: str) -> defer.Deferred: + client = self.host.get_client(profile_key) + d = defer.ensureDeferred( + self.get_preview_data(client, url, data_format.deserialise(options)) + ) + d.addCallback(data_format.serialise) + return d + + @async_lru() + async def get_preview_data( + self, client: SatXMPPEntity, url: str, options: dict + ) -> Optional[dict]: + """Fetch preview data from a url using registered protocols + + @param url: The url to fetch the preview data from + @param options: Additional options that may be used while fetching preview data + @return: A dictionary containing the preview data or None if no data could be + fetched + """ + parsed_url = parse.urlparse(url) + domain = parsed_url.netloc + + preview_data: Optional[dict] = None + matched_protocol = None + for registered_domain, registered_protocol in self.domain_protocols.items(): + if fnmatch.fnmatch(domain, registered_domain): + matched_protocol = registered_protocol + break + + if matched_protocol is not None: + callback = self.protocols[matched_protocol].callback + preview_data = await callback(client, url, options) + else: + for name, protocol in sorted( + self.protocols.items(), key=lambda item: item[1].priority, reverse=True + ): + try: + preview_data = await protocol.callback(client, url, options) + except Exception as e: + log.warning(f"Can't run protocol {name} for {url}: {e}") + else: + if preview_data is not None: + matched_protocol = protocol.name + break + + if preview_data is not None: + preview_data["protocol"] = matched_protocol + # we don't clean html for youtube as we need Javascript to make it work, and + # for invidious as we generate it ourself + if "html" in preview_data: + if matched_protocol in ("oembed-youtube", "invidious"): + # this flag indicate that we know the source of HTML and we should be + # able to trust it. This will add `allow-scripts` and + # `allow-same-origin` in the preview <iframe> "sandbox" attribute + preview_data["html_known"] = True + else: + preview_data["html_known"] = False + clean_xhtml = self.host.plugins["TEXT_SYNTAXES"].clean_xhtml + try: + preview_data["html"] = clean_xhtml(preview_data["html"]) + except Exception as e: + log.warning(f"Can't clean html data: {e}\n{preview_data}") + del preview_data["html"] + + + return preview_data + + @classmethod + def register(cls, name: str, callback: Callable, priority: int = 0): + """Register a protocol to retrieve preview data + + The registered callback should return a dictionary of preview data if available, + or None otherwise. + + @param name: Unique name of the protocol + @param callback: Async callback function to fetch preview data + @param priority: Priority of the protocol, with higher numbers indicating higher + priority + @return: None + """ + if name in cls.protocols: + raise ConflictError(f"Protocol with the name {name} is already registered.") + + cls.protocols[name] = Protocol(name=name, callback=callback, priority=priority) + + @classmethod + def register_domain_protocol(cls, domains: Union[str, List[str]], protocol_name: str): + """Register a protocol for a specific domain or list of domains + + @param domains: The domain name or list of domain names + @param protocol_name: The name of the protocol to be associated with the domain(s) + @return: None + """ + protocol_name = protocol_name.replace(" ", "").lower() + if protocol_name not in cls.protocols: + raise ConflictError( + f"Protocol with the name {protocol_name} is not registered." + ) + + if isinstance(domains, str): + domains = [domains] + + for domain in domains: + domain = domain.strip() + if not domain: + log.warning("empty string used as domain, ignoring") + continue + cls.domain_protocols[domain] = protocol_name + + # Open Graph + + async def fetch_open_graph_data( + self, client: SatXMPPEntity, url: str, options: dict + ) -> Optional[dict]: + """Fetch Open Graph data from a url + + This method implements the Open Graph protocol, details of which can be found at: + http://ogp.me/ + + @param url: The url to fetch the Open Graph data from + @param options: Additional options that may be used while fetching data + @return: A dictionary containing the Open Graph data or None if no data could be + fetched + """ + resp = await treq.get(url) + + if resp.code == 200: + html = await resp.text() + parser = etree.HTMLParser() + tree = etree.fromstring(html, parser) + + # Extract Open Graph data + metadata = {} + for tag in OG_TAGS: + og_el = tree.find('.//meta[@property="og:{tag}"]'.format(tag=tag)) + if og_el is not None: + metadata[tag] = og_el.get("content") + + if metadata: + if "site_name" in metadata and not "provider_name" in metadata: + metadata["provider_name"] = metadata["site_name"] + return metadata + + return None + else: + raise PreviewFetchError( + f"Failed to fetch preview for {url}, status code: {resp.code}" + ) + + # oEmbed + + async def _fetch_oembed_data(self, oembed_url: str) -> Optional[dict]: + """Fetch oEmbed data from a given oEmbed URL + + @param oembed_url: The url to fetch the oEmbed data from + @return: A dictionary containing the oEmbed data or None if no data could be + fetched + """ + resp = await treq.get(oembed_url) + if resp.code == 200: + return json.loads(await resp.text()) + else: + raise PreviewFetchError( + f"Failed to fetch oEmbed preview for {oembed_url}, status code: " + f"{resp.code}" + ) + + async def fetch_youtube_oembed_data( + self, client: SatXMPPEntity, url: str, options: dict + ) -> Optional[dict]: + """Fetch YouTube oEmbed data from a url + + @param url: The url to fetch the YouTube oEmbed data from + @param options: Additional options that may be used while fetching data + @return: A dictionary containing the YouTube oEmbed data or None if no data could + be fetched + """ + oembed_url = f"https://www.youtube.com/oembed?url={parse.quote(url)}&format=json" + data = await self._fetch_oembed_data(oembed_url) + if data is not None and 'html' in data: + html = data['html'] + root = etree.HTML(html) + iframe_elt = root.xpath('//iframe') + if iframe_elt: + iframe_elt[0].attrib['style'] = ( + 'position: absolute; top: 0; left: 0; width: 100%; height: 100%;' + ) + data['html'] = etree.tostring(root, method='html', encoding='unicode') + else: + log.warning("No <iframe> found in the YouTube oEmbed response") + + return data + + async def fetch_generic_oembed_data( + self, client: SatXMPPEntity, url: str, options: dict + ) -> Optional[dict]: + """Fetch generic oEmbed data from a url + + @param url: The url to fetch the oEmbed data from + @param options: Additional options that may be used while fetching data + @return: A dictionary containing the oEmbed data or None if no data could be + fetched + """ + resp = await treq.get(url) + if resp.code == 200: + html = await resp.text() + parser = etree.HTMLParser() + tree = etree.fromstring(html, parser) + + # Find oEmbed URL + oembed_link = tree.find('.//link[@type="application/json+oembed"]') + if oembed_link is not None: + oembed_url = oembed_link.get("href") + return await self._fetch_oembed_data(oembed_url) + else: + return None + else: + raise PreviewFetchError( + f"Failed to fetch preview for {url}, status code: {resp.code}" + ) + + + async def fetch_generic_data( + self, client: SatXMPPEntity, url: str, options: dict + ) -> Optional[dict]: + """Fetch generic data from a url + + This method attempts to extract the title, description, and author metadata from + the HTML of the page. If these data cannot be found, the method will return None. + + @param url: The url to fetch the generic data from + @param options: Additional options that may be used while fetching data + @return: A dictionary containing the generic data or None if no data could be + fetched + """ + resp = await treq.get(url) + if resp.code == 200: + html = await resp.text() + parser = etree.HTMLParser() + tree = etree.fromstring(html, parser) + + # Find title, description, and author metadata + title_el = tree.find(".//title") + desc_el = tree.find('.//meta[@name="description"]') + author_el = tree.find('.//meta[@name="author"]') + + metadata = { + "title": title_el.text if title_el is not None else "", + "description": desc_el.get("content") if desc_el is not None else "", + "author_name": author_el.get("content") if author_el is not None else "", + "url": url, + "provider_name": parse.urlparse(url).netloc, + "provider_url": f"{parse.urlparse(url).scheme}://{parse.urlparse(url).netloc}", + } + + return metadata if any(metadata.values()) else None + else: + raise PreviewFetchError( + f"Failed to fetch generic preview for {url}, status code: {resp.code}" + ) + + # Wikipedia + + async def fetch_wikipedia_data( + self, client: SatXMPPEntity, url: str, options: dict + ) -> Optional[dict]: + """Fetch Wikipedia data from a url + + This method implements the Wikipedia API, details of which can be found at: + https://www.mediawiki.org/wiki/API:Main_page + + @param url: The url to fetch the Wikipedia data from + @param options: Additional options that may be used while fetching data + @return: A dictionary containing the Wikipedia data or None if no data could be + fetched + """ + parsed_url = parse.urlparse(url) + page_name = parsed_url.path.split("/")[-1] + + # Use the Wikipedia API to get a summary of the page and a preview image + api_url = ( + f"https://{parsed_url.netloc}/w/api.php?format=json&action=query&" + f"prop=extracts|pageimages&exintro&explaintext&redirects=1&piprop=thumbnail" + f"&pithumbsize=300&titles={page_name}" + ) + + resp = await treq.get(api_url) + if resp.code == 200: + data = json.loads(await resp.text()) + # The page ID is the first key in the "pages" dictionary + page_id = next(iter(data["query"]["pages"].keys())) + page = data["query"]["pages"][page_id] + + # The API may return a page with a missing title or extract if the page does + # not exist + if "missing" in page: + return None + + return { + "provider_name": "Wikipedia", + "provider_url": "https://www.wikipedia.org", + "title": page.get("title"), + "description": page.get("extract"), + "url": url, + "image": page.get("thumbnail", {}).get("source") + if "thumbnail" in page + else None, + } + else: + raise PreviewFetchError( + f"Failed to fetch Wikipedia preview for {url}, status code: {resp.code}" + ) + + # Invidious + + async def fetch_invidious_data(self, client: SatXMPPEntity, url: str, options: dict) -> Optional[dict]: + """ + Fetch Invidious data from a url and generate HTML iframe. + + @param url: The url to fetch the Invidious data from. + @param options: Additional options that may be used while fetching data. + @return: A dictionary containing the Invidious data or None if no data could be fetched. + """ + parsed_url = parse.urlparse(url) + if 'watch' in parsed_url.path: + video_id = parse.parse_qs(parsed_url.query).get('v', [None])[0] + else: + video_id = parsed_url.path.strip('/') + if not video_id: + log.warning(f"Can't extract video ID from {url}") + return None + + invidious_api_url = f"https://{parsed_url.netloc}/api/v1/videos/{video_id}" + + resp = await treq.get(invidious_api_url) + if resp.code == 200: + video_data = await resp.json() + # construct the iframe html code + html = ( + f'<iframe' + f' width="100%"' + f' height="auto"' + f' src="https://{parsed_url.netloc}/embed/{video_id}"' + f' frameborder="0" ' + f' allow="' + f' accelerometer;' + f' autoplay;' + f' clipboard-write;' + f' encrypted-media;' + f' gyroscope;' + f' picture-in-picture"' + f' style="' + f' position: absolute;' + f' top: 0;' + f' left: 0;' + f' width: 100%;' + f' height: 100%;"' + f' allowfullscreen></iframe>' + ) + # structure the data to be returned + data = { + "title": video_data.get("title"), + "description": video_data.get("description"), + "url": url, + "image": video_data.get("videoThumbnails", [{}])[0].get("url"), + "provider_name": "Invidious", + "provider_url": f"https://{parsed_url.netloc}", + "html": html, + "author_name": video_data.get("author"), + "author_url": f"https://{parsed_url.netloc}/channel/{video_data.get('authorId')}", + } + return data + else: + log.warning(f"Unable to fetch video data from Invidious API for {video_id}") + return None