view libervia/backend/memory/cache.py @ 4306:94e0968987cd

plugin XEP-0033: code modernisation, improve delivery, data validation: - Code has been rewritten using Pydantic models and `async` coroutines for data validation and cleaner element parsing/generation. - Delivery has been completely rewritten. It now works even if server doesn't support multicast, and send to local multicast service first. Delivering to local multicast service first is due to bad support of XEP-0033 in server (notably Prosody which has an incomplete implementation), and the current impossibility to detect if a sub-domain service handles fully multicast or only for local domains. This is a workaround to have a good balance between backward compatilibity and use of bandwith, and to make it work with the incoming email gateway implementation (the gateway will only deliver to entities of its own domain). - disco feature checking now uses `async` corountines. `host` implementation still use Deferred return values for compatibility with legacy code. rel 450
author Goffi <goffi@goffi.org>
date Thu, 26 Sep 2024 16:12:01 +0200
parents e11b13418ba6
children
line wrap: on
line source

#!/usr/bin/env python3

# Libervia: an XMPP client
# Copyright (C) 2009-2024 Jérôme Poisson (goffi@goffi.org)

# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.

# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.

# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

from io import BufferedIOBase
import mimetypes
from pathlib import Path
import time
from typing import Any

from pydantic import BaseModel, ValidationError

from libervia.backend.core import exceptions
from libervia.backend.core.constants import Const as C
from libervia.backend.core.i18n import _
from libervia.backend.core.log import getLogger
from libervia.backend.tools.common import regex


log = getLogger(__name__)

CACHE_METADATA_EXT = ".cache.json"
DEFAULT_EXT = ".raw"


class CacheMetadata(BaseModel):
    source: str
    uid: str
    filename: str
    creation: int
    eol: int
    max_age: int = C.DEFAULT_MAX_AGE
    original_filename: str | None = None
    mime_type: str | None = None
    last_access: int | None = None


