"""Asset API Routes""" from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query from fastapi.responses import FileResponse from sqlalchemy.orm import Session from sqlalchemy import desc from typing import List, Optional from uuid import UUID, uuid4 import os import shutil from PIL import Image import io from app.database import get_db from app.models.asset import Asset from app.models.user import User from app.schemas.asset import AssetCreate, AssetResponse, AssetUpdate from app.config import settings router = APIRouter() THUMBNAIL_SIZE = (256, 256) THUMBNAIL_QUALITY = 85 def get_file_type(mime_type: str) -> str: """Determine file type from mime type""" if mime_type.startswith("image/"): return "image" elif mime_type.startswith("video/"): return "video" elif mime_type.startswith("audio/"): return "audio" else: return "document" def generate_thumbnail(file_path: str, file_type: str, asset_id: str) -> Optional[str]: """Generate a thumbnail for an asset""" try: thumbnail_dir = os.path.join(settings.storage_path, "thumbnails") os.makedirs(thumbnail_dir, exist_ok=True) thumbnail_path = os.path.join(thumbnail_dir, f"{asset_id}.jpg") if file_type == "image": with Image.open(file_path) as img: img.thumbnail(THUMBNAIL_SIZE, Image.Resampling.LANCZOS) # Convert to RGB if necessary (for PNG with alpha) if img.mode in ('RGBA', 'LA', 'P'): img = img.convert('RGB') img.save(thumbnail_path, 'JPEG', quality=THUMBNAIL_QUALITY) return thumbnail_path elif file_type == "video": try: import ffmpeg # Try creating thumbnail at 1s mark ( ffmpeg .input(file_path, ss=1) .output(thumbnail_path, vframes=1, vf=f'scale={THUMBNAIL_SIZE[0]}:-1') .overwrite_output() .run(capture_stdout=True, capture_stderr=True) ) return thumbnail_path except Exception as e: print(f"Failed to generate video thumbnail: {e}") return None except Exception as e: print(f"Failed to generate thumbnail: {e}") return None @router.get("/", response_model=List[AssetResponse]) def get_assets( skip: int = 0, limit: int = 50, file_type: Optional[str] = None, module: Optional[str] = None, db: Session = Depends(get_db) ): """Get all assets with optional filtering""" query = db.query(Asset) if file_type: query = query.filter(Asset.file_type == file_type) if module: query = query.filter(Asset.source_module == module) assets = query.order_by(Asset.created_at.desc()).offset(skip).limit(limit).all() return assets @router.get("/library") def get_asset_library( file_types: Optional[str] = Query(None, description="Comma-separated file types: image,video,audio"), search: Optional[str] = None, page: int = Query(1, ge=1), limit: int = Query(20, le=100), db: Session = Depends(get_db) ): """Get user's asset library with thumbnails for selection in tools""" # Get test user for now user = db.query(User).filter(User.email == "test@forge.ai").first() query = db.query(Asset).filter(Asset.is_temporary == False) if user: query = query.filter(Asset.user_id == user.id) if file_types: types = [t.strip() for t in file_types.split(",")] query = query.filter(Asset.file_type.in_(types)) if search: query = query.filter(Asset.original_filename.ilike(f"%{search}%")) total = query.count() assets = query.order_by(desc(Asset.created_at)).offset((page - 1) * limit).limit(limit).all() return { "items": [ { "id": str(a.id), "filename": a.original_filename or a.stored_filename, "file_type": a.file_type, "mime_type": a.mime_type, "width": a.width, "height": a.height, "thumbnail_url": f"/api/v1/assets/{a.id}/thumbnail" if a.thumbnail_path else None, "file_url": f"/api/v1/assets/{a.id}/download", "created_at": a.created_at.isoformat(), "source_module": a.source_module } for a in assets ], "total": total, "page": page, "limit": limit, "pages": (total + limit - 1) // limit } @router.get("/{asset_id}/thumbnail") def get_asset_thumbnail(asset_id: UUID, db: Session = Depends(get_db)): """Get asset thumbnail for fast preview""" asset = db.query(Asset).filter(Asset.id == asset_id).first() if not asset: raise HTTPException(status_code=404, detail="Asset not found") # If thumbnail exists, serve it if asset.thumbnail_path and os.path.exists(asset.thumbnail_path): return FileResponse(asset.thumbnail_path, media_type="image/jpeg") # Generate thumbnail on-demand if it doesn't exist if asset.file_type in ["image", "video"] and os.path.exists(asset.file_path): thumbnail_path = generate_thumbnail(asset.file_path, asset.file_type, str(asset.id)) if thumbnail_path: asset.thumbnail_path = thumbnail_path db.commit() return FileResponse(thumbnail_path, media_type="image/jpeg") # Fallback: serve original (not ideal but works) if os.path.exists(asset.file_path): return FileResponse(asset.file_path, media_type=asset.mime_type) raise HTTPException(status_code=404, detail="Thumbnail not available") @router.get("/{asset_id}", response_model=AssetResponse) def get_asset(asset_id: UUID, db: Session = Depends(get_db)): """Get asset by ID""" asset = db.query(Asset).filter(Asset.id == asset_id).first() if not asset: raise HTTPException(status_code=404, detail="Asset not found") return asset @router.get("/{asset_id}/download") def download_asset(asset_id: UUID, db: Session = Depends(get_db)): """Download an asset file""" asset = db.query(Asset).filter(Asset.id == asset_id).first() if not asset: raise HTTPException(status_code=404, detail="Asset not found") file_path = asset.file_path if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="File not found on disk") return FileResponse( file_path, filename=asset.original_filename or asset.stored_filename, media_type=asset.mime_type ) async def process_upload( file: UploadFile, db: Session, user: Optional[User] = None, project_id: Optional[str] = None, source_module: Optional[str] = None, is_temporary: bool = False, overwrite: bool = False ) -> Asset: """Core logic for uploading/saving an asset""" # Check for duplicates if not temporary existing_asset = None if not is_temporary and user: existing_asset = db.query(Asset).filter( Asset.user_id == user.id, Asset.original_filename == file.filename, Asset.is_temporary == False ).first() if existing_asset: if not overwrite: # Return conflict with existing ID # We interpret 409 specially in frontend raise HTTPException( status_code=409, detail={"message": "File exists", "asset_id": str(existing_asset.id)} ) else: # Overwrite: Delete existing file on disk but KEEP the record if os.path.exists(existing_asset.file_path): try: os.remove(existing_asset.file_path) except OSError: pass if existing_asset.thumbnail_path and os.path.exists(existing_asset.thumbnail_path): try: os.remove(existing_asset.thumbnail_path) except OSError: pass # Reuse the existing ID asset_id = existing_asset.id # Determine file type file_type = get_file_type(file.content_type) # Generate unique ID if new, otherwise reuse if not 'asset_id' in locals(): asset_id = uuid4() ext = os.path.splitext(file.filename)[1] if file.filename else "" stored_filename = f"{asset_id}{ext}" # Determine storage path storage_dir = os.path.join(settings.storage_path, f"{file_type}s") os.makedirs(storage_dir, exist_ok=True) file_path = os.path.join(storage_dir, stored_filename) # Save file try: with open(file_path, "wb") as buffer: # Read in chunks to handle large files while content := await file.read(1024 * 1024): buffer.write(content) await file.seek(0) # Reset cursor except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to save file: {str(e)}") # Extract metadata width, height, duration_seconds = None, None, None try: if file_type == "image": with Image.open(file_path) as img: width, height = img.size elif file_type == "video": # Placeholder for video metadata # In production, use ffmpeg probe from app.utils.video import extract_video_metadata metadata = extract_video_metadata(file_path) width = metadata.get('width') height = metadata.get('height') duration_seconds = metadata.get('duration_seconds') except Exception: pass # Ignore metadata extraction errors # Generate thumbnail thumbnail_path = generate_thumbnail(file_path, file_type, str(asset_id)) # Create or Update Asset record file_size = os.path.getsize(file_path) if existing_asset: # Update existing record existing_asset.stored_filename = stored_filename existing_asset.file_path = file_path existing_asset.thumbnail_path = thumbnail_path existing_asset.file_type = file_type existing_asset.mime_type = file.content_type existing_asset.file_size_bytes = file_size existing_asset.width = width existing_asset.height = height existing_asset.duration_seconds = duration_seconds existing_asset.source_module = source_module # Don't update project_id unless specified? For now keep it simple. if project_id: existing_asset.project_id = UUID(project_id) db.commit() db.refresh(existing_asset) return existing_asset else: # Create new record asset = Asset( id=asset_id, user_id=user.id if user else None, project_id=UUID(project_id) if project_id else None, original_filename=file.filename, stored_filename=stored_filename, file_path=file_path, thumbnail_path=thumbnail_path, file_type=file_type, mime_type=file.content_type, file_size_bytes=file_size, width=width, height=height, duration_seconds=duration_seconds, source_module=source_module, is_temporary=is_temporary ) db.add(asset) db.commit() db.refresh(asset) return asset @router.post("/upload", response_model=AssetResponse) async def upload_asset( file: UploadFile = File(...), project_id: Optional[str] = Form(None), source_module: Optional[str] = Form(None), is_temporary: bool = Form(False), overwrite: bool = Form(False), db: Session = Depends(get_db) ): """Upload a new asset""" # Get test user user = db.query(User).filter(User.email == "test@forge.ai").first() return await process_upload( file=file, db=db, user=user, project_id=project_id, source_module=source_module, is_temporary=is_temporary, overwrite=overwrite ) @router.patch("/{asset_id}", response_model=AssetResponse) def update_asset(asset_id: UUID, asset_update: AssetUpdate, db: Session = Depends(get_db)): """Update asset details (e.g. rename)""" asset = db.query(Asset).filter(Asset.id == asset_id).first() if not asset: raise HTTPException(status_code=404, detail="Asset not found") if asset_update.original_filename: asset.original_filename = asset_update.original_filename db.commit() db.refresh(asset) return asset @router.delete("/{asset_id}") def delete_asset(asset_id: UUID, db: Session = Depends(get_db)): """Delete an asset""" asset = db.query(Asset).filter(Asset.id == asset_id).first() if not asset: raise HTTPException(status_code=404, detail="Asset not found") # Delete file from disk if os.path.exists(asset.file_path): os.remove(asset.file_path) # Delete from database db.delete(asset) db.commit() return {"message": "Asset deleted"}