feature of multi-processes

This commit is contained in:
Manish Tanwar 2025-10-08 19:03:04 +05:30
parent e10df6feb3
commit bd76e4ef67
12 changed files with 843 additions and 73 deletions

View file

@ -2,9 +2,10 @@ import os
import time
import threading
import tempfile
import datetime
from werkzeug.utils import secure_filename
from flask import Blueprint, request, jsonify, send_file
from video_generator import generate_video_async, get_job_status, cleanup_job_files
from video_generator import generate_video_async, get_job_status, cleanup_job_files, get_user_jobs, get_queue_status, cancel_job, can_add_to_queue
from config import Config
api_bp = Blueprint('api', __name__)
@ -182,15 +183,25 @@ def generate_video():
print(f"DEBUG: Error cleaning up image: {e}")
return jsonify({'error': 'Seed must be a number between 0 and 4294967295'}), 400
# Validate sample count
if not isinstance(sample_count, int) or sample_count < 1 or sample_count > 2:
# Validate sample count (increased limit to 10)
if not isinstance(sample_count, int) or sample_count < 1 or sample_count > 10:
if image_path and os.path.exists(image_path):
try:
os.remove(image_path)
os.rmdir(os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}"))
except Exception as e:
print(f"DEBUG: Error cleaning up image: {e}")
return jsonify({'error': 'Sample count must be between 1 and 2'}), 400
return jsonify({'error': 'Sample count must be between 1 and 10'}), 400
# Check queue limit for user
if not can_add_to_queue(user_email):
if image_path and os.path.exists(image_path):
try:
os.remove(image_path)
os.rmdir(os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}"))
except Exception as e:
print(f"DEBUG: Error cleaning up image: {e}")
return jsonify({'error': 'Queue limit exceeded. Maximum 4 jobs per user allowed.'}), 400
# Start video generation
print(f"DEBUG: About to call generate_video_async with job_id: {job_id} and image_path: {image_path}")
@ -397,4 +408,83 @@ def cleanup_job(job_id):
else:
return jsonify({'error': 'Failed to clean up some files'}), 500
except Exception as e:
return jsonify({'error': f'Failed to cleanup: {str(e)}'}), 500
return jsonify({'error': f'Failed to cleanup: {str(e)}'}), 500
@api_bp.route('/user-jobs', methods=['GET'])
def get_user_job_list():
"""Get all jobs for the current user."""
try:
# Get user email from request (you may need to adjust this based on your auth system)
user_email = request.args.get('user_email', 'anonymous')
jobs = get_user_jobs(user_email)
return jsonify({'jobs': jobs}), 200
except Exception as e:
return jsonify({'error': f'Failed to get user jobs: {str(e)}'}), 500
@api_bp.route('/queue-status', methods=['GET'])
def get_queue_status_endpoint():
"""Get overall queue status."""
try:
status = get_queue_status()
return jsonify(status), 200
except Exception as e:
return jsonify({'error': f'Failed to get queue status: {str(e)}'}), 500
@api_bp.route('/cancel/<job_id>', methods=['DELETE'])
def cancel_job_endpoint(job_id):
"""Cancel a queued job."""
try:
success = cancel_job(job_id)
if success:
return jsonify({'message': 'Job cancelled successfully'}), 200
else:
return jsonify({'error': 'Job could not be cancelled (not found or already processing)'}), 400
except Exception as e:
return jsonify({'error': f'Failed to cancel job: {str(e)}'}), 500
@api_bp.route('/test-queue', methods=['GET'])
def test_queue_endpoints():
"""Test endpoint to verify queue routes are working."""
return jsonify({
'message': 'Queue endpoints are working',
'timestamp': datetime.datetime.now().isoformat(),
'available_endpoints': [
'/api/user-jobs',
'/api/queue-status',
'/api/cancel/<job_id>',
'/api/download/<job_id>/video/<index>'
]
}), 200
@api_bp.route('/download/<job_id>/video/<int:video_index>', methods=['GET'])
def download_individual_video(job_id, video_index):
"""Download an individual video from a multi-video job."""
try:
job = get_job_status(job_id)
if job['status'] == 'not_found':
return jsonify({'error': 'Job not found'}), 404
if job['status'] != 'completed':
return jsonify({'error': 'Video not ready for download'}), 400
individual_videos = job.get('individual_video_paths', [])
if video_index < 1 or video_index > len(individual_videos):
return jsonify({'error': f'Video index {video_index} out of range. Available: 1-{len(individual_videos)}'}), 400
video_path = individual_videos[video_index - 1] # Convert to 0-based index
if not os.path.exists(video_path):
return jsonify({'error': f'Video file not found: {video_path}'}), 404
download_name = f'video_{video_index}_{job_id}.mp4'
return send_file(
video_path,
as_attachment=True,
download_name=download_name,
mimetype='video/mp4'
)
except Exception as e:
return jsonify({'error': f'Failed to download video: {str(e)}'}), 500

