Mercurial > libervia-backend
view libervia/backend/memory/cache.py @ 4338:7c0b7ecb816f
component email gateway: Add a pubsub service:
a pubsub service is implemented to retrieve and manage attachments using XEP-0498.
rel 453
author | Goffi <goffi@goffi.org> |
---|---|
date | Tue, 03 Dec 2024 00:13:23 +0100 |
parents | e11b13418ba6 |
children |
line wrap: on
line source
#!/usr/bin/env python3 # Libervia: an XMPP client # Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from io import BufferedIOBase import mimetypes from pathlib import Path import time from typing import Any from pydantic import BaseModel, ValidationError from libervia.backend.core import exceptions from libervia.backend.core.constants import Const as C from libervia.backend.core.i18n import _ from libervia.backend.core.log import getLogger from libervia.backend.tools.common import regex log = getLogger(__name__) CACHE_METADATA_EXT = ".cache.json" DEFAULT_EXT = ".raw" class CacheMetadata(BaseModel): source: str uid: str filename: str creation: int eol: int max_age: int = C.DEFAULT_MAX_AGE original_filename: str | None = None mime_type: str | None = None last_access: int | None = None class Cache: """Generic file caching.""" def __init__(self, host, profile): """ @param profile(unicode, None): name of the profile to set the cache for if None, the cache will be common for all profiles """ self.profile = profile path_elts = [host.memory.config_get("", "local_dir"), C.CACHE_DIR] if profile: path_elts.extend(["profiles", regex.path_escape(profile)]) else: path_elts.append("common") self.cache_dir = Path(*path_elts) self.cache_dir.mkdir(0o700, parents=True, exist_ok=True) self.purge() def purge(self): # Remove expired, unreadable, and unrelated files from cache # TODO: this should not be called only on startup, but at regular interval # (e.g. once a day) to_delete = set() seen = set() now = time.time() for cache_data_file in self.cache_dir.glob(f"*{CACHE_METADATA_EXT}"): try: with cache_data_file.open("r") as f: cache_data = CacheMetadata.model_validate_json(f.read()) except (IOError, ValidationError): log.warning( _("Can't read metadata file at {path}, deleting it.").format( path=cache_data_file ) ) to_delete.add(cache_data_file) continue else: cached_file = self.get_path(cache_data.filename) if not cached_file.exists(): log.warning( f"Cache file {cache_data_file!r} references a non-existent file " f"and will be deleted: {cache_data_file!r}." ) to_delete.add(cache_data_file) elif cache_data.eol < now: log.debug( f"Purging expired cache file {cache_data_file} (expired for " f"{int(time.time() - cache_data.eol)}s)" ) to_delete.add(cache_data_file) seen.add(cached_file) seen.add(cache_data_file) for file in to_delete: log.debug(f"Deleting cache file: {file}") file.unlink() for file in self.cache_dir.iterdir(): if file not in seen: log.debug(f"Deleting irrelevant file in cache dir: {file}") file.unlink() def get_path(self, filename: str) -> Path: """Return cached file URL. @param filename: cached file name (cache data or actual file) @return: path to the cached file """ if not filename or "/" in filename: log.error( "invalid char found in file name, hack attempt? name:{}".format(filename) ) raise exceptions.DataError("Invalid char found") return self.cache_dir / filename def get_metadata(self, uid: str, update_eol: bool = True) -> dict[str, Any] | None: """Retrieve metadata for cached data. @param uid: unique identifier of cache metadata. @param update_eol: True if eol must extended if True, max_age will be added to eol (only if it is not already expired) @return: metadata, see [cache_data] for data details, an additional "path" key is the full path to cached file. None if file is not in cache (or cache is invalid). """ uid = uid.strip() if not uid: raise exceptions.InternalError("uid must not be empty") cache_url = self.get_path(f"{uid}{CACHE_METADATA_EXT}") if not cache_url.exists(): return None try: with cache_url.open("r") as f: cache_data = CacheMetadata.model_validate_json(f.read()) except (IOError, EOFError) as e: log.warning(f"Can't read cache at {cache_url}: {e}") return None except ValidationError: log.warning(f"Invalid cache found at {cache_url}") return None except UnicodeDecodeError as e: log.warning(f"Invalid encoding, this is not a cache metadata file.") return None if cache_data.eol < time.time(): log.debug( "removing expired cache (expired for {}s)".format( time.time() - cache_data.eol ) ) return None if update_eol: now = int(time.time()) cache_data.last_access = now cache_data.eol = now + cache_data.max_age with cache_url.open("w") as f: f.write(cache_data.model_dump_json(exclude_none=True)) # FIXME: we convert to dict to be compatible with former method (pre Pydantic). # All call to get_metadata should use directly the Pydantic model in the future. cache_data_dict = cache_data.model_dump() cache_data_dict["path"] = self.get_path(cache_data.filename) return cache_data_dict def get_file_path(self, uid: str) -> Path | None: """Retrieve absolute path to file @param uid(unicode): unique identifier of file @return (unicode, None): absolute path to cached file None if file is not in cache (or cache is invalid) """ metadata = self.get_metadata(uid) if metadata is not None: return metadata["path"] def remove_from_cache(self, uid: str, metadata=None) -> None: """Remove data from cache @param uid(unicode): unique identifier cache file """ cache_data = self.get_metadata(uid, update_eol=False) if cache_data is None: log.debug(f"cache with uid {uid!r} has already expired or been removed") return try: filename = cache_data["filename"] except KeyError: log.warning(_("missing filename for cache {uid!r}").format(uid=uid)) else: filepath = self.get_path(filename) try: filepath.unlink() except FileNotFoundError: log.warning( _("missing file referenced in cache {uid!r}: {filename}").format( uid=uid, filename=filename ) ) cache_file = self.get_path(f"{uid}{CACHE_METADATA_EXT}") cache_file.unlink() log.debug(f"Cache with uid {uid!r} has been removed.") def cache_data( self, source: str, uid: str, mime_type: str | None = None, max_age: int = C.DEFAULT_MAX_AGE, original_filename: str | None = None, ) -> BufferedIOBase: """Create cache metadata and file object to use for actual data. @param source: source of the cache (should be plugin's import_name) @param uid: an identifier of the file which must be unique @param mime_type: MIME type of the file to cache it will be used notably to guess file extension It may be autogenerated if filename is specified @param max_age: maximum age in seconds the cache metadata will have an "eol" (end of life) None to use default value 0 to ignore cache (file will be re-downloaded on each access) @param original_filename: if not None, will be used to retrieve file extension and guess mime type, and stored in "original_filename" @return: file object opened in write mode you have to close it yourself (hint: use ``with`` statement) """ if original_filename is not None and mime_type is None: # we have original_filename but not MIME type, we try to guess the later mime_type = mimetypes.guess_type(original_filename, strict=False)[0] if mime_type: ext = mimetypes.guess_extension(mime_type, strict=False) if ext is None: log.warning("can't find extension for MIME type {}".format(mime_type)) ext = DEFAULT_EXT elif ext == ".jpe": ext = ".jpg" else: ext = DEFAULT_EXT mime_type = None filename = uid + ext now = int(time.time()) metadata = CacheMetadata( source=source, uid=uid, mime_type=mime_type, max_age=max_age, original_filename=original_filename, filename=filename, creation=now, eol=now + max_age, ) cache_metadata_file = self.get_path(f"{uid}{CACHE_METADATA_EXT}") file_path = self.get_path(filename) with open(cache_metadata_file, "w") as f: f.write(metadata.model_dump_json(exclude_none=True)) return open(file_path, "wb")