115 lines
3.5 KiB
Python
Executable file
115 lines
3.5 KiB
Python
Executable file
import hashlib
|
|
import os
|
|
import uuid
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import aiofiles
|
|
|
|
from app.config import settings
|
|
|
|
|
|
class StorageService:
|
|
"""Service for storing and retrieving proof files."""
|
|
|
|
def __init__(self):
|
|
self.storage_path = Path(settings.FILE_STORAGE_PATH)
|
|
self._ensure_storage_exists()
|
|
|
|
def _ensure_storage_exists(self) -> None:
|
|
"""Ensure the storage directory exists."""
|
|
self.storage_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
def _generate_storage_key(
|
|
self,
|
|
campaign_id: uuid.UUID,
|
|
proof_name: str,
|
|
version: int,
|
|
file_extension: str,
|
|
) -> str:
|
|
"""Generate a unique storage key for a file."""
|
|
date_prefix = datetime.utcnow().strftime("%Y/%m")
|
|
safe_proof_name = "".join(c if c.isalnum() or c in "-_" else "_" for c in proof_name)
|
|
filename = f"{safe_proof_name}_v{version}{file_extension}"
|
|
return f"{date_prefix}/{campaign_id}/{filename}"
|
|
|
|
def _get_file_path(self, storage_key: str) -> Path:
|
|
"""Get the full file path for a storage key."""
|
|
return self.storage_path / storage_key
|
|
|
|
async def store_file(
|
|
self,
|
|
file_data: bytes,
|
|
campaign_id: uuid.UUID,
|
|
proof_name: str,
|
|
version: int,
|
|
file_type: str,
|
|
) -> str:
|
|
"""Store a file and return the storage key."""
|
|
# Determine file extension from MIME type
|
|
extension_map = {
|
|
"image/png": ".png",
|
|
"image/jpeg": ".jpg",
|
|
"image/jpg": ".jpg",
|
|
"image/gif": ".gif",
|
|
"image/webp": ".webp",
|
|
"image/svg+xml": ".svg",
|
|
"application/pdf": ".pdf",
|
|
}
|
|
extension = extension_map.get(file_type, ".bin")
|
|
|
|
storage_key = self._generate_storage_key(
|
|
campaign_id=campaign_id,
|
|
proof_name=proof_name,
|
|
version=version,
|
|
file_extension=extension,
|
|
)
|
|
|
|
file_path = self._get_file_path(storage_key)
|
|
file_path.parent.mkdir(parents=True, exist_ok=True)
|
|
|
|
async with aiofiles.open(file_path, "wb") as f:
|
|
await f.write(file_data)
|
|
|
|
return storage_key
|
|
|
|
async def get_file(self, storage_key: str) -> Optional[bytes]:
|
|
"""Retrieve a file by its storage key."""
|
|
file_path = self._get_file_path(storage_key)
|
|
if not file_path.exists():
|
|
return None
|
|
|
|
async with aiofiles.open(file_path, "rb") as f:
|
|
return await f.read()
|
|
|
|
async def delete_file(self, storage_key: str) -> bool:
|
|
"""Delete a file by its storage key."""
|
|
file_path = self._get_file_path(storage_key)
|
|
if not file_path.exists():
|
|
return False
|
|
|
|
os.remove(file_path)
|
|
return True
|
|
|
|
def get_file_url(self, storage_key: str) -> str:
|
|
"""Get a URL to access the file (for local storage, returns a relative path)."""
|
|
return f"/files/{storage_key}"
|
|
|
|
async def generate_thumbnail_data_url(
|
|
self,
|
|
file_data: bytes,
|
|
file_type: str,
|
|
) -> str:
|
|
"""Generate a data URL for the file (for small previews)."""
|
|
import base64
|
|
b64_data = base64.b64encode(file_data).decode("utf-8")
|
|
return f"data:{file_type};base64,{b64_data}"
|
|
|
|
def get_checksum(self, file_data: bytes) -> str:
|
|
"""Calculate MD5 checksum of file data."""
|
|
return hashlib.md5(file_data).hexdigest()
|
|
|
|
|
|
# Singleton instance
|
|
storage_service = StorageService()
|