160 lines
No EOL
5.5 KiB
Python
160 lines
No EOL
5.5 KiB
Python
import json
|
|
import hashlib
|
|
from typing import Optional, Any
|
|
from ..config.database import get_redis
|
|
from ..config.settings import settings
|
|
|
|
class CacheService:
|
|
def __init__(self):
|
|
self.redis = None
|
|
self.enabled = settings.cache_enabled
|
|
|
|
async def get_redis(self):
|
|
"""Get Redis client"""
|
|
if not self.redis:
|
|
self.redis = get_redis()
|
|
return self.redis
|
|
|
|
def _generate_key(self, prefix: str, *args) -> str:
|
|
"""Generate cache key"""
|
|
key_data = f"{prefix}:{':'.join(str(arg) for arg in args)}"
|
|
return hashlib.md5(key_data.encode()).hexdigest()
|
|
|
|
async def get(self, key: str) -> Optional[Any]:
|
|
"""Get value from cache"""
|
|
if not self.enabled:
|
|
return None
|
|
|
|
redis_client = await self.get_redis()
|
|
if not redis_client:
|
|
return None
|
|
|
|
try:
|
|
cached_data = await redis_client.get(key)
|
|
if cached_data:
|
|
return json.loads(cached_data)
|
|
except Exception as e:
|
|
print(f"Cache get error: {e}")
|
|
|
|
return None
|
|
|
|
async def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
|
|
"""Set value in cache"""
|
|
if not self.enabled:
|
|
return False
|
|
|
|
redis_client = await self.get_redis()
|
|
if not redis_client:
|
|
return False
|
|
|
|
try:
|
|
serialized_value = json.dumps(value, default=str)
|
|
ttl = ttl or settings.cache_ttl
|
|
await redis_client.setex(key, ttl, serialized_value)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Cache set error: {e}")
|
|
return False
|
|
|
|
async def delete(self, key: str) -> bool:
|
|
"""Delete value from cache"""
|
|
if not self.enabled:
|
|
return False
|
|
|
|
redis_client = await self.get_redis()
|
|
if not redis_client:
|
|
return False
|
|
|
|
try:
|
|
await redis_client.delete(key)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Cache delete error: {e}")
|
|
return False
|
|
|
|
async def clear_pattern(self, pattern: str) -> bool:
|
|
"""Clear cache entries matching pattern"""
|
|
if not self.enabled:
|
|
return False
|
|
|
|
redis_client = await self.get_redis()
|
|
if not redis_client:
|
|
return False
|
|
|
|
try:
|
|
keys = await redis_client.keys(pattern)
|
|
if keys:
|
|
await redis_client.delete(*keys)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Cache clear pattern error: {e}")
|
|
return False
|
|
|
|
def get_chat_cache_key(self, query: str, index_id: str) -> str:
|
|
"""Generate cache key for chat responses"""
|
|
return self._generate_key("chat", query, index_id)
|
|
|
|
def get_document_cache_key(self, index_id: str) -> str:
|
|
"""Generate cache key for document lists"""
|
|
return self._generate_key("documents", index_id)
|
|
|
|
def get_index_cache_key(self, user_id: str) -> str:
|
|
"""Generate cache key for user indices"""
|
|
return self._generate_key("indices", user_id)
|
|
|
|
def get_all_cache_keys_for_index(self, index_id: str) -> list[str]:
|
|
"""Get all cache keys that should be cleared for a specific index"""
|
|
return [
|
|
self.get_document_cache_key(index_id),
|
|
# Chat cache keys are query-specific, so we'll use pattern matching for those
|
|
]
|
|
|
|
async def clear_index_cache(self, index_id: str) -> bool:
|
|
"""Clear only cache entries for specific index"""
|
|
if not self.enabled:
|
|
return False
|
|
|
|
try:
|
|
# Clear document cache for this index
|
|
document_key = self.get_document_cache_key(index_id)
|
|
await self.delete(document_key)
|
|
|
|
# Clear chat cache entries for this index using targeted pattern
|
|
# Pattern: chat:*:index_id (since chat keys are "chat:query:index_id")
|
|
chat_pattern = f"*:{index_id}"
|
|
redis_client = await self.get_redis()
|
|
if redis_client:
|
|
# Get all keys and filter for chat keys with this index_id
|
|
all_keys = await redis_client.keys("*")
|
|
chat_keys_to_delete = []
|
|
|
|
for key in all_keys:
|
|
key_str = key.decode() if isinstance(key, bytes) else str(key)
|
|
# Check if this is a chat cache key for our index
|
|
if key_str.endswith(f":{index_id}") and "chat" in key_str:
|
|
chat_keys_to_delete.append(key)
|
|
|
|
if chat_keys_to_delete:
|
|
await redis_client.delete(*chat_keys_to_delete)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"Error clearing cache for index {index_id}: {e}")
|
|
return False
|
|
|
|
async def clear_user_index_cache(self, user_id: str) -> bool:
|
|
"""Clear index cache for a specific user"""
|
|
if not self.enabled:
|
|
return False
|
|
|
|
try:
|
|
user_index_key = self.get_index_cache_key(user_id)
|
|
await self.delete(user_index_key)
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error clearing user index cache for {user_id}: {e}")
|
|
return False
|
|
|
|
# Global cache instance
|
|
cache = CacheService() |