view libervia/backend/plugins/plugin_misc_url_preview.py @ 4303:a7ec325246fb

component email-gateway: first draft: Initial implementation of the Email Gateway. This component uses XEP-0100 for registration. Upon registration and subsequent startups, a connection is made to registered IMAP services, and incoming emails (in `INBOX` mailboxes) are immediately forwarded as XMPP messages. In the opposite direction, an SMTP connection is established to send emails on incoming XMPP messages. rel 449
author Goffi <goffi@goffi.org>
date Fri, 06 Sep 2024 18:07:17 +0200
parents 0d7bb4df2343
children
line wrap: on
line source

#!/usr/bin/env python3


# Libervia plugin to handle events
# Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from dataclasses import dataclass
import json
from textwrap import dedent
from typing import Callable, Dict, List, Optional, Union
from urllib import parse
import fnmatch

from lxml import etree
import treq
from twisted.internet import defer

from libervia.backend.core.constants import Const as C
from libervia.backend.core.core_types import SatXMPPEntity
from libervia.backend.core.exceptions import ConflictError
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import data_format
from libervia.backend.tools.common.async_utils import async_lru

log = getLogger(__name__)

PLUGIN_INFO = {
    C.PI_NAME: "Preview",
    C.PI_IMPORT_NAME: "Preview",
    C.PI_TYPE: C.PLUG_TYPE_MISC,
    C.PI_PROTOCOLS: ["Open Graph", "oEmbed"],
    C.PI_DEPENDENCIES: ["TEXT_SYNTAXES"],
    C.PI_MAIN: "Preview",
    C.PI_HANDLER: "no",
    C.PI_DESCRIPTION: dedent(
        _(
            """\
    Retrieves and provides a preview of URLs using various protocols. Initially, it
    uses the Open Graph protocol for most web pages. Specialized handlers are
    implemented for YouTube using the oEmbed protocol.
    """
        )
    ),
}

OG_TAGS = [
    "title",
    "type",
    "image",
    "url",
    "audio",
    "description",
    "determiner",
    "locale",
    "locale:alternate",
    "site_name",
    "video",
]


class PreviewFetchError(Exception):
    pass


@dataclass
class Protocol:
    name: str
    callback: Callable
    priority: int


