veo3/backend/video_generator.py
2025-11-04 02:31:40 +05:30

1190 lines
No EOL
46 KiB
Python

import os
import time
import uuid
import requests
import json
import datetime
import logging
import zipfile
import random
import threading
from google import genai
from google.genai import types
from google.auth.exceptions import DefaultCredentialsError
from utils.auth import get_google_credentials
from utils.storage import download_blob, delete_blob, validate_and_convert_image, upload_image_to_gcs, cleanup_image_files
from config import Config
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Job queue and processing configuration
CONCURRENT_JOB_LIMIT = 2
# MAX_QUEUE_SIZE_PER_USER removed - unlimited queue
MAX_RETRIES = 3
# In-memory storage (in production, use Redis or database)
job_status = {}
job_queue = [] # Global queue: [job_id1, job_id2, ...]
processing_jobs = [] # Currently active jobs (max CONCURRENT_JOB_LIMIT)
user_job_counts = {} # Track jobs per user: {user_email: count}
def send_usage_webhook(user_email: str, prompt: str) -> None:
"""
Send usage data to webhook for tracking purposes
Args:
user_email: Email of the user who processed the request
prompt: The prompt used for processing
"""
if not Config.WEBHOOK_ENABLED:
logger.info("Webhook disabled, skipping usage tracking")
return
try:
current_datetime = datetime.datetime.now().isoformat()
webhook_data = {
"tool": "VEO3",
"date": current_datetime,
"user": user_email,
"model": "VEO3",
"settings": "no settings",
"subTool": "no subTool",
"prompt": prompt,
"negativePrompt": "no NEGATIVE_PROMPT",
"image": "no image"
}
logger.info(f"Sending usage data to webhook for user: {user_email}")
response = requests.post(
Config.WEBHOOK_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(webhook_data),
timeout=Config.WEBHOOK_TIMEOUT
)
if response.status_code == 200:
logger.info("Successfully sent usage data to webhook")
else:
logger.warning(f"Webhook request failed with status code: {response.status_code}")
logger.warning(f"Response: {response.text}")
except Exception as e:
logger.error(f"Error sending usage data to webhook: {str(e)}")
# Don't raise the exception - webhook failure shouldn't block the main flow
def get_user_queue_count(user_email: str) -> int:
"""Get the current queue count for a user."""
return user_job_counts.get(user_email, 0)
def can_add_to_queue(user_email: str) -> bool:
"""Check if user can add more jobs to queue - now unlimited."""
return True # No limits
def add_to_queue(job_id: str, user_email: str) -> None:
"""Add job to queue and update user count."""
global job_queue, user_job_counts
job_queue.append(job_id)
user_job_counts[user_email] = user_job_counts.get(user_email, 0) + 1
logger.info(f"Added job {job_id} to queue for user {user_email}. Queue position: {len(job_queue)}")
def remove_from_queue(job_id: str, user_email: str) -> None:
"""Remove job from queue and update user count."""
global job_queue, user_job_counts
if job_id in job_queue:
job_queue.remove(job_id)
user_job_counts[user_email] = max(0, user_job_counts.get(user_email, 0) - 1)
if user_job_counts[user_email] == 0:
del user_job_counts[user_email]
def get_queue_position(job_id: str) -> int:
"""Get the position of a job in the queue (1-based)."""
try:
return job_queue.index(job_id) + 1
except ValueError:
return -1
def start_next_job() -> None:
"""Start the next job from queue if there's capacity."""
global processing_jobs, job_queue
if len(processing_jobs) >= CONCURRENT_JOB_LIMIT:
return
if not job_queue:
return
next_job_id = job_queue.pop(0)
processing_jobs.append(next_job_id)
# Update job status
if next_job_id in job_status:
job_status[next_job_id].update({
'status': 'processing',
'queue_position': 0,
'started_at': datetime.datetime.now().isoformat()
})
# Start processing in background thread
threading.Thread(
target=process_video_generation,
args=(next_job_id,),
daemon=True
).start()
logger.info(f"Started processing job {next_job_id}")
def complete_job(job_id: str) -> None:
"""Mark job as complete and start next queued job."""
global processing_jobs
if job_id in processing_jobs:
processing_jobs.remove(job_id)
# Start next job if available
start_next_job()
logger.info(f"Completed job {job_id}. Processing jobs: {len(processing_jobs)}")
def cancel_job(job_id: str) -> bool:
"""Cancel a job (queued or processing)."""
global job_queue, job_status, user_job_counts, processing_jobs
job = job_status.get(job_id)
if not job:
return False
user_email = job.get('user_email', 'anonymous')
# Cancel queued jobs
if job['status'] == 'queued':
remove_from_queue(job_id, user_email)
job_status[job_id].update({
'status': 'cancelled',
'message': 'Job cancelled by user',
'progress': 0,
'can_cancel': False,
'cancelled_at': datetime.datetime.now().isoformat()
})
logger.info(f"Queued job {job_id} cancelled by user")
return True
# Cancel processing jobs
elif job_id in processing_jobs:
processing_jobs.remove(job_id)
job_status[job_id].update({
'status': 'cancelled',
'message': 'Processing cancelled by user',
'progress': 0,
'can_cancel': False,
'cancelled_at': datetime.datetime.now().isoformat()
})
remove_from_queue(job_id, user_email)
# Start next job if available
start_next_job()
logger.info(f"Processing job {job_id} cancelled by user")
return True
return False
def retry_job(job_id: str) -> bool:
"""Retry a failed or cancelled job."""
global job_status
job = job_status.get(job_id)
if not job:
return False
# Only retry failed or cancelled jobs
if job['status'] not in ['failed', 'cancelled']:
return False
user_email = job.get('user_email', 'anonymous')
# Reset job status and re-queue
job_status[job_id].update({
'status': 'queued',
'message': 'Job re-queued for processing...',
'progress': 0,
'can_cancel': True,
'error': None,
'cancelled_at': None
})
# Add back to queue
add_to_queue(job_id, user_email)
job_status[job_id]['queue_position'] = get_queue_position(job_id)
# Try to start immediately if there's capacity
start_next_job()
logger.info(f"Job {job_id} retried by user")
return True
def delete_job_completely(job_id: str) -> bool:
"""Completely delete a job and all associated files."""
import shutil
global job_status, job_queue, processing_jobs, user_job_counts
job = job_status.get(job_id)
if not job:
return False
user_email = job.get('user_email', 'anonymous')
# Remove from all tracking
if job_id in job_queue:
job_queue.remove(job_id)
if job_id in processing_jobs:
processing_jobs.remove(job_id)
# Start next job if we freed up a processing slot
start_next_job()
# Clean up local files
try:
job_folder = os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}")
if os.path.exists(job_folder):
shutil.rmtree(job_folder)
logger.info(f"Deleted local folder: {job_folder}")
except Exception as e:
logger.error(f"Error deleting local folder for job {job_id}: {e}")
# Clean up GCS files
try:
if job.get('image_blob_name'):
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, job['image_blob_name'])
# Clean up any GCS video files if they exist
gcs_uri = job.get('gcs_uri')
if gcs_uri and gcs_uri.startswith('gs://'):
blob_name = gcs_uri.split('/', 3)[-1] # Extract blob name from gs://bucket/path
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, blob_name)
except Exception as e:
logger.error(f"Error deleting GCS files for job {job_id}: {e}")
# Update user job count
remove_from_queue(job_id, user_email)
# Remove from job status completely
del job_status[job_id]
logger.info(f"Job {job_id} completely deleted")
return True
def generate_video_async(
prompt: str,
job_id: str = None,
model_name: str = None,
video_length_sec: int = 8,
aspect_ratio: str = "16:9",
sample_count: int = 1,
person_generation: str = "dont_allow",
image_path: str = None,
last_frame_path: str = None,
reference_image_paths: list = None,
user_email: str = "anonymous",
seed: int = None,
generate_audio: bool = True
) -> str:
"""
Queue video generation job and return job ID.
"""
# Check if user can add more jobs
if not can_add_to_queue(user_email):
raise Exception(f"Queue limit exceeded. Maximum {MAX_QUEUE_SIZE_PER_USER} jobs per user.")
if job_id is None:
job_id = str(uuid.uuid4())
# Initialize job status with queue info
job_status[job_id] = {
'user_email': user_email,
'prompt': prompt,
'status': 'queued',
'progress': 0,
'message': 'Job queued for processing...',
'queue_position': 0, # Will be updated when added to queue
'retry_count': 0,
'max_retries': MAX_RETRIES,
'videos_requested': sample_count,
'videos_completed': [],
'individual_video_paths': [],
'can_cancel': True,
'created_at': datetime.datetime.now().isoformat(),
'started_at': None,
'video_path': None,
'gcs_uri': None,
'error': None,
'image_gcs_uri': None,
'image_blob_name': None,
'local_image_path': image_path,
'last_frame_gcs_uri': None,
'last_frame_blob_name': None,
'local_last_frame_path': last_frame_path,
'reference_image_gcs_uris': None,
'reference_image_blob_names': None,
'local_reference_image_paths': reference_image_paths,
'model_name': model_name,
'video_length_sec': video_length_sec,
'aspect_ratio': aspect_ratio,
'person_generation': person_generation,
'seed': seed,
'generate_audio': generate_audio
}
# Add to queue
add_to_queue(job_id, user_email)
job_status[job_id]['queue_position'] = get_queue_position(job_id)
# Try to start immediately if there's capacity
start_next_job()
return job_id
def process_video_generation(job_id: str) -> None:
"""
Process video generation for a specific job (called from queue processor).
"""
job = job_status.get(job_id)
if not job:
logger.error(f"Job {job_id} not found in status")
return
# Extract parameters from job
prompt = job['prompt']
user_email = job['user_email']
model_name = job['model_name']
video_length_sec = job['video_length_sec']
aspect_ratio = job['aspect_ratio']
sample_count = job['videos_requested']
person_generation = job['person_generation']
image_path = job['local_image_path']
last_frame_path = job['local_last_frame_path']
reference_image_paths = job['local_reference_image_paths']
seed = job['seed']
generate_audio = job['generate_audio']
# Validate Veo 3.1 feature constraints
if last_frame_path or (reference_image_paths and len(reference_image_paths) > 0):
if video_length_sec != 8:
error_message = "Veo 3.1 features (last frame interpolation and reference images) require 8-second duration"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
if reference_image_paths and len(reference_image_paths) > 0 and aspect_ratio != "16:9":
error_message = "Reference images feature requires 16:9 aspect ratio"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Update status to starting
job_status[job_id].update({
'status': 'starting',
'progress': 0,
'message': 'Initializing video generation...',
'can_cancel': False # Cannot cancel once processing starts
})
# Validate and upload image if provided
image_gcs_uri = None
image_blob_name = None
last_frame_gcs_uri = None
last_frame_blob_name = None
reference_image_gcs_uris = []
reference_image_blob_names = []
print(f"DEBUG: generate_video_async received image_path: {image_path}")
print(f"DEBUG: generate_video_async received last_frame_path: {last_frame_path}")
print(f"DEBUG: generate_video_async received reference_image_paths: {reference_image_paths}")
if image_path:
try:
# Validate and convert image to JPEG
validation_result = validate_and_convert_image(image_path)
if not validation_result['valid']:
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': f'First frame validation failed: {validation_result["error"]}',
'error': validation_result['error']
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Auto-select aspect ratio based on the converted image
detected_aspect_ratio = validation_result['aspect_ratio']
if detected_aspect_ratio in ['16:9', '9:16']:
aspect_ratio = detected_aspect_ratio
print(f"Using detected aspect ratio: {aspect_ratio}")
if validation_result.get('was_cropped'):
print(f"Image was automatically cropped from {validation_result['original_size']} to {validation_result['size']} to fit {aspect_ratio} aspect ratio")
# Upload image to GCS
job_status[job_id].update({
'status': 'uploading_image',
'progress': 5,
'message': 'Uploading first frame image...'
})
image_gcs_uri = upload_image_to_gcs(image_path, job_id)
image_blob_name = image_gcs_uri.replace(f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/", "")
job_status[job_id].update({
'image_gcs_uri': image_gcs_uri,
'image_blob_name': image_blob_name,
'local_image_path': image_path,
'detected_aspect_ratio': detected_aspect_ratio,
'image_was_cropped': validation_result.get('was_cropped', False)
})
except Exception as e:
error_message = f"Failed to process first frame image: {str(e)}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Upload last frame if provided (Veo 3.1 feature)
if last_frame_path:
try:
job_status[job_id].update({
'status': 'uploading_image',
'progress': 7,
'message': 'Uploading last frame image...'
})
last_frame_gcs_uri = upload_image_to_gcs(last_frame_path, f"{job_id}_last")
last_frame_blob_name = last_frame_gcs_uri.replace(f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/", "")
job_status[job_id].update({
'last_frame_gcs_uri': last_frame_gcs_uri,
'last_frame_blob_name': last_frame_blob_name,
'local_last_frame_path': last_frame_path
})
print(f"Uploaded last frame: {last_frame_gcs_uri}")
except Exception as e:
error_message = f"Failed to process last frame image: {str(e)}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Upload reference images if provided (Veo 3.1 feature)
if reference_image_paths and len(reference_image_paths) > 0:
try:
job_status[job_id].update({
'status': 'uploading_image',
'progress': 8,
'message': f'Uploading {len(reference_image_paths)} reference image(s)...'
})
from utils.storage import upload_reference_images_to_gcs
reference_image_gcs_uris = upload_reference_images_to_gcs(reference_image_paths, job_id)
reference_image_blob_names = [uri.replace(f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/", "") for uri in reference_image_gcs_uris]
job_status[job_id].update({
'reference_image_gcs_uris': reference_image_gcs_uris,
'reference_image_blob_names': reference_image_blob_names,
'local_reference_image_paths': reference_image_paths
})
print(f"Uploaded {len(reference_image_gcs_uris)} reference images")
except Exception as e:
error_message = f"Failed to process reference images: {str(e)}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
try:
# Validate model selection
selected_model = model_name or Config.MODEL_ID
if selected_model not in Config.SUPPORTED_MODELS:
error_message = f"Unsupported model '{selected_model}'. Supported models: {', '.join(Config.SUPPORTED_MODELS.keys())}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
logger.warning(f"Model validation failed for job {job_id}: {selected_model}")
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Validate aspect ratio - Support both 16:9 and 9:16
if aspect_ratio not in ["16:9", "9:16"]:
error_message = f"Unsupported aspect ratio '{aspect_ratio}'. Supported ratios: 16:9 (landscape) and 9:16 (portrait)."
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
logger.warning(f"Aspect ratio validation failed for job {job_id}: {aspect_ratio}")
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Set up authentication
credentials = get_google_credentials()
# Configure the GenAI client for Vertex AI
if credentials:
client = genai.Client(
vertexai=True,
project=Config.PROJECT_ID,
location=Config.REGION,
credentials=credentials
)
else:
client = genai.Client(
vertexai=True,
project=Config.PROJECT_ID,
location=Config.REGION
)
# Define a unique prefix for this generation job in GCS
timestamp = int(time.time())
safe_prompt = prompt[:30].replace(' ', '_').replace('/', '_').replace('\\', '_')
output_gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/veo_outputs/{safe_prompt}_{timestamp}/"
job_status[job_id].update({
'status': 'generating',
'progress': 10,
'message': 'Starting video generation...',
'gcs_uri': output_gcs_uri
})
# DEBUG: Log environment and SDK info
print("=== ENVIRONMENT DEBUG ===")
import sys
print(f"Python version: {sys.version}")
try:
import google.genai
print(f"google-genai version: {google.genai.__version__}")
except AttributeError:
print("google-genai version not available")
print(f"Model ID: {selected_model}")
print(f"Model Name: {Config.SUPPORTED_MODELS[selected_model]['name']}")
print(f"Model Speed: {Config.SUPPORTED_MODELS[selected_model]['speed']}")
print(f"Price per second: ${Config.SUPPORTED_MODELS[selected_model]['price_per_second']}")
print(f"Output GCS URI: {output_gcs_uri}")
print("=== END ENVIRONMENT DEBUG ===")
# Start the video generation operation using the updated SDK
print("Submitting video generation request...")
# Build the video generation request
config_kwargs = {
'person_generation': person_generation,
'aspect_ratio': aspect_ratio,
'output_gcs_uri': output_gcs_uri,
'duration_seconds': video_length_sec,
'generate_audio': generate_audio
}
# Generate random seed if not provided
if seed is None:
seed = random.randint(0, 4294967295)
print(f"Generated random seed: {seed}")
else:
print(f"Using provided seed: {seed}")
# Add seed to config
config_kwargs['seed'] = seed
# Veo 3.1 features - Add to config (not request_kwargs)
if last_frame_gcs_uri:
config_kwargs['last_frame'] = types.Image(
gcs_uri=last_frame_gcs_uri,
mime_type='image/jpeg'
)
print(f"✓ Using last frame for interpolation: {last_frame_gcs_uri}")
if reference_image_gcs_uris and len(reference_image_gcs_uris) > 0:
config_kwargs['reference_images'] = [
types.VideoGenerationReferenceImage(
image=types.Image(gcs_uri=uri, mime_type='image/jpeg'),
reference_type='asset'
) for uri in reference_image_gcs_uris
]
print(f"✓ Using {len(reference_image_gcs_uris)} reference image(s) for content guidance")
request_kwargs = {
'model': selected_model,
'prompt': prompt,
'config': types.GenerateVideosConfig(**config_kwargs)
}
# Add first frame image if provided
if image_gcs_uri:
request_kwargs['image'] = types.Image(
gcs_uri=image_gcs_uri,
mime_type='image/jpeg' # Always JPEG after conversion
)
print(f"✓ Using first frame image: {image_gcs_uri}")
# Make multiple API calls to ensure we get the requested number of videos
# Each call typically generates 1 video, especially when using an image
calls_needed = sample_count
print(f"=== VIDEO GENERATION SETUP ===")
print(f"Requested videos: {sample_count}")
print(f"API calls needed: {calls_needed}")
print(f"Making {calls_needed} API call(s) to ensure we get {sample_count} video(s)")
print("===================================")
operations = []
for i in range(calls_needed):
print(f"Starting API call {i+1}/{calls_needed}...")
operation = client.models.generate_videos(**request_kwargs)
operations.append(operation)
print(f"Operation {i+1} started: {operation.name}")
job_status[job_id].update({
'status': 'processing',
'progress': 20,
'message': f'Generating {sample_count} video(s) via {calls_needed} API call(s)...',
'operation_names': [op.name for op in operations]
})
# Poll all operations for completion
completed_operations = []
while len(completed_operations) < calls_needed:
print(f"Checking progress... {len(completed_operations)}/{calls_needed} operations completed")
time.sleep(30) # Check every 30 seconds
for i, operation in enumerate(operations):
if operation not in completed_operations:
# Refresh operation status
updated_operation = client.operations.get(operation)
operations[i] = updated_operation
if updated_operation.done:
completed_operations.append(updated_operation)
print(f"Operation {i + 1} completed!")
# Update progress
progress = 20 + int((len(completed_operations) / calls_needed) * 60) # 20-80%
job_status[job_id].update({
'progress': progress,
'message': f'Video generation in progress... {len(completed_operations)}/{calls_needed} operations completed'
})
# Check for errors in any operation
for i, operation in enumerate(operations):
if operation.error:
error_message = f"Video generation operation {i+1} failed: {operation.error}"
print(error_message)
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': str(operation.error)
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Collect all generated videos from all operations
all_generated_videos = []
for i, operation in enumerate(operations):
print(f"=== PROCESSING OPERATION {i+1} RESULTS ===")
if not operation.response or not hasattr(operation.response, 'generated_videos') or not operation.response.generated_videos:
# Check if content was filtered by safety settings
if (hasattr(operation.response, 'rai_media_filtered_count') and
operation.response.rai_media_filtered_count > 0 and
hasattr(operation.response, 'rai_media_filtered_reasons')):
filtered_reasons = operation.response.rai_media_filtered_reasons
error_message = f"Video generation operation {i+1} blocked by content safety filters:\n\n{filtered_reasons[0] if filtered_reasons else 'Content filtered by safety settings'}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': 'Content filtered by safety settings'
})
else:
error_message = f"Operation {i+1} completed, but no video URIs found in the response."
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Add videos from this operation
operation_videos = operation.response.generated_videos
print(f"Operation {i+1} generated {len(operation_videos)} videos")
for j, video in enumerate(operation_videos):
if hasattr(video, 'video') and hasattr(video.video, 'uri'):
print(f" Video {j+1}: {video.video.uri}")
all_generated_videos.extend(operation_videos)
print(f"Total videos collected so far: {len(all_generated_videos)}")
print(f"=== FINAL VIDEO SELECTION ===")
print(f"Total videos generated: {len(all_generated_videos)}")
print(f"Videos requested: {sample_count}")
# Take exactly the number of videos requested
generated_videos = all_generated_videos[:sample_count]
print(f"Videos to download: {len(generated_videos)}")
print("===============================")
# DEBUG: Log video objects details
print("=== VIDEO OBJECTS DEBUG ===")
print(f"Total videos generated: {len(all_generated_videos)}")
print(f"Requested sample count: {sample_count}")
print(f"Videos to use: {len(generated_videos)}")
for i, video in enumerate(generated_videos):
print(f"Video {i+1} object: {video}")
print(f"Video {i+1} type: {type(video)}")
if hasattr(video, 'video') and hasattr(video.video, 'uri'):
print(f"Video {i+1} URI: {video.video.uri}")
print("=== END VIDEO DEBUG ===")
# Validate all videos have URIs
video_uris = []
for i, video in enumerate(generated_videos):
if not hasattr(video, 'video') or not hasattr(video.video, 'uri'):
error_message = f"Video {i+1} does not contain video URI at expected path."
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
video_uris.append(video.video.uri)
print(f"Videos generated successfully. GCS URIs: {video_uris}")
job_status[job_id].update({
'status': 'downloading',
'progress': 90,
'message': f'Downloading {len(video_uris)} video(s)...',
'gcs_video_uris': video_uris
})
# Always use job-specific folder for consistency
download_folder = os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}")
if not os.path.exists(download_folder):
os.makedirs(download_folder, exist_ok=True)
print(f"Created job-specific folder for videos: {download_folder}")
else:
print(f"Using existing job-specific folder for videos: {download_folder}")
downloaded_video_paths = []
for i, gcs_video_uri in enumerate(video_uris):
# Parse GCS URI to get bucket name and blob name
if not gcs_video_uri.startswith("gs://"):
error_message = f"Invalid GCS URI received: {gcs_video_uri}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': error_message
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
path_parts = gcs_video_uri.replace("gs://", "").split("/", 1)
source_bucket_name = path_parts[0]
source_blob_name = path_parts[1]
# Construct local file name in the same folder as the image
video_filename = os.path.join(download_folder, f"generated_video_{i+1}.mp4")
# Download the video
local_video_path = download_blob(source_bucket_name, source_blob_name, video_filename)
downloaded_video_paths.append(local_video_path)
print(f"Downloaded video {i+1}: {local_video_path}")
# Determine if we need to create zip
has_image = image_path is not None
video_count = len(downloaded_video_paths)
should_use_zip = video_count > 1 or has_image
zip_filename = None
files_added_to_zip = []
print(f"=== ZIP CREATION DECISION ===")
print(f"has_image: {has_image}")
print(f"video_count: {video_count}")
print(f"should_use_zip: {should_use_zip}")
print("==============================")
if should_use_zip:
# Create a zip file inside the job folder
zip_filename = os.path.join(download_folder, "all_videos.zip")
print(f"Creating zip file (needed for {video_count} videos, has_image={has_image}): {zip_filename}")
print(f"Source folder: {download_folder}")
with zipfile.ZipFile(zip_filename, 'w') as zipf:
# Add all video files from the download folder (excluding the zip itself)
for root, dirs, files in os.walk(download_folder):
print(f"Walking directory: {root}, found {len(files)} files")
for file in files:
if file == "all_videos.zip": # Skip the zip file itself
continue
file_path = os.path.join(root, file)
if os.path.exists(file_path):
# Add file to zip with relative path (just the filename)
arcname = os.path.basename(file_path)
zipf.write(file_path, arcname)
files_added_to_zip.append(arcname)
print(f"Added to zip: {file} as {arcname} (size: {os.path.getsize(file_path)} bytes)")
else:
print(f"Warning: File not found: {file_path}")
zip_size = os.path.getsize(zip_filename) if os.path.exists(zip_filename) else 0
print(f"Created zip file: {zip_filename}")
print(f"Zip file size: {zip_size} bytes")
print(f"Files in zip: {files_added_to_zip}")
else:
print(f"Skipping zip creation (single video, no image): {video_count} videos, has_image={has_image}")
print(f"=== DOWNLOAD STRATEGY DECISION ===")
print(f"zip_filename: {zip_filename}")
print(f"should_use_zip: {should_use_zip}")
print("===================================")
if should_use_zip:
# Validate zip file exists and is not empty
if os.path.exists(zip_filename) and os.path.getsize(zip_filename) > 0:
primary_download_path = zip_filename
download_type = 'zip'
message = f'Video generation completed successfully! Generated {video_count} video(s).'
if has_image:
message += ' Download includes image and videos.'
print(f"Using zip file for download: {zip_filename}")
else:
print(f"ERROR: Zip file missing or empty: {zip_filename}")
# Fallback to first video file
primary_download_path = downloaded_video_paths[0] if downloaded_video_paths else None
download_type = 'mp4'
message = f'Video generation completed successfully! Generated {video_count} video(s). (Zip creation failed, using first video)'
else:
# Single video, no image - provide direct mp4 download
primary_download_path = downloaded_video_paths[0]
download_type = 'mp4'
message = f'Video generation completed successfully! Generated 1 video.'
print(f"Using single video file for download: {primary_download_path}")
print(f"Final download decision:")
print(f" primary_download_path: {primary_download_path}")
print(f" download_type: {download_type}")
print(f" should_use_zip: {should_use_zip}")
job_status[job_id].update({
'status': 'completed',
'progress': 100,
'message': message,
'video_path': primary_download_path,
'video_paths': downloaded_video_paths,
'individual_video_paths': downloaded_video_paths, # For individual downloads
'download_folder': download_folder,
'is_zip': should_use_zip,
'download_type': download_type,
'video_count': video_count,
'videos_completed': downloaded_video_paths,
'has_image': has_image
})
# Send usage data to webhook for tracking
send_usage_webhook(user_email, prompt)
# Clean up temporary image files
if image_path and image_blob_name:
cleanup_image_files(job_id, image_path, image_blob_name)
# Clean up last frame if provided
if last_frame_path and last_frame_blob_name:
cleanup_image_files(job_id, last_frame_path, last_frame_blob_name)
# Clean up reference images if provided
if reference_image_paths and reference_image_blob_names:
from utils.storage import cleanup_multiple_images
cleanup_multiple_images(job_id, reference_image_paths, reference_image_blob_names)
# Mark job as completed in queue
complete_job(job_id)
remove_from_queue(job_id, user_email)
except Exception as e:
# Check if this is a retryable error and we haven't exceeded max retries
job = job_status.get(job_id, {})
retry_count = job.get('retry_count', 0)
is_retryable_error = (
'503' in str(e) or
'500' in str(e) or
'timeout' in str(e).lower() or
'connection' in str(e).lower() or
'timeouterror' in str(e).lower() or
'connection aborted' in str(e).lower() or
'write operation timed out' in str(e).lower()
)
if is_retryable_error and retry_count < MAX_RETRIES:
# Retry the job
retry_count += 1
# Determine error type for user-friendly message
error_type = "Network timeout"
if 'timeout' in str(e).lower() or 'timeouterror' in str(e).lower():
error_type = "Upload timeout"
elif 'connection' in str(e).lower():
error_type = "Connection error"
elif '503' in str(e) or '500' in str(e):
error_type = "Server error"
job_status[job_id].update({
'status': f'retry_{retry_count}_of_{MAX_RETRIES}',
'progress': 0,
'message': f'{error_type} - Retry attempt {retry_count} of {MAX_RETRIES} in progress...',
'retry_count': retry_count,
'error': f'Previous attempt failed: {str(e)}'
})
logger.info(f"Retrying job {job_id}, attempt {retry_count}/{MAX_RETRIES}. Error: {str(e)}")
# Wait before retry (exponential backoff)
retry_delays = [30, 60, 120] # 30s, 1min, 2min
delay = retry_delays[min(retry_count - 1, len(retry_delays) - 1)]
logger.info(f"Waiting {delay} seconds before retry...")
time.sleep(delay)
# Retry by calling this function again
logger.info(f"Starting retry {retry_count} for job {job_id}")
try:
process_video_generation(job_id)
except RecursionError:
logger.error(f"Recursion error detected for job {job_id}, treating as final failure")
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': 'System error during retry processing',
'error': 'Retry system failure',
'final_failure': True
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
else:
# Final failure
attempts_msg = f" (after {retry_count} retries)" if retry_count > 0 else ""
error_message = f"Video generation failed{attempts_msg}: {str(e)}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': str(e),
'final_failure': True
})
logger.error(f"Job {job_id} failed after {retry_count} retries: {str(e)}")
# Clean up image files on failure
if image_path and image_blob_name:
cleanup_image_files(job_id, image_path, image_blob_name)
# Clean up last frame on failure
if last_frame_path and last_frame_blob_name:
cleanup_image_files(job_id, last_frame_path, last_frame_blob_name)
# Clean up reference images on failure
if reference_image_paths and reference_image_blob_names:
from utils.storage import cleanup_multiple_images
cleanup_multiple_images(job_id, reference_image_paths, reference_image_blob_names)
# Mark job as completed in queue (even if failed)
complete_job(job_id)
remove_from_queue(job_id, user_email)
def get_job_status(job_id: str) -> dict:
"""Get the status of a video generation job with queue information."""
job = job_status.get(job_id)
if not job:
return {'status': 'not_found', 'message': 'Job not found'}
# Add current queue position if still queued
if job['status'] == 'queued':
job['queue_position'] = get_queue_position(job_id)
return job
def get_user_jobs(user_email: str) -> list:
"""Get all jobs for a specific user."""
user_jobs = []
for job_id, job in job_status.items():
if job.get('user_email') == user_email:
job_copy = job.copy()
job_copy['job_id'] = job_id
# Add current queue position if still queued
if job_copy['status'] == 'queued':
job_copy['queue_position'] = get_queue_position(job_id)
user_jobs.append(job_copy)
# Sort by creation time (newest first)
user_jobs.sort(key=lambda x: x.get('created_at', ''), reverse=True)
return user_jobs
def get_queue_status() -> dict:
"""Get overall queue status."""
return {
'queue_length': len(job_queue),
'processing_jobs': len(processing_jobs),
'concurrent_limit': CONCURRENT_JOB_LIMIT,
'queued_jobs': job_queue.copy(),
'active_jobs': processing_jobs.copy()
}
def cleanup_job_files(job_id: str, cleanup_gcs: bool = True) -> bool:
"""
Clean up local and optionally GCS files for a job.
Args:
job_id: The job ID to clean up
cleanup_gcs: Whether to also clean up GCS files (default: True)
"""
job = job_status.get(job_id)
if not job:
print(f"Job {job_id} not found for cleanup")
return False
success = True
start_time = time.time()
# Clean up entire job folder (more efficient than individual files)
download_folder = job.get('download_folder')
if download_folder and os.path.exists(download_folder):
try:
import shutil
shutil.rmtree(download_folder)
print(f"Deleted entire job folder: {download_folder}")
except Exception as e:
print(f"Error deleting job folder: {e}")
success = False
else:
# Fallback: try to delete individual files if folder not specified
if job.get('video_path') and os.path.exists(job['video_path']):
try:
os.remove(job['video_path'])
print(f"Deleted main video file: {job['video_path']}")
except Exception as e:
print(f"Error deleting main video file: {e}")
success = False
# Clean up individual video files
if job.get('video_paths'):
for video_path in job['video_paths']:
if os.path.exists(video_path):
try:
os.remove(video_path)
print(f"Deleted individual video file: {video_path}")
except Exception as e:
print(f"Error deleting individual video file: {e}")
success = False
# Delete GCS video files only if requested (can be slow)
if cleanup_gcs:
# Delete GCS video files (for backwards compatibility, keep the old logic)
if job.get('source_bucket_name') and job.get('source_blob_name'):
try:
delete_blob(job['source_bucket_name'], job['source_blob_name'])
except Exception as e:
print(f"Error deleting GCS file: {e}")
success = False
# Delete multiple GCS video files
if job.get('gcs_video_uris'):
for gcs_uri in job['gcs_video_uris']:
try:
if gcs_uri.startswith("gs://"):
path_parts = gcs_uri.replace("gs://", "").split("/", 1)
bucket_name = path_parts[0]
blob_name = path_parts[1]
delete_blob(bucket_name, blob_name)
print(f"Deleted GCS file: {gcs_uri}")
except Exception as e:
print(f"Error deleting GCS file {gcs_uri}: {e}")
success = False
else:
print("Skipping GCS cleanup (local files only)")
# Delete image files (includes both local and GCS)
if cleanup_gcs and (job.get('local_image_path') or job.get('image_blob_name')):
try:
cleanup_image_files(
job_id,
job.get('local_image_path'),
job.get('image_blob_name')
)
except Exception as e:
print(f"Error deleting image files: {e}")
success = False
# Remove job from memory
if success:
del job_status[job_id]
# Log cleanup completion time
cleanup_time = time.time() - start_time
cleanup_type = "full (local + GCS)" if cleanup_gcs else "local only"
print(f"Cleanup completed for job {job_id} in {cleanup_time:.2f}s ({cleanup_type})")
return success