1190 lines
No EOL
46 KiB
Python
1190 lines
No EOL
46 KiB
Python
import os
|
|
import time
|
|
import uuid
|
|
import requests
|
|
import json
|
|
import datetime
|
|
import logging
|
|
import zipfile
|
|
import random
|
|
import threading
|
|
from google import genai
|
|
from google.genai import types
|
|
from google.auth.exceptions import DefaultCredentialsError
|
|
from utils.auth import get_google_credentials
|
|
from utils.storage import download_blob, delete_blob, validate_and_convert_image, upload_image_to_gcs, cleanup_image_files
|
|
from config import Config
|
|
|
|
# Set up logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Job queue and processing configuration
|
|
CONCURRENT_JOB_LIMIT = 2
|
|
# MAX_QUEUE_SIZE_PER_USER removed - unlimited queue
|
|
MAX_RETRIES = 3
|
|
|
|
# In-memory storage (in production, use Redis or database)
|
|
job_status = {}
|
|
job_queue = [] # Global queue: [job_id1, job_id2, ...]
|
|
processing_jobs = [] # Currently active jobs (max CONCURRENT_JOB_LIMIT)
|
|
user_job_counts = {} # Track jobs per user: {user_email: count}
|
|
|
|
def send_usage_webhook(user_email: str, prompt: str) -> None:
|
|
"""
|
|
Send usage data to webhook for tracking purposes
|
|
|
|
Args:
|
|
user_email: Email of the user who processed the request
|
|
prompt: The prompt used for processing
|
|
"""
|
|
if not Config.WEBHOOK_ENABLED:
|
|
logger.info("Webhook disabled, skipping usage tracking")
|
|
return
|
|
|
|
try:
|
|
current_datetime = datetime.datetime.now().isoformat()
|
|
|
|
webhook_data = {
|
|
"tool": "VEO3",
|
|
"date": current_datetime,
|
|
"user": user_email,
|
|
"model": "VEO3",
|
|
"settings": "no settings",
|
|
"subTool": "no subTool",
|
|
"prompt": prompt,
|
|
"negativePrompt": "no NEGATIVE_PROMPT",
|
|
"image": "no image"
|
|
}
|
|
|
|
logger.info(f"Sending usage data to webhook for user: {user_email}")
|
|
|
|
response = requests.post(
|
|
Config.WEBHOOK_URL,
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps(webhook_data),
|
|
timeout=Config.WEBHOOK_TIMEOUT
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Successfully sent usage data to webhook")
|
|
else:
|
|
logger.warning(f"Webhook request failed with status code: {response.status_code}")
|
|
logger.warning(f"Response: {response.text}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending usage data to webhook: {str(e)}")
|
|
# Don't raise the exception - webhook failure shouldn't block the main flow
|
|
|
|
def get_user_queue_count(user_email: str) -> int:
|
|
"""Get the current queue count for a user."""
|
|
return user_job_counts.get(user_email, 0)
|
|
|
|
def can_add_to_queue(user_email: str) -> bool:
|
|
"""Check if user can add more jobs to queue - now unlimited."""
|
|
return True # No limits
|
|
|
|
def add_to_queue(job_id: str, user_email: str) -> None:
|
|
"""Add job to queue and update user count."""
|
|
global job_queue, user_job_counts
|
|
job_queue.append(job_id)
|
|
user_job_counts[user_email] = user_job_counts.get(user_email, 0) + 1
|
|
logger.info(f"Added job {job_id} to queue for user {user_email}. Queue position: {len(job_queue)}")
|
|
|
|
def remove_from_queue(job_id: str, user_email: str) -> None:
|
|
"""Remove job from queue and update user count."""
|
|
global job_queue, user_job_counts
|
|
if job_id in job_queue:
|
|
job_queue.remove(job_id)
|
|
user_job_counts[user_email] = max(0, user_job_counts.get(user_email, 0) - 1)
|
|
if user_job_counts[user_email] == 0:
|
|
del user_job_counts[user_email]
|
|
|
|
def get_queue_position(job_id: str) -> int:
|
|
"""Get the position of a job in the queue (1-based)."""
|
|
try:
|
|
return job_queue.index(job_id) + 1
|
|
except ValueError:
|
|
return -1
|
|
|
|
def start_next_job() -> None:
|
|
"""Start the next job from queue if there's capacity."""
|
|
global processing_jobs, job_queue
|
|
|
|
if len(processing_jobs) >= CONCURRENT_JOB_LIMIT:
|
|
return
|
|
|
|
if not job_queue:
|
|
return
|
|
|
|
next_job_id = job_queue.pop(0)
|
|
processing_jobs.append(next_job_id)
|
|
|
|
# Update job status
|
|
if next_job_id in job_status:
|
|
job_status[next_job_id].update({
|
|
'status': 'processing',
|
|
'queue_position': 0,
|
|
'started_at': datetime.datetime.now().isoformat()
|
|
})
|
|
|
|
# Start processing in background thread
|
|
threading.Thread(
|
|
target=process_video_generation,
|
|
args=(next_job_id,),
|
|
daemon=True
|
|
).start()
|
|
|
|
logger.info(f"Started processing job {next_job_id}")
|
|
|
|
def complete_job(job_id: str) -> None:
|
|
"""Mark job as complete and start next queued job."""
|
|
global processing_jobs
|
|
|
|
if job_id in processing_jobs:
|
|
processing_jobs.remove(job_id)
|
|
|
|
# Start next job if available
|
|
start_next_job()
|
|
logger.info(f"Completed job {job_id}. Processing jobs: {len(processing_jobs)}")
|
|
|
|
def cancel_job(job_id: str) -> bool:
|
|
"""Cancel a job (queued or processing)."""
|
|
global job_queue, job_status, user_job_counts, processing_jobs
|
|
|
|
job = job_status.get(job_id)
|
|
if not job:
|
|
return False
|
|
|
|
user_email = job.get('user_email', 'anonymous')
|
|
|
|
# Cancel queued jobs
|
|
if job['status'] == 'queued':
|
|
remove_from_queue(job_id, user_email)
|
|
|
|
job_status[job_id].update({
|
|
'status': 'cancelled',
|
|
'message': 'Job cancelled by user',
|
|
'progress': 0,
|
|
'can_cancel': False,
|
|
'cancelled_at': datetime.datetime.now().isoformat()
|
|
})
|
|
|
|
logger.info(f"Queued job {job_id} cancelled by user")
|
|
return True
|
|
|
|
# Cancel processing jobs
|
|
elif job_id in processing_jobs:
|
|
processing_jobs.remove(job_id)
|
|
|
|
job_status[job_id].update({
|
|
'status': 'cancelled',
|
|
'message': 'Processing cancelled by user',
|
|
'progress': 0,
|
|
'can_cancel': False,
|
|
'cancelled_at': datetime.datetime.now().isoformat()
|
|
})
|
|
|
|
remove_from_queue(job_id, user_email)
|
|
|
|
# Start next job if available
|
|
start_next_job()
|
|
|
|
logger.info(f"Processing job {job_id} cancelled by user")
|
|
return True
|
|
|
|
return False
|
|
|
|
def retry_job(job_id: str) -> bool:
|
|
"""Retry a failed or cancelled job."""
|
|
global job_status
|
|
|
|
job = job_status.get(job_id)
|
|
if not job:
|
|
return False
|
|
|
|
# Only retry failed or cancelled jobs
|
|
if job['status'] not in ['failed', 'cancelled']:
|
|
return False
|
|
|
|
user_email = job.get('user_email', 'anonymous')
|
|
|
|
# Reset job status and re-queue
|
|
job_status[job_id].update({
|
|
'status': 'queued',
|
|
'message': 'Job re-queued for processing...',
|
|
'progress': 0,
|
|
'can_cancel': True,
|
|
'error': None,
|
|
'cancelled_at': None
|
|
})
|
|
|
|
# Add back to queue
|
|
add_to_queue(job_id, user_email)
|
|
job_status[job_id]['queue_position'] = get_queue_position(job_id)
|
|
|
|
# Try to start immediately if there's capacity
|
|
start_next_job()
|
|
|
|
logger.info(f"Job {job_id} retried by user")
|
|
return True
|
|
|
|
def delete_job_completely(job_id: str) -> bool:
|
|
"""Completely delete a job and all associated files."""
|
|
import shutil
|
|
|
|
global job_status, job_queue, processing_jobs, user_job_counts
|
|
|
|
job = job_status.get(job_id)
|
|
if not job:
|
|
return False
|
|
|
|
user_email = job.get('user_email', 'anonymous')
|
|
|
|
# Remove from all tracking
|
|
if job_id in job_queue:
|
|
job_queue.remove(job_id)
|
|
if job_id in processing_jobs:
|
|
processing_jobs.remove(job_id)
|
|
# Start next job if we freed up a processing slot
|
|
start_next_job()
|
|
|
|
# Clean up local files
|
|
try:
|
|
job_folder = os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}")
|
|
if os.path.exists(job_folder):
|
|
shutil.rmtree(job_folder)
|
|
logger.info(f"Deleted local folder: {job_folder}")
|
|
except Exception as e:
|
|
logger.error(f"Error deleting local folder for job {job_id}: {e}")
|
|
|
|
# Clean up GCS files
|
|
try:
|
|
if job.get('image_blob_name'):
|
|
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, job['image_blob_name'])
|
|
|
|
# Clean up any GCS video files if they exist
|
|
gcs_uri = job.get('gcs_uri')
|
|
if gcs_uri and gcs_uri.startswith('gs://'):
|
|
blob_name = gcs_uri.split('/', 3)[-1] # Extract blob name from gs://bucket/path
|
|
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, blob_name)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error deleting GCS files for job {job_id}: {e}")
|
|
|
|
# Update user job count
|
|
remove_from_queue(job_id, user_email)
|
|
|
|
# Remove from job status completely
|
|
del job_status[job_id]
|
|
|
|
logger.info(f"Job {job_id} completely deleted")
|
|
return True
|
|
|
|
def generate_video_async(
|
|
prompt: str,
|
|
job_id: str = None,
|
|
model_name: str = None,
|
|
video_length_sec: int = 8,
|
|
aspect_ratio: str = "16:9",
|
|
sample_count: int = 1,
|
|
person_generation: str = "dont_allow",
|
|
image_path: str = None,
|
|
last_frame_path: str = None,
|
|
reference_image_paths: list = None,
|
|
user_email: str = "anonymous",
|
|
seed: int = None,
|
|
generate_audio: bool = True
|
|
) -> str:
|
|
"""
|
|
Queue video generation job and return job ID.
|
|
"""
|
|
# Check if user can add more jobs
|
|
if not can_add_to_queue(user_email):
|
|
raise Exception(f"Queue limit exceeded. Maximum {MAX_QUEUE_SIZE_PER_USER} jobs per user.")
|
|
|
|
if job_id is None:
|
|
job_id = str(uuid.uuid4())
|
|
|
|
# Initialize job status with queue info
|
|
job_status[job_id] = {
|
|
'user_email': user_email,
|
|
'prompt': prompt,
|
|
'status': 'queued',
|
|
'progress': 0,
|
|
'message': 'Job queued for processing...',
|
|
'queue_position': 0, # Will be updated when added to queue
|
|
'retry_count': 0,
|
|
'max_retries': MAX_RETRIES,
|
|
'videos_requested': sample_count,
|
|
'videos_completed': [],
|
|
'individual_video_paths': [],
|
|
'can_cancel': True,
|
|
'created_at': datetime.datetime.now().isoformat(),
|
|
'started_at': None,
|
|
'video_path': None,
|
|
'gcs_uri': None,
|
|
'error': None,
|
|
'image_gcs_uri': None,
|
|
'image_blob_name': None,
|
|
'local_image_path': image_path,
|
|
'last_frame_gcs_uri': None,
|
|
'last_frame_blob_name': None,
|
|
'local_last_frame_path': last_frame_path,
|
|
'reference_image_gcs_uris': None,
|
|
'reference_image_blob_names': None,
|
|
'local_reference_image_paths': reference_image_paths,
|
|
'model_name': model_name,
|
|
'video_length_sec': video_length_sec,
|
|
'aspect_ratio': aspect_ratio,
|
|
'person_generation': person_generation,
|
|
'seed': seed,
|
|
'generate_audio': generate_audio
|
|
}
|
|
|
|
# Add to queue
|
|
add_to_queue(job_id, user_email)
|
|
job_status[job_id]['queue_position'] = get_queue_position(job_id)
|
|
|
|
# Try to start immediately if there's capacity
|
|
start_next_job()
|
|
|
|
return job_id
|
|
|
|
def process_video_generation(job_id: str) -> None:
|
|
"""
|
|
Process video generation for a specific job (called from queue processor).
|
|
"""
|
|
job = job_status.get(job_id)
|
|
if not job:
|
|
logger.error(f"Job {job_id} not found in status")
|
|
return
|
|
|
|
# Extract parameters from job
|
|
prompt = job['prompt']
|
|
user_email = job['user_email']
|
|
model_name = job['model_name']
|
|
video_length_sec = job['video_length_sec']
|
|
aspect_ratio = job['aspect_ratio']
|
|
sample_count = job['videos_requested']
|
|
person_generation = job['person_generation']
|
|
image_path = job['local_image_path']
|
|
last_frame_path = job['local_last_frame_path']
|
|
reference_image_paths = job['local_reference_image_paths']
|
|
seed = job['seed']
|
|
generate_audio = job['generate_audio']
|
|
|
|
# Validate Veo 3.1 feature constraints
|
|
if last_frame_path or (reference_image_paths and len(reference_image_paths) > 0):
|
|
if video_length_sec != 8:
|
|
error_message = "Veo 3.1 features (last frame interpolation and reference images) require 8-second duration"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
if reference_image_paths and len(reference_image_paths) > 0 and aspect_ratio != "16:9":
|
|
error_message = "Reference images feature requires 16:9 aspect ratio"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Update status to starting
|
|
job_status[job_id].update({
|
|
'status': 'starting',
|
|
'progress': 0,
|
|
'message': 'Initializing video generation...',
|
|
'can_cancel': False # Cannot cancel once processing starts
|
|
})
|
|
|
|
# Validate and upload image if provided
|
|
image_gcs_uri = None
|
|
image_blob_name = None
|
|
last_frame_gcs_uri = None
|
|
last_frame_blob_name = None
|
|
reference_image_gcs_uris = []
|
|
reference_image_blob_names = []
|
|
|
|
print(f"DEBUG: generate_video_async received image_path: {image_path}")
|
|
print(f"DEBUG: generate_video_async received last_frame_path: {last_frame_path}")
|
|
print(f"DEBUG: generate_video_async received reference_image_paths: {reference_image_paths}")
|
|
|
|
if image_path:
|
|
try:
|
|
# Validate and convert image to JPEG
|
|
validation_result = validate_and_convert_image(image_path)
|
|
if not validation_result['valid']:
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': f'First frame validation failed: {validation_result["error"]}',
|
|
'error': validation_result['error']
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Auto-select aspect ratio based on the converted image
|
|
detected_aspect_ratio = validation_result['aspect_ratio']
|
|
if detected_aspect_ratio in ['16:9', '9:16']:
|
|
aspect_ratio = detected_aspect_ratio
|
|
print(f"Using detected aspect ratio: {aspect_ratio}")
|
|
if validation_result.get('was_cropped'):
|
|
print(f"Image was automatically cropped from {validation_result['original_size']} to {validation_result['size']} to fit {aspect_ratio} aspect ratio")
|
|
|
|
# Upload image to GCS
|
|
job_status[job_id].update({
|
|
'status': 'uploading_image',
|
|
'progress': 5,
|
|
'message': 'Uploading first frame image...'
|
|
})
|
|
|
|
image_gcs_uri = upload_image_to_gcs(image_path, job_id)
|
|
image_blob_name = image_gcs_uri.replace(f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/", "")
|
|
|
|
job_status[job_id].update({
|
|
'image_gcs_uri': image_gcs_uri,
|
|
'image_blob_name': image_blob_name,
|
|
'local_image_path': image_path,
|
|
'detected_aspect_ratio': detected_aspect_ratio,
|
|
'image_was_cropped': validation_result.get('was_cropped', False)
|
|
})
|
|
|
|
except Exception as e:
|
|
error_message = f"Failed to process first frame image: {str(e)}"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Upload last frame if provided (Veo 3.1 feature)
|
|
if last_frame_path:
|
|
try:
|
|
job_status[job_id].update({
|
|
'status': 'uploading_image',
|
|
'progress': 7,
|
|
'message': 'Uploading last frame image...'
|
|
})
|
|
|
|
last_frame_gcs_uri = upload_image_to_gcs(last_frame_path, f"{job_id}_last")
|
|
last_frame_blob_name = last_frame_gcs_uri.replace(f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/", "")
|
|
|
|
job_status[job_id].update({
|
|
'last_frame_gcs_uri': last_frame_gcs_uri,
|
|
'last_frame_blob_name': last_frame_blob_name,
|
|
'local_last_frame_path': last_frame_path
|
|
})
|
|
|
|
print(f"Uploaded last frame: {last_frame_gcs_uri}")
|
|
|
|
except Exception as e:
|
|
error_message = f"Failed to process last frame image: {str(e)}"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Upload reference images if provided (Veo 3.1 feature)
|
|
if reference_image_paths and len(reference_image_paths) > 0:
|
|
try:
|
|
job_status[job_id].update({
|
|
'status': 'uploading_image',
|
|
'progress': 8,
|
|
'message': f'Uploading {len(reference_image_paths)} reference image(s)...'
|
|
})
|
|
|
|
from utils.storage import upload_reference_images_to_gcs
|
|
reference_image_gcs_uris = upload_reference_images_to_gcs(reference_image_paths, job_id)
|
|
reference_image_blob_names = [uri.replace(f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/", "") for uri in reference_image_gcs_uris]
|
|
|
|
job_status[job_id].update({
|
|
'reference_image_gcs_uris': reference_image_gcs_uris,
|
|
'reference_image_blob_names': reference_image_blob_names,
|
|
'local_reference_image_paths': reference_image_paths
|
|
})
|
|
|
|
print(f"Uploaded {len(reference_image_gcs_uris)} reference images")
|
|
|
|
except Exception as e:
|
|
error_message = f"Failed to process reference images: {str(e)}"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
try:
|
|
# Validate model selection
|
|
selected_model = model_name or Config.MODEL_ID
|
|
if selected_model not in Config.SUPPORTED_MODELS:
|
|
error_message = f"Unsupported model '{selected_model}'. Supported models: {', '.join(Config.SUPPORTED_MODELS.keys())}"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
logger.warning(f"Model validation failed for job {job_id}: {selected_model}")
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Validate aspect ratio - Support both 16:9 and 9:16
|
|
if aspect_ratio not in ["16:9", "9:16"]:
|
|
error_message = f"Unsupported aspect ratio '{aspect_ratio}'. Supported ratios: 16:9 (landscape) and 9:16 (portrait)."
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
logger.warning(f"Aspect ratio validation failed for job {job_id}: {aspect_ratio}")
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Set up authentication
|
|
credentials = get_google_credentials()
|
|
|
|
# Configure the GenAI client for Vertex AI
|
|
if credentials:
|
|
client = genai.Client(
|
|
vertexai=True,
|
|
project=Config.PROJECT_ID,
|
|
location=Config.REGION,
|
|
credentials=credentials
|
|
)
|
|
else:
|
|
client = genai.Client(
|
|
vertexai=True,
|
|
project=Config.PROJECT_ID,
|
|
location=Config.REGION
|
|
)
|
|
|
|
# Define a unique prefix for this generation job in GCS
|
|
timestamp = int(time.time())
|
|
safe_prompt = prompt[:30].replace(' ', '_').replace('/', '_').replace('\\', '_')
|
|
output_gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/veo_outputs/{safe_prompt}_{timestamp}/"
|
|
|
|
job_status[job_id].update({
|
|
'status': 'generating',
|
|
'progress': 10,
|
|
'message': 'Starting video generation...',
|
|
'gcs_uri': output_gcs_uri
|
|
})
|
|
|
|
# DEBUG: Log environment and SDK info
|
|
print("=== ENVIRONMENT DEBUG ===")
|
|
import sys
|
|
print(f"Python version: {sys.version}")
|
|
try:
|
|
import google.genai
|
|
print(f"google-genai version: {google.genai.__version__}")
|
|
except AttributeError:
|
|
print("google-genai version not available")
|
|
print(f"Model ID: {selected_model}")
|
|
print(f"Model Name: {Config.SUPPORTED_MODELS[selected_model]['name']}")
|
|
print(f"Model Speed: {Config.SUPPORTED_MODELS[selected_model]['speed']}")
|
|
print(f"Price per second: ${Config.SUPPORTED_MODELS[selected_model]['price_per_second']}")
|
|
print(f"Output GCS URI: {output_gcs_uri}")
|
|
print("=== END ENVIRONMENT DEBUG ===")
|
|
|
|
# Start the video generation operation using the updated SDK
|
|
print("Submitting video generation request...")
|
|
|
|
# Build the video generation request
|
|
config_kwargs = {
|
|
'person_generation': person_generation,
|
|
'aspect_ratio': aspect_ratio,
|
|
'output_gcs_uri': output_gcs_uri,
|
|
'duration_seconds': video_length_sec,
|
|
'generate_audio': generate_audio
|
|
}
|
|
|
|
# Generate random seed if not provided
|
|
if seed is None:
|
|
seed = random.randint(0, 4294967295)
|
|
print(f"Generated random seed: {seed}")
|
|
else:
|
|
print(f"Using provided seed: {seed}")
|
|
|
|
# Add seed to config
|
|
config_kwargs['seed'] = seed
|
|
|
|
# Veo 3.1 features - Add to config (not request_kwargs)
|
|
if last_frame_gcs_uri:
|
|
config_kwargs['last_frame'] = types.Image(
|
|
gcs_uri=last_frame_gcs_uri,
|
|
mime_type='image/jpeg'
|
|
)
|
|
print(f"✓ Using last frame for interpolation: {last_frame_gcs_uri}")
|
|
|
|
if reference_image_gcs_uris and len(reference_image_gcs_uris) > 0:
|
|
config_kwargs['reference_images'] = [
|
|
types.VideoGenerationReferenceImage(
|
|
image=types.Image(gcs_uri=uri, mime_type='image/jpeg'),
|
|
reference_type='asset'
|
|
) for uri in reference_image_gcs_uris
|
|
]
|
|
print(f"✓ Using {len(reference_image_gcs_uris)} reference image(s) for content guidance")
|
|
|
|
request_kwargs = {
|
|
'model': selected_model,
|
|
'prompt': prompt,
|
|
'config': types.GenerateVideosConfig(**config_kwargs)
|
|
}
|
|
|
|
# Add first frame image if provided
|
|
if image_gcs_uri:
|
|
request_kwargs['image'] = types.Image(
|
|
gcs_uri=image_gcs_uri,
|
|
mime_type='image/jpeg' # Always JPEG after conversion
|
|
)
|
|
print(f"✓ Using first frame image: {image_gcs_uri}")
|
|
|
|
# Make multiple API calls to ensure we get the requested number of videos
|
|
# Each call typically generates 1 video, especially when using an image
|
|
calls_needed = sample_count
|
|
print(f"=== VIDEO GENERATION SETUP ===")
|
|
print(f"Requested videos: {sample_count}")
|
|
print(f"API calls needed: {calls_needed}")
|
|
print(f"Making {calls_needed} API call(s) to ensure we get {sample_count} video(s)")
|
|
print("===================================")
|
|
|
|
operations = []
|
|
for i in range(calls_needed):
|
|
print(f"Starting API call {i+1}/{calls_needed}...")
|
|
operation = client.models.generate_videos(**request_kwargs)
|
|
operations.append(operation)
|
|
print(f"Operation {i+1} started: {operation.name}")
|
|
|
|
job_status[job_id].update({
|
|
'status': 'processing',
|
|
'progress': 20,
|
|
'message': f'Generating {sample_count} video(s) via {calls_needed} API call(s)...',
|
|
'operation_names': [op.name for op in operations]
|
|
})
|
|
|
|
# Poll all operations for completion
|
|
completed_operations = []
|
|
while len(completed_operations) < calls_needed:
|
|
print(f"Checking progress... {len(completed_operations)}/{calls_needed} operations completed")
|
|
time.sleep(30) # Check every 30 seconds
|
|
|
|
for i, operation in enumerate(operations):
|
|
if operation not in completed_operations:
|
|
# Refresh operation status
|
|
updated_operation = client.operations.get(operation)
|
|
operations[i] = updated_operation
|
|
|
|
if updated_operation.done:
|
|
completed_operations.append(updated_operation)
|
|
print(f"Operation {i + 1} completed!")
|
|
|
|
# Update progress
|
|
progress = 20 + int((len(completed_operations) / calls_needed) * 60) # 20-80%
|
|
job_status[job_id].update({
|
|
'progress': progress,
|
|
'message': f'Video generation in progress... {len(completed_operations)}/{calls_needed} operations completed'
|
|
})
|
|
|
|
# Check for errors in any operation
|
|
for i, operation in enumerate(operations):
|
|
if operation.error:
|
|
error_message = f"Video generation operation {i+1} failed: {operation.error}"
|
|
print(error_message)
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': str(operation.error)
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Collect all generated videos from all operations
|
|
all_generated_videos = []
|
|
for i, operation in enumerate(operations):
|
|
print(f"=== PROCESSING OPERATION {i+1} RESULTS ===")
|
|
if not operation.response or not hasattr(operation.response, 'generated_videos') or not operation.response.generated_videos:
|
|
# Check if content was filtered by safety settings
|
|
if (hasattr(operation.response, 'rai_media_filtered_count') and
|
|
operation.response.rai_media_filtered_count > 0 and
|
|
hasattr(operation.response, 'rai_media_filtered_reasons')):
|
|
|
|
filtered_reasons = operation.response.rai_media_filtered_reasons
|
|
error_message = f"Video generation operation {i+1} blocked by content safety filters:\n\n{filtered_reasons[0] if filtered_reasons else 'Content filtered by safety settings'}"
|
|
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': 'Content filtered by safety settings'
|
|
})
|
|
else:
|
|
error_message = f"Operation {i+1} completed, but no video URIs found in the response."
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
# Add videos from this operation
|
|
operation_videos = operation.response.generated_videos
|
|
print(f"Operation {i+1} generated {len(operation_videos)} videos")
|
|
for j, video in enumerate(operation_videos):
|
|
if hasattr(video, 'video') and hasattr(video.video, 'uri'):
|
|
print(f" Video {j+1}: {video.video.uri}")
|
|
|
|
all_generated_videos.extend(operation_videos)
|
|
print(f"Total videos collected so far: {len(all_generated_videos)}")
|
|
|
|
print(f"=== FINAL VIDEO SELECTION ===")
|
|
print(f"Total videos generated: {len(all_generated_videos)}")
|
|
print(f"Videos requested: {sample_count}")
|
|
|
|
# Take exactly the number of videos requested
|
|
generated_videos = all_generated_videos[:sample_count]
|
|
print(f"Videos to download: {len(generated_videos)}")
|
|
print("===============================")
|
|
|
|
# DEBUG: Log video objects details
|
|
print("=== VIDEO OBJECTS DEBUG ===")
|
|
print(f"Total videos generated: {len(all_generated_videos)}")
|
|
print(f"Requested sample count: {sample_count}")
|
|
print(f"Videos to use: {len(generated_videos)}")
|
|
for i, video in enumerate(generated_videos):
|
|
print(f"Video {i+1} object: {video}")
|
|
print(f"Video {i+1} type: {type(video)}")
|
|
if hasattr(video, 'video') and hasattr(video.video, 'uri'):
|
|
print(f"Video {i+1} URI: {video.video.uri}")
|
|
print("=== END VIDEO DEBUG ===")
|
|
|
|
# Validate all videos have URIs
|
|
video_uris = []
|
|
for i, video in enumerate(generated_videos):
|
|
if not hasattr(video, 'video') or not hasattr(video.video, 'uri'):
|
|
error_message = f"Video {i+1} does not contain video URI at expected path."
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
video_uris.append(video.video.uri)
|
|
|
|
print(f"Videos generated successfully. GCS URIs: {video_uris}")
|
|
|
|
job_status[job_id].update({
|
|
'status': 'downloading',
|
|
'progress': 90,
|
|
'message': f'Downloading {len(video_uris)} video(s)...',
|
|
'gcs_video_uris': video_uris
|
|
})
|
|
|
|
# Always use job-specific folder for consistency
|
|
download_folder = os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}")
|
|
if not os.path.exists(download_folder):
|
|
os.makedirs(download_folder, exist_ok=True)
|
|
print(f"Created job-specific folder for videos: {download_folder}")
|
|
else:
|
|
print(f"Using existing job-specific folder for videos: {download_folder}")
|
|
|
|
downloaded_video_paths = []
|
|
|
|
for i, gcs_video_uri in enumerate(video_uris):
|
|
# Parse GCS URI to get bucket name and blob name
|
|
if not gcs_video_uri.startswith("gs://"):
|
|
error_message = f"Invalid GCS URI received: {gcs_video_uri}"
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': error_message
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
return
|
|
|
|
path_parts = gcs_video_uri.replace("gs://", "").split("/", 1)
|
|
source_bucket_name = path_parts[0]
|
|
source_blob_name = path_parts[1]
|
|
|
|
# Construct local file name in the same folder as the image
|
|
video_filename = os.path.join(download_folder, f"generated_video_{i+1}.mp4")
|
|
|
|
# Download the video
|
|
local_video_path = download_blob(source_bucket_name, source_blob_name, video_filename)
|
|
downloaded_video_paths.append(local_video_path)
|
|
|
|
print(f"Downloaded video {i+1}: {local_video_path}")
|
|
|
|
# Determine if we need to create zip
|
|
has_image = image_path is not None
|
|
video_count = len(downloaded_video_paths)
|
|
should_use_zip = video_count > 1 or has_image
|
|
|
|
zip_filename = None
|
|
files_added_to_zip = []
|
|
|
|
print(f"=== ZIP CREATION DECISION ===")
|
|
print(f"has_image: {has_image}")
|
|
print(f"video_count: {video_count}")
|
|
print(f"should_use_zip: {should_use_zip}")
|
|
print("==============================")
|
|
|
|
if should_use_zip:
|
|
# Create a zip file inside the job folder
|
|
zip_filename = os.path.join(download_folder, "all_videos.zip")
|
|
print(f"Creating zip file (needed for {video_count} videos, has_image={has_image}): {zip_filename}")
|
|
print(f"Source folder: {download_folder}")
|
|
|
|
with zipfile.ZipFile(zip_filename, 'w') as zipf:
|
|
# Add all video files from the download folder (excluding the zip itself)
|
|
for root, dirs, files in os.walk(download_folder):
|
|
print(f"Walking directory: {root}, found {len(files)} files")
|
|
for file in files:
|
|
if file == "all_videos.zip": # Skip the zip file itself
|
|
continue
|
|
file_path = os.path.join(root, file)
|
|
if os.path.exists(file_path):
|
|
# Add file to zip with relative path (just the filename)
|
|
arcname = os.path.basename(file_path)
|
|
zipf.write(file_path, arcname)
|
|
files_added_to_zip.append(arcname)
|
|
print(f"Added to zip: {file} as {arcname} (size: {os.path.getsize(file_path)} bytes)")
|
|
else:
|
|
print(f"Warning: File not found: {file_path}")
|
|
|
|
zip_size = os.path.getsize(zip_filename) if os.path.exists(zip_filename) else 0
|
|
print(f"Created zip file: {zip_filename}")
|
|
print(f"Zip file size: {zip_size} bytes")
|
|
print(f"Files in zip: {files_added_to_zip}")
|
|
else:
|
|
print(f"Skipping zip creation (single video, no image): {video_count} videos, has_image={has_image}")
|
|
|
|
print(f"=== DOWNLOAD STRATEGY DECISION ===")
|
|
print(f"zip_filename: {zip_filename}")
|
|
print(f"should_use_zip: {should_use_zip}")
|
|
print("===================================")
|
|
|
|
if should_use_zip:
|
|
# Validate zip file exists and is not empty
|
|
if os.path.exists(zip_filename) and os.path.getsize(zip_filename) > 0:
|
|
primary_download_path = zip_filename
|
|
download_type = 'zip'
|
|
message = f'Video generation completed successfully! Generated {video_count} video(s).'
|
|
if has_image:
|
|
message += ' Download includes image and videos.'
|
|
print(f"Using zip file for download: {zip_filename}")
|
|
else:
|
|
print(f"ERROR: Zip file missing or empty: {zip_filename}")
|
|
# Fallback to first video file
|
|
primary_download_path = downloaded_video_paths[0] if downloaded_video_paths else None
|
|
download_type = 'mp4'
|
|
message = f'Video generation completed successfully! Generated {video_count} video(s). (Zip creation failed, using first video)'
|
|
else:
|
|
# Single video, no image - provide direct mp4 download
|
|
primary_download_path = downloaded_video_paths[0]
|
|
download_type = 'mp4'
|
|
message = f'Video generation completed successfully! Generated 1 video.'
|
|
print(f"Using single video file for download: {primary_download_path}")
|
|
|
|
print(f"Final download decision:")
|
|
print(f" primary_download_path: {primary_download_path}")
|
|
print(f" download_type: {download_type}")
|
|
print(f" should_use_zip: {should_use_zip}")
|
|
|
|
job_status[job_id].update({
|
|
'status': 'completed',
|
|
'progress': 100,
|
|
'message': message,
|
|
'video_path': primary_download_path,
|
|
'video_paths': downloaded_video_paths,
|
|
'individual_video_paths': downloaded_video_paths, # For individual downloads
|
|
'download_folder': download_folder,
|
|
'is_zip': should_use_zip,
|
|
'download_type': download_type,
|
|
'video_count': video_count,
|
|
'videos_completed': downloaded_video_paths,
|
|
'has_image': has_image
|
|
})
|
|
|
|
# Send usage data to webhook for tracking
|
|
send_usage_webhook(user_email, prompt)
|
|
|
|
# Clean up temporary image files
|
|
if image_path and image_blob_name:
|
|
cleanup_image_files(job_id, image_path, image_blob_name)
|
|
|
|
# Clean up last frame if provided
|
|
if last_frame_path and last_frame_blob_name:
|
|
cleanup_image_files(job_id, last_frame_path, last_frame_blob_name)
|
|
|
|
# Clean up reference images if provided
|
|
if reference_image_paths and reference_image_blob_names:
|
|
from utils.storage import cleanup_multiple_images
|
|
cleanup_multiple_images(job_id, reference_image_paths, reference_image_blob_names)
|
|
|
|
# Mark job as completed in queue
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
|
|
except Exception as e:
|
|
# Check if this is a retryable error and we haven't exceeded max retries
|
|
job = job_status.get(job_id, {})
|
|
retry_count = job.get('retry_count', 0)
|
|
|
|
is_retryable_error = (
|
|
'503' in str(e) or
|
|
'500' in str(e) or
|
|
'timeout' in str(e).lower() or
|
|
'connection' in str(e).lower() or
|
|
'timeouterror' in str(e).lower() or
|
|
'connection aborted' in str(e).lower() or
|
|
'write operation timed out' in str(e).lower()
|
|
)
|
|
|
|
if is_retryable_error and retry_count < MAX_RETRIES:
|
|
# Retry the job
|
|
retry_count += 1
|
|
# Determine error type for user-friendly message
|
|
error_type = "Network timeout"
|
|
if 'timeout' in str(e).lower() or 'timeouterror' in str(e).lower():
|
|
error_type = "Upload timeout"
|
|
elif 'connection' in str(e).lower():
|
|
error_type = "Connection error"
|
|
elif '503' in str(e) or '500' in str(e):
|
|
error_type = "Server error"
|
|
|
|
job_status[job_id].update({
|
|
'status': f'retry_{retry_count}_of_{MAX_RETRIES}',
|
|
'progress': 0,
|
|
'message': f'{error_type} - Retry attempt {retry_count} of {MAX_RETRIES} in progress...',
|
|
'retry_count': retry_count,
|
|
'error': f'Previous attempt failed: {str(e)}'
|
|
})
|
|
|
|
logger.info(f"Retrying job {job_id}, attempt {retry_count}/{MAX_RETRIES}. Error: {str(e)}")
|
|
|
|
# Wait before retry (exponential backoff)
|
|
retry_delays = [30, 60, 120] # 30s, 1min, 2min
|
|
delay = retry_delays[min(retry_count - 1, len(retry_delays) - 1)]
|
|
|
|
logger.info(f"Waiting {delay} seconds before retry...")
|
|
time.sleep(delay)
|
|
|
|
# Retry by calling this function again
|
|
logger.info(f"Starting retry {retry_count} for job {job_id}")
|
|
try:
|
|
process_video_generation(job_id)
|
|
except RecursionError:
|
|
logger.error(f"Recursion error detected for job {job_id}, treating as final failure")
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': 'System error during retry processing',
|
|
'error': 'Retry system failure',
|
|
'final_failure': True
|
|
})
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
else:
|
|
# Final failure
|
|
attempts_msg = f" (after {retry_count} retries)" if retry_count > 0 else ""
|
|
error_message = f"Video generation failed{attempts_msg}: {str(e)}"
|
|
|
|
job_status[job_id].update({
|
|
'status': 'failed',
|
|
'progress': 0,
|
|
'message': error_message,
|
|
'error': str(e),
|
|
'final_failure': True
|
|
})
|
|
|
|
logger.error(f"Job {job_id} failed after {retry_count} retries: {str(e)}")
|
|
|
|
# Clean up image files on failure
|
|
if image_path and image_blob_name:
|
|
cleanup_image_files(job_id, image_path, image_blob_name)
|
|
|
|
# Clean up last frame on failure
|
|
if last_frame_path and last_frame_blob_name:
|
|
cleanup_image_files(job_id, last_frame_path, last_frame_blob_name)
|
|
|
|
# Clean up reference images on failure
|
|
if reference_image_paths and reference_image_blob_names:
|
|
from utils.storage import cleanup_multiple_images
|
|
cleanup_multiple_images(job_id, reference_image_paths, reference_image_blob_names)
|
|
|
|
# Mark job as completed in queue (even if failed)
|
|
complete_job(job_id)
|
|
remove_from_queue(job_id, user_email)
|
|
|
|
def get_job_status(job_id: str) -> dict:
|
|
"""Get the status of a video generation job with queue information."""
|
|
job = job_status.get(job_id)
|
|
if not job:
|
|
return {'status': 'not_found', 'message': 'Job not found'}
|
|
|
|
# Add current queue position if still queued
|
|
if job['status'] == 'queued':
|
|
job['queue_position'] = get_queue_position(job_id)
|
|
|
|
return job
|
|
|
|
def get_user_jobs(user_email: str) -> list:
|
|
"""Get all jobs for a specific user."""
|
|
user_jobs = []
|
|
for job_id, job in job_status.items():
|
|
if job.get('user_email') == user_email:
|
|
job_copy = job.copy()
|
|
job_copy['job_id'] = job_id
|
|
# Add current queue position if still queued
|
|
if job_copy['status'] == 'queued':
|
|
job_copy['queue_position'] = get_queue_position(job_id)
|
|
user_jobs.append(job_copy)
|
|
|
|
# Sort by creation time (newest first)
|
|
user_jobs.sort(key=lambda x: x.get('created_at', ''), reverse=True)
|
|
return user_jobs
|
|
|
|
def get_queue_status() -> dict:
|
|
"""Get overall queue status."""
|
|
return {
|
|
'queue_length': len(job_queue),
|
|
'processing_jobs': len(processing_jobs),
|
|
'concurrent_limit': CONCURRENT_JOB_LIMIT,
|
|
'queued_jobs': job_queue.copy(),
|
|
'active_jobs': processing_jobs.copy()
|
|
}
|
|
|
|
def cleanup_job_files(job_id: str, cleanup_gcs: bool = True) -> bool:
|
|
"""
|
|
Clean up local and optionally GCS files for a job.
|
|
|
|
Args:
|
|
job_id: The job ID to clean up
|
|
cleanup_gcs: Whether to also clean up GCS files (default: True)
|
|
"""
|
|
job = job_status.get(job_id)
|
|
if not job:
|
|
print(f"Job {job_id} not found for cleanup")
|
|
return False
|
|
|
|
success = True
|
|
start_time = time.time()
|
|
|
|
# Clean up entire job folder (more efficient than individual files)
|
|
download_folder = job.get('download_folder')
|
|
if download_folder and os.path.exists(download_folder):
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(download_folder)
|
|
print(f"Deleted entire job folder: {download_folder}")
|
|
except Exception as e:
|
|
print(f"Error deleting job folder: {e}")
|
|
success = False
|
|
else:
|
|
# Fallback: try to delete individual files if folder not specified
|
|
if job.get('video_path') and os.path.exists(job['video_path']):
|
|
try:
|
|
os.remove(job['video_path'])
|
|
print(f"Deleted main video file: {job['video_path']}")
|
|
except Exception as e:
|
|
print(f"Error deleting main video file: {e}")
|
|
success = False
|
|
|
|
# Clean up individual video files
|
|
if job.get('video_paths'):
|
|
for video_path in job['video_paths']:
|
|
if os.path.exists(video_path):
|
|
try:
|
|
os.remove(video_path)
|
|
print(f"Deleted individual video file: {video_path}")
|
|
except Exception as e:
|
|
print(f"Error deleting individual video file: {e}")
|
|
success = False
|
|
|
|
# Delete GCS video files only if requested (can be slow)
|
|
if cleanup_gcs:
|
|
# Delete GCS video files (for backwards compatibility, keep the old logic)
|
|
if job.get('source_bucket_name') and job.get('source_blob_name'):
|
|
try:
|
|
delete_blob(job['source_bucket_name'], job['source_blob_name'])
|
|
except Exception as e:
|
|
print(f"Error deleting GCS file: {e}")
|
|
success = False
|
|
|
|
# Delete multiple GCS video files
|
|
if job.get('gcs_video_uris'):
|
|
for gcs_uri in job['gcs_video_uris']:
|
|
try:
|
|
if gcs_uri.startswith("gs://"):
|
|
path_parts = gcs_uri.replace("gs://", "").split("/", 1)
|
|
bucket_name = path_parts[0]
|
|
blob_name = path_parts[1]
|
|
delete_blob(bucket_name, blob_name)
|
|
print(f"Deleted GCS file: {gcs_uri}")
|
|
except Exception as e:
|
|
print(f"Error deleting GCS file {gcs_uri}: {e}")
|
|
success = False
|
|
else:
|
|
print("Skipping GCS cleanup (local files only)")
|
|
|
|
# Delete image files (includes both local and GCS)
|
|
if cleanup_gcs and (job.get('local_image_path') or job.get('image_blob_name')):
|
|
try:
|
|
cleanup_image_files(
|
|
job_id,
|
|
job.get('local_image_path'),
|
|
job.get('image_blob_name')
|
|
)
|
|
except Exception as e:
|
|
print(f"Error deleting image files: {e}")
|
|
success = False
|
|
|
|
# Remove job from memory
|
|
if success:
|
|
del job_status[job_id]
|
|
|
|
# Log cleanup completion time
|
|
cleanup_time = time.time() - start_time
|
|
cleanup_type = "full (local + GCS)" if cleanup_gcs else "local only"
|
|
print(f"Cleanup completed for job {job_id} in {cleanup_time:.2f}s ({cleanup_type})")
|
|
|
|
return success |