class Cache:
    """Generic file caching."""

    def __init__(self, host, profile):
        """
        @param profile(unicode, None): name of the profile to set the cache for
            if None, the cache will be common for all profiles
        """
        self.profile = profile
        path_elts = [host.memory.config_get("", "local_dir"), C.CACHE_DIR]
        if profile:
            path_elts.extend(["profiles", regex.path_escape(profile)])
        else:
            path_elts.append("common")
        self.cache_dir = Path(*path_elts)

        self.cache_dir.mkdir(0o700, parents=True, exist_ok=True)
        self.purge()

    def purge(self):
        # Remove expired, unreadable, and unrelated files from cache
        # TODO: this should not be called only on startup, but at regular interval
        #   (e.g. once a day)
        to_delete = set()
        seen = set()
        now = time.time()
        for cache_data_file in self.cache_dir.glob(f"*{CACHE_METADATA_EXT}"):
            try:
                with cache_data_file.open("r") as f:
                    cache_data = CacheMetadata.model_validate_json(f.read())
            except (IOError, ValidationError):
                log.warning(
                    _("Can't read metadata file at {path}, deleting it.").format(
                        path=cache_data_file
                    )
                )
                to_delete.add(cache_data_file)
                continue
            else:
                cached_file = self.get_path(cache_data.filename)
                if not cached_file.exists():
                    log.warning(
                        f"Cache file {cache_data_file!r} references a non-existent file "
                        f"and will be deleted: {cache_data_file!r}."
                    )
                    to_delete.add(cache_data_file)
                elif cache_data.eol < now:
                    log.debug(
                        f"Purging expired cache file {cache_data_file} (expired for "
                        f"{int(time.time() - cache_data.eol)}s)"
                    )
                    to_delete.add(cache_data_file)
                seen.add(cached_file)
            seen.add(cache_data_file)

        for file in to_delete:
            log.debug(f"Deleting cache file: {file}")
            file.unlink()

        for file in self.cache_dir.iterdir():
            if file not in seen:
                log.debug(f"Deleting irrelevant file in cache dir: {file}")
                file.unlink()

    def get_path(self, filename: str) -> Path:
        """Return cached file URL.

        @param filename: cached file name (cache data or actual file)
        @return: path to the cached file
        """
        if not filename or "/" in filename:
            log.error(
                "invalid char found in file name, hack attempt? name:{}".format(filename)
            )
            raise exceptions.DataError("Invalid char found")
        return self.cache_dir / filename

    def get_metadata(self, uid: str, update_eol: bool = True) -> dict[str, Any] | None:
        """Retrieve metadata for cached data.

        @param uid: unique identifier of cache metadata.
        @param update_eol: True if eol must extended
            if True, max_age will be added to eol (only if it is not already expired)
        @return: metadata, see [cache_data] for data details, an additional "path" key is
            the full path to cached file.
            None if file is not in cache (or cache is invalid).
        """
        uid = uid.strip()
        if not uid:
            raise exceptions.InternalError("uid must not be empty")
        cache_url = self.get_path(f"{uid}{CACHE_METADATA_EXT}")
        if not cache_url.exists():
            return None

        try:
            with cache_url.open("r") as f:
                cache_data = CacheMetadata.model_validate_json(f.read())
        except (IOError, EOFError) as e:
            log.warning(f"Can't read cache at {cache_url}: {e}")
            return None
        except ValidationError:
            log.warning(f"Invalid cache found at {cache_url}")
            return None
        except UnicodeDecodeError as e:
            log.warning(f"Invalid encoding, this is not a cache metadata file.")
            return None

        if cache_data.eol < time.time():
            log.debug(
                "removing expired cache (expired for {}s)".format(
                    time.time() - cache_data.eol
                )
            )
            return None

        if update_eol:
            now = int(time.time())
            cache_data.last_access = now
            cache_data.eol = now + cache_data.max_age
            with cache_url.open("w") as f:
                f.write(cache_data.model_dump_json(exclude_none=True))

        # FIXME: we convert to dict to be compatible with former method (pre Pydantic).
        #   All call to get_metadata should use directly the Pydantic model in the future.
        cache_data_dict = cache_data.model_dump()
        cache_data_dict["path"] = self.get_path(cache_data.filename)
        return cache_data_dict

    def get_file_path(self, uid: str) -> Path | None:
        """Retrieve absolute path to file

        @param uid(unicode): unique identifier of file
        @return (unicode, None): absolute path to cached file
            None if file is not in cache (or cache is invalid)
        """
        metadata = self.get_metadata(uid)
        if metadata is not None:
            return metadata["path"]

    def remove_from_cache(self, uid: str, metadata=None) -> None:
        """Remove data from cache

        @param uid(unicode): unique identifier cache file
        """
        cache_data = self.get_metadata(uid, update_eol=False)
        if cache_data is None:
            log.debug(f"cache with uid {uid!r} has already expired or been removed")
            return

        try:
            filename = cache_data["filename"]
        except KeyError:
            log.warning(_("missing filename for cache {uid!r}").format(uid=uid))
        else:
            filepath = self.get_path(filename)
            try:
                filepath.unlink()
            except FileNotFoundError:
                log.warning(
                    _("missing file referenced in cache {uid!r}: {filename}").format(
                        uid=uid, filename=filename
                    )
                )

        cache_file = self.get_path(f"{uid}{CACHE_METADATA_EXT}")
        cache_file.unlink()
        log.debug(f"Cache with uid {uid!r} has been removed.")

    def cache_data(
        self,
        source: str,
        uid: str,
        mime_type: str | None = None,
        max_age: int = C.DEFAULT_MAX_AGE,
        original_filename: str | None = None,
    ) -> BufferedIOBase:
        """Create cache metadata and file object to use for actual data.

        @param source: source of the cache (should be plugin's import_name)
        @param uid: an identifier of the file which must be unique
        @param mime_type: MIME type of the file to cache
            it will be used notably to guess file extension
            It may be autogenerated if filename is specified
        @param max_age: maximum age in seconds
            the cache metadata will have an "eol" (end of life)
            None to use default value
            0 to ignore cache (file will be re-downloaded on each access)
        @param original_filename: if not None, will be used to retrieve file extension and
            guess mime type, and stored in "original_filename"
        @return: file object opened in write mode
            you have to close it yourself (hint: use ``with`` statement)
        """
        if original_filename is not None and mime_type is None:
            # we have original_filename but not MIME type, we try to guess the later
            mime_type = mimetypes.guess_type(original_filename, strict=False)[0]

        if mime_type:
            ext = mimetypes.guess_extension(mime_type, strict=False)
            if ext is None:
                log.warning("can't find extension for MIME type {}".format(mime_type))
                ext = DEFAULT_EXT
            elif ext == ".jpe":
                ext = ".jpg"
        else:
            ext = DEFAULT_EXT
            mime_type = None

        filename = uid + ext
        now = int(time.time())
        metadata = CacheMetadata(
            source=source,
            uid=uid,
            mime_type=mime_type,
            max_age=max_age,
            original_filename=original_filename,
            filename=filename,
            creation=now,
            eol=now + max_age,
        )

        cache_metadata_file = self.get_path(f"{uid}{CACHE_METADATA_EXT}")
        file_path = self.get_path(filename)

        with open(cache_metadata_file, "w") as f:
            f.write(metadata.model_dump_json(exclude_none=True))

        return open(file_path, "wb")