Mercurial > libervia-backend
comparison libervia/backend/plugins/plugin_misc_download.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_download.py@524856bd7b19 |
children | 0d7bb4df2343 |
comparison
equal
deleted
inserted
replaced
4070:d10748475025 | 4071:4b842c1fb686 |
---|---|
1 #!/usr/bin/env python3 | |
2 | |
3 # SAT plugin for downloading files | |
4 # Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) | |
5 | |
6 # This program is free software: you can redistribute it and/or modify | |
7 # it under the terms of the GNU Affero General Public License as published by | |
8 # the Free Software Foundation, either version 3 of the License, or | |
9 # (at your option) any later version. | |
10 | |
11 # This program is distributed in the hope that it will be useful, | |
12 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
14 # GNU Affero General Public License for more details. | |
15 | |
16 # You should have received a copy of the GNU Affero General Public License | |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | |
18 | |
19 import hashlib | |
20 from pathlib import Path | |
21 from typing import Any, Dict, Optional, Union, Tuple, Callable | |
22 from urllib.parse import unquote, urlparse | |
23 | |
24 import treq | |
25 from twisted.internet import defer | |
26 from twisted.words.protocols.jabber import error as jabber_error | |
27 | |
28 from libervia.backend.core import exceptions | |
29 from libervia.backend.core.constants import Const as C | |
30 from libervia.backend.core.core_types import SatXMPPEntity | |
31 from libervia.backend.core.i18n import D_, _ | |
32 from libervia.backend.core.log import getLogger | |
33 from libervia.backend.tools import xml_tools | |
34 from libervia.backend.tools import stream | |
35 from libervia.backend.tools.common import data_format | |
36 from libervia.backend.tools.web import treq_client_no_ssl | |
37 | |
38 log = getLogger(__name__) | |
39 | |
40 | |
41 PLUGIN_INFO = { | |
42 C.PI_NAME: "File Download", | |
43 C.PI_IMPORT_NAME: "DOWNLOAD", | |
44 C.PI_TYPE: C.PLUG_TYPE_MISC, | |
45 C.PI_MODES: C.PLUG_MODE_BOTH, | |
46 C.PI_MAIN: "DownloadPlugin", | |
47 C.PI_HANDLER: "no", | |
48 C.PI_DESCRIPTION: _("""File download management"""), | |
49 } | |
50 | |
51 | |
52 class DownloadPlugin(object): | |
53 | |
54 def __init__(self, host): | |
55 log.info(_("plugin Download initialization")) | |
56 self.host = host | |
57 host.bridge.add_method( | |
58 "file_download", | |
59 ".plugin", | |
60 in_sign="ssss", | |
61 out_sign="s", | |
62 method=self._file_download, | |
63 async_=True, | |
64 ) | |
65 host.bridge.add_method( | |
66 "file_download_complete", | |
67 ".plugin", | |
68 in_sign="ssss", | |
69 out_sign="s", | |
70 method=self._file_download_complete, | |
71 async_=True, | |
72 ) | |
73 self._download_callbacks = {} | |
74 self._scheme_callbacks = {} | |
75 self.register_scheme('http', self.download_http) | |
76 self.register_scheme('https', self.download_http) | |
77 | |
78 def _file_download( | |
79 self, attachment_s: str, dest_path: str, extra_s: str, profile: str | |
80 ) -> defer.Deferred: | |
81 d = defer.ensureDeferred(self.file_download( | |
82 self.host.get_client(profile), | |
83 data_format.deserialise(attachment_s), | |
84 Path(dest_path), | |
85 data_format.deserialise(extra_s) | |
86 )) | |
87 d.addCallback(lambda ret: data_format.serialise(ret)) | |
88 return d | |
89 | |
90 async def file_download( | |
91 self, | |
92 client: SatXMPPEntity, | |
93 attachment: Dict[str, Any], | |
94 dest_path: Path, | |
95 extra: Optional[Dict[str, Any]] = None | |
96 ) -> Dict[str, Any]: | |
97 """Download a file using best available method | |
98 | |
99 parameters are the same as for [download] | |
100 @return (dict): action dictionary, with progress id in case of success, else xmlui | |
101 message | |
102 """ | |
103 try: | |
104 progress_id, __ = await self.download(client, attachment, dest_path, extra) | |
105 except Exception as e: | |
106 if (isinstance(e, jabber_error.StanzaError) | |
107 and e.condition == 'not-acceptable'): | |
108 reason = e.text | |
109 else: | |
110 reason = str(e) | |
111 msg = D_("Can't download file: {reason}").format(reason=reason) | |
112 log.warning(msg) | |
113 return { | |
114 "xmlui": xml_tools.note( | |
115 msg, D_("Can't download file"), C.XMLUI_DATA_LVL_WARNING | |
116 ).toXml() | |
117 } | |
118 else: | |
119 return {"progress": progress_id} | |
120 | |
121 def _file_download_complete( | |
122 self, attachment_s: str, dest_path: str, extra_s: str, profile: str | |
123 ) -> defer.Deferred: | |
124 d = defer.ensureDeferred(self.file_download_complete( | |
125 self.host.get_client(profile), | |
126 data_format.deserialise(attachment_s), | |
127 Path(dest_path), | |
128 data_format.deserialise(extra_s) | |
129 )) | |
130 d.addCallback(lambda path: str(path)) | |
131 return d | |
132 | |
133 async def file_download_complete( | |
134 self, | |
135 client: SatXMPPEntity, | |
136 attachment: Dict[str, Any], | |
137 dest_path: Path, | |
138 extra: Optional[Dict[str, Any]] = None | |
139 ) -> str: | |
140 """Helper method to fully download a file and return its path | |
141 | |
142 parameters are the same as for [download] | |
143 @return (str): path to the downloaded file | |
144 use empty string to store the file in cache | |
145 """ | |
146 __, download_d = await self.download(client, attachment, dest_path, extra) | |
147 dest_path = await download_d | |
148 return dest_path | |
149 | |
150 async def download_uri( | |
151 self, | |
152 client: SatXMPPEntity, | |
153 uri: str, | |
154 dest_path: Union[Path, str], | |
155 extra: Optional[Dict[str, Any]] = None | |
156 ) -> Tuple[str, defer.Deferred]: | |
157 if extra is None: | |
158 extra = {} | |
159 uri_parsed = urlparse(uri, 'http') | |
160 if dest_path: | |
161 dest_path = Path(dest_path) | |
162 cache_uid = None | |
163 else: | |
164 filename = Path(unquote(uri_parsed.path)).name.strip() or C.FILE_DEFAULT_NAME | |
165 # we don't use Path.suffixes because we don't want to have more than 2 | |
166 # suffixes, but we still want to handle suffixes like "tar.gz". | |
167 stem, *suffixes = filename.rsplit('.', 2) | |
168 # we hash the URL to have an unique identifier, and avoid double download | |
169 url_hash = hashlib.sha256(uri_parsed.geturl().encode()).hexdigest() | |
170 cache_uid = f"{stem}_{url_hash}" | |
171 cache_data = client.cache.get_metadata(cache_uid) | |
172 if cache_data is not None: | |
173 # file is already in cache, we return it | |
174 download_d = defer.succeed(cache_data['path']) | |
175 return '', download_d | |
176 else: | |
177 # the file is not in cache | |
178 unique_name = '.'.join([cache_uid] + suffixes) | |
179 with client.cache.cache_data( | |
180 "DOWNLOAD", cache_uid, filename=unique_name) as f: | |
181 # we close the file and only use its name, the file will be opened | |
182 # by the registered callback | |
183 dest_path = Path(f.name) | |
184 | |
185 # should we check certificates? | |
186 check_certificate = self.host.memory.param_get_a( | |
187 "check_certificate", "Connection", profile_key=client.profile) | |
188 if not check_certificate: | |
189 extra['ignore_tls_errors'] = True | |
190 log.warning( | |
191 _("certificate check disabled for download, this is dangerous!")) | |
192 | |
193 try: | |
194 callback = self._scheme_callbacks[uri_parsed.scheme] | |
195 except KeyError: | |
196 raise exceptions.NotFound(f"Can't find any handler for uri {uri}") | |
197 else: | |
198 try: | |
199 progress_id, download_d = await callback( | |
200 client, uri_parsed, dest_path, extra) | |
201 except Exception as e: | |
202 log.warning(_( | |
203 "Can't download URI {uri}: {reason}").format( | |
204 uri=uri, reason=e)) | |
205 if cache_uid is not None: | |
206 client.cache.remove_from_cache(cache_uid) | |
207 elif dest_path.exists(): | |
208 dest_path.unlink() | |
209 raise e | |
210 download_d.addCallback(lambda __: dest_path) | |
211 return progress_id, download_d | |
212 | |
213 | |
214 async def download( | |
215 self, | |
216 client: SatXMPPEntity, | |
217 attachment: Dict[str, Any], | |
218 dest_path: Union[Path, str], | |
219 extra: Optional[Dict[str, Any]] = None | |
220 ) -> Tuple[str, defer.Deferred]: | |
221 """Download a file from URI using suitable method | |
222 | |
223 @param uri: URI to the file to download | |
224 @param dest_path: where the file must be downloaded | |
225 if empty string, the file will be stored in local path | |
226 @param extra: options depending on scheme handler | |
227 Some common options: | |
228 - ignore_tls_errors(bool): True to ignore SSL/TLS certificate verification | |
229 used only if HTTPS transport is needed | |
230 @return: ``progress_id`` and a Deferred which fire download URL when download is | |
231 finished. | |
232 ``progress_id`` can be empty string if the file already exist and is not | |
233 downloaded again (can happen if cache is used with empty ``dest_path``). | |
234 """ | |
235 uri = attachment.get("uri") | |
236 if uri: | |
237 return await self.download_uri(client, uri, dest_path, extra) | |
238 else: | |
239 for source in attachment.get("sources", []): | |
240 source_type = source.get("type") | |
241 if not source_type: | |
242 log.warning( | |
243 "source type is missing for source: {source}\nattachment: " | |
244 f"{attachment}" | |
245 ) | |
246 continue | |
247 try: | |
248 cb = self._download_callbacks[source_type] | |
249 except KeyError: | |
250 log.warning( | |
251 f"no source handler registered for {source_type!r}" | |
252 ) | |
253 else: | |
254 try: | |
255 return await cb(client, attachment, source, dest_path, extra) | |
256 except exceptions.CancelError as e: | |
257 # the handler can't or doesn't want to handle this source | |
258 log.debug( | |
259 f"Following source handling by {cb} has been cancelled ({e}):" | |
260 f"{source}" | |
261 ) | |
262 | |
263 log.warning( | |
264 "no source could be handled, we can't download the attachment:\n" | |
265 f"{attachment}" | |
266 ) | |
267 raise exceptions.FeatureNotFound("no handler could manage the attachment") | |
268 | |
269 def register_download_handler( | |
270 self, | |
271 source_type: str, | |
272 callback: Callable[ | |
273 [ | |
274 SatXMPPEntity, Dict[str, Any], Dict[str, Any], Union[str, Path], | |
275 Dict[str, Any] | |
276 ], | |
277 Tuple[str, defer.Deferred] | |
278 ] | |
279 ) -> None: | |
280 """Register a handler to manage a type of attachment source | |
281 | |
282 @param source_type: ``type`` of source handled | |
283 This is usually the namespace of the protocol used | |
284 @param callback: method to call to manage the source. | |
285 Call arguments are the same as for [download], with an extra ``source`` dict | |
286 which is used just after ``attachment`` to give a quick reference to the | |
287 source used. | |
288 The callabke must return a tuple with: | |
289 - progress ID | |
290 - a Deferred which fire whant the file is fully downloaded | |
291 """ | |
292 if source_type is self._download_callbacks: | |
293 raise exceptions.ConflictError( | |
294 f"The is already a callback registered for source type {source_type!r}" | |
295 ) | |
296 self._download_callbacks[source_type] = callback | |
297 | |
298 def register_scheme(self, scheme: str, download_cb: Callable) -> None: | |
299 """Register an URI scheme handler | |
300 | |
301 @param scheme: URI scheme this callback is handling | |
302 @param download_cb: callback to download a file | |
303 arguments are: | |
304 - (SatXMPPClient) client | |
305 - (urllib.parse.SplitResult) parsed URI | |
306 - (Path) destination path where the file must be downloaded | |
307 - (dict) options | |
308 must return a tuple with progress_id and a Deferred which fire when download | |
309 is finished | |
310 """ | |
311 if scheme in self._scheme_callbacks: | |
312 raise exceptions.ConflictError( | |
313 f"A method with scheme {scheme!r} is already registered" | |
314 ) | |
315 self._scheme_callbacks[scheme] = download_cb | |
316 | |
317 def unregister(self, scheme): | |
318 try: | |
319 del self._scheme_callbacks[scheme] | |
320 except KeyError: | |
321 raise exceptions.NotFound(f"No callback registered for scheme {scheme!r}") | |
322 | |
323 def errback_download(self, file_obj, download_d, resp): | |
324 """Set file_obj and download deferred appropriatly after a network error | |
325 | |
326 @param file_obj(SatFile): file where the download must be done | |
327 @param download_d(Deferred): deffered which must be fired on complete download | |
328 @param resp(treq.response.IResponse): treq response | |
329 """ | |
330 msg = f"HTTP error ({resp.code}): {resp.phrase.decode()}" | |
331 file_obj.close(error=msg) | |
332 download_d.errback(exceptions.NetworkError(msg)) | |
333 | |
334 async def download_http(self, client, uri_parsed, dest_path, options): | |
335 url = uri_parsed.geturl() | |
336 | |
337 if options.get('ignore_tls_errors', False): | |
338 log.warning( | |
339 "TLS certificate check disabled, this is highly insecure" | |
340 ) | |
341 treq_client = treq_client_no_ssl | |
342 else: | |
343 treq_client = treq | |
344 | |
345 head_data = await treq_client.head(url) | |
346 try: | |
347 content_length = int(head_data.headers.getRawHeaders('content-length')[0]) | |
348 except (KeyError, TypeError, IndexError): | |
349 content_length = None | |
350 log.debug(f"No content lenght found at {url}") | |
351 file_obj = stream.SatFile( | |
352 self.host, | |
353 client, | |
354 dest_path, | |
355 mode="wb", | |
356 size = content_length, | |
357 ) | |
358 | |
359 progress_id = file_obj.uid | |
360 | |
361 resp = await treq_client.get(url, unbuffered=True) | |
362 if resp.code == 200: | |
363 d = treq.collect(resp, file_obj.write) | |
364 d.addBoth(lambda _: file_obj.close()) | |
365 else: | |
366 d = defer.Deferred() | |
367 self.errback_download(file_obj, d, resp) | |
368 return progress_id, d |