changeset 4103:eaa0daa7f834

plugin URL preview: URL preview first draft
author Goffi <goffi@goffi.org>
date Tue, 27 Jun 2023 15:48:15 +0200
parents c0bb4b3fdccf
children 58bbc100f13b
files libervia/backend/plugins/plugin_misc_url_preview.py
diffstat 1 files changed, 503 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/libervia/backend/plugins/plugin_misc_url_preview.py	Tue Jun 27 15:48:15 2023 +0200
@@ -0,0 +1,503 @@
+#!/usr/bin/env python3
+
+
+# Libervia plugin to handle events
+# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+from dataclasses import dataclass
+import json
+from textwrap import dedent
+from typing import Callable, Dict, List, Optional, Union
+from urllib import parse
+import fnmatch
+
+from lxml import etree
+import treq
+from twisted.internet import defer
+
+from libervia.backend.core.constants import Const as C
+from libervia.backend.core.core_types import SatXMPPEntity
+from libervia.backend.core.exceptions import ConflictError
+from libervia.backend.core.i18n import _
+from libervia.backend.core.log import getLogger
+from libervia.backend.tools.common import data_format
+from libervia.backend.tools.common.async_utils import async_lru
+
+log = getLogger(__name__)
+
+PLUGIN_INFO = {
+    C.PI_NAME: "Preview",
+    C.PI_IMPORT_NAME: "Preview",
+    C.PI_TYPE: C.PLUG_TYPE_MISC,
+    C.PI_PROTOCOLS: ["Open Graph", "oEmbed"],
+    C.PI_DEPENDENCIES: ["TEXT_SYNTAXES"],
+    C.PI_MAIN: "Preview",
+    C.PI_HANDLER: "no",
+    C.PI_DESCRIPTION: dedent(
+        _(
+            """\
+    Retrieves and provides a preview of URLs using various protocols. Initially, it
+    uses the Open Graph protocol for most web pages. Specialized handlers are
+    implemented for YouTube using the oEmbed protocol.
+    """
+        )
+    ),
+}
+
+OG_TAGS = [
+    "title",
+    "type",
+    "image",
+    "url",
+    "audio",
+    "description",
+    "determiner",
+    "locale",
+    "locale:alternate",
+    "site_name",
+    "video",
+]
+
+
+class PreviewFetchError(Exception):
+    pass
+
+
+@dataclass
+class Protocol:
+    name: str
+    callback: Callable
+    priority: int
+
+
+class Preview:
+    protocols: Dict[str, Protocol] = {}
+    domain_protocols: Dict[str, str] = {}
+
+    def __init__(self, host):
+        log.info(_("Preview plugin initialization"))
+        self.host = host
+
+        # generic protocols
+
+        self.register("open_graph", self.fetch_open_graph_data, priority=100)
+        self.register("oembed", self.fetch_generic_oembed_data, priority=50)
+        self.register("generic", self.fetch_generic_data, priority=0)
+
+        # domain specific protocols
+
+        self.register("oembed-youtube", self.fetch_youtube_oembed_data, priority=-100)
+        self.register_domain_protocol(
+            ["www.youtube.com", "youtu.be", "m.youtube.com"], "oembed-youtube"
+        )
+
+        self.register("wikipedia", self.fetch_wikipedia_data, priority=-80)
+        self.register_domain_protocol(["*.wikipedia.org"], "wikipedia")
+
+        self.register("invidious", self.fetch_invidious_data, priority=-90)
+        self.register_domain_protocol(
+            ["yewtu.be", "www.yewtu.be", "invidious.fdn.fr"],
+            "invidious"
+        )
+
+        # bridge methods
+
+        host.bridge.add_method(
+            "url_preview_get",
+            ".plugin",
+            in_sign="sss",
+            out_sign="s",
+            method=self._url_preview_get,
+            async_=True,
+        )
+
+    # API
+
+    def _url_preview_get(self, url: str, options: str, profile_key: str) -> defer.Deferred:
+        client = self.host.get_client(profile_key)
+        d = defer.ensureDeferred(
+            self.get_preview_data(client, url, data_format.deserialise(options))
+        )
+        d.addCallback(data_format.serialise)
+        return d
+
+    @async_lru()
+    async def get_preview_data(
+        self, client: SatXMPPEntity, url: str, options: dict
+    ) -> Optional[dict]:
+        """Fetch preview data from a url using registered protocols
+
+        @param url: The url to fetch the preview data from
+        @param options: Additional options that may be used while fetching preview data
+        @return: A dictionary containing the preview data or None if no data could be
+            fetched
+        """
+        parsed_url = parse.urlparse(url)
+        domain = parsed_url.netloc
+
+        preview_data: Optional[dict] = None
+        matched_protocol = None
+        for registered_domain, registered_protocol in self.domain_protocols.items():
+            if fnmatch.fnmatch(domain, registered_domain):
+                matched_protocol = registered_protocol
+                break
+
+        if matched_protocol is not None:
+            callback = self.protocols[matched_protocol].callback
+            preview_data = await callback(client, url, options)
+        else:
+            for name, protocol in sorted(
+                self.protocols.items(), key=lambda item: item[1].priority, reverse=True
+            ):
+                try:
+                    preview_data = await protocol.callback(client, url, options)
+                except Exception as e:
+                    log.warning(f"Can't run protocol {name} for {url}: {e}")
+                else:
+                    if preview_data is not None:
+                        matched_protocol = protocol.name
+                        break
+
+        if preview_data is not None:
+            preview_data["protocol"] = matched_protocol
+            # we don't clean html for youtube as we need Javascript to make it work, and
+            # for invidious as we generate it ourself
+            if "html" in preview_data:
+                if matched_protocol in ("oembed-youtube", "invidious"):
+                    # this flag indicate that we know the source of HTML and we should be
+                    # able to trust it. This will add `allow-scripts` and
+                    # `allow-same-origin` in the preview <iframe> "sandbox" attribute
+                    preview_data["html_known"] = True
+                else:
+                    preview_data["html_known"] = False
+                    clean_xhtml = self.host.plugins["TEXT_SYNTAXES"].clean_xhtml
+                    try:
+                        preview_data["html"] = clean_xhtml(preview_data["html"])
+                    except Exception as e:
+                        log.warning(f"Can't clean html data: {e}\n{preview_data}")
+                        del preview_data["html"]
+
+
+        return preview_data
+
+    @classmethod
+    def register(cls, name: str, callback: Callable, priority: int = 0):
+        """Register a protocol to retrieve preview data
+
+        The registered callback should return a dictionary of preview data if available,
+        or None otherwise.
+
+        @param name: Unique name of the protocol
+        @param callback: Async callback function to fetch preview data
+        @param priority: Priority of the protocol, with higher numbers indicating higher
+            priority
+        @return: None
+        """
+        if name in cls.protocols:
+            raise ConflictError(f"Protocol with the name {name} is already registered.")
+
+        cls.protocols[name] = Protocol(name=name, callback=callback, priority=priority)
+
+    @classmethod
+    def register_domain_protocol(cls, domains: Union[str, List[str]], protocol_name: str):
+        """Register a protocol for a specific domain or list of domains
+
+        @param domains: The domain name or list of domain names
+        @param protocol_name: The name of the protocol to be associated with the domain(s)
+        @return: None
+        """
+        protocol_name = protocol_name.replace(" ", "").lower()
+        if protocol_name not in cls.protocols:
+            raise ConflictError(
+                f"Protocol with the name {protocol_name} is not registered."
+            )
+
+        if isinstance(domains, str):
+            domains = [domains]
+
+        for domain in domains:
+            domain = domain.strip()
+            if not domain:
+                log.warning("empty string used as domain, ignoring")
+                continue
+            cls.domain_protocols[domain] = protocol_name
+
+    # Open Graph
+
+    async def fetch_open_graph_data(
+        self, client: SatXMPPEntity, url: str, options: dict
+    ) -> Optional[dict]:
+        """Fetch Open Graph data from a url
+
+        This method implements the Open Graph protocol, details of which can be found at:
+        http://ogp.me/
+
+        @param url: The url to fetch the Open Graph data from
+        @param options: Additional options that may be used while fetching data
+        @return: A dictionary containing the Open Graph data or None if no data could be
+            fetched
+        """
+        resp = await treq.get(url)
+
+        if resp.code == 200:
+            html = await resp.text()
+            parser = etree.HTMLParser()
+            tree = etree.fromstring(html, parser)
+
+            # Extract Open Graph data
+            metadata = {}
+            for tag in OG_TAGS:
+                og_el = tree.find('.//meta[@property="og:{tag}"]'.format(tag=tag))
+                if og_el is not None:
+                    metadata[tag] = og_el.get("content")
+
+            if metadata:
+                if "site_name" in metadata and not "provider_name" in metadata:
+                    metadata["provider_name"] = metadata["site_name"]
+                return metadata
+
+            return None
+        else:
+            raise PreviewFetchError(
+                f"Failed to fetch preview for {url}, status code: {resp.code}"
+            )
+
+    # oEmbed
+
+    async def _fetch_oembed_data(self, oembed_url: str) -> Optional[dict]:
+        """Fetch oEmbed data from a given oEmbed URL
+
+        @param oembed_url: The url to fetch the oEmbed data from
+        @return: A dictionary containing the oEmbed data or None if no data could be
+            fetched
+        """
+        resp = await treq.get(oembed_url)
+        if resp.code == 200:
+            return json.loads(await resp.text())
+        else:
+            raise PreviewFetchError(
+                f"Failed to fetch oEmbed preview for {oembed_url}, status code: "
+                f"{resp.code}"
+            )
+
+    async def fetch_youtube_oembed_data(
+        self, client: SatXMPPEntity, url: str, options: dict
+    ) -> Optional[dict]:
+        """Fetch YouTube oEmbed data from a url
+
+        @param url: The url to fetch the YouTube oEmbed data from
+        @param options: Additional options that may be used while fetching data
+        @return: A dictionary containing the YouTube oEmbed data or None if no data could
+            be fetched
+        """
+        oembed_url = f"https://www.youtube.com/oembed?url={parse.quote(url)}&format=json"
+        data = await self._fetch_oembed_data(oembed_url)
+        if data is not None and 'html' in data:
+            html = data['html']
+            root = etree.HTML(html)
+            iframe_elt = root.xpath('//iframe')
+            if iframe_elt:
+                iframe_elt[0].attrib['style'] = (
+                    'position: absolute; top: 0; left: 0; width: 100%; height: 100%;'
+                )
+                data['html'] = etree.tostring(root, method='html', encoding='unicode')
+            else:
+                log.warning("No <iframe> found in the YouTube oEmbed response")
+
+        return data
+
+    async def fetch_generic_oembed_data(
+        self, client: SatXMPPEntity, url: str, options: dict
+    ) -> Optional[dict]:
+        """Fetch generic oEmbed data from a url
+
+        @param url: The url to fetch the oEmbed data from
+        @param options: Additional options that may be used while fetching data
+        @return: A dictionary containing the oEmbed data or None if no data could be
+            fetched
+        """
+        resp = await treq.get(url)
+        if resp.code == 200:
+            html = await resp.text()
+            parser = etree.HTMLParser()
+            tree = etree.fromstring(html, parser)
+
+            # Find oEmbed URL
+            oembed_link = tree.find('.//link[@type="application/json+oembed"]')
+            if oembed_link is not None:
+                oembed_url = oembed_link.get("href")
+                return await self._fetch_oembed_data(oembed_url)
+            else:
+                return None
+        else:
+            raise PreviewFetchError(
+                f"Failed to fetch preview for {url}, status code: {resp.code}"
+            )
+
+
+    async def fetch_generic_data(
+        self, client: SatXMPPEntity, url: str, options: dict
+    ) -> Optional[dict]:
+        """Fetch generic data from a url
+
+        This method attempts to extract the title, description, and author metadata from
+        the HTML of the page. If these data cannot be found, the method will return None.
+
+        @param url: The url to fetch the generic data from
+        @param options: Additional options that may be used while fetching data
+        @return: A dictionary containing the generic data or None if no data could be
+            fetched
+        """
+        resp = await treq.get(url)
+        if resp.code == 200:
+            html = await resp.text()
+            parser = etree.HTMLParser()
+            tree = etree.fromstring(html, parser)
+
+            # Find title, description, and author metadata
+            title_el = tree.find(".//title")
+            desc_el = tree.find('.//meta[@name="description"]')
+            author_el = tree.find('.//meta[@name="author"]')
+
+            metadata = {
+                "title": title_el.text if title_el is not None else "",
+                "description": desc_el.get("content") if desc_el is not None else "",
+                "author_name": author_el.get("content") if author_el is not None else "",
+                "url": url,
+                "provider_name": parse.urlparse(url).netloc,
+                "provider_url": f"{parse.urlparse(url).scheme}://{parse.urlparse(url).netloc}",
+            }
+
+            return metadata if any(metadata.values()) else None
+        else:
+            raise PreviewFetchError(
+                f"Failed to fetch generic preview for {url}, status code: {resp.code}"
+            )
+
+    # Wikipedia
+
+    async def fetch_wikipedia_data(
+        self, client: SatXMPPEntity, url: str, options: dict
+    ) -> Optional[dict]:
+        """Fetch Wikipedia data from a url
+
+        This method implements the Wikipedia API, details of which can be found at:
+        https://www.mediawiki.org/wiki/API:Main_page
+
+        @param url: The url to fetch the Wikipedia data from
+        @param options: Additional options that may be used while fetching data
+        @return: A dictionary containing the Wikipedia data or None if no data could be
+            fetched
+        """
+        parsed_url = parse.urlparse(url)
+        page_name = parsed_url.path.split("/")[-1]
+
+        # Use the Wikipedia API to get a summary of the page and a preview image
+        api_url = (
+            f"https://{parsed_url.netloc}/w/api.php?format=json&action=query&"
+            f"prop=extracts|pageimages&exintro&explaintext&redirects=1&piprop=thumbnail"
+            f"&pithumbsize=300&titles={page_name}"
+        )
+
+        resp = await treq.get(api_url)
+        if resp.code == 200:
+            data = json.loads(await resp.text())
+            # The page ID is the first key in the "pages" dictionary
+            page_id = next(iter(data["query"]["pages"].keys()))
+            page = data["query"]["pages"][page_id]
+
+            # The API may return a page with a missing title or extract if the page does
+            # not exist
+            if "missing" in page:
+                return None
+
+            return {
+                "provider_name": "Wikipedia",
+                "provider_url": "https://www.wikipedia.org",
+                "title": page.get("title"),
+                "description": page.get("extract"),
+                "url": url,
+                "image": page.get("thumbnail", {}).get("source")
+                if "thumbnail" in page
+                else None,
+            }
+        else:
+            raise PreviewFetchError(
+                f"Failed to fetch Wikipedia preview for {url}, status code: {resp.code}"
+            )
+
+    # Invidious
+
+    async def fetch_invidious_data(self, client: SatXMPPEntity, url: str, options: dict) -> Optional[dict]:
+        """
+        Fetch Invidious data from a url and generate HTML iframe.
+
+        @param url: The url to fetch the Invidious data from.
+        @param options: Additional options that may be used while fetching data.
+        @return: A dictionary containing the Invidious data or None if no data could be fetched.
+        """
+        parsed_url = parse.urlparse(url)
+        if 'watch' in parsed_url.path:
+            video_id = parse.parse_qs(parsed_url.query).get('v', [None])[0]
+        else:
+            video_id = parsed_url.path.strip('/')
+        if not video_id:
+            log.warning(f"Can't extract video ID from {url}")
+            return None
+
+        invidious_api_url = f"https://{parsed_url.netloc}/api/v1/videos/{video_id}"
+
+        resp = await treq.get(invidious_api_url)
+        if resp.code == 200:
+            video_data = await resp.json()
+            # construct the iframe html code
+            html = (
+                f'<iframe'
+                f'    width="100%"'
+                f'    height="auto"'
+                f'    src="https://{parsed_url.netloc}/embed/{video_id}"'
+                f'    frameborder="0" '
+                f'    allow="'
+                f'        accelerometer;'
+                f'        autoplay;'
+                f'        clipboard-write;'
+                f'        encrypted-media;'
+                f'        gyroscope;'
+                f'        picture-in-picture"'
+                f'    style="'
+                f'        position: absolute;'
+                f'        top: 0;'
+                f'        left: 0;'
+                f'        width: 100%;'
+                f'        height: 100%;"'
+                f'    allowfullscreen></iframe>'
+            )
+            # structure the data to be returned
+            data = {
+                "title": video_data.get("title"),
+                "description": video_data.get("description"),
+                "url": url,
+                "image": video_data.get("videoThumbnails", [{}])[0].get("url"),
+                "provider_name": "Invidious",
+                "provider_url": f"https://{parsed_url.netloc}",
+                "html": html,
+                "author_name": video_data.get("author"),
+                "author_url": f"https://{parsed_url.netloc}/channel/{video_data.get('authorId')}",
+            }
+            return data
+        else:
+            log.warning(f"Unable to fetch video data from Invidious API for {video_id}")
+            return None