import hashlib import os import uuid from datetime import datetime from pathlib import Path from typing import Optional import aiofiles from app.config import settings class StorageService: """Service for storing and retrieving proof files.""" def __init__(self): self.storage_path = Path(settings.FILE_STORAGE_PATH) self._ensure_storage_exists() def _ensure_storage_exists(self) -> None: """Ensure the storage directory exists.""" self.storage_path.mkdir(parents=True, exist_ok=True) def _generate_storage_key( self, campaign_id: uuid.UUID, proof_name: str, version: int, file_extension: str, ) -> str: """Generate a unique storage key for a file.""" date_prefix = datetime.utcnow().strftime("%Y/%m") safe_proof_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in proof_name) filename = f"{safe_proof_name}_v{version}{file_extension}" return f"{date_prefix}/{campaign_id}/{filename}" def _get_file_path(self, storage_key: str) -> Path: """Get the full file path for a storage key.""" return self.storage_path / storage_key async def store_file( self, file_data: bytes, campaign_id: uuid.UUID, proof_name: str, version: int, file_type: str, ) -> str: """Store a file and return the storage key.""" # Determine file extension from MIME type extension_map = { "image/png": ".png", "image/jpeg": ".jpg", "image/jpg": ".jpg", "image/gif": ".gif", "image/webp": ".webp", "image/svg+xml": ".svg", "application/pdf": ".pdf", } extension = extension_map.get(file_type, ".bin") storage_key = self._generate_storage_key( campaign_id=campaign_id, proof_name=proof_name, version=version, file_extension=extension, ) file_path = self._get_file_path(storage_key) file_path.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(file_path, "wb") as f: await f.write(file_data) return storage_key async def get_file(self, storage_key: str) -> Optional[bytes]: """Retrieve a file by its storage key.""" file_path = self._get_file_path(storage_key) if not file_path.exists(): return None async with aiofiles.open(file_path, "rb") as f: return await f.read() async def delete_file(self, storage_key: str) -> bool: """Delete a file by its storage key.""" file_path = self._get_file_path(storage_key) if not file_path.exists(): return False os.remove(file_path) return True def get_file_url(self, storage_key: str) -> str: """Get a URL to access the file (for local storage, returns a relative path).""" return f"/files/{storage_key}" async def generate_thumbnail_data_url( self, file_data: bytes, file_type: str, ) -> str: """Generate a data URL for the file (for small previews).""" import base64 b64_data = base64.b64encode(file_data).decode("utf-8") return f"data:{file_type};base64,{b64_data}" def get_checksum(self, file_data: bytes) -> str: """Calculate MD5 checksum of file data.""" return hashlib.md5(file_data).hexdigest() async def store_kb_document( self, file_data: bytes, kb_id: uuid.UUID, doc_id: uuid.UUID, filename: str, mime_type: str, ) -> str: """Store a knowledge base source document and return the storage key.""" # Sanitize filename safe_name = "".join(c if c.isalnum() or c in "-_." else "_" for c in filename) storage_key = f"kb/{kb_id}/{doc_id}_{safe_name}" file_path = self._get_file_path(storage_key) file_path.parent.mkdir(parents=True, exist_ok=True) async with aiofiles.open(file_path, "wb") as f: await f.write(file_data) return storage_key # Singleton instance storage_service = StorageService()