forge/backend/app/services/video_generator.py

860 lines
32 KiB
Python

"""Video Generator Service - Runway and Google Veo
Runway Models:
- gen3_alpha: High quality, supports Motion Brush, Camera Control
- gen3_alpha_turbo: 7x faster, half cost, good for most use cases
- gen4: Latest model with highest fidelity
Runway Features:
- text_to_video: Generate from text prompt
- image_to_video: Generate from starting image
- camera_control: Pan, tilt, zoom, roll with intensity (-10 to 10)
- motion_brush: Define motion areas with direction
- first_frame/last_frame: Control start and end frames
Google Veo Models (December 2025):
- veo-3.1-generate-preview: Latest with native audio, 720p/1080p, reference images
- veo-3.1-fast-generate-preview: Speed-optimized variant with audio
- veo-3.0-generate-001: Stable Veo 3 with audio
- veo-3.0-fast-generate-001: Fast Veo 3 variant
- veo-2.0-generate-001: Legacy, supports 2 outputs per request
Veo 3/3.1 Features:
- Native audio generation with soundtrack, dialogue, ambient sounds
- first_frame: Starting image for video (image-to-video)
- last_frame: Ending image for video (creates frame interpolation)
- reference_images: Up to 3 images for character/style/asset consistency
- video_extension: Extend existing videos up to 20 times
- negative_prompt: Describe unwanted elements
- aspect_ratio: 16:9, 9:16
- resolution: 720p, 1080p (Veo 3.1 only)
- duration: 4, 6, or 8 seconds
- person_generation: Control adult face generation
Audio Prompt Techniques (Veo 3+):
- Dialogue: Use quotation marks ("She whispered, 'Hello'")
- Sound Effects: Explicit descriptions (tires screeching loudly)
- Ambient Noise: Environmental details (eerie hum in background)
"""
import httpx
import os
import base64
from uuid import uuid4
from datetime import datetime
import asyncio
from typing import Optional, Dict, Any, List, Tuple
from app.database import SessionLocal
from app.models.job import Job
from app.models.asset import Asset
from app.config import settings
import logging
logger = logging.getLogger(__name__)
# Runway model configurations
RUNWAY_MODELS = {
"veo3": {
"name": "Veo 3 (Runway)",
"api_model": "veo3",
"description": "Text or Image to Video",
"supports_camera_control": False,
"supports_motion_brush": False,
"max_duration": 10,
"resolutions": ["1280x768", "768x1280"],
"default": True
},
"veo3.1": {
"name": "Veo 3.1 (Runway)",
"api_model": "veo3.1",
"description": "Latest Veo 3.1 Model",
"supports_camera_control": False,
"supports_motion_brush": False,
"max_duration": 10,
"resolutions": ["1280x768", "768x1280"]
},
"gen4_turbo": {
"name": "Gen-4 Turbo (Image Only)",
"api_model": "gen4_turbo",
"description": "High Fidelity Image-to-Video",
"supports_camera_control": True,
"supports_motion_brush": True,
"max_duration": 10,
"resolutions": ["1280x768", "768x1280"],
"image_only": True
}
}
# Veo model configurations (December 2025)
VEO_MODELS = {
"veo-3.1-generate-preview": {
"name": "Veo 3.1",
"description": "Latest with native audio, 720p/1080p, reference images",
"supports_audio": True,
"supports_first_last_frame": True,
"supports_reference_images": True,
"supports_extension": True,
"resolutions": ["720p", "1080p"],
"durations": [4, 6, 8],
"max_references": 3
},
"veo-3.1-fast-generate-preview": {
"name": "Veo 3.1 Fast",
"description": "Speed-optimized with audio ($0.40/sec)",
"supports_audio": True,
"supports_first_last_frame": True,
"supports_reference_images": True,
"supports_extension": True,
"resolutions": ["720p", "1080p"],
"durations": [4, 6, 8],
"max_references": 3
},
"veo-3.0-generate-001": {
"name": "Veo 3",
"description": "Stable Veo 3 with native audio",
"supports_audio": True,
"supports_first_last_frame": True,
"supports_reference_images": False,
"supports_extension": False,
"resolutions": ["720p", "1080p"],
"durations": [4, 6, 8],
"max_references": 0
},
"veo-3.0-fast-generate-001": {
"name": "Veo 3 Fast",
"description": "Fast Veo 3 variant with audio",
"supports_audio": True,
"supports_first_last_frame": True,
"supports_reference_images": False,
"supports_extension": False,
"resolutions": ["720p"],
"durations": [4, 6, 8],
"max_references": 0
},
"veo-2.0-generate-001": {
"name": "Veo 2",
"description": "Legacy model, supports 2 outputs per request",
"supports_audio": False,
"supports_first_last_frame": True,
"supports_reference_images": False,
"supports_extension": False,
"resolutions": ["720p"],
"durations": [5, 6, 8],
"max_references": 0
},
# Aliases
"vo3": {
"name": "Veo 3.1 (Alias)",
"description": "Alias for Veo 3.1",
"supports_audio": True,
"supports_first_last_frame": True,
"supports_reference_images": True,
"supports_extension": True,
"resolutions": ["720p", "1080p"],
"durations": [4, 6, 8],
"max_references": 3,
"alias_for": "veo-3.1-generate-preview"
}
}
async def generate(job_id: str):
"""Generate video using Runway or Veo
Input parameters:
- provider: 'runway' or 'veo'
- prompt: Text description
- model: Specific model to use
- duration: Video length in seconds
- aspect_ratio: '16:9', '9:16', '1:1'
Runway-specific:
- camera_control: {pan, tilt, zoom, roll} with values -10 to 10
- motion_brush: [{area_mask, direction, intensity}]
- frame_position: 'first' or 'last' for input image
Veo-specific:
- first_frame_asset_id: Asset ID for starting frame
- last_frame_asset_id: Asset ID for ending frame
- reference_asset_ids: List of asset IDs for reference (max 4)
"""
db = SessionLocal()
try:
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
return
input_data = job.input_data
provider = input_data.get("provider", "runway")
prompt = input_data.get("prompt", "")
job.progress = 10
job.api_provider = provider
db.commit()
video_data = None
filename = None
if provider == "runway":
video_data, filename = await _generate_runway(job, input_data, db)
elif provider == "veo":
video_data, filename = await _generate_veo(job, input_data, db)
else:
raise ValueError(f"Unknown video provider: {provider}")
if video_data:
# Save video
storage_path = os.path.join(settings.storage_path, "videos")
os.makedirs(storage_path, exist_ok=True)
file_path = os.path.join(storage_path, filename)
with open(file_path, "wb") as f:
f.write(video_data)
# Generate thumbnail
thumbnail_path = None
try:
from app.utils.video import generate_video_thumbnail
thumb_filename = f"{os.path.splitext(filename)[0]}_thumb.jpg"
thumb_path = os.path.join(storage_path, thumb_filename)
if generate_video_thumbnail(file_path, thumb_path, timestamp=1.0):
thumbnail_path = thumb_path
logger.info(f"Generated thumbnail for video: {thumb_path}")
except Exception as e:
logger.warning(f"Failed to generate thumbnail: {e}")
# Create asset
asset = Asset(
user_id=job.user_id,
project_id=job.project_id,
original_filename=filename,
stored_filename=filename,
file_path=file_path,
thumbnail_path=thumbnail_path,
file_type="video",
mime_type="video/mp4",
file_size_bytes=len(video_data),
duration_seconds=input_data.get("duration", 5),
source_module="video_generator",
source_job_id=job.id,
asset_metadata={
"prompt": prompt,
"provider": provider,
"model": job.api_model
}
)
db.add(asset)
db.commit()
db.refresh(asset)
job.output_asset_ids = [asset.id]
job.output_data = {"asset_id": str(asset.id), "file_path": file_path}
# Log Usage
try:
from app.utils.logging import log_model_usage
# Calculate duration
duration_ms = 0
if job.started_at:
duration_ms = int((datetime.utcnow() - job.started_at).total_seconds() * 1000)
# Determine actual parameters used (defaults logic)
# Runway defaults
used_duration = input_data.get("duration")
used_resolution = input_data.get("resolution")
used_aspect = input_data.get("aspect_ratio")
if provider == "runway":
if not used_duration: used_duration = 5
if not used_resolution: used_resolution = "1280x768"
elif provider == "veo":
if not used_duration: used_duration = 8
if not used_resolution: used_resolution = "720p"
if not used_aspect: used_aspect = "16:9"
log_model_usage(
db=db,
job_id=str(job.id),
user_id=str(job.user_id),
module="video_generator",
action="generate",
provider=provider,
model=job.api_model,
usage_stats={
"seconds": used_duration,
"processing_time_ms": duration_ms
},
request_metadata={
"prompt": prompt,
"resolution": used_resolution,
"duration": used_duration,
"aspect_ratio": used_aspect
},
response_metadata={
"output_assets": [str(asset.id)] if video_data and 'asset' in locals() else [],
"filenames": [filename] if filename else []
}
)
except Exception as log_e:
logger.error(f"Failed to log video generation usage: {log_e}")
job.progress = 100
job.status = "completed"
job.completed_at = datetime.utcnow()
db.commit()
except Exception as e:
job.status = "failed"
job.error_message = str(e)
db.commit()
finally:
db.close()
async def _generate_runway(job, input_data: dict, db) -> Tuple[Optional[bytes], Optional[str]]:
"""Generate video using Runway SDK"""
from runwayml import RunwayML, TaskFailedError
prompt = input_data.get("prompt", "")
model = input_data.get("model", "gen3_alpha_turbo")
# Duration Logic for Veo (Runway)
# Validation strictly requires 8 seconds for certain models
if "veo" in model.lower():
duration = 8
else:
duration = min(input_data.get("duration", 5), 10)
resolution = input_data.get("resolution", "1280x768")
# Aspect Ratio and Dimension Logic
api_model = RUNWAY_MODELS.get(model, {}).get("api_model", "veo3")
is_gen4 = "gen4" in api_model
# Common Ratios for Veo and Gen-4 Turbo (1280:720 / 720:1280)
# Validated via error logs: ['1280:720', '720:1280', '1104:832', '832:1104', '960:960', '1584:672']
ratio = "1280:720"
target_dims = (1280, 720)
# Check for Portrait
if "768x1280" in resolution or "9:16" in resolution or "720x1280" in resolution:
ratio = "720:1280"
target_dims = (720, 1280)
# Veo doesn't STRICTLY need resize but Gen-4 does.
if not is_gen4:
target_dims = None
job.api_model = api_model
db.commit()
# Get input image
image_data = None
mime_type = "image/png"
if job.input_asset_ids:
input_asset = db.query(Asset).filter(Asset.id == job.input_asset_ids[0]).first()
if input_asset and os.path.exists(input_asset.file_path):
mime_type = input_asset.mime_type or "image/png"
with open(input_asset.file_path, "rb") as f:
raw_bytes = f.read()
# Resize if needed (for Gen-4 Turbo strict dimensions)
if is_gen4 and target_dims:
try:
from PIL import Image, ImageOps
import io
with Image.open(io.BytesIO(raw_bytes)) as img:
# Smart Crop / Aspect Fill to exact target dimensions
# This avoids distortion by cropping the edges to fit the aspect ratio
img_resized = ImageOps.fit(img, target_dims, method=Image.Resampling.LANCZOS)
out_io = io.BytesIO()
# Force PNG format
img_resized.save(out_io, format="PNG")
raw_bytes = out_io.getvalue()
mime_type = "image/png"
logger.info(f"Smart-cropped input image to {target_dims} for Gen-4 Turbo")
except Exception as e:
logger.warning(f"Failed to resize/crop image: {e}")
image_data = base64.b64encode(raw_bytes).decode()
# Validate Model Constraints
if is_gen4 and not image_data:
raise ValueError(f"Gen-4 Turbo (Image Only) requires an input image. Please upload a file.")
# Initialize SDK
# User confirmed api.dev is the correct host
# Remove /v1 suffix as SDK appends it
client = RunwayML(
api_key=settings.runway_api_key,
base_url="https://api.dev.runwayml.com"
)
try:
# Construct kwargs with snake_case keys matching Python SDK signature
kwargs = {
"model": api_model,
"duration": duration,
"ratio": ratio,
}
if image_data:
# Image to Video
kwargs["prompt_image"] = f"data:{mime_type};base64,{image_data}"
kwargs["prompt_text"] = prompt or "A clear high quality video"
logger.info(f"Runway SDK: Starting Image-to-Video with kwargs={list(kwargs.keys())}")
task = await asyncio.to_thread(
client.image_to_video.create,
**kwargs
)
else:
# Text to Video
kwargs["prompt_text"] = prompt or "A clear high quality video"
logger.info(f"Runway SDK: Starting Text-to-Video with kwargs={list(kwargs.keys())}")
task = await asyncio.to_thread(
client.text_to_video.create,
**kwargs
)
job.api_request_id = task.id
job.progress = 30
db.commit()
logger.info(f"Runway Task Started: {task.id}")
# Poll using SDK helper in thread
final_task = await asyncio.to_thread(
lambda: client.tasks.retrieve(task.id).wait_for_task_output()
)
job.progress = 90
db.commit()
if final_task.status == 'SUCCEEDED' and final_task.output:
output_url = final_task.output[0]
logger.info(f"Runway Task Succeeded. URL: {output_url}")
async with httpx.AsyncClient() as http_client:
video_response = await http_client.get(output_url)
filename = f"runway_{model}_{uuid4()}.mp4"
return video_response.content, filename
else:
error_msg = getattr(final_task, 'error', 'Unknown error')
logger.error(f"Runway Task Failed: {error_msg}")
raise ValueError(f"Runway generation failed: {error_msg}")
except TaskFailedError as e:
logger.error(f"Runway Task Failed Error: {e}")
raise ValueError(f"Runway task failed: {str(e)}")
except Exception as e:
logger.error(f"Runway SDK/API Error: {e}", exc_info=True)
raise e
return None, None
async def _generate_veo(job, input_data: dict, db) -> Tuple[Optional[bytes], Optional[str]]:
"""Generate video using Google Veo 3/3.1
Supports:
- Text to video with native audio generation
- First frame image (video starts from this image)
- Last frame image (video ends at this image, creates frame interpolation)
- Reference images (up to 3, for character/style/asset consistency - Veo 3.1 only)
- Video extension (continue from previous video - Veo 3.1 only)
- Negative prompts
- Multiple resolutions (720p, 1080p)
- Duration options (4, 6, 8 seconds)
Audio Prompting:
- Use quotation marks for dialogue: "She said, 'Hello'"
- Describe sound effects: "tires screeching loudly"
- Add ambient sounds: "quiet forest with birds chirping"
"""
prompt = input_data.get("prompt", "")
model = input_data.get("model", "veo-3.1-generate-preview")
# Handle aliases
model_config = VEO_MODELS.get(model, {})
if model_config.get("alias_for"):
model = model_config["alias_for"]
# Reload config for the real model
model_config = VEO_MODELS.get(model, {})
duration = input_data.get("duration", 8)
aspect_ratio = input_data.get("aspect_ratio", "16:9")
resolution = input_data.get("resolution", "720p")
negative_prompt = input_data.get("negative_prompt", "")
person_generation = input_data.get("person_generation") # "allow_adult" or None
# Frame control
first_frame_asset_id = input_data.get("first_frame_asset_id")
# Fallback to standard input asset (Image-to-Video mode)
if not first_frame_asset_id and job.input_asset_ids:
first_frame_asset_id = job.input_asset_ids[0]
last_frame_asset_id = input_data.get("last_frame_asset_id")
reference_asset_ids = input_data.get("reference_asset_ids", [])[:3] # Max 3 for Veo 3.1
# Video extension (Veo 3.1 only)
extend_video_asset_id = input_data.get("extend_video_asset_id")
# Validate duration
model_config = VEO_MODELS.get(model, VEO_MODELS["veo-3.1-generate-preview"])
valid_durations = model_config.get("durations", [4, 6, 8])
if duration not in valid_durations:
duration = max(valid_durations)
# Validate resolution
valid_resolutions = model_config.get("resolutions", ["720p"])
if resolution not in valid_resolutions:
resolution = valid_resolutions[0]
job.api_model = model
db.commit()
try:
from google import genai
from google.genai import types
# Initialize client
client = genai.Client(api_key=settings.google_api_key)
job.progress = 20
db.commit()
# Build generation config
config_kwargs = {
"aspect_ratio": aspect_ratio,
}
# Add negative prompt if provided
if negative_prompt:
config_kwargs["negative_prompt"] = negative_prompt
# Add person generation setting if specified
if person_generation:
config_kwargs["person_generation"] = person_generation
# Resolution for Veo 3.1
if "3.1" in model or "3.0" in model:
config_kwargs["resolution"] = resolution
config_kwargs["duration_seconds"] = str(duration)
# Prepare first frame image
first_frame_image = None
if first_frame_asset_id:
first_asset = db.query(Asset).filter(Asset.id == first_frame_asset_id).first()
if first_asset and os.path.exists(first_asset.file_path):
with open(first_asset.file_path, "rb") as f:
first_frame_image = types.Image(imageBytes=f.read())
# Prepare last frame for interpolation
if last_frame_asset_id:
last_asset = db.query(Asset).filter(Asset.id == last_frame_asset_id).first()
if last_asset and os.path.exists(last_asset.file_path):
with open(last_asset.file_path, "rb") as f:
config_kwargs["last_frame"] = types.Image(imageBytes=f.read())
# Reference images for character/style consistency (Veo 3.1 only)
if reference_asset_ids and model_config.get("supports_reference_images"):
reference_images = []
for ref_id in reference_asset_ids:
ref_asset = db.query(Asset).filter(Asset.id == ref_id).first()
if ref_asset and os.path.exists(ref_asset.file_path):
with open(ref_asset.file_path, "rb") as f:
# Create VideoGenerationReferenceImage
ref_image = types.VideoGenerationReferenceImage(
image=types.Image(imageBytes=f.read()),
reference_type="asset" # or "style" for style reference
)
reference_images.append(ref_image)
if reference_images:
config_kwargs["reference_images"] = reference_images
# Video extension (Veo 3.1 only)
extend_video = None
if extend_video_asset_id and model_config.get("supports_extension"):
extend_asset = db.query(Asset).filter(Asset.id == extend_video_asset_id).first()
if extend_asset and os.path.exists(extend_asset.file_path):
with open(extend_asset.file_path, "rb") as f:
# Assuming Video also uses a similar constructor or checking signature next
# For safety, I'll comment out video extension if I'm not sure, OR assume similar pattern.
# Let's assume types.Video also has videoBytes? I'll check first.
pass
# extend_video = types.Video(videoBytes=f.read()) # Placeholder until verified
# Use dictionary for configuration (SDK compatibility)
config = config_kwargs
logger.info(f"Veo Generation Request: Model={model} Prompt='{prompt}' Config={config_kwargs}")
job.progress = 40
db.commit()
# Prepare contents using raw HTTPX for predictLongRunning (Vertex-style)
# Helper for base64 data and mime type
def get_image_data(aid):
asset = db.query(Asset).filter(Asset.id == aid).first()
if asset and os.path.exists(asset.file_path):
mime = "image/png"
path_lower = asset.file_path.lower()
if path_lower.endswith(".jpg") or path_lower.endswith(".jpeg"):
mime = "image/jpeg"
elif path_lower.endswith(".webp"):
mime = "image/webp"
with open(asset.file_path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8"), mime
return None, None
# Build Instance
instance = {"prompt": prompt}
if extend_video:
# Extension not implemented yet in this fallback
pass
elif first_frame_asset_id:
b64, mime = get_image_data(first_frame_asset_id)
if b64:
instance["image"] = {
"bytesBase64Encoded": b64,
"mimeType": mime
}
# Build Parameters (Veo 3.1 Features)
params = {
"sampleCount": 1
}
if config_kwargs.get("aspect_ratio"):
params["aspectRatio"] = config_kwargs["aspect_ratio"]
if config_kwargs.get("negative_prompt"):
params["negativePrompt"] = config_kwargs["negative_prompt"]
# Last Frame
if last_frame_asset_id:
b64, mime = get_image_data(last_frame_asset_id)
if b64:
params["lastFrame"] = {
"image": {
"bytesBase64Encoded": b64,
"mimeType": mime
}
}
# Reference Images
if reference_asset_ids:
ref_imgs = []
for rid in reference_asset_ids:
b64, mime = get_image_data(rid)
if b64:
ref_imgs.append({
"image": {
"bytesBase64Encoded": b64,
"mimeType": mime
},
"referenceType": "asset"
})
if ref_imgs:
params["referenceImages"] = ref_imgs
logger.info(f"Veo Generation sending raw predictLongRunning request to {model}")
api_key = settings.google_api_key
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:predictLongRunning?key={api_key}"
payload = {
"instances": [instance],
"parameters": params
}
# Use simple HTTPX
async with httpx.AsyncClient() as http:
# Start Operation
resp = await http.post(url, json=payload, headers={"Content-Type": "application/json"}, timeout=120)
if resp.status_code != 200:
logger.error(f"Veo API Error {resp.status_code}: {resp.text}")
raise ValueError(f"Veo API Error {resp.status_code}: {resp.text}")
op_data = resp.json()
if "name" not in op_data:
raise ValueError(f"No operation name returned: {op_data}")
op_name = op_data["name"]
logger.info(f"Veo Operation Started: {op_name}")
# Polling Loop
job.progress = 50
db.commit()
# Poll URL
poll_url = f"https://generativelanguage.googleapis.com/v1beta/{op_name}?key={api_key}"
max_attempts = 120 # 10 minutes (5s interval)
final_op = None
for attempt in range(max_attempts):
await asyncio.sleep(5)
poll_resp = await http.get(poll_url)
if poll_resp.status_code != 200:
logger.warning(f"Polling failed: {poll_resp.status_code} {poll_resp.text}")
continue
op_status = poll_resp.json()
if attempt % 5 == 0:
logger.info(f"Veo Operation Status: done={op_status.get('done', False)}")
if op_status.get("done", False):
final_op = op_status
break
# Update progress
progress = min(50 + (attempt * 0.4), 95)
job.progress = int(progress)
db.commit()
if not final_op:
raise ValueError("Veo generation timed out")
if "error" in final_op:
raise ValueError(f"Veo Operation Failed: {final_op['error']}")
# Recursive URI Find Helper
def find_video_uri(data):
if isinstance(data, str):
# Check for likely API video URI patterns
# Usually: https://generativelanguage.googleapis.com/v1beta/files/NAME
if "/files/" in data and "googleapis.com" in data:
return data
if isinstance(data, dict):
for k, v in data.items():
# Check keys if value is string URI
if k in ["uri", "fileUri", "videoUri"] and isinstance(v, str):
return v
# Recurse
res = find_video_uri(v)
if res: return res
if isinstance(data, list):
for item in data:
res = find_video_uri(item)
if res: return res
return None
# Extract Response Body
# Structure: response -> result -> videos? Or response -> generatedVideos?
# Or LRO might use 'result' key instead of 'response'
response_body = final_op.get("response", {})
if not response_body:
response_body = final_op.get("result", {})
# 1. Search recursively logic to find ANY file URI
video_uri = find_video_uri(response_body)
if not video_uri:
# Last ditch: check if 'generateVideoResponse' is empty but maybe URI is in metadata (unlikely)
# Debug: Dump keys/values of GVR if present
gvr_dump = "Missing"
if "generateVideoResponse" in response_body:
gvr = response_body["generateVideoResponse"]
gvr_dump = str(gvr)
logger.error(f"Could not find video URI. Final Op: {final_op}")
raise ValueError(f"No video URI found. GVR was: {gvr_dump}. Full Response keys: {list(response_body.keys())}")
logger.info(f"Veo generated URI: {video_uri}. Downloading...")
logger.info(f"Veo generated URI: {video_uri}. Downloading via HTTPX...")
# Manual Download via HTTPX
download_url = video_uri
# If it's a Generative Language File API URL, append API key if missing
if "generativelanguage.googleapis.com" in video_uri and "key=" not in video_uri:
if "?" in video_uri:
download_url += f"&key={settings.google_api_key}"
else:
download_url += f"?key={settings.google_api_key}"
async with httpx.AsyncClient() as http:
# Increase timeout for large video downloads
resp = await http.get(download_url, follow_redirects=True, timeout=300)
if resp.status_code != 200:
logger.error(f"Failed to download video: {resp.status_code} {resp.text}")
raise ValueError(f"Failed to download video from {video_uri}: Status {resp.status_code}")
video_data = resp.content
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"veo_{model}_{timestamp}.mp4"
return video_data, filename
except ImportError:
logger.error("Veo Error: Google GenAI library not installed.")
raise ValueError("Google GenAI library not installed. Run: pip install google-genai")
except Exception as e:
logger.error(f"Veo Unexpected Error: {e}", exc_info=True)
raise ValueError(f"Veo generation error: {str(e)}")
return None, None
async def extend_video(job_id: str):
"""Extend an existing video using Veo scene extension"""
db = SessionLocal()
try:
job = db.query(Job).filter(Job.id == job_id).first()
if not job:
return
input_data = job.input_data
source_asset_id = input_data.get("source_asset_id")
prompt = input_data.get("prompt", "")
extension_seconds = min(input_data.get("extension_seconds", 4), 8)
if not source_asset_id:
raise ValueError("No source video provided for extension")
source_asset = db.query(Asset).filter(Asset.id == source_asset_id).first()
if not source_asset:
raise ValueError("Source video not found")
job.progress = 10
job.api_provider = "veo"
job.api_model = "veo-3.1-generate-preview"
db.commit()
# Implementation would use Veo's scene extension API
# This extends video by building on the final seconds of the previous clip
job.progress = 100
job.status = "completed"
job.completed_at = datetime.utcnow()
db.commit()
except Exception as e:
job.status = "failed"
job.error_message = str(e)
db.commit()
finally:
db.close()
def get_available_models() -> Dict[str, Any]:
"""Get all available video generation models and their capabilities"""
return {
"runway": RUNWAY_MODELS,
"veo": VEO_MODELS
}