comparison libervia/backend/plugins/plugin_misc_url_preview.py @ 4103:eaa0daa7f834

plugin URL preview: URL preview first draft
author Goffi <goffi@goffi.org>
date Tue, 27 Jun 2023 15:48:15 +0200
parents
children
comparison
equal deleted inserted replaced
4102:c0bb4b3fdccf 4103:eaa0daa7f834
1 #!/usr/bin/env python3
2
3
4 # Libervia plugin to handle events
5 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org)
6
7 # This program is free software: you can redistribute it and/or modify
8 # it under the terms of the GNU Affero General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU Affero General Public License for more details.
16
17 # You should have received a copy of the GNU Affero General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
19
20 from dataclasses import dataclass
21 import json
22 from textwrap import dedent
23 from typing import Callable, Dict, List, Optional, Union
24 from urllib import parse
25 import fnmatch
26
27 from lxml import etree
28 import treq
29 from twisted.internet import defer
30
31 from libervia.backend.core.constants import Const as C
32 from libervia.backend.core.core_types import SatXMPPEntity
33 from libervia.backend.core.exceptions import ConflictError
34 from libervia.backend.core.i18n import _
35 from libervia.backend.core.log import getLogger
36 from libervia.backend.tools.common import data_format
37 from libervia.backend.tools.common.async_utils import async_lru
38
39 log = getLogger(__name__)
40
41 PLUGIN_INFO = {
42 C.PI_NAME: "Preview",
43 C.PI_IMPORT_NAME: "Preview",
44 C.PI_TYPE: C.PLUG_TYPE_MISC,
45 C.PI_PROTOCOLS: ["Open Graph", "oEmbed"],
46 C.PI_DEPENDENCIES: ["TEXT_SYNTAXES"],
47 C.PI_MAIN: "Preview",
48 C.PI_HANDLER: "no",
49 C.PI_DESCRIPTION: dedent(
50 _(
51 """\
52 Retrieves and provides a preview of URLs using various protocols. Initially, it
53 uses the Open Graph protocol for most web pages. Specialized handlers are
54 implemented for YouTube using the oEmbed protocol.
55 """
56 )
57 ),
58 }
59
60 OG_TAGS = [
61 "title",
62 "type",
63 "image",
64 "url",
65 "audio",
66 "description",
67 "determiner",
68 "locale",
69 "locale:alternate",
70 "site_name",
71 "video",
72 ]
73
74
75 class PreviewFetchError(Exception):
76 pass
77
78
79 @dataclass
80 class Protocol:
81 name: str
82 callback: Callable
83 priority: int
84
85
86 class Preview:
87 protocols: Dict[str, Protocol] = {}
88 domain_protocols: Dict[str, str] = {}
89
90 def __init__(self, host):
91 log.info(_("Preview plugin initialization"))
92 self.host = host
93
94 # generic protocols
95
96 self.register("open_graph", self.fetch_open_graph_data, priority=100)
97 self.register("oembed", self.fetch_generic_oembed_data, priority=50)
98 self.register("generic", self.fetch_generic_data, priority=0)
99
100 # domain specific protocols
101
102 self.register("oembed-youtube", self.fetch_youtube_oembed_data, priority=-100)
103 self.register_domain_protocol(
104 ["www.youtube.com", "youtu.be", "m.youtube.com"], "oembed-youtube"
105 )
106
107 self.register("wikipedia", self.fetch_wikipedia_data, priority=-80)
108 self.register_domain_protocol(["*.wikipedia.org"], "wikipedia")
109
110 self.register("invidious", self.fetch_invidious_data, priority=-90)
111 self.register_domain_protocol(
112 ["yewtu.be", "www.yewtu.be", "invidious.fdn.fr"],
113 "invidious"
114 )
115
116 # bridge methods
117
118 host.bridge.add_method(
119 "url_preview_get",
120 ".plugin",
121 in_sign="sss",
122 out_sign="s",
123 method=self._url_preview_get,
124 async_=True,
125 )
126
127 # API
128
129 def _url_preview_get(self, url: str, options: str, profile_key: str) -> defer.Deferred:
130 client = self.host.get_client(profile_key)
131 d = defer.ensureDeferred(
132 self.get_preview_data(client, url, data_format.deserialise(options))
133 )
134 d.addCallback(data_format.serialise)
135 return d
136
137 @async_lru()
138 async def get_preview_data(
139 self, client: SatXMPPEntity, url: str, options: dict
140 ) -> Optional[dict]:
141 """Fetch preview data from a url using registered protocols
142
143 @param url: The url to fetch the preview data from
144 @param options: Additional options that may be used while fetching preview data
145 @return: A dictionary containing the preview data or None if no data could be
146 fetched
147 """
148 parsed_url = parse.urlparse(url)
149 domain = parsed_url.netloc
150
151 preview_data: Optional[dict] = None
152 matched_protocol = None
153 for registered_domain, registered_protocol in self.domain_protocols.items():
154 if fnmatch.fnmatch(domain, registered_domain):
155 matched_protocol = registered_protocol
156 break
157
158 if matched_protocol is not None:
159 callback = self.protocols[matched_protocol].callback
160 preview_data = await callback(client, url, options)
161 else:
162 for name, protocol in sorted(
163 self.protocols.items(), key=lambda item: item[1].priority, reverse=True
164 ):
165 try:
166 preview_data = await protocol.callback(client, url, options)
167 except Exception as e:
168 log.warning(f"Can't run protocol {name} for {url}: {e}")
169 else:
170 if preview_data is not None:
171 matched_protocol = protocol.name
172 break
173
174 if preview_data is not None:
175 preview_data["protocol"] = matched_protocol
176 # we don't clean html for youtube as we need Javascript to make it work, and
177 # for invidious as we generate it ourself
178 if "html" in preview_data:
179 if matched_protocol in ("oembed-youtube", "invidious"):
180 # this flag indicate that we know the source of HTML and we should be
181 # able to trust it. This will add `allow-scripts` and
182 # `allow-same-origin` in the preview <iframe> "sandbox" attribute
183 preview_data["html_known"] = True
184 else:
185 preview_data["html_known"] = False
186 clean_xhtml = self.host.plugins["TEXT_SYNTAXES"].clean_xhtml
187 try:
188 preview_data["html"] = clean_xhtml(preview_data["html"])
189 except Exception as e:
190 log.warning(f"Can't clean html data: {e}\n{preview_data}")
191 del preview_data["html"]
192
193
194 return preview_data
195
196 @classmethod
197 def register(cls, name: str, callback: Callable, priority: int = 0):
198 """Register a protocol to retrieve preview data
199
200 The registered callback should return a dictionary of preview data if available,
201 or None otherwise.
202
203 @param name: Unique name of the protocol
204 @param callback: Async callback function to fetch preview data
205 @param priority: Priority of the protocol, with higher numbers indicating higher
206 priority
207 @return: None
208 """
209 if name in cls.protocols:
210 raise ConflictError(f"Protocol with the name {name} is already registered.")
211
212 cls.protocols[name] = Protocol(name=name, callback=callback, priority=priority)
213
214 @classmethod
215 def register_domain_protocol(cls, domains: Union[str, List[str]], protocol_name: str):
216 """Register a protocol for a specific domain or list of domains
217
218 @param domains: The domain name or list of domain names
219 @param protocol_name: The name of the protocol to be associated with the domain(s)
220 @return: None
221 """
222 protocol_name = protocol_name.replace(" ", "").lower()
223 if protocol_name not in cls.protocols:
224 raise ConflictError(
225 f"Protocol with the name {protocol_name} is not registered."
226 )
227
228 if isinstance(domains, str):
229 domains = [domains]
230
231 for domain in domains:
232 domain = domain.strip()
233 if not domain:
234 log.warning("empty string used as domain, ignoring")
235 continue
236 cls.domain_protocols[domain] = protocol_name
237
238 # Open Graph
239
240 async def fetch_open_graph_data(
241 self, client: SatXMPPEntity, url: str, options: dict
242 ) -> Optional[dict]:
243 """Fetch Open Graph data from a url
244
245 This method implements the Open Graph protocol, details of which can be found at:
246 http://ogp.me/
247
248 @param url: The url to fetch the Open Graph data from
249 @param options: Additional options that may be used while fetching data
250 @return: A dictionary containing the Open Graph data or None if no data could be
251 fetched
252 """
253 resp = await treq.get(url)
254
255 if resp.code == 200:
256 html = await resp.text()
257 parser = etree.HTMLParser()
258 tree = etree.fromstring(html, parser)
259
260 # Extract Open Graph data
261 metadata = {}
262 for tag in OG_TAGS:
263 og_el = tree.find('.//meta[@property="og:{tag}"]'.format(tag=tag))
264 if og_el is not None:
265 metadata[tag] = og_el.get("content")
266
267 if metadata:
268 if "site_name" in metadata and not "provider_name" in metadata:
269 metadata["provider_name"] = metadata["site_name"]
270 return metadata
271
272 return None
273 else:
274 raise PreviewFetchError(
275 f"Failed to fetch preview for {url}, status code: {resp.code}"
276 )
277
278 # oEmbed
279
280 async def _fetch_oembed_data(self, oembed_url: str) -> Optional[dict]:
281 """Fetch oEmbed data from a given oEmbed URL
282
283 @param oembed_url: The url to fetch the oEmbed data from
284 @return: A dictionary containing the oEmbed data or None if no data could be
285 fetched
286 """
287 resp = await treq.get(oembed_url)
288 if resp.code == 200:
289 return json.loads(await resp.text())
290 else:
291 raise PreviewFetchError(
292 f"Failed to fetch oEmbed preview for {oembed_url}, status code: "
293 f"{resp.code}"
294 )
295
296 async def fetch_youtube_oembed_data(
297 self, client: SatXMPPEntity, url: str, options: dict
298 ) -> Optional[dict]:
299 """Fetch YouTube oEmbed data from a url
300
301 @param url: The url to fetch the YouTube oEmbed data from
302 @param options: Additional options that may be used while fetching data
303 @return: A dictionary containing the YouTube oEmbed data or None if no data could
304 be fetched
305 """
306 oembed_url = f"https://www.youtube.com/oembed?url={parse.quote(url)}&format=json"
307 data = await self._fetch_oembed_data(oembed_url)
308 if data is not None and 'html' in data:
309 html = data['html']
310 root = etree.HTML(html)
311 iframe_elt = root.xpath('//iframe')
312 if iframe_elt:
313 iframe_elt[0].attrib['style'] = (
314 'position: absolute; top: 0; left: 0; width: 100%; height: 100%;'
315 )
316 data['html'] = etree.tostring(root, method='html', encoding='unicode')
317 else:
318 log.warning("No <iframe> found in the YouTube oEmbed response")
319
320 return data
321
322 async def fetch_generic_oembed_data(
323 self, client: SatXMPPEntity, url: str, options: dict
324 ) -> Optional[dict]:
325 """Fetch generic oEmbed data from a url
326
327 @param url: The url to fetch the oEmbed data from
328 @param options: Additional options that may be used while fetching data
329 @return: A dictionary containing the oEmbed data or None if no data could be
330 fetched
331 """
332 resp = await treq.get(url)
333 if resp.code == 200:
334 html = await resp.text()
335 parser = etree.HTMLParser()
336 tree = etree.fromstring(html, parser)
337
338 # Find oEmbed URL
339 oembed_link = tree.find('.//link[@type="application/json+oembed"]')
340 if oembed_link is not None:
341 oembed_url = oembed_link.get("href")
342 return await self._fetch_oembed_data(oembed_url)
343 else:
344 return None
345 else:
346 raise PreviewFetchError(
347 f"Failed to fetch preview for {url}, status code: {resp.code}"
348 )
349
350
351 async def fetch_generic_data(
352 self, client: SatXMPPEntity, url: str, options: dict
353 ) -> Optional[dict]:
354 """Fetch generic data from a url
355
356 This method attempts to extract the title, description, and author metadata from
357 the HTML of the page. If these data cannot be found, the method will return None.
358
359 @param url: The url to fetch the generic data from
360 @param options: Additional options that may be used while fetching data
361 @return: A dictionary containing the generic data or None if no data could be
362 fetched
363 """
364 resp = await treq.get(url)
365 if resp.code == 200:
366 html = await resp.text()
367 parser = etree.HTMLParser()
368 tree = etree.fromstring(html, parser)
369
370 # Find title, description, and author metadata
371 title_el = tree.find(".//title")
372 desc_el = tree.find('.//meta[@name="description"]')
373 author_el = tree.find('.//meta[@name="author"]')
374
375 metadata = {
376 "title": title_el.text if title_el is not None else "",
377 "description": desc_el.get("content") if desc_el is not None else "",
378 "author_name": author_el.get("content") if author_el is not None else "",
379 "url": url,
380 "provider_name": parse.urlparse(url).netloc,
381 "provider_url": f"{parse.urlparse(url).scheme}://{parse.urlparse(url).netloc}",
382 }
383
384 return metadata if any(metadata.values()) else None
385 else:
386 raise PreviewFetchError(
387 f"Failed to fetch generic preview for {url}, status code: {resp.code}"
388 )
389
390 # Wikipedia
391
392 async def fetch_wikipedia_data(
393 self, client: SatXMPPEntity, url: str, options: dict
394 ) -> Optional[dict]:
395 """Fetch Wikipedia data from a url
396
397 This method implements the Wikipedia API, details of which can be found at:
398 https://www.mediawiki.org/wiki/API:Main_page
399
400 @param url: The url to fetch the Wikipedia data from
401 @param options: Additional options that may be used while fetching data
402 @return: A dictionary containing the Wikipedia data or None if no data could be
403 fetched
404 """
405 parsed_url = parse.urlparse(url)
406 page_name = parsed_url.path.split("/")[-1]
407
408 # Use the Wikipedia API to get a summary of the page and a preview image
409 api_url = (
410 f"https://{parsed_url.netloc}/w/api.php?format=json&action=query&"
411 f"prop=extracts|pageimages&exintro&explaintext&redirects=1&piprop=thumbnail"
412 f"&pithumbsize=300&titles={page_name}"
413 )
414
415 resp = await treq.get(api_url)
416 if resp.code == 200:
417 data = json.loads(await resp.text())
418 # The page ID is the first key in the "pages" dictionary
419 page_id = next(iter(data["query"]["pages"].keys()))
420 page = data["query"]["pages"][page_id]
421
422 # The API may return a page with a missing title or extract if the page does
423 # not exist
424 if "missing" in page:
425 return None
426
427 return {
428 "provider_name": "Wikipedia",
429 "provider_url": "https://www.wikipedia.org",
430 "title": page.get("title"),
431 "description": page.get("extract"),
432 "url": url,
433 "image": page.get("thumbnail", {}).get("source")
434 if "thumbnail" in page
435 else None,
436 }
437 else:
438 raise PreviewFetchError(
439 f"Failed to fetch Wikipedia preview for {url}, status code: {resp.code}"
440 )
441
442 # Invidious
443
444 async def fetch_invidious_data(self, client: SatXMPPEntity, url: str, options: dict) -> Optional[dict]:
445 """
446 Fetch Invidious data from a url and generate HTML iframe.
447
448 @param url: The url to fetch the Invidious data from.
449 @param options: Additional options that may be used while fetching data.
450 @return: A dictionary containing the Invidious data or None if no data could be fetched.
451 """
452 parsed_url = parse.urlparse(url)
453 if 'watch' in parsed_url.path:
454 video_id = parse.parse_qs(parsed_url.query).get('v', [None])[0]
455 else:
456 video_id = parsed_url.path.strip('/')
457 if not video_id:
458 log.warning(f"Can't extract video ID from {url}")
459 return None
460
461 invidious_api_url = f"https://{parsed_url.netloc}/api/v1/videos/{video_id}"
462
463 resp = await treq.get(invidious_api_url)
464 if resp.code == 200:
465 video_data = await resp.json()
466 # construct the iframe html code
467 html = (
468 f'<iframe'
469 f' width="100%"'
470 f' height="auto"'
471 f' src="https://{parsed_url.netloc}/embed/{video_id}"'
472 f' frameborder="0" '
473 f' allow="'
474 f' accelerometer;'
475 f' autoplay;'
476 f' clipboard-write;'
477 f' encrypted-media;'
478 f' gyroscope;'
479 f' picture-in-picture"'
480 f' style="'
481 f' position: absolute;'
482 f' top: 0;'
483 f' left: 0;'
484 f' width: 100%;'
485 f' height: 100%;"'
486 f' allowfullscreen></iframe>'
487 )
488 # structure the data to be returned
489 data = {
490 "title": video_data.get("title"),
491 "description": video_data.get("description"),
492 "url": url,
493 "image": video_data.get("videoThumbnails", [{}])[0].get("url"),
494 "provider_name": "Invidious",
495 "provider_url": f"https://{parsed_url.netloc}",
496 "html": html,
497 "author_name": video_data.get("author"),
498 "author_url": f"https://{parsed_url.netloc}/channel/{video_data.get('authorId')}",
499 }
500 return data
501 else:
502 log.warning(f"Unable to fetch video data from Invidious API for {video_id}")
503 return None