View file

@ -23,7 +23,7 @@ def download_blob(bucket_name: str, source_blob_name: str, destination_file_name
os.makedirs(os.path.dirname(destination_file_name), exist_ok=True)
try:
blob.download_to_filename(destination_file_name)
blob.download_to_filename(destination_file_name, timeout=600) # 10 minute timeout for video downloads
print(f"Video downloaded from gs://{bucket_name}/{source_blob_name} to {destination_file_name}")
return destination_file_name
except Exception as e:
@ -222,8 +222,8 @@ def upload_image_to_gcs(image_path: str, job_id: str) -> str:
blob = bucket.blob(blob_name)
# Upload converted image
blob.upload_from_filename(converted_image_path)
# Upload converted image with timeout
blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout
# Set content type to JPEG
blob.content_type = 'image/jpeg'

View file

@ -7,6 +7,7 @@ import datetime
import logging
import zipfile
import random
import threading
from google import genai
from google.genai import types
from google.auth.exceptions import DefaultCredentialsError
@ -18,8 +19,16 @@ from config import Config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# In-memory storage for job status (in production, use Redis or database)
# Job queue and processing configuration
CONCURRENT_JOB_LIMIT = 2
MAX_QUEUE_SIZE_PER_USER = 4
MAX_RETRIES = 3
# In-memory storage (in production, use Redis or database)
job_status = {}
job_queue = [] # Global queue: [job_id1, job_id2, ...]
processing_jobs = [] # Currently active jobs (max CONCURRENT_JOB_LIMIT)
user_job_counts = {} # Track jobs per user: {user_email: count}
def send_usage_webhook(user_email: str, prompt: str) -> None:
"""
@ -67,6 +76,103 @@ def send_usage_webhook(user_email: str, prompt: str) -> None:
logger.error(f"Error sending usage data to webhook: {str(e)}")
# Don't raise the exception - webhook failure shouldn't block the main flow
def get_user_queue_count(user_email: str) -> int:
"""Get the current queue count for a user."""
return user_job_counts.get(user_email, 0)
def can_add_to_queue(user_email: str) -> bool:
"""Check if user can add more jobs to queue."""
return get_user_queue_count(user_email) < MAX_QUEUE_SIZE_PER_USER
def add_to_queue(job_id: str, user_email: str) -> None:
"""Add job to queue and update user count."""
global job_queue, user_job_counts
job_queue.append(job_id)
user_job_counts[user_email] = user_job_counts.get(user_email, 0) + 1
logger.info(f"Added job {job_id} to queue for user {user_email}. Queue position: {len(job_queue)}")
def remove_from_queue(job_id: str, user_email: str) -> None:
"""Remove job from queue and update user count."""
global job_queue, user_job_counts
if job_id in job_queue:
job_queue.remove(job_id)
user_job_counts[user_email] = max(0, user_job_counts.get(user_email, 0) - 1)
if user_job_counts[user_email] == 0:
del user_job_counts[user_email]
def get_queue_position(job_id: str) -> int:
"""Get the position of a job in the queue (1-based)."""
try:
return job_queue.index(job_id) + 1
except ValueError:
return -1
def start_next_job() -> None:
"""Start the next job from queue if there's capacity."""
global processing_jobs, job_queue
if len(processing_jobs) >= CONCURRENT_JOB_LIMIT:
return
if not job_queue:
return
next_job_id = job_queue.pop(0)
processing_jobs.append(next_job_id)
# Update job status
if next_job_id in job_status:
job_status[next_job_id].update({
'status': 'processing',
'queue_position': 0,
'started_at': datetime.datetime.now().isoformat()
})
# Start processing in background thread
threading.Thread(
target=process_video_generation,
args=(next_job_id,),
daemon=True
).start()
logger.info(f"Started processing job {next_job_id}")
def complete_job(job_id: str) -> None:
"""Mark job as complete and start next queued job."""
global processing_jobs
if job_id in processing_jobs:
processing_jobs.remove(job_id)
# Start next job if available
start_next_job()
logger.info(f"Completed job {job_id}. Processing jobs: {len(processing_jobs)}")
def cancel_job(job_id: str) -> bool:
"""Cancel a queued job."""
job = job_status.get(job_id)
if not job:
return False
user_email = job.get('user_email', 'unknown')
# Can only cancel queued jobs
if job['status'] != 'queued':
return False
# Remove from queue
remove_from_queue(job_id, user_email)
# Update status
job_status[job_id].update({
'status': 'cancelled',
'message': 'Job cancelled by user',
'cancelled_at': datetime.datetime.now().isoformat()
})
logger.info(f"Cancelled job {job_id} for user {user_email}")
return True
def generate_video_async(
prompt: str,
job_id: str = None,
@ -81,24 +187,83 @@ def generate_video_async(
generate_audio: bool = True
) -> str:
"""
Start video generation asynchronously and return job ID.
Queue video generation job and return job ID.
"""
# Check if user can add more jobs
if not can_add_to_queue(user_email):
raise Exception(f"Queue limit exceeded. Maximum {MAX_QUEUE_SIZE_PER_USER} jobs per user.")
if job_id is None:
job_id = str(uuid.uuid4())
# Initialize job status
# Initialize job status with queue info
job_status[job_id] = {
'status': 'starting',
'user_email': user_email,
'prompt': prompt,
'status': 'queued',
'progress': 0,
'message': 'Initializing video generation...',
'message': 'Job queued for processing...',
'queue_position': 0, # Will be updated when added to queue
'retry_count': 0,
'max_retries': MAX_RETRIES,
'videos_requested': sample_count,
'videos_completed': [],
'individual_video_paths': [],
'can_cancel': True,
'created_at': datetime.datetime.now().isoformat(),
'started_at': None,
'video_path': None,
'gcs_uri': None,
'error': None,
'image_gcs_uri': None,
'image_blob_name': None,
'local_image_path': None
'local_image_path': image_path,
'model_name': model_name,
'video_length_sec': video_length_sec,
'aspect_ratio': aspect_ratio,
'person_generation': person_generation,
'seed': seed,
'generate_audio': generate_audio
}
# Add to queue
add_to_queue(job_id, user_email)
job_status[job_id]['queue_position'] = get_queue_position(job_id)
# Try to start immediately if there's capacity
start_next_job()
return job_id
def process_video_generation(job_id: str) -> None:
"""
Process video generation for a specific job (called from queue processor).
"""
job = job_status.get(job_id)
if not job:
logger.error(f"Job {job_id} not found in status")
return
# Extract parameters from job
prompt = job['prompt']
user_email = job['user_email']
model_name = job['model_name']
video_length_sec = job['video_length_sec']
aspect_ratio = job['aspect_ratio']
sample_count = job['videos_requested']
person_generation = job['person_generation']
image_path = job['local_image_path']
seed = job['seed']
generate_audio = job['generate_audio']
# Update status to starting
job_status[job_id].update({
'status': 'starting',
'progress': 0,
'message': 'Initializing video generation...',
'can_cancel': False # Cannot cancel once processing starts
})
# Validate and upload image if provided
image_gcs_uri = None
image_blob_name = None
@ -114,7 +279,9 @@ def generate_video_async(
'message': f'Image validation failed: {validation_result["error"]}',
'error': validation_result['error']
})
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Auto-select aspect ratio based on the converted image
detected_aspect_ratio = validation_result['aspect_ratio']
@ -150,7 +317,9 @@ def generate_video_async(
'message': error_message,
'error': error_message
})
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
try:
# Validate model selection
@ -164,7 +333,9 @@ def generate_video_async(
'error': error_message
})
logger.warning(f"Model validation failed for job {job_id}: {selected_model}")
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Validate aspect ratio - Support both 16:9 and 9:16
if aspect_ratio not in ["16:9", "9:16"]:
@ -176,7 +347,9 @@ def generate_video_async(
'error': error_message
})
logger.warning(f"Aspect ratio validation failed for job {job_id}: {aspect_ratio}")
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Set up authentication
credentials = get_google_credentials()
@ -317,7 +490,9 @@ def generate_video_async(
'message': error_message,
'error': str(operation.error)
})
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Collect all generated videos from all operations
all_generated_videos = []
@ -347,7 +522,9 @@ def generate_video_async(
'error': error_message
})
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
# Add videos from this operation
operation_videos = operation.response.generated_videos
@ -391,7 +568,9 @@ def generate_video_async(
'message': error_message,
'error': error_message
})
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
video_uris.append(video.video.uri)
print(f"Videos generated successfully. GCS URIs: {video_uris}")
@ -423,7 +602,9 @@ def generate_video_async(
'message': error_message,
'error': error_message
})
return job_id
complete_job(job_id)
remove_from_queue(job_id, user_email)
return
path_parts = gcs_video_uri.replace("gs://", "").split("/", 1)
source_bucket_name = path_parts[0]
@ -438,15 +619,21 @@ def generate_video_async(
print(f"Downloaded video {i+1}: {local_video_path}")
# Determine if we need to create zip first
# Determine if we need to create zip
has_image = image_path is not None
video_count = len(downloaded_video_paths)
will_need_zip = video_count > 1 or has_image
should_use_zip = video_count > 1 or has_image
zip_filename = None
files_added_to_zip = []
if will_need_zip:
print(f"=== ZIP CREATION DECISION ===")
print(f"has_image: {has_image}")
print(f"video_count: {video_count}")
print(f"should_use_zip: {should_use_zip}")
print("==============================")
if should_use_zip:
# Create a zip file inside the job folder
zip_filename = os.path.join(download_folder, "all_videos.zip")
print(f"Creating zip file (needed for {video_count} videos, has_image={has_image}): {zip_filename}")
@ -476,18 +663,8 @@ def generate_video_async(
else:
print(f"Skipping zip creation (single video, no image): {video_count} videos, has_image={has_image}")
# Determine download strategy
has_image = image_path is not None
video_count = len(downloaded_video_paths)
print(f"=== DOWNLOAD STRATEGY DECISION ===")
print(f"has_image: {has_image}")
print(f"video_count: {video_count}")
print(f"image_path: {image_path}")
print(f"downloaded_video_paths: {downloaded_video_paths}")
# Use zip for multiple videos OR when image is present
should_use_zip = video_count > 1 or has_image
print(f"zip_filename: {zip_filename}")
print(f"should_use_zip: {should_use_zip}")
print("===================================")
@ -524,10 +701,12 @@ def generate_video_async(
'message': message,
'video_path': primary_download_path,
'video_paths': downloaded_video_paths,
'individual_video_paths': downloaded_video_paths, # For individual downloads
'download_folder': download_folder,
'is_zip': should_use_zip,
'download_type': download_type,
'video_count': video_count,
'videos_completed': downloaded_video_paths,
'has_image': has_image
})
@ -537,25 +716,130 @@ def generate_video_async(
# Clean up temporary image files
if image_path and image_blob_name:
cleanup_image_files(job_id, image_path, image_blob_name)
# Mark job as completed in queue
complete_job(job_id)
remove_from_queue(job_id, user_email)
except Exception as e:
error_message = f"An error occurred during video generation: {str(e)}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': str(e)
})
# Check if this is a retryable error and we haven't exceeded max retries
job = job_status.get(job_id, {})
retry_count = job.get('retry_count', 0)
# Clean up image files on failure
if image_path and image_blob_name:
cleanup_image_files(job_id, image_path, image_blob_name)
return job_id
is_retryable_error = (
'503' in str(e) or
'500' in str(e) or
'timeout' in str(e).lower() or
'connection' in str(e).lower() or
'timeouterror' in str(e).lower() or
'connection aborted' in str(e).lower() or
'write operation timed out' in str(e).lower()
)
if is_retryable_error and retry_count < MAX_RETRIES:
# Retry the job
retry_count += 1
# Determine error type for user-friendly message
error_type = "Network timeout"
if 'timeout' in str(e).lower() or 'timeouterror' in str(e).lower():
error_type = "Upload timeout"
elif 'connection' in str(e).lower():
error_type = "Connection error"
elif '503' in str(e) or '500' in str(e):
error_type = "Server error"
job_status[job_id].update({
'status': f'retry_{retry_count}_of_{MAX_RETRIES}',
'progress': 0,
'message': f'{error_type} - Retry attempt {retry_count} of {MAX_RETRIES} in progress...',
'retry_count': retry_count,
'error': f'Previous attempt failed: {str(e)}'
})
logger.info(f"Retrying job {job_id}, attempt {retry_count}/{MAX_RETRIES}. Error: {str(e)}")
# Wait before retry (exponential backoff)
retry_delays = [30, 60, 120] # 30s, 1min, 2min
delay = retry_delays[min(retry_count - 1, len(retry_delays) - 1)]
logger.info(f"Waiting {delay} seconds before retry...")
time.sleep(delay)
# Retry by calling this function again
logger.info(f"Starting retry {retry_count} for job {job_id}")
try:
process_video_generation(job_id)
except RecursionError:
logger.error(f"Recursion error detected for job {job_id}, treating as final failure")
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': 'System error during retry processing',
'error': 'Retry system failure',
'final_failure': True
})
complete_job(job_id)
remove_from_queue(job_id, user_email)
else:
# Final failure
attempts_msg = f" (after {retry_count} retries)" if retry_count > 0 else ""
error_message = f"Video generation failed{attempts_msg}: {str(e)}"
job_status[job_id].update({
'status': 'failed',
'progress': 0,
'message': error_message,
'error': str(e),
'final_failure': True
})
logger.error(f"Job {job_id} failed after {retry_count} retries: {str(e)}")
# Clean up image files on failure
if image_path and image_blob_name:
cleanup_image_files(job_id, image_path, image_blob_name)
# Mark job as completed in queue (even if failed)
complete_job(job_id)
remove_from_queue(job_id, user_email)
def get_job_status(job_id: str) -> dict:
"""Get the status of a video generation job."""
return job_status.get(job_id, {'status': 'not_found', 'message': 'Job not found'})
"""Get the status of a video generation job with queue information."""
job = job_status.get(job_id)
if not job:
return {'status': 'not_found', 'message': 'Job not found'}
# Add current queue position if still queued
if job['status'] == 'queued':
job['queue_position'] = get_queue_position(job_id)
return job
def get_user_jobs(user_email: str) -> list:
"""Get all jobs for a specific user."""
user_jobs = []
for job_id, job in job_status.items():
if job.get('user_email') == user_email:
job_copy = job.copy()
job_copy['job_id'] = job_id
# Add current queue position if still queued
if job_copy['status'] == 'queued':
job_copy['queue_position'] = get_queue_position(job_id)
user_jobs.append(job_copy)
# Sort by creation time (newest first)
user_jobs.sort(key=lambda x: x.get('created_at', ''), reverse=True)
return user_jobs
def get_queue_status() -> dict:
"""Get overall queue status."""
return {
'queue_length': len(job_queue),
'processing_jobs': len(processing_jobs),
'concurrent_limit': CONCURRENT_JOB_LIMIT,
'queued_jobs': job_queue.copy(),
'active_jobs': processing_jobs.copy()
}
def cleanup_job_files(job_id: str) -> bool:
"""Clean up local and GCS files for a job."""

View file

@ -29,12 +29,18 @@ const ProgressIndicator = ({
const getStatusChip = () => {
const statusConfig = {
[JOB_STATUS.QUEUED]: { label: 'Queued', color: 'info' },
[JOB_STATUS.STARTING]: { label: 'Starting', color: 'info' },
[JOB_STATUS.UPLOADING_IMAGE]: { label: 'Uploading Image', color: 'info' },
[JOB_STATUS.GENERATING]: { label: 'Generating', color: 'info' },
[JOB_STATUS.PROCESSING]: { label: 'Processing', color: 'warning' },
[JOB_STATUS.DOWNLOADING]: { label: 'Downloading', color: 'warning' },
[JOB_STATUS.COMPLETED]: { label: 'Completed', color: 'success' },
[JOB_STATUS.FAILED]: { label: 'Failed', color: 'error' }
[JOB_STATUS.FAILED]: { label: 'Failed', color: 'error' },
[JOB_STATUS.CANCELLED]: { label: 'Cancelled', color: 'default' },
[JOB_STATUS.RETRY_1_OF_3]: { label: 'Retry 1/3', color: 'warning' },
[JOB_STATUS.RETRY_2_OF_3]: { label: 'Retry 2/3', color: 'warning' },
[JOB_STATUS.RETRY_3_OF_3]: { label: 'Retry 3/3', color: 'warning' }
};
const config = statusConfig[status] || { label: 'Unknown', color: 'default' };

View file

@ -0,0 +1,304 @@
import React, { useState, useEffect } from 'react';
import {
Paper,
Typography,
Box,
List,
ListItem,
Chip,
Button,
LinearProgress,
Grid,
Divider,
Alert,
IconButton,
Tooltip
} from '@mui/material';
import {
CheckCircleRounded,
ErrorRounded,
HourglassEmptyRounded,
PlayArrowRounded,
CancelRounded,
DownloadRounded,
RefreshRounded
} from '@mui/icons-material';
import { JOB_STATUS, API_BASE_URL } from '../utils/constants';
import { getUserJobs, getQueueStatus, cancelJob } from '../services/api';
const QueueManager = ({ userEmail }) => {
const [jobs, setJobs] = useState([]);
const [queueStatus, setQueueStatus] = useState({});
const [loading, setLoading] = useState(true);
const [error, setError] = useState('');
const fetchUserJobs = async () => {
try {
console.log('Fetching user jobs for:', userEmail);
const data = await getUserJobs(userEmail);
console.log('User jobs data:', data);
setJobs(data.jobs || []);
} catch (err) {
console.error('fetchUserJobs error:', err);
setError(`Failed to load jobs: ${err.message}`);
}
};
const fetchQueueStatusData = async () => {
try {
console.log('Fetching queue status');
const data = await getQueueStatus();
console.log('Queue status data:', data);
setQueueStatus(data);
} catch (err) {
console.error('fetchQueueStatus error:', err);
}
};
const handleCancelJob = async (jobId) => {
try {
await cancelJob(jobId);
await fetchUserJobs(); // Refresh the list
} catch (err) {
setError(`Failed to cancel job: ${err.message}`);
}
};
useEffect(() => {
const loadData = async () => {
setLoading(true);
await Promise.all([fetchUserJobs(), fetchQueueStatusData()]);
setLoading(false);
};
loadData();
// Poll for updates every 2 seconds
const interval = setInterval(() => {
fetchUserJobs();
fetchQueueStatusData();
}, 2000);
return () => clearInterval(interval);
}, [userEmail]);
const getStatusIcon = (status) => {
if (status === JOB_STATUS.COMPLETED) return <CheckCircleRounded color="success" />;
if (status === JOB_STATUS.FAILED) return <ErrorRounded color="error" />;
if (status === JOB_STATUS.CANCELLED) return <CancelRounded color="disabled" />;
if (status === JOB_STATUS.QUEUED) return <HourglassEmptyRounded color="info" />;
if (status.startsWith('retry_')) return <RefreshRounded color="warning" />;
return <PlayArrowRounded color="primary" />;
};
const getStatusColor = (status) => {
if (status === JOB_STATUS.COMPLETED) return 'success';
if (status === JOB_STATUS.FAILED) return 'error';
if (status === JOB_STATUS.CANCELLED) return 'default';
if (status === JOB_STATUS.QUEUED) return 'info';
if (status.startsWith('retry_')) return 'warning';
return 'primary';
};
const getProgressValue = (job) => {
if (job.status === JOB_STATUS.COMPLETED) return 100;
if (job.status === JOB_STATUS.FAILED || job.status === JOB_STATUS.CANCELLED) return 0;
return job.progress || 0;
};
const renderDownloadButtons = (job) => {
if (job.status !== JOB_STATUS.COMPLETED) return null;
const videoCount = job.video_count || 1;
const buttons = [];
// Main download button (zip for multiple videos, single video for one)
buttons.push(
<Button
key="main-download"
variant="contained"
size="small"
startIcon={<DownloadRounded />}
href={`${API_BASE_URL}/api/download/${job.job_id}`}
sx={{ mr: 1, mb: 1 }}
>
Download {videoCount > 1 ? 'All' : 'Video'}
</Button>
);
// Individual download buttons for multiple videos
if (videoCount > 1) {
for (let i = 1; i <= videoCount; i++) {
buttons.push(
<Button
key={`video-${i}`}
variant="outlined"
size="small"
href={`${API_BASE_URL}/api/download/${job.job_id}/video/${i}`}
sx={{ mr: 0.5, mb: 1 }}
>
Video {i}
</Button>
);
}
}
return <Box sx={{ mt: 1 }}>{buttons}</Box>;
};
if (loading) {
return (
<Paper elevation={2} sx={{ p: 3 }}>
<Typography variant="h6" gutterBottom>Job Queue</Typography>
<LinearProgress />
</Paper>
);
}
return (
<Paper elevation={2} sx={{ p: 3 }}>
<Box sx={{ display: 'flex', justifyContent: 'space-between', alignItems: 'center', mb: 2 }}>
<Typography variant="h6">Job Queue</Typography>
<Tooltip title="Refresh">
<IconButton onClick={() => { fetchUserJobs(); fetchQueueStatusData(); }}>
<RefreshRounded />
</IconButton>
</Tooltip>
</Box>
{/* Queue Status Summary */}
{queueStatus.queue_length !== undefined && (
<Alert severity="info" sx={{ mb: 2 }}>
Queue: {queueStatus.queue_length} waiting
Processing: {queueStatus.processing_jobs}/{queueStatus.concurrent_limit}
Your jobs: {jobs.length}/4
</Alert>
)}
{error && (
<Alert severity="error" sx={{ mb: 2 }} onClose={() => setError('')}>
{error}
</Alert>
)}
{jobs.length === 0 ? (
<Typography color="text.secondary" sx={{ textAlign: 'center', py: 4 }}>
No jobs found. Submit a video generation request to see it here.
</Typography>
) : (
<List>
{jobs.map((job, index) => (
<React.Fragment key={job.job_id}>
<ListItem sx={{ flexDirection: 'column', alignItems: 'stretch', py: 2 }}>
<Box sx={{ display: 'flex', alignItems: 'center', justifyContent: 'space-between', width: '100%', mb: 1 }}>
<Box sx={{ display: 'flex', alignItems: 'center', gap: 1 }}>
{getStatusIcon(job.status)}
<Typography variant="subtitle2" sx={{ fontWeight: 'bold' }}>
{job.videos_requested} video{job.videos_requested > 1 ? 's' : ''}
</Typography>
<Chip
label={job.status.replace(/_/g, ' ').toUpperCase()}
color={getStatusColor(job.status)}
size="small"
/>
{job.status === JOB_STATUS.QUEUED && job.queue_position > 0 && (
<Chip
label={`Position: ${job.queue_position}`}
variant="outlined"
size="small"
/>
)}
</Box>
<Box sx={{ display: 'flex', gap: 1 }}>
{job.can_cancel && job.status === JOB_STATUS.QUEUED && (
<Button
variant="outlined"
color="error"
size="small"
startIcon={<CancelRounded />}
onClick={() => handleCancelJob(job.job_id)}
>
Cancel
</Button>
)}
</Box>
</Box>
<Typography variant="body2" color="text.secondary" sx={{ mb: 1 }}>
{job.prompt.length > 100 ? `${job.prompt.substring(0, 100)}...` : job.prompt}
</Typography>
<Typography variant="caption" color="text.secondary" sx={{ mb: 1 }}>
{job.message || 'Processing...'}
</Typography>
{/* Progress Bar */}
<LinearProgress
variant="determinate"
value={getProgressValue(job)}
sx={{ mb: 1 }}
/>
{/* Download Buttons */}
{renderDownloadButtons(job)}
{/* Job Details */}
<Grid container spacing={1} sx={{ mt: 1 }}>
<Grid item xs={6} sm={3}>
<Typography variant="caption" color="text.secondary">
Created: {new Date(job.created_at).toLocaleString()}
</Typography>
</Grid>
{job.started_at && (
<Grid item xs={6} sm={3}>
<Typography variant="caption" color="text.secondary">
Started: {new Date(job.started_at).toLocaleString()}
</Typography>
</Grid>
)}
<Grid item xs={6} sm={3}>
<Typography variant="caption" color="text.secondary">
Model: {job.model_name?.includes('fast') ? 'Fast' : 'Standard'}
</Typography>
</Grid>
<Grid item xs={6} sm={3}>
<Typography variant="caption" color="text.secondary">
Length: {job.video_length_sec}s
</Typography>
</Grid>
</Grid>
{/* Retry Information */}
{job.retry_count > 0 && (
<Alert severity="warning" sx={{ mt: 1 }}>
<strong>Retry attempt {job.retry_count} of {job.max_retries}</strong>
<br />
{job.status.includes('retry') && job.message && (
<span>{job.message}</span>
)}
{job.error && job.error.includes('timeout') && (
<div style={{ marginTop: '8px', fontSize: '0.9em' }}>
<strong>Note:</strong> This is likely due to network connectivity issues. The system will automatically retry with longer timeouts.
</div>
)}
</Alert>
)}
{/* Final Failure Information */}
{job.final_failure && (
<Alert severity="error" sx={{ mt: 1 }}>
Failed after {job.retry_count} retries. {job.error}
</Alert>
)}
</ListItem>
{index < jobs.length - 1 && <Divider />}
</React.Fragment>
))}
</List>
)}
</Paper>
);
};
export default QueueManager;

View file

@ -130,8 +130,8 @@ const VideoForm = ({ onSubmit, isGenerating }) => {
newErrors.seed = 'Seed must be a number between 0 and 4294967295';
}
if (formData.sampleCount < 1 || formData.sampleCount > 2) {
newErrors.sampleCount = 'Sample count must be between 1 and 2';
if (formData.sampleCount < 1 || formData.sampleCount > 10) {
newErrors.sampleCount = 'Sample count must be between 1 and 10';
}
@ -207,6 +207,8 @@ const VideoForm = ({ onSubmit, isGenerating }) => {
</label>
<Typography variant="caption" display="block" sx={{ mt: 1 }}>
Supported: JPG, PNG Max: 10MB Min: 720p
<br />
<em>Tip: Smaller images (under 2MB) upload faster and are less likely to timeout</em>
</Typography>
</Box>
) : (

View file

@ -1,8 +1,9 @@
import React from 'react';
import { Box } from '@mui/material';
import { Box, Grid } from '@mui/material';
import { useIsAuthenticated, useMsal } from '@azure/msal-react';
import VideoForm from './VideoForm';
import ProgressIndicator from './ProgressIndicator';
import QueueManager from './QueueManager';
import { useVideoGeneration } from '../hooks/useVideoGeneration';
import { useMockIsAuthenticated, useMockMsal } from './DevAuthWrapper';
@ -43,20 +44,32 @@ const VideoGenerator = () => {
return (
<Box>
<VideoForm
onSubmit={handleFormSubmit}
isGenerating={isGenerating}
/>
<ProgressIndicator
status={status}
progress={progress}
message={message}
error={error}
downloadLinks={downloadLinks}
onDownload={handleDownload}
onReset={reset}
/>
<Grid container spacing={3}>
{/* Left Column - Video Form */}
<Grid item xs={12} lg={6}>
<VideoForm
onSubmit={handleFormSubmit}
isGenerating={isGenerating}
/>
<Box sx={{ mt: 3 }}>
<ProgressIndicator
status={status}
progress={progress}
message={message}
error={error}
downloadLinks={downloadLinks}
onDownload={handleDownload}
onReset={reset}
/>
</Box>
</Grid>
{/* Right Column - Queue Manager */}
<Grid item xs={12} lg={6}>
<QueueManager userEmail={getUserEmail()} />
</Grid>
</Grid>
</Box>
);
};

View file

@ -72,14 +72,34 @@ export const downloadVideo = async (jobId) => {
responseType: 'blob',
});
// Get the filename from Content-Disposition header or determine from content type
let filename = `generated_video_${jobId}.mp4`;
let mimeType = 'video/mp4';
const contentDisposition = response.headers['content-disposition'];
if (contentDisposition) {
const filenameMatch = contentDisposition.match(/filename="?(.+)"?/);
if (filenameMatch) {
filename = filenameMatch[1];
}
}
const contentType = response.headers['content-type'];
if (contentType) {
mimeType = contentType;
if (contentType.includes('application/zip')) {
filename = filename.replace('.mp4', '.zip');
}
}
// Create blob URL for download
const blob = new Blob([response.data], { type: 'video/mp4' });
const blob = new Blob([response.data], { type: mimeType });
const url = window.URL.createObjectURL(blob);
// Create download link
const link = document.createElement('a');
link.href = url;
link.download = `generated_video_${jobId}.mp4`;
link.download = filename;
document.body.appendChild(link);
link.click();
@ -98,4 +118,42 @@ export const cleanupJob = async (jobId) => {
export const healthCheck = async () => {
const response = await apiClient.get('/health');
return response.data;
};
export const getUserJobs = async (userEmail) => {
const response = await apiClient.get(`/api/user-jobs?user_email=${encodeURIComponent(userEmail)}`);
return response.data;
};
export const getQueueStatus = async () => {
const response = await apiClient.get('/api/queue-status');
return response.data;
};
export const cancelJob = async (jobId) => {
const response = await apiClient.delete(`/api/cancel/${jobId}`);
return response.data;
};
export const downloadIndividualVideo = async (jobId, videoIndex) => {
const response = await apiClient.get(`/api/download/${jobId}/video/${videoIndex}`, {
responseType: 'blob',
});
// Create blob URL for download
const blob = new Blob([response.data], { type: 'video/mp4' });
const url = window.URL.createObjectURL(blob);
// Create download link
const link = document.createElement('a');
link.href = url;
link.download = `video_${videoIndex}_${jobId}.mp4`;
document.body.appendChild(link);
link.click();
// Cleanup
document.body.removeChild(link);
window.URL.revokeObjectURL(url);
return true;
};

View file

@ -34,12 +34,21 @@ export const VIDEO_GENERATION_OPTIONS = {
{ value: 8, label: '8 seconds' }
],
sampleCounts: [
{ value: 1, label: '1' },
{ value: 2, label: '2' }
{ value: 1, label: '1 video' },
{ value: 2, label: '2 videos' },
{ value: 3, label: '3 videos' },
{ value: 4, label: '4 videos' },
{ value: 5, label: '5 videos' },
{ value: 6, label: '6 videos' },
{ value: 7, label: '7 videos' },
{ value: 8, label: '8 videos' },
{ value: 9, label: '9 videos' },
{ value: 10, label: '10 videos' }
]
};
export const JOB_STATUS = {
QUEUED: 'queued',
STARTING: 'starting',
UPLOADING_IMAGE: 'uploading_image',
GENERATING: 'generating',
@ -47,7 +56,11 @@ export const JOB_STATUS = {
DOWNLOADING: 'downloading',
COMPLETED: 'completed',
FAILED: 'failed',
NOT_FOUND: 'not_found'
NOT_FOUND: 'not_found',
CANCELLED: 'cancelled',
RETRY_1_OF_3: 'retry_1_of_3',
RETRY_2_OF_3: 'retry_2_of_3',
RETRY_3_OF_3: 'retry_3_of_3'
};
export const IMAGE_UPLOAD_CONFIG = {