forge/backend/app/api/v1/assets.py

389 lines
13 KiB
Python

"""Asset API Routes"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from uuid import UUID, uuid4
import os
import shutil
from PIL import Image
import io
from app.database import get_db
from app.models.asset import Asset
from app.models.user import User
from app.schemas.asset import AssetCreate, AssetResponse, AssetUpdate
from app.config import settings
router = APIRouter()
THUMBNAIL_SIZE = (256, 256)
THUMBNAIL_QUALITY = 85
def get_file_type(mime_type: str) -> str:
"""Determine file type from mime type"""
if mime_type.startswith("image/"):
return "image"
elif mime_type.startswith("video/"):
return "video"
elif mime_type.startswith("audio/"):
return "audio"
else:
return "document"
def generate_thumbnail(file_path: str, file_type: str, asset_id: str) -> Optional[str]:
"""Generate a thumbnail for an asset"""
try:
thumbnail_dir = os.path.join(settings.storage_path, "thumbnails")
os.makedirs(thumbnail_dir, exist_ok=True)
thumbnail_path = os.path.join(thumbnail_dir, f"{asset_id}.jpg")
if file_type == "image":
with Image.open(file_path) as img:
img.thumbnail(THUMBNAIL_SIZE, Image.Resampling.LANCZOS)
# Convert to RGB if necessary (for PNG with alpha)
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
img.save(thumbnail_path, 'JPEG', quality=THUMBNAIL_QUALITY)
return thumbnail_path
elif file_type == "video":
try:
import ffmpeg
# Try creating thumbnail at 1s mark
(
ffmpeg
.input(file_path, ss=1)
.output(thumbnail_path, vframes=1, vf=f'scale={THUMBNAIL_SIZE[0]}:-1')
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
return thumbnail_path
except Exception as e:
print(f"Failed to generate video thumbnail: {e}")
return None
except Exception as e:
print(f"Failed to generate thumbnail: {e}")
return None
@router.get("/", response_model=List[AssetResponse])
def get_assets(
skip: int = 0,
limit: int = 50,
file_type: Optional[str] = None,
module: Optional[str] = None,
db: Session = Depends(get_db)
):
"""Get all assets with optional filtering"""
query = db.query(Asset)
if file_type:
query = query.filter(Asset.file_type == file_type)
if module:
query = query.filter(Asset.source_module == module)
assets = query.order_by(Asset.created_at.desc()).offset(skip).limit(limit).all()
return assets
@router.get("/library")
def get_asset_library(
file_types: Optional[str] = Query(None, description="Comma-separated file types: image,video,audio"),
search: Optional[str] = None,
page: int = Query(1, ge=1),
limit: int = Query(20, le=100),
db: Session = Depends(get_db)
):
"""Get user's asset library with thumbnails for selection in tools"""
# Get test user for now
user = db.query(User).filter(User.email == "test@forge.ai").first()
query = db.query(Asset).filter(Asset.is_temporary == False)
if user:
query = query.filter(Asset.user_id == user.id)
if file_types:
types = [t.strip() for t in file_types.split(",")]
query = query.filter(Asset.file_type.in_(types))
if search:
query = query.filter(Asset.original_filename.ilike(f"%{search}%"))
total = query.count()
assets = query.order_by(desc(Asset.created_at)).offset((page - 1) * limit).limit(limit).all()
return {
"items": [
{
"id": str(a.id),
"filename": a.original_filename or a.stored_filename,
"file_type": a.file_type,
"mime_type": a.mime_type,
"width": a.width,
"height": a.height,
"thumbnail_url": f"/api/v1/assets/{a.id}/thumbnail" if a.thumbnail_path else None,
"file_url": f"/api/v1/assets/{a.id}/download",
"created_at": a.created_at.isoformat(),
"source_module": a.source_module
}
for a in assets
],
"total": total,
"page": page,
"limit": limit,
"pages": (total + limit - 1) // limit
}
@router.get("/{asset_id}/thumbnail")
def get_asset_thumbnail(asset_id: UUID, db: Session = Depends(get_db)):
"""Get asset thumbnail for fast preview"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
# If thumbnail exists, serve it
if asset.thumbnail_path and os.path.exists(asset.thumbnail_path):
return FileResponse(asset.thumbnail_path, media_type="image/jpeg")
# Generate thumbnail on-demand if it doesn't exist
if asset.file_type in ["image", "video"] and os.path.exists(asset.file_path):
thumbnail_path = generate_thumbnail(asset.file_path, asset.file_type, str(asset.id))
if thumbnail_path:
asset.thumbnail_path = thumbnail_path
db.commit()
return FileResponse(thumbnail_path, media_type="image/jpeg")
# Fallback: serve original (not ideal but works)
if os.path.exists(asset.file_path):
return FileResponse(asset.file_path, media_type=asset.mime_type)
raise HTTPException(status_code=404, detail="Thumbnail not available")
@router.get("/{asset_id}", response_model=AssetResponse)
def get_asset(asset_id: UUID, db: Session = Depends(get_db)):
"""Get asset by ID"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
return asset
@router.get("/{asset_id}/download")
def download_asset(asset_id: UUID, db: Session = Depends(get_db)):
"""Download an asset file"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
file_path = asset.file_path
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
file_path,
filename=asset.original_filename or asset.stored_filename,
media_type=asset.mime_type
)
async def process_upload(
file: UploadFile,
db: Session,
user: Optional[User] = None,
project_id: Optional[str] = None,
source_module: Optional[str] = None,
is_temporary: bool = False,
overwrite: bool = False
) -> Asset:
"""Core logic for uploading/saving an asset"""
# Check for duplicates if not temporary
existing_asset = None
if not is_temporary and user:
existing_asset = db.query(Asset).filter(
Asset.user_id == user.id,
Asset.original_filename == file.filename,
Asset.is_temporary == False
).first()
if existing_asset:
if not overwrite:
# Return conflict with existing ID
# We interpret 409 specially in frontend
raise HTTPException(
status_code=409,
detail={"message": "File exists", "asset_id": str(existing_asset.id)}
)
else:
# Overwrite: Delete existing file on disk but KEEP the record
if os.path.exists(existing_asset.file_path):
try:
os.remove(existing_asset.file_path)
except OSError:
pass
if existing_asset.thumbnail_path and os.path.exists(existing_asset.thumbnail_path):
try:
os.remove(existing_asset.thumbnail_path)
except OSError:
pass
# Reuse the existing ID
asset_id = existing_asset.id
# Determine file type
file_type = get_file_type(file.content_type)
# Generate unique ID if new, otherwise reuse
if not 'asset_id' in locals():
asset_id = uuid4()
ext = os.path.splitext(file.filename)[1] if file.filename else ""
stored_filename = f"{asset_id}{ext}"
# Determine storage path
storage_dir = os.path.join(settings.storage_path, f"{file_type}s")
os.makedirs(storage_dir, exist_ok=True)
file_path = os.path.join(storage_dir, stored_filename)
# Save file
try:
with open(file_path, "wb") as buffer:
# Read in chunks to handle large files
while content := await file.read(1024 * 1024):
buffer.write(content)
await file.seek(0) # Reset cursor
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}")
# Extract metadata
width, height, duration_seconds = None, None, None
try:
if file_type == "image":
with Image.open(file_path) as img:
width, height = img.size
elif file_type == "video":
# Placeholder for video metadata
# In production, use ffmpeg probe
from app.utils.video import extract_video_metadata
metadata = extract_video_metadata(file_path)
width = metadata.get('width')
height = metadata.get('height')
duration_seconds = metadata.get('duration_seconds')
except Exception:
pass # Ignore metadata extraction errors
# Generate thumbnail
thumbnail_path = generate_thumbnail(file_path, file_type, str(asset_id))
# Create or Update Asset record
file_size = os.path.getsize(file_path)
if existing_asset:
# Update existing record
existing_asset.stored_filename = stored_filename
existing_asset.file_path = file_path
existing_asset.thumbnail_path = thumbnail_path
existing_asset.file_type = file_type
existing_asset.mime_type = file.content_type
existing_asset.file_size_bytes = file_size
existing_asset.width = width
existing_asset.height = height
existing_asset.duration_seconds = duration_seconds
existing_asset.source_module = source_module
# Don't update project_id unless specified? For now keep it simple.
if project_id:
existing_asset.project_id = UUID(project_id)
db.commit()
db.refresh(existing_asset)
return existing_asset
else:
# Create new record
asset = Asset(
id=asset_id,
user_id=user.id if user else None,
project_id=UUID(project_id) if project_id else None,
original_filename=file.filename,
stored_filename=stored_filename,
file_path=file_path,
thumbnail_path=thumbnail_path,
file_type=file_type,
mime_type=file.content_type,
file_size_bytes=file_size,
width=width,
height=height,
duration_seconds=duration_seconds,
source_module=source_module,
is_temporary=is_temporary
)
db.add(asset)
db.commit()
db.refresh(asset)
return asset
@router.post("/upload", response_model=AssetResponse)
async def upload_asset(
file: UploadFile = File(...),
project_id: Optional[str] = Form(None),
source_module: Optional[str] = Form(None),
is_temporary: bool = Form(False),
overwrite: bool = Form(False),
db: Session = Depends(get_db)
):
"""Upload a new asset"""
# Get test user
user = db.query(User).filter(User.email == "test@forge.ai").first()
return await process_upload(
file=file,
db=db,
user=user,
project_id=project_id,
source_module=source_module,
is_temporary=is_temporary,
overwrite=overwrite
)
@router.patch("/{asset_id}", response_model=AssetResponse)
def update_asset(asset_id: UUID, asset_update: AssetUpdate, db: Session = Depends(get_db)):
"""Update asset details (e.g. rename)"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
if asset_update.original_filename:
asset.original_filename = asset_update.original_filename
db.commit()
db.refresh(asset)
return asset
@router.delete("/{asset_id}")
def delete_asset(asset_id: UUID, db: Session = Depends(get_db)):
"""Delete an asset"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
# Delete file from disk
if os.path.exists(asset.file_path):
os.remove(asset.file_path)
# Delete from database
db.delete(asset)
db.commit()
return {"message": "Asset deleted"}