changeset 3818:2863345c9bbb

core (memory/cache): type hints + filename fix: cached file filename is now always UID + extension to avoid collision. `filename` argument as been renamed to `original_filename`, it is store if present, and used to guess media type when necessary.
author Goffi <goffi@goffi.org>
date Wed, 29 Jun 2022 11:54:53 +0200
parents 998c5318230f
children 4f02e339d184
files sat/memory/cache.py
diffstat 1 files changed, 55 insertions(+), 41 deletions(-) [+]
line wrap: on
line diff
--- a/sat/memory/cache.py	Wed Jun 29 11:49:03 2022 +0200
+++ b/sat/memory/cache.py	Wed Jun 29 11:54:53 2022 +0200
@@ -17,14 +17,17 @@
 # You should have received a copy of the GNU Affero General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
-import pickle as pickle
+from io import BufferedIOBase
 import mimetypes
+from pathlib import Path
+import pickle as pickle
 import time
-from pathlib import Path
+from typing import Any, Dict, Optional
+
+from sat.core import exceptions
+from sat.core.constants import Const as C
 from sat.core.i18n import _
 from sat.core.log import getLogger
-from sat.core.constants import Const as C
-from sat.core import exceptions
 from sat.tools.common import regex
 
 
@@ -38,7 +41,7 @@
 
     def __init__(self, host, profile):
         """
-        @param profile(unicode, None): ame of the profile to set the cache for
+        @param profile(unicode, None): name of the profile to set the cache for
             if None, the cache will be common for all profiles
         """
         self.profile = profile
@@ -105,11 +108,11 @@
                 purged.add(cache_file)
                 purged.add(filepath)
 
-    def getPath(self, filename):
+    def getPath(self, filename: str) -> Path:
         """return cached file URL
 
-        @param filename(str): cached file name (cache data or actual file)
-        @return (Path): path to the cached file
+        @param filename: cached file name (cache data or actual file)
+        @return: path to the cached file
         """
         if not filename or "/" in filename:
             log.error(
@@ -118,7 +121,7 @@
             raise exceptions.DataError("Invalid char found")
         return self.cache_dir / filename
 
-    def getMetadata(self, uid, update_eol=True):
+    def getMetadata(self, uid: str, update_eol: bool = True) -> Optional[Dict[str, Any]]:
         """Retrieve metadata for cached data
 
         @param uid(unicode): unique identifier of file
@@ -173,7 +176,7 @@
         cache_data["path"] = self.getPath(cache_data["filename"])
         return cache_data
 
-    def getFilePath(self, uid):
+    def getFilePath(self, uid: str) -> Path:
         """Retrieve absolute path to file
 
         @param uid(unicode): unique identifier of file
@@ -212,53 +215,64 @@
         cache_file.unlink()
         log.debug(f"cache with uid {uid!r} has been removed")
 
-    def cacheData(self, source, uid, mime_type=None, max_age=None, filename=None):
+    def cacheData(
+        self,
+        source: str,
+        uid: str,
+        mime_type: Optional[str] = None,
+        max_age: Optional[int] = None,
+        original_filename: Optional[str] = None
+    ) -> BufferedIOBase:
         """create cache metadata and file object to use for actual data
 
-        @param source(unicode): source of the cache (should be plugin's import_name)
-        @param uid(unicode): an identifier of the file which must be unique
-        @param mime_type(unicode): MIME type of the file to cache
+        @param source: source of the cache (should be plugin's import_name)
+        @param uid: an identifier of the file which must be unique
+        @param mime_type: MIME type of the file to cache
             it will be used notably to guess file extension
             It may be autogenerated if filename is specified
-        @param max_age(int, None): maximum age in seconds
+        @param max_age: maximum age in seconds
             the cache metadata will have an "eol" (end of life)
             None to use default value
             0 to ignore cache (file will be re-downloaded on each access)
-        @param filename: if not None, will be used as filename
-            else one will be generated from uid and guessed extension
-        @return(file): file object opened in write mode
-            you have to close it yourself (hint: use with statement)
+        @param original_filename: if not None, will be used to retrieve file extension and
+            guess
+            mime type, and stored in "original_filename"
+        @return: file object opened in write mode
+            you have to close it yourself (hint: use ``with`` statement)
         """
-        cache_url = self.getPath(uid)
-        if filename is None:
-            if mime_type:
-                ext = mimetypes.guess_extension(mime_type, strict=False)
-                if ext is None:
-                    log.warning(
-                        "can't find extension for MIME type {}".format(mime_type)
-                    )
-                    ext = DEFAULT_EXT
-                elif ext == ".jpe":
-                    ext = ".jpg"
-            else:
-                ext = DEFAULT_EXT
-                mime_type = None
-            filename = uid + ext
-        elif mime_type is None:
-            # we have filename but not MIME type, we try to guess the later
-            mime_type = mimetypes.guess_type(filename, strict=False)[0]
         if max_age is None:
             max_age = C.DEFAULT_MAX_AGE
-        now = int(time.time())
         cache_data = {
             "source": source,
+            # we also store max_age for updating eol
+            "max_age": max_age,
+        }
+        cache_url = self.getPath(uid)
+        if original_filename is not None:
+            cache_data["original_filename"] = original_filename
+            if mime_type is None:
+                # we have original_filename but not MIME type, we try to guess the later
+                mime_type = mimetypes.guess_type(original_filename, strict=False)[0]
+        if mime_type:
+            ext = mimetypes.guess_extension(mime_type, strict=False)
+            if ext is None:
+                log.warning(
+                    "can't find extension for MIME type {}".format(mime_type)
+                )
+                ext = DEFAULT_EXT
+            elif ext == ".jpe":
+                ext = ".jpg"
+        else:
+            ext = DEFAULT_EXT
+            mime_type = None
+        filename = uid + ext
+        now = int(time.time())
+        cache_data.update({
             "filename": filename,
             "creation": now,
             "eol": now + max_age,
-            # we also store max_age for updating eol
-            "max_age": max_age,
             "mime_type": mime_type,
-        }
+        })
         file_path = self.getPath(filename)
 
         with open(cache_url, "wb") as f: