"""TTL cache with per-key async locks. Decisions: - cachetools.TTLCache handles the time-based eviction; we wrap it with per-key asyncio.Lock instances so that the first concurrent caller does the load and the rest wait. Without this we'd stampede Airtable on cache expiry. - A single asyncio.Lock guards lock-dict mutations. """ from __future__ import annotations import asyncio from typing import Awaitable, Callable, Hashable, TypeVar from cachetools import TTLCache T = TypeVar("T") class TTLAsyncCache: def __init__(self, ttl: int, maxsize: int = 32) -> None: self._cache: TTLCache = TTLCache(maxsize=maxsize, ttl=ttl) self._locks: dict[Hashable, asyncio.Lock] = {} self._locks_guard = asyncio.Lock() async def _get_lock(self, key: Hashable) -> asyncio.Lock: async with self._locks_guard: lock = self._locks.get(key) if lock is None: lock = asyncio.Lock() self._locks[key] = lock return lock def peek(self, key: Hashable) -> T | None: return self._cache.get(key) def invalidate(self, key: Hashable) -> None: self._cache.pop(key, None) async def get_or_set( self, key: Hashable, loader: Callable[[], Awaitable[T]], ) -> T: # Fast path — no lock needed if value present. if key in self._cache: return self._cache[key] lock = await self._get_lock(key) async with lock: # Re-check inside the lock; another coroutine may have loaded it. if key in self._cache: return self._cache[key] value = await loader() self._cache[key] = value return value