class Preview:
    protocols: Dict[str, Protocol] = {}
    domain_protocols: Dict[str, str] = {}

    def __init__(self, host):
        log.info(_("Preview plugin initialization"))
        self.host = host

        # generic protocols

        self.register("open_graph", self.fetch_open_graph_data, priority=100)
        self.register("oembed", self.fetch_generic_oembed_data, priority=50)
        self.register("generic", self.fetch_generic_data, priority=0)

        # domain specific protocols

        self.register("oembed-youtube", self.fetch_youtube_oembed_data, priority=-100)
        self.register_domain_protocol(
            ["www.youtube.com", "youtu.be", "m.youtube.com"], "oembed-youtube"
        )

        self.register("wikipedia", self.fetch_wikipedia_data, priority=-80)
        self.register_domain_protocol(["*.wikipedia.org"], "wikipedia")

        self.register("invidious", self.fetch_invidious_data, priority=-90)
        self.register_domain_protocol(
            ["yewtu.be", "www.yewtu.be", "invidious.fdn.fr"], "invidious"
        )

        # bridge methods

        host.bridge.add_method(
            "url_preview_get",
            ".plugin",
            in_sign="sss",
            out_sign="s",
            method=self._url_preview_get,
            async_=True,
        )

    # API

    def _url_preview_get(
        self, url: str, options: str, profile_key: str
    ) -> defer.Deferred:
        client = self.host.get_client(profile_key)
        d = defer.ensureDeferred(
            self.get_preview_data(client, url, data_format.deserialise(options))
        )
        d.addCallback(data_format.serialise)
        return d

    @async_lru()
    async def get_preview_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """Fetch preview data from a url using registered protocols

        @param url: The url to fetch the preview data from
        @param options: Additional options that may be used while fetching preview data
        @return: A dictionary containing the preview data or None if no data could be
            fetched
        """
        parsed_url = parse.urlparse(url)
        domain = parsed_url.netloc

        preview_data: Optional[dict] = None
        matched_protocol = None
        for registered_domain, registered_protocol in self.domain_protocols.items():
            if fnmatch.fnmatch(domain, registered_domain):
                matched_protocol = registered_protocol
                break

        if matched_protocol is not None:
            callback = self.protocols[matched_protocol].callback
            preview_data = await callback(client, url, options)
        else:
            for name, protocol in sorted(
                self.protocols.items(), key=lambda item: item[1].priority, reverse=True
            ):
                try:
                    preview_data = await protocol.callback(client, url, options)
                except Exception as e:
                    log.warning(f"Can't run protocol {name} for {url}: {e}")
                else:
                    if preview_data is not None:
                        matched_protocol = protocol.name
                        break

        if preview_data is not None:
            preview_data["protocol"] = matched_protocol
            # we don't clean html for youtube as we need Javascript to make it work, and
            # for invidious as we generate it ourself
            if "html" in preview_data:
                if matched_protocol in ("oembed-youtube", "invidious"):
                    # this flag indicate that we know the source of HTML and we should be
                    # able to trust it. This will add `allow-scripts` and
                    # `allow-same-origin` in the preview <iframe> "sandbox" attribute
                    preview_data["html_known"] = True
                else:
                    preview_data["html_known"] = False
                    clean_xhtml = self.host.plugins["TEXT_SYNTAXES"].clean_xhtml
                    try:
                        preview_data["html"] = clean_xhtml(preview_data["html"])
                    except Exception as e:
                        log.warning(f"Can't clean html data: {e}\n{preview_data}")
                        del preview_data["html"]

        return preview_data

    @classmethod
    def register(cls, name: str, callback: Callable, priority: int = 0):
        """Register a protocol to retrieve preview data

        The registered callback should return a dictionary of preview data if available,
        or None otherwise.

        @param name: Unique name of the protocol
        @param callback: Async callback function to fetch preview data
        @param priority: Priority of the protocol, with higher numbers indicating higher
            priority
        @return: None
        """
        if name in cls.protocols:
            raise ConflictError(f"Protocol with the name {name} is already registered.")

        cls.protocols[name] = Protocol(name=name, callback=callback, priority=priority)

    @classmethod
    def register_domain_protocol(cls, domains: Union[str, List[str]], protocol_name: str):
        """Register a protocol for a specific domain or list of domains

        @param domains: The domain name or list of domain names
        @param protocol_name: The name of the protocol to be associated with the domain(s)
        @return: None
        """
        protocol_name = protocol_name.replace(" ", "").lower()
        if protocol_name not in cls.protocols:
            raise ConflictError(
                f"Protocol with the name {protocol_name} is not registered."
            )

        if isinstance(domains, str):
            domains = [domains]

        for domain in domains:
            domain = domain.strip()
            if not domain:
                log.warning("empty string used as domain, ignoring")
                continue
            cls.domain_protocols[domain] = protocol_name

    # Open Graph

    async def fetch_open_graph_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """Fetch Open Graph data from a url

        This method implements the Open Graph protocol, details of which can be found at:
        http://ogp.me/

        @param url: The url to fetch the Open Graph data from
        @param options: Additional options that may be used while fetching data
        @return: A dictionary containing the Open Graph data or None if no data could be
            fetched
        """
        resp = await treq.get(url)

        if resp.code == 200:
            html = await resp.text()
            parser = etree.HTMLParser()
            tree = etree.fromstring(html, parser)

            # Extract Open Graph data
            metadata = {}
            for tag in OG_TAGS:
                og_el = tree.find('.//meta[@property="og:{tag}"]'.format(tag=tag))
                if og_el is not None:
                    metadata[tag] = og_el.get("content")

            if metadata:
                if "site_name" in metadata and not "provider_name" in metadata:
                    metadata["provider_name"] = metadata["site_name"]
                return metadata

            return None
        else:
            raise PreviewFetchError(
                f"Failed to fetch preview for {url}, status code: {resp.code}"
            )

    # oEmbed

    async def _fetch_oembed_data(self, oembed_url: str) -> Optional[dict]:
        """Fetch oEmbed data from a given oEmbed URL

        @param oembed_url: The url to fetch the oEmbed data from
        @return: A dictionary containing the oEmbed data or None if no data could be
            fetched
        """
        resp = await treq.get(oembed_url)
        if resp.code == 200:
            return json.loads(await resp.text())
        else:
            raise PreviewFetchError(
                f"Failed to fetch oEmbed preview for {oembed_url}, status code: "
                f"{resp.code}"
            )

    async def fetch_youtube_oembed_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """Fetch YouTube oEmbed data from a url

        @param url: The url to fetch the YouTube oEmbed data from
        @param options: Additional options that may be used while fetching data
        @return: A dictionary containing the YouTube oEmbed data or None if no data could
            be fetched
        """
        oembed_url = f"https://www.youtube.com/oembed?url={parse.quote(url)}&format=json"
        data = await self._fetch_oembed_data(oembed_url)
        if data is not None and "html" in data:
            html = data["html"]
            root = etree.HTML(html)
            iframe_elt = root.xpath("//iframe")
            if iframe_elt:
                iframe_elt[0].attrib[
                    "style"
                ] = "position: absolute; top: 0; left: 0; width: 100%; height: 100%;"
                data["html"] = etree.tostring(root, method="html", encoding="unicode")
            else:
                log.warning("No <iframe> found in the YouTube oEmbed response")

        return data

    async def fetch_generic_oembed_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """Fetch generic oEmbed data from a url

        @param url: The url to fetch the oEmbed data from
        @param options: Additional options that may be used while fetching data
        @return: A dictionary containing the oEmbed data or None if no data could be
            fetched
        """
        resp = await treq.get(url)
        if resp.code == 200:
            html = await resp.text()
            parser = etree.HTMLParser()
            tree = etree.fromstring(html, parser)

            # Find oEmbed URL
            oembed_link = tree.find('.//link[@type="application/json+oembed"]')
            if oembed_link is not None:
                oembed_url = oembed_link.get("href")
                return await self._fetch_oembed_data(oembed_url)
            else:
                return None
        else:
            raise PreviewFetchError(
                f"Failed to fetch preview for {url}, status code: {resp.code}"
            )

    async def fetch_generic_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """Fetch generic data from a url

        This method attempts to extract the title, description, and author metadata from
        the HTML of the page. If these data cannot be found, the method will return None.

        @param url: The url to fetch the generic data from
        @param options: Additional options that may be used while fetching data
        @return: A dictionary containing the generic data or None if no data could be
            fetched
        """
        resp = await treq.get(url)
        if resp.code == 200:
            html = await resp.text()
            parser = etree.HTMLParser()
            tree = etree.fromstring(html, parser)

            # Find title, description, and author metadata
            title_el = tree.find(".//title")
            desc_el = tree.find('.//meta[@name="description"]')
            author_el = tree.find('.//meta[@name="author"]')

            metadata = {
                "title": title_el.text if title_el is not None else "",
                "description": desc_el.get("content") if desc_el is not None else "",
                "author_name": author_el.get("content") if author_el is not None else "",
                "url": url,
                "provider_name": parse.urlparse(url).netloc,
                "provider_url": f"{parse.urlparse(url).scheme}://{parse.urlparse(url).netloc}",
            }

            return metadata if any(metadata.values()) else None
        else:
            raise PreviewFetchError(
                f"Failed to fetch generic preview for {url}, status code: {resp.code}"
            )

    # Wikipedia

    async def fetch_wikipedia_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """Fetch Wikipedia data from a url

        This method implements the Wikipedia API, details of which can be found at:
        https://www.mediawiki.org/wiki/API:Main_page

        @param url: The url to fetch the Wikipedia data from
        @param options: Additional options that may be used while fetching data
        @return: A dictionary containing the Wikipedia data or None if no data could be
            fetched
        """
        parsed_url = parse.urlparse(url)
        page_name = parsed_url.path.split("/")[-1]

        # Use the Wikipedia API to get a summary of the page and a preview image
        api_url = (
            f"https://{parsed_url.netloc}/w/api.php?format=json&action=query&"
            f"prop=extracts|pageimages&exintro&explaintext&redirects=1&piprop=thumbnail"
            f"&pithumbsize=300&titles={page_name}"
        )

        resp = await treq.get(api_url)
        if resp.code == 200:
            data = json.loads(await resp.text())
            # The page ID is the first key in the "pages" dictionary
            page_id = next(iter(data["query"]["pages"].keys()))
            page = data["query"]["pages"][page_id]

            # The API may return a page with a missing title or extract if the page does
            # not exist
            if "missing" in page:
                return None

            return {
                "provider_name": "Wikipedia",
                "provider_url": "https://www.wikipedia.org",
                "title": page.get("title"),
                "description": page.get("extract"),
                "url": url,
                "image": (
                    page.get("thumbnail", {}).get("source")
                    if "thumbnail" in page
                    else None
                ),
            }
        else:
            raise PreviewFetchError(
                f"Failed to fetch Wikipedia preview for {url}, status code: {resp.code}"
            )

    # Invidious

    async def fetch_invidious_data(
        self, client: SatXMPPEntity, url: str, options: dict
    ) -> Optional[dict]:
        """
        Fetch Invidious data from a url and generate HTML iframe.

        @param url: The url to fetch the Invidious data from.
        @param options: Additional options that may be used while fetching data.
        @return: A dictionary containing the Invidious data or None if no data could be fetched.
        """
        parsed_url = parse.urlparse(url)
        if "watch" in parsed_url.path:
            video_id = parse.parse_qs(parsed_url.query).get("v", [None])[0]
        else:
            video_id = parsed_url.path.strip("/")
        if not video_id:
            log.warning(f"Can't extract video ID from {url}")
            return None

        invidious_api_url = f"https://{parsed_url.netloc}/api/v1/videos/{video_id}"

        resp = await treq.get(invidious_api_url)
        if resp.code == 200:
            video_data = await resp.json()
            # construct the iframe html code
            html = (
                f"<iframe"
                f'    width="100%"'
                f'    height="auto"'
                f'    src="https://{parsed_url.netloc}/embed/{video_id}"'
                f'    frameborder="0" '
                f'    allow="'
                f"        accelerometer;"
                f"        autoplay;"
                f"        clipboard-write;"
                f"        encrypted-media;"
                f"        gyroscope;"
                f'        picture-in-picture"'
                f'    style="'
                f"        position: absolute;"
                f"        top: 0;"
                f"        left: 0;"
                f"        width: 100%;"
                f'        height: 100%;"'
                f"    allowfullscreen></iframe>"
            )
            # structure the data to be returned
            data = {
                "title": video_data.get("title"),
                "description": video_data.get("description"),
                "url": url,
                "image": video_data.get("videoThumbnails", [{}])[0].get("url"),
                "provider_name": "Invidious",
                "provider_url": f"https://{parsed_url.netloc}",
                "html": html,
                "author_name": video_data.get("author"),
                "author_url": f"https://{parsed_url.netloc}/channel/{video_data.get('authorId')}",
            }
            return data
        else:
            log.warning(f"Unable to fetch video data from Invidious API for {video_id}")
            return None