Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_misc_url_preview.py @ 4103:eaa0daa7f834
plugin URL preview: URL preview first draft
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 27 Jun 2023 15:48:15 +0200 |
parents | |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4102:c0bb4b3fdccf | 4103:eaa0daa7f834 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 | |
4 # Libervia plugin to handle events | |
5 # Copyright (C) 2009-2022 Jérôme Poisson (goffi@goffi.org) | |
6 | |
7 # This program is free software: you can redistribute it and/or modify | |
8 # it under the terms of the GNU Affero General Public License as published by | |
9 # the Free Software Foundation, either version 3 of the License, or | |
10 # (at your option) any later version. | |
11 | |
12 # This program is distributed in the hope that it will be useful, | |
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
15 # GNU Affero General Public License for more details. | |
16 | |
17 # You should have received a copy of the GNU Affero General Public License | |
18 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
19 | |
20 from dataclasses import dataclass | |
21 import json | |
22 from textwrap import dedent | |
23 from typing import Callable, Dict, List, Optional, Union | |
24 from urllib import parse | |
25 import fnmatch | |
26 | |
27 from lxml import etree | |
28 import treq | |
29 from twisted.internet import defer | |
30 | |
31 from libervia.backend.core.constants import Const as C | |
32 from libervia.backend.core.core_types import SatXMPPEntity | |
33 from libervia.backend.core.exceptions import ConflictError | |
34 from libervia.backend.core.i18n import _ | |
35 from libervia.backend.core.log import getLogger | |
36 from libervia.backend.tools.common import data_format | |
37 from libervia.backend.tools.common.async_utils import async_lru | |
38 | |
39 log = getLogger(__name__) | |
40 | |
41 PLUGIN_INFO = { | |
42 C.PI_NAME: "Preview", | |
43 C.PI_IMPORT_NAME: "Preview", | |
44 C.PI_TYPE: C.PLUG_TYPE_MISC, | |
45 C.PI_PROTOCOLS: ["Open Graph", "oEmbed"], | |
46 C.PI_DEPENDENCIES: ["TEXT_SYNTAXES"], | |
47 C.PI_MAIN: "Preview", | |
48 C.PI_HANDLER: "no", | |
49 C.PI_DESCRIPTION: dedent( | |
50 _( | |
51 """\ | |
52 Retrieves and provides a preview of URLs using various protocols. Initially, it | |
53 uses the Open Graph protocol for most web pages. Specialized handlers are | |
54 implemented for YouTube using the oEmbed protocol. | |
55 """ | |
56 ) | |
57 ), | |
58 } | |
59 | |
60 OG_TAGS = [ | |
61 "title", | |
62 "type", | |
63 "image", | |
64 "url", | |
65 "audio", | |
66 "description", | |
67 "determiner", | |
68 "locale", | |
69 "locale:alternate", | |
70 "site_name", | |
71 "video", | |
72 ] | |
73 | |
74 | |
75 class PreviewFetchError(Exception): | |
76 pass | |
77 | |
78 | |
79 @dataclass | |
80 class Protocol: | |
81 name: str | |
82 callback: Callable | |
83 priority: int | |
84 | |
85 | |
86 class Preview: | |
87 protocols: Dict[str, Protocol] = {} | |
88 domain_protocols: Dict[str, str] = {} | |
89 | |
90 def __init__(self, host): | |
91 log.info(_("Preview plugin initialization")) | |
92 self.host = host | |
93 | |
94 # generic protocols | |
95 | |
96 self.register("open_graph", self.fetch_open_graph_data, priority=100) | |
97 self.register("oembed", self.fetch_generic_oembed_data, priority=50) | |
98 self.register("generic", self.fetch_generic_data, priority=0) | |
99 | |
100 # domain specific protocols | |
101 | |
102 self.register("oembed-youtube", self.fetch_youtube_oembed_data, priority=-100) | |
103 self.register_domain_protocol( | |
104 ["www.youtube.com", "youtu.be", "m.youtube.com"], "oembed-youtube" | |
105 ) | |
106 | |
107 self.register("wikipedia", self.fetch_wikipedia_data, priority=-80) | |
108 self.register_domain_protocol(["*.wikipedia.org"], "wikipedia") | |
109 | |
110 self.register("invidious", self.fetch_invidious_data, priority=-90) | |
111 self.register_domain_protocol( | |
112 ["yewtu.be", "www.yewtu.be", "invidious.fdn.fr"], | |
113 "invidious" | |
114 ) | |
115 | |
116 # bridge methods | |
117 | |
118 host.bridge.add_method( | |
119 "url_preview_get", | |
120 ".plugin", | |
121 in_sign="sss", | |
122 out_sign="s", | |
123 method=self._url_preview_get, | |
124 async_=True, | |
125 ) | |
126 | |
127 # API | |
128 | |
129 def _url_preview_get(self, url: str, options: str, profile_key: str) -> defer.Deferred: | |
130 client = self.host.get_client(profile_key) | |
131 d = defer.ensureDeferred( | |
132 self.get_preview_data(client, url, data_format.deserialise(options)) | |
133 ) | |
134 d.addCallback(data_format.serialise) | |
135 return d | |
136 | |
137 @async_lru() | |
138 async def get_preview_data( | |
139 self, client: SatXMPPEntity, url: str, options: dict | |
140 ) -> Optional[dict]: | |
141 """Fetch preview data from a url using registered protocols | |
142 | |
143 @param url: The url to fetch the preview data from | |
144 @param options: Additional options that may be used while fetching preview data | |
145 @return: A dictionary containing the preview data or None if no data could be | |
146 fetched | |
147 """ | |
148 parsed_url = parse.urlparse(url) | |
149 domain = parsed_url.netloc | |
150 | |
151 preview_data: Optional[dict] = None | |
152 matched_protocol = None | |
153 for registered_domain, registered_protocol in self.domain_protocols.items(): | |
154 if fnmatch.fnmatch(domain, registered_domain): | |
155 matched_protocol = registered_protocol | |
156 break | |
157 | |
158 if matched_protocol is not None: | |
159 callback = self.protocols[matched_protocol].callback | |
160 preview_data = await callback(client, url, options) | |
161 else: | |
162 for name, protocol in sorted( | |
163 self.protocols.items(), key=lambda item: item[1].priority, reverse=True | |
164 ): | |
165 try: | |
166 preview_data = await protocol.callback(client, url, options) | |
167 except Exception as e: | |
168 log.warning(f"Can't run protocol {name} for {url}: {e}") | |
169 else: | |
170 if preview_data is not None: | |
171 matched_protocol = protocol.name | |
172 break | |
173 | |
174 if preview_data is not None: | |
175 preview_data["protocol"] = matched_protocol | |
176 # we don't clean html for youtube as we need Javascript to make it work, and | |
177 # for invidious as we generate it ourself | |
178 if "html" in preview_data: | |
179 if matched_protocol in ("oembed-youtube", "invidious"): | |
180 # this flag indicate that we know the source of HTML and we should be | |
181 # able to trust it. This will add `allow-scripts` and | |
182 # `allow-same-origin` in the preview <iframe> "sandbox" attribute | |
183 preview_data["html_known"] = True | |
184 else: | |
185 preview_data["html_known"] = False | |
186 clean_xhtml = self.host.plugins["TEXT_SYNTAXES"].clean_xhtml | |
187 try: | |
188 preview_data["html"] = clean_xhtml(preview_data["html"]) | |
189 except Exception as e: | |
190 log.warning(f"Can't clean html data: {e}\n{preview_data}") | |
191 del preview_data["html"] | |
192 | |
193 | |
194 return preview_data | |
195 | |
196 @classmethod | |
197 def register(cls, name: str, callback: Callable, priority: int = 0): | |
198 """Register a protocol to retrieve preview data | |
199 | |
200 The registered callback should return a dictionary of preview data if available, | |
201 or None otherwise. | |
202 | |
203 @param name: Unique name of the protocol | |
204 @param callback: Async callback function to fetch preview data | |
205 @param priority: Priority of the protocol, with higher numbers indicating higher | |
206 priority | |
207 @return: None | |
208 """ | |
209 if name in cls.protocols: | |
210 raise ConflictError(f"Protocol with the name {name} is already registered.") | |
211 | |
212 cls.protocols[name] = Protocol(name=name, callback=callback, priority=priority) | |
213 | |
214 @classmethod | |
215 def register_domain_protocol(cls, domains: Union[str, List[str]], protocol_name: str): | |
216 """Register a protocol for a specific domain or list of domains | |
217 | |
218 @param domains: The domain name or list of domain names | |
219 @param protocol_name: The name of the protocol to be associated with the domain(s) | |
220 @return: None | |
221 """ | |
222 protocol_name = protocol_name.replace(" ", "").lower() | |
223 if protocol_name not in cls.protocols: | |
224 raise ConflictError( | |
225 f"Protocol with the name {protocol_name} is not registered." | |
226 ) | |
227 | |
228 if isinstance(domains, str): | |
229 domains = [domains] | |
230 | |
231 for domain in domains: | |
232 domain = domain.strip() | |
233 if not domain: | |
234 log.warning("empty string used as domain, ignoring") | |
235 continue | |
236 cls.domain_protocols[domain] = protocol_name | |
237 | |
238 # Open Graph | |
239 | |
240 async def fetch_open_graph_data( | |
241 self, client: SatXMPPEntity, url: str, options: dict | |
242 ) -> Optional[dict]: | |
243 """Fetch Open Graph data from a url | |
244 | |
245 This method implements the Open Graph protocol, details of which can be found at: | |
246 http://ogp.me/ | |
247 | |
248 @param url: The url to fetch the Open Graph data from | |
249 @param options: Additional options that may be used while fetching data | |
250 @return: A dictionary containing the Open Graph data or None if no data could be | |
251 fetched | |
252 """ | |
253 resp = await treq.get(url) | |
254 | |
255 if resp.code == 200: | |
256 html = await resp.text() | |
257 parser = etree.HTMLParser() | |
258 tree = etree.fromstring(html, parser) | |
259 | |
260 # Extract Open Graph data | |
261 metadata = {} | |
262 for tag in OG_TAGS: | |
263 og_el = tree.find('.//meta[@property="og:{tag}"]'.format(tag=tag)) | |
264 if og_el is not None: | |
265 metadata[tag] = og_el.get("content") | |
266 | |
267 if metadata: | |
268 if "site_name" in metadata and not "provider_name" in metadata: | |
269 metadata["provider_name"] = metadata["site_name"] | |
270 return metadata | |
271 | |
272 return None | |
273 else: | |
274 raise PreviewFetchError( | |
275 f"Failed to fetch preview for {url}, status code: {resp.code}" | |
276 ) | |
277 | |
278 # oEmbed | |
279 | |
280 async def _fetch_oembed_data(self, oembed_url: str) -> Optional[dict]: | |
281 """Fetch oEmbed data from a given oEmbed URL | |
282 | |
283 @param oembed_url: The url to fetch the oEmbed data from | |
284 @return: A dictionary containing the oEmbed data or None if no data could be | |
285 fetched | |
286 """ | |
287 resp = await treq.get(oembed_url) | |
288 if resp.code == 200: | |
289 return json.loads(await resp.text()) | |
290 else: | |
291 raise PreviewFetchError( | |
292 f"Failed to fetch oEmbed preview for {oembed_url}, status code: " | |
293 f"{resp.code}" | |
294 ) | |
295 | |
296 async def fetch_youtube_oembed_data( | |
297 self, client: SatXMPPEntity, url: str, options: dict | |
298 ) -> Optional[dict]: | |
299 """Fetch YouTube oEmbed data from a url | |
300 | |
301 @param url: The url to fetch the YouTube oEmbed data from | |
302 @param options: Additional options that may be used while fetching data | |
303 @return: A dictionary containing the YouTube oEmbed data or None if no data could | |
304 be fetched | |
305 """ | |
306 oembed_url = f"https://www.youtube.com/oembed?url={parse.quote(url)}&format=json" | |
307 data = await self._fetch_oembed_data(oembed_url) | |
308 if data is not None and 'html' in data: | |
309 html = data['html'] | |
310 root = etree.HTML(html) | |
311 iframe_elt = root.xpath('//iframe') | |
312 if iframe_elt: | |
313 iframe_elt[0].attrib['style'] = ( | |
314 'position: absolute; top: 0; left: 0; width: 100%; height: 100%;' | |
315 ) | |
316 data['html'] = etree.tostring(root, method='html', encoding='unicode') | |
317 else: | |
318 log.warning("No <iframe> found in the YouTube oEmbed response") | |
319 | |
320 return data | |
321 | |
322 async def fetch_generic_oembed_data( | |
323 self, client: SatXMPPEntity, url: str, options: dict | |
324 ) -> Optional[dict]: | |
325 """Fetch generic oEmbed data from a url | |
326 | |
327 @param url: The url to fetch the oEmbed data from | |
328 @param options: Additional options that may be used while fetching data | |
329 @return: A dictionary containing the oEmbed data or None if no data could be | |
330 fetched | |
331 """ | |
332 resp = await treq.get(url) | |
333 if resp.code == 200: | |
334 html = await resp.text() | |
335 parser = etree.HTMLParser() | |
336 tree = etree.fromstring(html, parser) | |
337 | |
338 # Find oEmbed URL | |
339 oembed_link = tree.find('.//link[@type="application/json+oembed"]') | |
340 if oembed_link is not None: | |
341 oembed_url = oembed_link.get("href") | |
342 return await self._fetch_oembed_data(oembed_url) | |
343 else: | |
344 return None | |
345 else: | |
346 raise PreviewFetchError( | |
347 f"Failed to fetch preview for {url}, status code: {resp.code}" | |
348 ) | |
349 | |
350 | |
351 async def fetch_generic_data( | |
352 self, client: SatXMPPEntity, url: str, options: dict | |
353 ) -> Optional[dict]: | |
354 """Fetch generic data from a url | |
355 | |
356 This method attempts to extract the title, description, and author metadata from | |
357 the HTML of the page. If these data cannot be found, the method will return None. | |
358 | |
359 @param url: The url to fetch the generic data from | |
360 @param options: Additional options that may be used while fetching data | |
361 @return: A dictionary containing the generic data or None if no data could be | |
362 fetched | |
363 """ | |
364 resp = await treq.get(url) | |
365 if resp.code == 200: | |
366 html = await resp.text() | |
367 parser = etree.HTMLParser() | |
368 tree = etree.fromstring(html, parser) | |
369 | |
370 # Find title, description, and author metadata | |
371 title_el = tree.find(".//title") | |
372 desc_el = tree.find('.//meta[@name="description"]') | |
373 author_el = tree.find('.//meta[@name="author"]') | |
374 | |
375 metadata = { | |
376 "title": title_el.text if title_el is not None else "", | |
377 "description": desc_el.get("content") if desc_el is not None else "", | |
378 "author_name": author_el.get("content") if author_el is not None else "", | |
379 "url": url, | |
380 "provider_name": parse.urlparse(url).netloc, | |
381 "provider_url": f"{parse.urlparse(url).scheme}://{parse.urlparse(url).netloc}", | |
382 } | |
383 | |
384 return metadata if any(metadata.values()) else None | |
385 else: | |
386 raise PreviewFetchError( | |
387 f"Failed to fetch generic preview for {url}, status code: {resp.code}" | |
388 ) | |
389 | |
390 # Wikipedia | |
391 | |
392 async def fetch_wikipedia_data( | |
393 self, client: SatXMPPEntity, url: str, options: dict | |
394 ) -> Optional[dict]: | |
395 """Fetch Wikipedia data from a url | |
396 | |
397 This method implements the Wikipedia API, details of which can be found at: | |
398 https://www.mediawiki.org/wiki/API:Main_page | |
399 | |
400 @param url: The url to fetch the Wikipedia data from | |
401 @param options: Additional options that may be used while fetching data | |
402 @return: A dictionary containing the Wikipedia data or None if no data could be | |
403 fetched | |
404 """ | |
405 parsed_url = parse.urlparse(url) | |
406 page_name = parsed_url.path.split("/")[-1] | |
407 | |
408 # Use the Wikipedia API to get a summary of the page and a preview image | |
409 api_url = ( | |
410 f"https://{parsed_url.netloc}/w/api.php?format=json&action=query&" | |
411 f"prop=extracts|pageimages&exintro&explaintext&redirects=1&piprop=thumbnail" | |
412 f"&pithumbsize=300&titles={page_name}" | |
413 ) | |
414 | |
415 resp = await treq.get(api_url) | |
416 if resp.code == 200: | |
417 data = json.loads(await resp.text()) | |
418 # The page ID is the first key in the "pages" dictionary | |
419 page_id = next(iter(data["query"]["pages"].keys())) | |
420 page = data["query"]["pages"][page_id] | |
421 | |
422 # The API may return a page with a missing title or extract if the page does | |
423 # not exist | |
424 if "missing" in page: | |
425 return None | |
426 | |
427 return { | |
428 "provider_name": "Wikipedia", | |
429 "provider_url": "https://www.wikipedia.org", | |
430 "title": page.get("title"), | |
431 "description": page.get("extract"), | |
432 "url": url, | |
433 "image": page.get("thumbnail", {}).get("source") | |
434 if "thumbnail" in page | |
435 else None, | |
436 } | |
437 else: | |
438 raise PreviewFetchError( | |
439 f"Failed to fetch Wikipedia preview for {url}, status code: {resp.code}" | |
440 ) | |
441 | |
442 # Invidious | |
443 | |
444 async def fetch_invidious_data(self, client: SatXMPPEntity, url: str, options: dict) -> Optional[dict]: | |
445 """ | |
446 Fetch Invidious data from a url and generate HTML iframe. | |
447 | |
448 @param url: The url to fetch the Invidious data from. | |
449 @param options: Additional options that may be used while fetching data. | |
450 @return: A dictionary containing the Invidious data or None if no data could be fetched. | |
451 """ | |
452 parsed_url = parse.urlparse(url) | |
453 if 'watch' in parsed_url.path: | |
454 video_id = parse.parse_qs(parsed_url.query).get('v', [None])[0] | |
455 else: | |
456 video_id = parsed_url.path.strip('/') | |
457 if not video_id: | |
458 log.warning(f"Can't extract video ID from {url}") | |
459 return None | |
460 | |
461 invidious_api_url = f"https://{parsed_url.netloc}/api/v1/videos/{video_id}" | |
462 | |
463 resp = await treq.get(invidious_api_url) | |
464 if resp.code == 200: | |
465 video_data = await resp.json() | |
466 # construct the iframe html code | |
467 html = ( | |
468 f'<iframe' | |
469 f' width="100%"' | |
470 f' height="auto"' | |
471 f' src="https://{parsed_url.netloc}/embed/{video_id}"' | |
472 f' frameborder="0" ' | |
473 f' allow="' | |
474 f' accelerometer;' | |
475 f' autoplay;' | |
476 f' clipboard-write;' | |
477 f' encrypted-media;' | |
478 f' gyroscope;' | |
479 f' picture-in-picture"' | |
480 f' style="' | |
481 f' position: absolute;' | |
482 f' top: 0;' | |
483 f' left: 0;' | |
484 f' width: 100%;' | |
485 f' height: 100%;"' | |
486 f' allowfullscreen></iframe>' | |
487 ) | |
488 # structure the data to be returned | |
489 data = { | |
490 "title": video_data.get("title"), | |
491 "description": video_data.get("description"), | |
492 "url": url, | |
493 "image": video_data.get("videoThumbnails", [{}])[0].get("url"), | |
494 "provider_name": "Invidious", | |
495 "provider_url": f"https://{parsed_url.netloc}", | |
496 "html": html, | |
497 "author_name": video_data.get("author"), | |
498 "author_url": f"https://{parsed_url.netloc}/channel/{video_data.get('authorId')}", | |
499 } | |
500 return data | |
501 else: | |
502 log.warning(f"Unable to fetch video data from Invidious API for {video_id}") | |
503 return None |