forge/backend/app/api/v1/assets.py
DJP 7a804e896d Initial commit - FORGE AI unified platform
Features:
- Image generation (OpenAI, Gemini, Leonardo, Bria, Stability, Flux)
- Nano Banana iterative editing
- Video generation and upscaling
- Audio TTS, STT, sound effects (ElevenLabs)
- Text prompt studio and alt text
- User authentication with JWT/cookies
- Admin panel with voice management
- Job queue with Celery
- PostgreSQL + Redis backend
- Next.js 15 + FastAPI architecture

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2025-12-09 20:39:00 -05:00

267 lines
8.5 KiB
Python

"""Asset API Routes"""
from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query
from fastapi.responses import FileResponse
from sqlalchemy.orm import Session
from sqlalchemy import desc
from typing import List, Optional
from uuid import UUID, uuid4
import os
import shutil
from PIL import Image
import io
from app.database import get_db
from app.models.asset import Asset
from app.models.user import User
from app.schemas.asset import AssetCreate, AssetResponse
from app.config import settings
router = APIRouter()
THUMBNAIL_SIZE = (256, 256)
THUMBNAIL_QUALITY = 85
def get_file_type(mime_type: str) -> str:
"""Determine file type from mime type"""
if mime_type.startswith("image/"):
return "image"
elif mime_type.startswith("video/"):
return "video"
elif mime_type.startswith("audio/"):
return "audio"
else:
return "document"
def generate_thumbnail(file_path: str, file_type: str, asset_id: str) -> Optional[str]:
"""Generate a thumbnail for an asset"""
try:
thumbnail_dir = os.path.join(settings.storage_path, "thumbnails")
os.makedirs(thumbnail_dir, exist_ok=True)
thumbnail_path = os.path.join(thumbnail_dir, f"{asset_id}.jpg")
if file_type == "image":
with Image.open(file_path) as img:
img.thumbnail(THUMBNAIL_SIZE, Image.Resampling.LANCZOS)
# Convert to RGB if necessary (for PNG with alpha)
if img.mode in ('RGBA', 'LA', 'P'):
img = img.convert('RGB')
img.save(thumbnail_path, 'JPEG', quality=THUMBNAIL_QUALITY)
return thumbnail_path
elif file_type == "video":
# For video, we'd use ffmpeg - placeholder for now
# Could extract first frame with: ffmpeg -i input.mp4 -vframes 1 -f image2 output.jpg
return None
except Exception as e:
print(f"Failed to generate thumbnail: {e}")
return None
@router.get("/", response_model=List[AssetResponse])
def get_assets(
skip: int = 0,
limit: int = 50,
file_type: Optional[str] = None,
module: Optional[str] = None,
db: Session = Depends(get_db)
):
"""Get all assets with optional filtering"""
query = db.query(Asset)
if file_type:
query = query.filter(Asset.file_type == file_type)
if module:
query = query.filter(Asset.source_module == module)
assets = query.order_by(Asset.created_at.desc()).offset(skip).limit(limit).all()
return assets
@router.get("/library")
def get_asset_library(
file_types: Optional[str] = Query(None, description="Comma-separated file types: image,video,audio"),
search: Optional[str] = None,
page: int = Query(1, ge=1),
limit: int = Query(20, le=100),
db: Session = Depends(get_db)
):
"""Get user's asset library with thumbnails for selection in tools"""
# Get test user for now
user = db.query(User).filter(User.email == "test@forge.ai").first()
query = db.query(Asset).filter(Asset.is_temporary == False)
if user:
query = query.filter(Asset.user_id == user.id)
if file_types:
types = [t.strip() for t in file_types.split(",")]
query = query.filter(Asset.file_type.in_(types))
if search:
query = query.filter(Asset.original_filename.ilike(f"%{search}%"))
total = query.count()
assets = query.order_by(desc(Asset.created_at)).offset((page - 1) * limit).limit(limit).all()
return {
"items": [
{
"id": str(a.id),
"filename": a.original_filename or a.stored_filename,
"file_type": a.file_type,
"mime_type": a.mime_type,
"width": a.width,
"height": a.height,
"thumbnail_url": f"/api/v1/assets/{a.id}/thumbnail" if a.thumbnail_path else None,
"file_url": f"/api/v1/assets/{a.id}/download",
"created_at": a.created_at.isoformat(),
"source_module": a.source_module
}
for a in assets
],
"total": total,
"page": page,
"limit": limit,
"pages": (total + limit - 1) // limit
}
@router.get("/{asset_id}/thumbnail")
def get_asset_thumbnail(asset_id: UUID, db: Session = Depends(get_db)):
"""Get asset thumbnail for fast preview"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
# If thumbnail exists, serve it
if asset.thumbnail_path and os.path.exists(asset.thumbnail_path):
return FileResponse(asset.thumbnail_path, media_type="image/jpeg")
# Generate thumbnail on-demand if it doesn't exist
if asset.file_type == "image" and os.path.exists(asset.file_path):
thumbnail_path = generate_thumbnail(asset.file_path, asset.file_type, str(asset.id))
if thumbnail_path:
asset.thumbnail_path = thumbnail_path
db.commit()
return FileResponse(thumbnail_path, media_type="image/jpeg")
# Fallback: serve original (not ideal but works)
if os.path.exists(asset.file_path):
return FileResponse(asset.file_path, media_type=asset.mime_type)
raise HTTPException(status_code=404, detail="Thumbnail not available")
@router.get("/{asset_id}", response_model=AssetResponse)
def get_asset(asset_id: UUID, db: Session = Depends(get_db)):
"""Get asset by ID"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
return asset
@router.get("/{asset_id}/download")
def download_asset(asset_id: UUID, db: Session = Depends(get_db)):
"""Download an asset file"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
file_path = asset.file_path
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found on disk")
return FileResponse(
file_path,
filename=asset.original_filename or asset.stored_filename,
media_type=asset.mime_type
)
@router.post("/upload", response_model=AssetResponse)
async def upload_asset(
file: UploadFile = File(...),
project_id: Optional[str] = Form(None),
source_module: Optional[str] = Form(None),
db: Session = Depends(get_db)
):
"""Upload a new asset"""
# Get test user
user = db.query(User).filter(User.email == "test@forge.ai").first()
# Determine file type
file_type = get_file_type(file.content_type)
# Generate unique ID and filename
asset_id = uuid4()
ext = os.path.splitext(file.filename)[1] if file.filename else ""
stored_filename = f"{asset_id}{ext}"
# Determine storage path
storage_dir = os.path.join(settings.storage_path, f"{file_type}s")
os.makedirs(storage_dir, exist_ok=True)
file_path = os.path.join(storage_dir, stored_filename)
# Save file
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# Get file size
file_size = os.path.getsize(file_path)
# Get image dimensions if applicable
width = None
height = None
if file_type == "image":
try:
with Image.open(file_path) as img:
width, height = img.size
except Exception:
pass
# Generate thumbnail
thumbnail_path = generate_thumbnail(file_path, file_type, str(asset_id))
# Create asset record
asset = Asset(
id=asset_id,
user_id=user.id if user else None,
project_id=UUID(project_id) if project_id else None,
original_filename=file.filename,
stored_filename=stored_filename,
file_path=file_path,
thumbnail_path=thumbnail_path,
file_type=file_type,
mime_type=file.content_type,
file_size_bytes=file_size,
width=width,
height=height,
source_module=source_module
)
db.add(asset)
db.commit()
db.refresh(asset)
return asset
@router.delete("/{asset_id}")
def delete_asset(asset_id: UUID, db: Session = Depends(get_db)):
"""Delete an asset"""
asset = db.query(Asset).filter(Asset.id == asset_id).first()
if not asset:
raise HTTPException(status_code=404, detail="Asset not found")
# Delete file from disk
if os.path.exists(asset.file_path):
os.remove(asset.file_path)
# Delete from database
db.delete(asset)
db.commit()
return {"message": "Asset deleted"}