Mercurial > libervia-backend
comparison sat/plugins/plugin_misc_download.py @ 3922:0ff265725489
plugin XEP-0447: handle attachment and download:
- plugin XEP-0447 can now be used in message attachments and to retrieve an attachment
- plugin attach: `attachment` being processed is added to `extra` so the handler can inspect it
- plugin attach: `size` is added to attachment
- plugin download: a whole attachment dict is now used in `download` and
`file_download`/`file_download_complete`. `download_uri` can be used as a shortcut when
just a URI is used. In addition to URI scheme handler, whole attachment handlers can now
be registered with `register_download_handler`
- plugin XEP-0363: `file_http_upload` `XEP-0363_upload_size` triggers have been renamed to
`XEP-0363_upload_pre_slot` and is now using a dict with arguments, allowing for the size
but also the filename to be modified, which is necessary for encryption (filename may
be hidden from URL this way).
- plugin XEP-0446: fix wrong element name
- plugin XEP-0447: source handler can now be registered (`url-data` is registered by
default)
- plugin XEP-0447: source parsing has been put in a separated `parse_sources_elt` method,
as it may be useful to do it independently (notably with XEP-0448)
- plugin XEP-0447: parse received message and complete attachments when suitable
- plugin XEP-0447: can now be used with message attachments
- plugin XEP-0447: can now be used with attachments download
- renamed `options` arguments to `extra` for consistency
- some style change (progressive move from legacy camelCase to PEP8 snake_case)
- some typing
rel 379
author | Goffi <goffi@goffi.org> |
---|---|
date | Thu, 06 Oct 2022 16:02:05 +0200 |
parents | be6d91572633 |
children | 524856bd7b19 |
comparison
equal
deleted
inserted
replaced
3921:cc2705225778 | 3922:0ff265725489 |
---|---|
14 # GNU Affero General Public License for more details. | 14 # GNU Affero General Public License for more details. |
15 | 15 |
16 # You should have received a copy of the GNU Affero General Public License | 16 # You should have received a copy of the GNU Affero General Public License |
17 # along with this program. If not, see <http://www.gnu.org/licenses/>. | 17 # along with this program. If not, see <http://www.gnu.org/licenses/>. |
18 | 18 |
19 import hashlib | |
19 from pathlib import Path | 20 from pathlib import Path |
20 from urllib.parse import urlparse, unquote | 21 from typing import Any, Dict, Optional, Union, Tuple, Callable |
21 import hashlib | 22 from urllib.parse import unquote, urlparse |
23 | |
22 import treq | 24 import treq |
23 from twisted.internet import defer | 25 from twisted.internet import defer |
24 from twisted.words.protocols.jabber import error as jabber_error | 26 from twisted.words.protocols.jabber import error as jabber_error |
25 from sat.core.i18n import _, D_ | 27 |
28 from sat.core import exceptions | |
26 from sat.core.constants import Const as C | 29 from sat.core.constants import Const as C |
30 from sat.core.core_types import SatXMPPEntity | |
31 from sat.core.i18n import D_, _ | |
27 from sat.core.log import getLogger | 32 from sat.core.log import getLogger |
28 from sat.core import exceptions | |
29 from sat.tools import xml_tools | 33 from sat.tools import xml_tools |
34 from sat.tools import stream | |
30 from sat.tools.common import data_format | 35 from sat.tools.common import data_format |
31 from sat.tools import stream | |
32 from sat.tools.web import treq_client_no_ssl | 36 from sat.tools.web import treq_client_no_ssl |
33 | 37 |
34 log = getLogger(__name__) | 38 log = getLogger(__name__) |
35 | 39 |
36 | 40 |
37 PLUGIN_INFO = { | 41 PLUGIN_INFO = { |
38 C.PI_NAME: "File Download", | 42 C.PI_NAME: "File Download", |
39 C.PI_IMPORT_NAME: "DOWNLOAD", | 43 C.PI_IMPORT_NAME: "DOWNLOAD", |
40 C.PI_TYPE: C.PLUG_TYPE_MISC, | 44 C.PI_TYPE: C.PLUG_TYPE_MISC, |
45 C.PI_MODES: C.PLUG_MODE_BOTH, | |
41 C.PI_MAIN: "DownloadPlugin", | 46 C.PI_MAIN: "DownloadPlugin", |
42 C.PI_HANDLER: "no", | 47 C.PI_HANDLER: "no", |
43 C.PI_DESCRIPTION: _("""File download management"""), | 48 C.PI_DESCRIPTION: _("""File download management"""), |
44 } | 49 } |
45 | 50 |
51 self.host = host | 56 self.host = host |
52 host.bridge.addMethod( | 57 host.bridge.addMethod( |
53 "fileDownload", | 58 "fileDownload", |
54 ".plugin", | 59 ".plugin", |
55 in_sign="ssss", | 60 in_sign="ssss", |
56 out_sign="a{ss}", | 61 out_sign="s", |
57 method=self._fileDownload, | 62 method=self._fileDownload, |
58 async_=True, | 63 async_=True, |
59 ) | 64 ) |
60 host.bridge.addMethod( | 65 host.bridge.addMethod( |
61 "fileDownloadComplete", | 66 "fileDownloadComplete", |
64 out_sign="s", | 69 out_sign="s", |
65 method=self._fileDownloadComplete, | 70 method=self._fileDownloadComplete, |
66 async_=True, | 71 async_=True, |
67 ) | 72 ) |
68 self._download_callbacks = {} | 73 self._download_callbacks = {} |
69 self.registerScheme('http', self.downloadHTTP) | 74 self._scheme_callbacks = {} |
70 self.registerScheme('https', self.downloadHTTP) | 75 self.register_scheme('http', self.download_http) |
71 | 76 self.register_scheme('https', self.download_http) |
72 def _fileDownload(self, uri, dest_path, options_s, profile): | 77 |
73 client = self.host.getClient(profile) | 78 def _fileDownload( |
74 options = data_format.deserialise(options_s) | 79 self, attachment_s: str, dest_path: str, extra_s: str, profile: str |
75 | 80 ) -> defer.Deferred: |
76 return defer.ensureDeferred(self.fileDownload( | 81 d = defer.ensureDeferred(self.file_download( |
77 client, uri, Path(dest_path), options | 82 self.host.getClient(profile), |
83 data_format.deserialise(attachment_s), | |
84 Path(dest_path), | |
85 data_format.deserialise(extra_s) | |
78 )) | 86 )) |
79 | 87 d.addCallback(lambda ret: data_format.serialise(ret)) |
80 async def fileDownload(self, client, uri, dest_path, options=None): | 88 return d |
89 | |
90 async def file_download( | |
91 self, | |
92 client: SatXMPPEntity, | |
93 attachment: Dict[str, Any], | |
94 dest_path: Path, | |
95 extra: Optional[Dict[str, Any]] = None | |
96 ) -> Dict[str, Any]: | |
81 """Download a file using best available method | 97 """Download a file using best available method |
82 | 98 |
83 parameters are the same as for [download] | 99 parameters are the same as for [download] |
84 @return (dict): action dictionary, with progress id in case of success, else xmlui | 100 @return (dict): action dictionary, with progress id in case of success, else xmlui |
85 message | 101 message |
86 """ | 102 """ |
87 try: | 103 try: |
88 progress_id, __ = await self.download(client, uri, dest_path, options) | 104 progress_id, __ = await self.download(client, attachment, dest_path, extra) |
89 except Exception as e: | 105 except Exception as e: |
90 if (isinstance(e, jabber_error.StanzaError) | 106 if (isinstance(e, jabber_error.StanzaError) |
91 and e.condition == 'not-acceptable'): | 107 and e.condition == 'not-acceptable'): |
92 reason = e.text | 108 reason = e.text |
93 else: | 109 else: |
100 ).toXml() | 116 ).toXml() |
101 } | 117 } |
102 else: | 118 else: |
103 return {"progress": progress_id} | 119 return {"progress": progress_id} |
104 | 120 |
105 def _fileDownloadComplete(self, uri, dest_path, options_s, profile): | 121 def _fileDownloadComplete( |
106 client = self.host.getClient(profile) | 122 self, attachment_s: str, dest_path: str, extra_s: str, profile: str |
107 options = data_format.deserialise(options_s) | 123 ) -> defer.Deferred: |
108 | 124 d = defer.ensureDeferred(self.file_download_complete( |
109 d = defer.ensureDeferred(self.fileDownloadComplete( | 125 self.host.getClient(profile), |
110 client, uri, dest_path, options | 126 data_format.deserialise(attachment_s), |
127 Path(dest_path), | |
128 data_format.deserialise(extra_s) | |
111 )) | 129 )) |
112 d.addCallback(lambda path: str(path)) | 130 d.addCallback(lambda path: str(path)) |
113 return d | 131 return d |
114 | 132 |
115 async def fileDownloadComplete(self, client, uri, dest_path, options=None): | 133 async def file_download_complete( |
134 self, | |
135 client: SatXMPPEntity, | |
136 attachment: Dict[str, Any], | |
137 dest_path: Path, | |
138 extra: Optional[Dict[str, Any]] = None | |
139 ) -> str: | |
116 """Helper method to fully download a file and return its path | 140 """Helper method to fully download a file and return its path |
117 | 141 |
118 parameters are the same as for [download] | 142 parameters are the same as for [download] |
119 @return (str): path to the downloaded file | 143 @return (str): path to the downloaded file |
120 use empty string to store the file in cache | 144 use empty string to store the file in cache |
121 """ | 145 """ |
122 __, download_d = await self.download(client, uri, dest_path, options) | 146 __, download_d = await self.download(client, attachment, dest_path, extra) |
123 dest_path = await download_d | 147 dest_path = await download_d |
124 return dest_path | 148 return dest_path |
125 | 149 |
126 async def download(self, client, uri, dest_path, options=None): | 150 async def download_uri( |
127 """Send a file using best available method | 151 self, |
128 | 152 client: SatXMPPEntity, |
129 @param uri(str): URI to the file to download | 153 uri: str, |
130 @param dest_path(str, Path): where the file must be downloaded | 154 dest_path: Union[Path, str], |
131 if empty string, the file will be stored in local path | 155 extra: Optional[Dict[str, Any]] = None |
132 @param options(dict, None): options depending on scheme handler | 156 ) -> Tuple[str, defer.Deferred]: |
133 Some common options: | 157 if extra is None: |
134 - ignore_tls_errors(bool): True to ignore SSL/TLS certificate verification | 158 extra = {} |
135 used only if HTTPS transport is needed | |
136 @return (tuple[unicode,D(unicode)]): progress_id and a Deferred which fire | |
137 download URL when download is finished | |
138 progress_id can be empty string if the file already exist and is not | |
139 downloaded again (can happen if cache is used with empty dest_path) | |
140 """ | |
141 if options is None: | |
142 options = {} | |
143 | |
144 uri_parsed = urlparse(uri, 'http') | 159 uri_parsed = urlparse(uri, 'http') |
145 if dest_path: | 160 if dest_path: |
146 dest_path = Path(dest_path) | 161 dest_path = Path(dest_path) |
147 cache_uid = None | 162 cache_uid = None |
148 else: | 163 else: |
169 | 184 |
170 # should we check certificates? | 185 # should we check certificates? |
171 check_certificate = self.host.memory.getParamA( | 186 check_certificate = self.host.memory.getParamA( |
172 "check_certificate", "Connection", profile_key=client.profile) | 187 "check_certificate", "Connection", profile_key=client.profile) |
173 if not check_certificate: | 188 if not check_certificate: |
174 options['ignore_tls_errors'] = True | 189 extra['ignore_tls_errors'] = True |
175 log.warning( | 190 log.warning( |
176 _("certificate check disabled for download, this is dangerous!")) | 191 _("certificate check disabled for download, this is dangerous!")) |
177 | 192 |
178 try: | 193 try: |
179 callback = self._download_callbacks[uri_parsed.scheme] | 194 callback = self._scheme_callbacks[uri_parsed.scheme] |
180 except KeyError: | 195 except KeyError: |
181 raise exceptions.NotFound(f"Can't find any handler for uri {uri}") | 196 raise exceptions.NotFound(f"Can't find any handler for uri {uri}") |
182 else: | 197 else: |
183 try: | 198 try: |
184 progress_id, download_d = await callback( | 199 progress_id, download_d = await callback( |
185 client, uri_parsed, dest_path, options) | 200 client, uri_parsed, dest_path, extra) |
186 except Exception as e: | 201 except Exception as e: |
187 log.warning(_( | 202 log.warning(_( |
188 "Can't download URI {uri}: {reason}").format( | 203 "Can't download URI {uri}: {reason}").format( |
189 uri=uri, reason=e)) | 204 uri=uri, reason=e)) |
190 if cache_uid is not None: | 205 if cache_uid is not None: |
193 dest_path.unlink() | 208 dest_path.unlink() |
194 raise e | 209 raise e |
195 download_d.addCallback(lambda __: dest_path) | 210 download_d.addCallback(lambda __: dest_path) |
196 return progress_id, download_d | 211 return progress_id, download_d |
197 | 212 |
198 def registerScheme(self, scheme, download_cb): | 213 |
214 async def download( | |
215 self, | |
216 client: SatXMPPEntity, | |
217 attachment: Dict[str, Any], | |
218 dest_path: Union[Path, str], | |
219 extra: Optional[Dict[str, Any]] = None | |
220 ) -> Tuple[str, defer.Deferred]: | |
221 """Download a file from URI using suitable method | |
222 | |
223 @param uri: URI to the file to download | |
224 @param dest_path: where the file must be downloaded | |
225 if empty string, the file will be stored in local path | |
226 @param extra: options depending on scheme handler | |
227 Some common options: | |
228 - ignore_tls_errors(bool): True to ignore SSL/TLS certificate verification | |
229 used only if HTTPS transport is needed | |
230 @return: ``progress_id`` and a Deferred which fire download URL when download is | |
231 finished. | |
232 ``progress_id`` can be empty string if the file already exist and is not | |
233 downloaded again (can happen if cache is used with empty ``dest_path``). | |
234 """ | |
235 uri = attachment.get("uri") | |
236 if uri: | |
237 return await self.download_uri(client, uri, dest_path, extra) | |
238 else: | |
239 for source in attachment.get("sources", []): | |
240 source_type = source.get("type") | |
241 if not source_type: | |
242 log.warning( | |
243 "source type is missing for source: {source}\nattachment: " | |
244 f"{attachment}" | |
245 ) | |
246 continue | |
247 try: | |
248 cb = self._download_callbacks[source_type] | |
249 except KeyError: | |
250 log.warning( | |
251 f"no source handler registered for {source_type!r}" | |
252 ) | |
253 else: | |
254 try: | |
255 return await cb(client, attachment, source, dest_path, extra) | |
256 except exceptions.CancelError as e: | |
257 # the handler can't or doesn't want to handle this source | |
258 log.debug( | |
259 f"Following source handling by {cb} has been cancelled ({e}):" | |
260 f"{source}" | |
261 ) | |
262 | |
263 log.warning( | |
264 "no source could be handled, we can't download the attachment:\n" | |
265 f"{attachment}" | |
266 ) | |
267 raise exceptions.FeatureNotFound("no handler could manage the attachment") | |
268 | |
269 def register_download_handler( | |
270 self, | |
271 source_type: str, | |
272 callback: Callable[ | |
273 [ | |
274 SatXMPPEntity, Dict[str, Any], Dict[str, Any], Union[str, Path], | |
275 Dict[str, Any] | |
276 ], | |
277 Tuple[str, defer.Deferred] | |
278 ] | |
279 ) -> None: | |
280 """Register a handler to manage a type of attachment source | |
281 | |
282 @param source_type: ``type`` of source handled | |
283 This is usually the namespace of the protocol used | |
284 @param callback: method to call to manage the source. | |
285 Call arguments are the same as for [download], with an extra ``source`` dict | |
286 which is used just after ``attachment`` to give a quick reference to the | |
287 source used. | |
288 The callabke must return a tuple with: | |
289 - progress ID | |
290 - a Deferred which fire whant the file is fully downloaded | |
291 """ | |
292 if source_type is self._download_callbacks: | |
293 raise exceptions.ConflictError( | |
294 f"The is already a callback registered for source type {source_type!r}" | |
295 ) | |
296 self._download_callbacks[source_type] = callback | |
297 | |
298 def register_scheme(self, scheme: str, download_cb: Callable) -> None: | |
199 """Register an URI scheme handler | 299 """Register an URI scheme handler |
200 | 300 |
201 @param scheme(unicode): URI scheme this callback is handling | 301 @param scheme: URI scheme this callback is handling |
202 @param download_cb(callable): callback to download a file | 302 @param download_cb: callback to download a file |
203 arguments are: | 303 arguments are: |
204 - (SatXMPPClient) client | 304 - (SatXMPPClient) client |
205 - (urllib.parse.SplitResult) parsed URI | 305 - (urllib.parse.SplitResult) parsed URI |
206 - (Path) destination path where the file must be downloaded | 306 - (Path) destination path where the file must be downloaded |
207 - (dict) options | 307 - (dict) options |
208 must return a tuple with progress_id and a Deferred which fire when download | 308 must return a tuple with progress_id and a Deferred which fire when download |
209 is finished | 309 is finished |
210 """ | 310 """ |
211 if scheme in self._download_callbacks: | 311 if scheme in self._scheme_callbacks: |
212 raise exceptions.ConflictError( | 312 raise exceptions.ConflictError( |
213 f"A method with scheme {scheme!r} is already registered" | 313 f"A method with scheme {scheme!r} is already registered" |
214 ) | 314 ) |
215 self._download_callbacks[scheme] = download_cb | 315 self._scheme_callbacks[scheme] = download_cb |
216 | 316 |
217 def unregister(self, scheme): | 317 def unregister(self, scheme): |
218 try: | 318 try: |
219 del self._download_callbacks[scheme] | 319 del self._scheme_callbacks[scheme] |
220 except KeyError: | 320 except KeyError: |
221 raise exceptions.NotFound(f"No callback registered for scheme {scheme!r}") | 321 raise exceptions.NotFound(f"No callback registered for scheme {scheme!r}") |
222 | 322 |
223 def errbackDownload(self, file_obj, download_d, resp): | 323 def errback_download(self, file_obj, download_d, resp): |
224 """Set file_obj and download deferred appropriatly after a network error | 324 """Set file_obj and download deferred appropriatly after a network error |
225 | 325 |
226 @param file_obj(SatFile): file where the download must be done | 326 @param file_obj(SatFile): file where the download must be done |
227 @param download_d(Deferred): deffered which must be fired on complete download | 327 @param download_d(Deferred): deffered which must be fired on complete download |
228 @param resp(treq.response.IResponse): treq response | 328 @param resp(treq.response.IResponse): treq response |
229 """ | 329 """ |
230 msg = f"HTTP error ({resp.code}): {resp.phrase.decode()}" | 330 msg = f"HTTP error ({resp.code}): {resp.phrase.decode()}" |
231 file_obj.close(error=msg) | 331 file_obj.close(error=msg) |
232 download_d.errback(exceptions.NetworkError(msg)) | 332 download_d.errback(exceptions.NetworkError(msg)) |
233 | 333 |
234 async def downloadHTTP(self, client, uri_parsed, dest_path, options): | 334 async def download_http(self, client, uri_parsed, dest_path, options): |
235 url = uri_parsed.geturl() | 335 url = uri_parsed.geturl() |
236 | 336 |
237 if options.get('ignore_tls_errors', False): | 337 if options.get('ignore_tls_errors', False): |
238 log.warning( | 338 log.warning( |
239 "TLS certificate check disabled, this is highly insecure" | 339 "TLS certificate check disabled, this is highly insecure" |
262 if resp.code == 200: | 362 if resp.code == 200: |
263 d = treq.collect(resp, file_obj.write) | 363 d = treq.collect(resp, file_obj.write) |
264 d.addBoth(lambda _: file_obj.close()) | 364 d.addBoth(lambda _: file_obj.close()) |
265 else: | 365 else: |
266 d = defer.Deferred() | 366 d = defer.Deferred() |
267 self.errbackDownload(file_obj, d, resp) | 367 self.errback_download(file_obj, d, resp) |
268 return progress_id, d | 368 return progress_id, d |