Mercurial > libervia-backend
diff libervia/backend/plugins/plugin_misc_download.py @ 4071:4b842c1fb686
refactoring: renamed `sat` package to `libervia.backend`
author | Goffi <goffi@goffi.org> |
---|---|
date | Fri, 02 Jun 2023 11:49:51 +0200 |
parents | sat/plugins/plugin_misc_download.py@524856bd7b19 |
children | 0d7bb4df2343 |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/libervia/backend/plugins/plugin_misc_download.py Fri Jun 02 11:49:51 2023 +0200 @@ -0,0 +1,368 @@ +#!/usr/bin/env python3 + +# SAT plugin for downloading files +# Copyright (C) 2009-2021 Jérôme Poisson (goffi@goffi.org) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import hashlib +from pathlib import Path +from typing import Any, Dict, Optional, Union, Tuple, Callable +from urllib.parse import unquote, urlparse + +import treq +from twisted.internet import defer +from twisted.words.protocols.jabber import error as jabber_error + +from libervia.backend.core import exceptions +from libervia.backend.core.constants import Const as C +from libervia.backend.core.core_types import SatXMPPEntity +from libervia.backend.core.i18n import D_, _ +from libervia.backend.core.log import getLogger +from libervia.backend.tools import xml_tools +from libervia.backend.tools import stream +from libervia.backend.tools.common import data_format +from libervia.backend.tools.web import treq_client_no_ssl + +log = getLogger(__name__) + + +PLUGIN_INFO = { + C.PI_NAME: "File Download", + C.PI_IMPORT_NAME: "DOWNLOAD", + C.PI_TYPE: C.PLUG_TYPE_MISC, + C.PI_MODES: C.PLUG_MODE_BOTH, + C.PI_MAIN: "DownloadPlugin", + C.PI_HANDLER: "no", + C.PI_DESCRIPTION: _("""File download management"""), +} + + +class DownloadPlugin(object): + + def __init__(self, host): + log.info(_("plugin Download initialization")) + self.host = host + host.bridge.add_method( + "file_download", + ".plugin", + in_sign="ssss", + out_sign="s", + method=self._file_download, + async_=True, + ) + host.bridge.add_method( + "file_download_complete", + ".plugin", + in_sign="ssss", + out_sign="s", + method=self._file_download_complete, + async_=True, + ) + self._download_callbacks = {} + self._scheme_callbacks = {} + self.register_scheme('http', self.download_http) + self.register_scheme('https', self.download_http) + + def _file_download( + self, attachment_s: str, dest_path: str, extra_s: str, profile: str + ) -> defer.Deferred: + d = defer.ensureDeferred(self.file_download( + self.host.get_client(profile), + data_format.deserialise(attachment_s), + Path(dest_path), + data_format.deserialise(extra_s) + )) + d.addCallback(lambda ret: data_format.serialise(ret)) + return d + + async def file_download( + self, + client: SatXMPPEntity, + attachment: Dict[str, Any], + dest_path: Path, + extra: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """Download a file using best available method + + parameters are the same as for [download] + @return (dict): action dictionary, with progress id in case of success, else xmlui + message + """ + try: + progress_id, __ = await self.download(client, attachment, dest_path, extra) + except Exception as e: + if (isinstance(e, jabber_error.StanzaError) + and e.condition == 'not-acceptable'): + reason = e.text + else: + reason = str(e) + msg = D_("Can't download file: {reason}").format(reason=reason) + log.warning(msg) + return { + "xmlui": xml_tools.note( + msg, D_("Can't download file"), C.XMLUI_DATA_LVL_WARNING + ).toXml() + } + else: + return {"progress": progress_id} + + def _file_download_complete( + self, attachment_s: str, dest_path: str, extra_s: str, profile: str + ) -> defer.Deferred: + d = defer.ensureDeferred(self.file_download_complete( + self.host.get_client(profile), + data_format.deserialise(attachment_s), + Path(dest_path), + data_format.deserialise(extra_s) + )) + d.addCallback(lambda path: str(path)) + return d + + async def file_download_complete( + self, + client: SatXMPPEntity, + attachment: Dict[str, Any], + dest_path: Path, + extra: Optional[Dict[str, Any]] = None + ) -> str: + """Helper method to fully download a file and return its path + + parameters are the same as for [download] + @return (str): path to the downloaded file + use empty string to store the file in cache + """ + __, download_d = await self.download(client, attachment, dest_path, extra) + dest_path = await download_d + return dest_path + + async def download_uri( + self, + client: SatXMPPEntity, + uri: str, + dest_path: Union[Path, str], + extra: Optional[Dict[str, Any]] = None + ) -> Tuple[str, defer.Deferred]: + if extra is None: + extra = {} + uri_parsed = urlparse(uri, 'http') + if dest_path: + dest_path = Path(dest_path) + cache_uid = None + else: + filename = Path(unquote(uri_parsed.path)).name.strip() or C.FILE_DEFAULT_NAME + # we don't use Path.suffixes because we don't want to have more than 2 + # suffixes, but we still want to handle suffixes like "tar.gz". + stem, *suffixes = filename.rsplit('.', 2) + # we hash the URL to have an unique identifier, and avoid double download + url_hash = hashlib.sha256(uri_parsed.geturl().encode()).hexdigest() + cache_uid = f"{stem}_{url_hash}" + cache_data = client.cache.get_metadata(cache_uid) + if cache_data is not None: + # file is already in cache, we return it + download_d = defer.succeed(cache_data['path']) + return '', download_d + else: + # the file is not in cache + unique_name = '.'.join([cache_uid] + suffixes) + with client.cache.cache_data( + "DOWNLOAD", cache_uid, filename=unique_name) as f: + # we close the file and only use its name, the file will be opened + # by the registered callback + dest_path = Path(f.name) + + # should we check certificates? + check_certificate = self.host.memory.param_get_a( + "check_certificate", "Connection", profile_key=client.profile) + if not check_certificate: + extra['ignore_tls_errors'] = True + log.warning( + _("certificate check disabled for download, this is dangerous!")) + + try: + callback = self._scheme_callbacks[uri_parsed.scheme] + except KeyError: + raise exceptions.NotFound(f"Can't find any handler for uri {uri}") + else: + try: + progress_id, download_d = await callback( + client, uri_parsed, dest_path, extra) + except Exception as e: + log.warning(_( + "Can't download URI {uri}: {reason}").format( + uri=uri, reason=e)) + if cache_uid is not None: + client.cache.remove_from_cache(cache_uid) + elif dest_path.exists(): + dest_path.unlink() + raise e + download_d.addCallback(lambda __: dest_path) + return progress_id, download_d + + + async def download( + self, + client: SatXMPPEntity, + attachment: Dict[str, Any], + dest_path: Union[Path, str], + extra: Optional[Dict[str, Any]] = None + ) -> Tuple[str, defer.Deferred]: + """Download a file from URI using suitable method + + @param uri: URI to the file to download + @param dest_path: where the file must be downloaded + if empty string, the file will be stored in local path + @param extra: options depending on scheme handler + Some common options: + - ignore_tls_errors(bool): True to ignore SSL/TLS certificate verification + used only if HTTPS transport is needed + @return: ``progress_id`` and a Deferred which fire download URL when download is + finished. + ``progress_id`` can be empty string if the file already exist and is not + downloaded again (can happen if cache is used with empty ``dest_path``). + """ + uri = attachment.get("uri") + if uri: + return await self.download_uri(client, uri, dest_path, extra) + else: + for source in attachment.get("sources", []): + source_type = source.get("type") + if not source_type: + log.warning( + "source type is missing for source: {source}\nattachment: " + f"{attachment}" + ) + continue + try: + cb = self._download_callbacks[source_type] + except KeyError: + log.warning( + f"no source handler registered for {source_type!r}" + ) + else: + try: + return await cb(client, attachment, source, dest_path, extra) + except exceptions.CancelError as e: + # the handler can't or doesn't want to handle this source + log.debug( + f"Following source handling by {cb} has been cancelled ({e}):" + f"{source}" + ) + + log.warning( + "no source could be handled, we can't download the attachment:\n" + f"{attachment}" + ) + raise exceptions.FeatureNotFound("no handler could manage the attachment") + + def register_download_handler( + self, + source_type: str, + callback: Callable[ + [ + SatXMPPEntity, Dict[str, Any], Dict[str, Any], Union[str, Path], + Dict[str, Any] + ], + Tuple[str, defer.Deferred] + ] + ) -> None: + """Register a handler to manage a type of attachment source + + @param source_type: ``type`` of source handled + This is usually the namespace of the protocol used + @param callback: method to call to manage the source. + Call arguments are the same as for [download], with an extra ``source`` dict + which is used just after ``attachment`` to give a quick reference to the + source used. + The callabke must return a tuple with: + - progress ID + - a Deferred which fire whant the file is fully downloaded + """ + if source_type is self._download_callbacks: + raise exceptions.ConflictError( + f"The is already a callback registered for source type {source_type!r}" + ) + self._download_callbacks[source_type] = callback + + def register_scheme(self, scheme: str, download_cb: Callable) -> None: + """Register an URI scheme handler + + @param scheme: URI scheme this callback is handling + @param download_cb: callback to download a file + arguments are: + - (SatXMPPClient) client + - (urllib.parse.SplitResult) parsed URI + - (Path) destination path where the file must be downloaded + - (dict) options + must return a tuple with progress_id and a Deferred which fire when download + is finished + """ + if scheme in self._scheme_callbacks: + raise exceptions.ConflictError( + f"A method with scheme {scheme!r} is already registered" + ) + self._scheme_callbacks[scheme] = download_cb + + def unregister(self, scheme): + try: + del self._scheme_callbacks[scheme] + except KeyError: + raise exceptions.NotFound(f"No callback registered for scheme {scheme!r}") + + def errback_download(self, file_obj, download_d, resp): + """Set file_obj and download deferred appropriatly after a network error + + @param file_obj(SatFile): file where the download must be done + @param download_d(Deferred): deffered which must be fired on complete download + @param resp(treq.response.IResponse): treq response + """ + msg = f"HTTP error ({resp.code}): {resp.phrase.decode()}" + file_obj.close(error=msg) + download_d.errback(exceptions.NetworkError(msg)) + + async def download_http(self, client, uri_parsed, dest_path, options): + url = uri_parsed.geturl() + + if options.get('ignore_tls_errors', False): + log.warning( + "TLS certificate check disabled, this is highly insecure" + ) + treq_client = treq_client_no_ssl + else: + treq_client = treq + + head_data = await treq_client.head(url) + try: + content_length = int(head_data.headers.getRawHeaders('content-length')[0]) + except (KeyError, TypeError, IndexError): + content_length = None + log.debug(f"No content lenght found at {url}") + file_obj = stream.SatFile( + self.host, + client, + dest_path, + mode="wb", + size = content_length, + ) + + progress_id = file_obj.uid + + resp = await treq_client.get(url, unbuffered=True) + if resp.code == 200: + d = treq.collect(resp, file_obj.write) + d.addBoth(lambda _: file_obj.close()) + else: + d = defer.Deferred() + self.errback_download(file_obj, d, resp) + return progress_id, d