Source code for speasy.core.cache.cache

import os
import re
import shutil
import logging
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional, Union

try:
    import pysciqlop_cache as sc
except ImportError:  # pragma: no cover - platform-specific (WASM has no wheel)
    # No compiled backend (e.g. WASM/Pyodide): fall back to a no-op cache so
    # importing Speasy still works; caching is simply disabled.
    from . import _noop_cache as sc

from .version import str_to_version, version_to_str, Version
from speasy.config import cache as cache_cfg

cache_version = str_to_version("3.0")

log = logging.getLogger(__name__)


[docs] class CacheItem: def __init__(self, data, version, lifetime=None): self.data = data self.version = version if lifetime is not None and isinstance(lifetime, (float, int)): lifetime = timedelta(seconds=lifetime) self.lifetime = lifetime self.created = datetime.now(tz=timezone.utc)
[docs] def bump_creation_time(self) -> "CacheItem": self.created = datetime.now(tz=timezone.utc) return self
def __setstate__(self, state): self.data = state["data"] self.version = state["version"] self.lifetime = state.get("lifetime", None) self.created = state.get("created", datetime.now(tz=timezone.utc))
[docs] def is_expired(self) -> bool: if isinstance(self.lifetime, timedelta): return datetime.now(tz=timezone.utc) > (self.created + self.lifetime) else: return False
def _is_legacy_diskcache_layout(p: Path) -> bool: if not p.exists(): return False return (p / "cache.db").is_file() or bool(list(p.glob("*/cache.db"))) def _migrate_legacy_diskcache(full_path: str) -> bool: """Detect a legacy diskcache layout at ``full_path`` and migrate it to sciqlop-cache format. Returns True if a migration was performed. The legacy cache is renamed to ``<full_path>.diskcache.backup`` and the new cache is written directly at ``full_path`` (sciqlop-cache references external value files by absolute path, so the destination cannot be relocated after writing). On any failure the legacy cache is restored. For large caches this can take minutes — a one-time cost on first launch after upgrading. The legacy backup is kept so the user can verify and delete it manually. """ p = Path(full_path) if not _is_legacy_diskcache_layout(p): return False backup = Path(f"{p}.diskcache.backup") if backup.exists(): log.warning( f"Legacy cache backup already present at {backup}; " f"skipping auto-migration. Move or remove it to retry." ) return False try: from pysciqlop_cache.migrate import migrate except ImportError as e: log.error( f"Detected legacy diskcache layout at {p} but cannot migrate " f"(missing dependency: {e}). Install diskcache once to allow " f"migration, or delete {p} to start fresh." ) return False log.warning( f"Detected legacy diskcache layout at {p}; migrating to sciqlop-cache. " f"This is a one-time operation and may take several minutes for large caches." ) # sciqlop-cache stores large values as external files referenced by # absolute path, so a migrated cache cannot be relocated afterwards. # Rename the legacy cache out of the way, then migrate straight into the # final path; restore the legacy cache on any failure. os.rename(str(p), str(backup)) try: result = migrate(str(backup), str(p)) except Exception: if p.exists(): shutil.rmtree(str(p)) os.rename(str(backup), str(p)) raise log.info( f"Migration complete: {result['migrated']} entries in " f"{result['elapsed_secs']}s. Legacy cache preserved at {backup}; " f"delete it once you've verified the new cache works." ) return True
[docs] class Cache: __slots__ = ["cache_file", "_data", "cache_type"] def __init__(self, cache_path: str = "", cache_type: str = "Cache"): full_path = f"{cache_path}/{cache_type}" _migrate_legacy_diskcache(full_path) if cache_type == "Fanout": self._data = sc.FanoutCache( cache_path=full_path, shard_count=8, max_size=cache_cfg.size() ) elif cache_type == "Cache": self._data = sc.Cache(cache_path=full_path, max_size=cache_cfg.size()) else: raise ValueError(f"Unimplemented cache type: {cache_type}") self.cache_type = cache_type self._data.reset_stats() if self.version < cache_version: self._data.clear() self.version = cache_version @property def version(self): return str_to_version(self._data.get("cache/version", default="0.0.0")) @version.setter def version(self, v: Union[str, Version]): self._data["cache/version"] = v if type(v) is str else version_to_str(v)
[docs] def disk_size(self): return self._data.volume()
[docs] def stats(self): s = self._data.stats() return { "hit": s["hits"], "misses": s["misses"], }
def __len__(self): return len(self._data)
[docs] def keys(self): return list(self._data)
def __contains__(self, item): return item in self._data def __getitem__(self, key): return self._data[key] def __setitem__(self, key, value): self._data[key] = value
[docs] def set(self, key, value, expire=None, tag: Optional[str] = None): self._data.set(key, value, expire=expire, tag=tag)
[docs] def add(self, key, value, expire=None, tag: Optional[str] = None): return self._data.add(key, value, expire=expire, tag=tag)
[docs] def get(self, key, default_value=None): return self._data.get(key, default_value)
[docs] def incr(self, key, delta=1, default=0): return self._data.incr(key, delta, default=default)
[docs] def drop(self, key): self._data.delete(key)
[docs] def evict_tag(self, tag: str): return self._data.evict_tag(tag)
[docs] def drop_matching_entries(self, pattern: Union[str, re.Pattern]): """Drop all cache entries that match a given pattern Parameters ---------- pattern : str or re.Pattern The pattern to match cache keys against """ if isinstance(pattern, str): pattern = re.compile(pattern) for key in filter(pattern.match, self.keys()): log.debug(f"Dropping cache entry {key}") self.drop(key)
[docs] @contextmanager def transact(self, key: Optional[str] = None): """Open a transaction context. For a single Cache the ``key`` argument is ignored. For a FanoutCache the transaction is per-shard and ``key`` selects the shard — atomicity only applies within that shard, so all operations in the block must target the same shard key. """ if self.cache_type == "Fanout": if key is None: raise ValueError( "FanoutCache.transact requires a key to select a shard" ) with self._data.transact(key): yield else: with self._data.transact(): yield
[docs] @contextmanager def lock(self, key: str): lock = sc.Lock(self._data, key) lock.acquire() try: yield lock finally: lock.release()