# HG changeset patch # User Goffi # Date 1656496493 -7200 # Node ID 2863345c9bbb6180dd27fc7b6faa026560e5e977 # Parent 998c5318230fa4ea09ad38cac56e144364345031 core (memory/cache): type hints + filename fix: cached file filename is now always UID + extension to avoid collision. `filename` argument as been renamed to `original_filename`, it is store if present, and used to guess media type when necessary. diff -r 998c5318230f -r 2863345c9bbb sat/memory/cache.py --- a/sat/memory/cache.py Wed Jun 29 11:49:03 2022 +0200 +++ b/sat/memory/cache.py Wed Jun 29 11:54:53 2022 +0200 @@ -17,14 +17,17 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -import pickle as pickle +from io import BufferedIOBase import mimetypes +from pathlib import Path +import pickle as pickle import time -from pathlib import Path +from typing import Any, Dict, Optional + +from sat.core import exceptions +from sat.core.constants import Const as C from sat.core.i18n import _ from sat.core.log import getLogger -from sat.core.constants import Const as C -from sat.core import exceptions from sat.tools.common import regex @@ -38,7 +41,7 @@ def __init__(self, host, profile): """ - @param profile(unicode, None): ame of the profile to set the cache for + @param profile(unicode, None): name of the profile to set the cache for if None, the cache will be common for all profiles """ self.profile = profile @@ -105,11 +108,11 @@ purged.add(cache_file) purged.add(filepath) - def getPath(self, filename): + def getPath(self, filename: str) -> Path: """return cached file URL - @param filename(str): cached file name (cache data or actual file) - @return (Path): path to the cached file + @param filename: cached file name (cache data or actual file) + @return: path to the cached file """ if not filename or "/" in filename: log.error( @@ -118,7 +121,7 @@ raise exceptions.DataError("Invalid char found") return self.cache_dir / filename - def getMetadata(self, uid, update_eol=True): + def getMetadata(self, uid: str, update_eol: bool = True) -> Optional[Dict[str, Any]]: """Retrieve metadata for cached data @param uid(unicode): unique identifier of file @@ -173,7 +176,7 @@ cache_data["path"] = self.getPath(cache_data["filename"]) return cache_data - def getFilePath(self, uid): + def getFilePath(self, uid: str) -> Path: """Retrieve absolute path to file @param uid(unicode): unique identifier of file @@ -212,53 +215,64 @@ cache_file.unlink() log.debug(f"cache with uid {uid!r} has been removed") - def cacheData(self, source, uid, mime_type=None, max_age=None, filename=None): + def cacheData( + self, + source: str, + uid: str, + mime_type: Optional[str] = None, + max_age: Optional[int] = None, + original_filename: Optional[str] = None + ) -> BufferedIOBase: """create cache metadata and file object to use for actual data - @param source(unicode): source of the cache (should be plugin's import_name) - @param uid(unicode): an identifier of the file which must be unique - @param mime_type(unicode): MIME type of the file to cache + @param source: source of the cache (should be plugin's import_name) + @param uid: an identifier of the file which must be unique + @param mime_type: MIME type of the file to cache it will be used notably to guess file extension It may be autogenerated if filename is specified - @param max_age(int, None): maximum age in seconds + @param max_age: maximum age in seconds the cache metadata will have an "eol" (end of life) None to use default value 0 to ignore cache (file will be re-downloaded on each access) - @param filename: if not None, will be used as filename - else one will be generated from uid and guessed extension - @return(file): file object opened in write mode - you have to close it yourself (hint: use with statement) + @param original_filename: if not None, will be used to retrieve file extension and + guess + mime type, and stored in "original_filename" + @return: file object opened in write mode + you have to close it yourself (hint: use ``with`` statement) """ - cache_url = self.getPath(uid) - if filename is None: - if mime_type: - ext = mimetypes.guess_extension(mime_type, strict=False) - if ext is None: - log.warning( - "can't find extension for MIME type {}".format(mime_type) - ) - ext = DEFAULT_EXT - elif ext == ".jpe": - ext = ".jpg" - else: - ext = DEFAULT_EXT - mime_type = None - filename = uid + ext - elif mime_type is None: - # we have filename but not MIME type, we try to guess the later - mime_type = mimetypes.guess_type(filename, strict=False)[0] if max_age is None: max_age = C.DEFAULT_MAX_AGE - now = int(time.time()) cache_data = { "source": source, + # we also store max_age for updating eol + "max_age": max_age, + } + cache_url = self.getPath(uid) + if original_filename is not None: + cache_data["original_filename"] = original_filename + if mime_type is None: + # we have original_filename but not MIME type, we try to guess the later + mime_type = mimetypes.guess_type(original_filename, strict=False)[0] + if mime_type: + ext = mimetypes.guess_extension(mime_type, strict=False) + if ext is None: + log.warning( + "can't find extension for MIME type {}".format(mime_type) + ) + ext = DEFAULT_EXT + elif ext == ".jpe": + ext = ".jpg" + else: + ext = DEFAULT_EXT + mime_type = None + filename = uid + ext + now = int(time.time()) + cache_data.update({ "filename": filename, "creation": now, "eol": now + max_age, - # we also store max_age for updating eol - "max_age": max_age, "mime_type": mime_type, - } + }) file_path = self.getPath(filename) with open(cache_url, "wb") as f: