diff --git a/backend/routes/api.py b/backend/routes/api.py index 292badf..67cb81b 100644 --- a/backend/routes/api.py +++ b/backend/routes/api.py @@ -2,9 +2,10 @@ import os import time import threading import tempfile +import datetime from werkzeug.utils import secure_filename from flask import Blueprint, request, jsonify, send_file -from video_generator import generate_video_async, get_job_status, cleanup_job_files +from video_generator import generate_video_async, get_job_status, cleanup_job_files, get_user_jobs, get_queue_status, cancel_job, can_add_to_queue from config import Config api_bp = Blueprint('api', __name__) @@ -182,15 +183,25 @@ def generate_video(): print(f"DEBUG: Error cleaning up image: {e}") return jsonify({'error': 'Seed must be a number between 0 and 4294967295'}), 400 - # Validate sample count - if not isinstance(sample_count, int) or sample_count < 1 or sample_count > 2: + # Validate sample count (increased limit to 10) + if not isinstance(sample_count, int) or sample_count < 1 or sample_count > 10: if image_path and os.path.exists(image_path): try: os.remove(image_path) os.rmdir(os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}")) except Exception as e: print(f"DEBUG: Error cleaning up image: {e}") - return jsonify({'error': 'Sample count must be between 1 and 2'}), 400 + return jsonify({'error': 'Sample count must be between 1 and 10'}), 400 + + # Check queue limit for user + if not can_add_to_queue(user_email): + if image_path and os.path.exists(image_path): + try: + os.remove(image_path) + os.rmdir(os.path.join(Config.TEMP_DOWNLOAD_PATH, f"job_{job_id}")) + except Exception as e: + print(f"DEBUG: Error cleaning up image: {e}") + return jsonify({'error': 'Queue limit exceeded. Maximum 4 jobs per user allowed.'}), 400 # Start video generation print(f"DEBUG: About to call generate_video_async with job_id: {job_id} and image_path: {image_path}") @@ -397,4 +408,83 @@ def cleanup_job(job_id): else: return jsonify({'error': 'Failed to clean up some files'}), 500 except Exception as e: - return jsonify({'error': f'Failed to cleanup: {str(e)}'}), 500 \ No newline at end of file + return jsonify({'error': f'Failed to cleanup: {str(e)}'}), 500 + +@api_bp.route('/user-jobs', methods=['GET']) +def get_user_job_list(): + """Get all jobs for the current user.""" + try: + # Get user email from request (you may need to adjust this based on your auth system) + user_email = request.args.get('user_email', 'anonymous') + jobs = get_user_jobs(user_email) + return jsonify({'jobs': jobs}), 200 + except Exception as e: + return jsonify({'error': f'Failed to get user jobs: {str(e)}'}), 500 + +@api_bp.route('/queue-status', methods=['GET']) +def get_queue_status_endpoint(): + """Get overall queue status.""" + try: + status = get_queue_status() + return jsonify(status), 200 + except Exception as e: + return jsonify({'error': f'Failed to get queue status: {str(e)}'}), 500 + +@api_bp.route('/cancel/', methods=['DELETE']) +def cancel_job_endpoint(job_id): + """Cancel a queued job.""" + try: + success = cancel_job(job_id) + if success: + return jsonify({'message': 'Job cancelled successfully'}), 200 + else: + return jsonify({'error': 'Job could not be cancelled (not found or already processing)'}), 400 + except Exception as e: + return jsonify({'error': f'Failed to cancel job: {str(e)}'}), 500 + +@api_bp.route('/test-queue', methods=['GET']) +def test_queue_endpoints(): + """Test endpoint to verify queue routes are working.""" + return jsonify({ + 'message': 'Queue endpoints are working', + 'timestamp': datetime.datetime.now().isoformat(), + 'available_endpoints': [ + '/api/user-jobs', + '/api/queue-status', + '/api/cancel/', + '/api/download//video/' + ] + }), 200 + +@api_bp.route('/download//video/', methods=['GET']) +def download_individual_video(job_id, video_index): + """Download an individual video from a multi-video job.""" + try: + job = get_job_status(job_id) + + if job['status'] == 'not_found': + return jsonify({'error': 'Job not found'}), 404 + + if job['status'] != 'completed': + return jsonify({'error': 'Video not ready for download'}), 400 + + individual_videos = job.get('individual_video_paths', []) + if video_index < 1 or video_index > len(individual_videos): + return jsonify({'error': f'Video index {video_index} out of range. Available: 1-{len(individual_videos)}'}), 400 + + video_path = individual_videos[video_index - 1] # Convert to 0-based index + + if not os.path.exists(video_path): + return jsonify({'error': f'Video file not found: {video_path}'}), 404 + + download_name = f'video_{video_index}_{job_id}.mp4' + + return send_file( + video_path, + as_attachment=True, + download_name=download_name, + mimetype='video/mp4' + ) + + except Exception as e: + return jsonify({'error': f'Failed to download video: {str(e)}'}), 500 \ No newline at end of file diff --git a/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_1.mp4 b/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_1.mp4 new file mode 100644 index 0000000..1fa0a2e Binary files /dev/null and b/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_1.mp4 differ diff --git a/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_2.mp4 b/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_2.mp4 new file mode 100644 index 0000000..8e49eed Binary files /dev/null and b/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_2.mp4 differ diff --git a/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_3.mp4 b/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_3.mp4 new file mode 100644 index 0000000..1544f31 Binary files /dev/null and b/backend/temp_downloads/job_485524df-1d69-4dc0-8970-4952c397077b/generated_video_3.mp4 differ diff --git a/backend/utils/storage.py b/backend/utils/storage.py index da1809d..411d006 100644 --- a/backend/utils/storage.py +++ b/backend/utils/storage.py @@ -23,7 +23,7 @@ def download_blob(bucket_name: str, source_blob_name: str, destination_file_name os.makedirs(os.path.dirname(destination_file_name), exist_ok=True) try: - blob.download_to_filename(destination_file_name) + blob.download_to_filename(destination_file_name, timeout=600) # 10 minute timeout for video downloads print(f"Video downloaded from gs://{bucket_name}/{source_blob_name} to {destination_file_name}") return destination_file_name except Exception as e: @@ -222,8 +222,8 @@ def upload_image_to_gcs(image_path: str, job_id: str) -> str: blob = bucket.blob(blob_name) - # Upload converted image - blob.upload_from_filename(converted_image_path) + # Upload converted image with timeout + blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout # Set content type to JPEG blob.content_type = 'image/jpeg' diff --git a/backend/video_generator.py b/backend/video_generator.py index dc62735..c461b71 100644 --- a/backend/video_generator.py +++ b/backend/video_generator.py @@ -7,6 +7,7 @@ import datetime import logging import zipfile import random +import threading from google import genai from google.genai import types from google.auth.exceptions import DefaultCredentialsError @@ -18,8 +19,16 @@ from config import Config logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# In-memory storage for job status (in production, use Redis or database) +# Job queue and processing configuration +CONCURRENT_JOB_LIMIT = 2 +MAX_QUEUE_SIZE_PER_USER = 4 +MAX_RETRIES = 3 + +# In-memory storage (in production, use Redis or database) job_status = {} +job_queue = [] # Global queue: [job_id1, job_id2, ...] +processing_jobs = [] # Currently active jobs (max CONCURRENT_JOB_LIMIT) +user_job_counts = {} # Track jobs per user: {user_email: count} def send_usage_webhook(user_email: str, prompt: str) -> None: """ @@ -67,6 +76,103 @@ def send_usage_webhook(user_email: str, prompt: str) -> None: logger.error(f"Error sending usage data to webhook: {str(e)}") # Don't raise the exception - webhook failure shouldn't block the main flow +def get_user_queue_count(user_email: str) -> int: + """Get the current queue count for a user.""" + return user_job_counts.get(user_email, 0) + +def can_add_to_queue(user_email: str) -> bool: + """Check if user can add more jobs to queue.""" + return get_user_queue_count(user_email) < MAX_QUEUE_SIZE_PER_USER + +def add_to_queue(job_id: str, user_email: str) -> None: + """Add job to queue and update user count.""" + global job_queue, user_job_counts + job_queue.append(job_id) + user_job_counts[user_email] = user_job_counts.get(user_email, 0) + 1 + logger.info(f"Added job {job_id} to queue for user {user_email}. Queue position: {len(job_queue)}") + +def remove_from_queue(job_id: str, user_email: str) -> None: + """Remove job from queue and update user count.""" + global job_queue, user_job_counts + if job_id in job_queue: + job_queue.remove(job_id) + user_job_counts[user_email] = max(0, user_job_counts.get(user_email, 0) - 1) + if user_job_counts[user_email] == 0: + del user_job_counts[user_email] + +def get_queue_position(job_id: str) -> int: + """Get the position of a job in the queue (1-based).""" + try: + return job_queue.index(job_id) + 1 + except ValueError: + return -1 + +def start_next_job() -> None: + """Start the next job from queue if there's capacity.""" + global processing_jobs, job_queue + + if len(processing_jobs) >= CONCURRENT_JOB_LIMIT: + return + + if not job_queue: + return + + next_job_id = job_queue.pop(0) + processing_jobs.append(next_job_id) + + # Update job status + if next_job_id in job_status: + job_status[next_job_id].update({ + 'status': 'processing', + 'queue_position': 0, + 'started_at': datetime.datetime.now().isoformat() + }) + + # Start processing in background thread + threading.Thread( + target=process_video_generation, + args=(next_job_id,), + daemon=True + ).start() + + logger.info(f"Started processing job {next_job_id}") + +def complete_job(job_id: str) -> None: + """Mark job as complete and start next queued job.""" + global processing_jobs + + if job_id in processing_jobs: + processing_jobs.remove(job_id) + + # Start next job if available + start_next_job() + logger.info(f"Completed job {job_id}. Processing jobs: {len(processing_jobs)}") + +def cancel_job(job_id: str) -> bool: + """Cancel a queued job.""" + job = job_status.get(job_id) + if not job: + return False + + user_email = job.get('user_email', 'unknown') + + # Can only cancel queued jobs + if job['status'] != 'queued': + return False + + # Remove from queue + remove_from_queue(job_id, user_email) + + # Update status + job_status[job_id].update({ + 'status': 'cancelled', + 'message': 'Job cancelled by user', + 'cancelled_at': datetime.datetime.now().isoformat() + }) + + logger.info(f"Cancelled job {job_id} for user {user_email}") + return True + def generate_video_async( prompt: str, job_id: str = None, @@ -81,24 +187,83 @@ def generate_video_async( generate_audio: bool = True ) -> str: """ - Start video generation asynchronously and return job ID. + Queue video generation job and return job ID. """ + # Check if user can add more jobs + if not can_add_to_queue(user_email): + raise Exception(f"Queue limit exceeded. Maximum {MAX_QUEUE_SIZE_PER_USER} jobs per user.") + if job_id is None: job_id = str(uuid.uuid4()) - # Initialize job status + # Initialize job status with queue info job_status[job_id] = { - 'status': 'starting', + 'user_email': user_email, + 'prompt': prompt, + 'status': 'queued', 'progress': 0, - 'message': 'Initializing video generation...', + 'message': 'Job queued for processing...', + 'queue_position': 0, # Will be updated when added to queue + 'retry_count': 0, + 'max_retries': MAX_RETRIES, + 'videos_requested': sample_count, + 'videos_completed': [], + 'individual_video_paths': [], + 'can_cancel': True, + 'created_at': datetime.datetime.now().isoformat(), + 'started_at': None, 'video_path': None, 'gcs_uri': None, 'error': None, 'image_gcs_uri': None, 'image_blob_name': None, - 'local_image_path': None + 'local_image_path': image_path, + 'model_name': model_name, + 'video_length_sec': video_length_sec, + 'aspect_ratio': aspect_ratio, + 'person_generation': person_generation, + 'seed': seed, + 'generate_audio': generate_audio } + # Add to queue + add_to_queue(job_id, user_email) + job_status[job_id]['queue_position'] = get_queue_position(job_id) + + # Try to start immediately if there's capacity + start_next_job() + + return job_id + +def process_video_generation(job_id: str) -> None: + """ + Process video generation for a specific job (called from queue processor). + """ + job = job_status.get(job_id) + if not job: + logger.error(f"Job {job_id} not found in status") + return + + # Extract parameters from job + prompt = job['prompt'] + user_email = job['user_email'] + model_name = job['model_name'] + video_length_sec = job['video_length_sec'] + aspect_ratio = job['aspect_ratio'] + sample_count = job['videos_requested'] + person_generation = job['person_generation'] + image_path = job['local_image_path'] + seed = job['seed'] + generate_audio = job['generate_audio'] + + # Update status to starting + job_status[job_id].update({ + 'status': 'starting', + 'progress': 0, + 'message': 'Initializing video generation...', + 'can_cancel': False # Cannot cancel once processing starts + }) + # Validate and upload image if provided image_gcs_uri = None image_blob_name = None @@ -114,7 +279,9 @@ def generate_video_async( 'message': f'Image validation failed: {validation_result["error"]}', 'error': validation_result['error'] }) - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return # Auto-select aspect ratio based on the converted image detected_aspect_ratio = validation_result['aspect_ratio'] @@ -150,7 +317,9 @@ def generate_video_async( 'message': error_message, 'error': error_message }) - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return try: # Validate model selection @@ -164,7 +333,9 @@ def generate_video_async( 'error': error_message }) logger.warning(f"Model validation failed for job {job_id}: {selected_model}") - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return # Validate aspect ratio - Support both 16:9 and 9:16 if aspect_ratio not in ["16:9", "9:16"]: @@ -176,7 +347,9 @@ def generate_video_async( 'error': error_message }) logger.warning(f"Aspect ratio validation failed for job {job_id}: {aspect_ratio}") - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return # Set up authentication credentials = get_google_credentials() @@ -317,7 +490,9 @@ def generate_video_async( 'message': error_message, 'error': str(operation.error) }) - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return # Collect all generated videos from all operations all_generated_videos = [] @@ -347,7 +522,9 @@ def generate_video_async( 'error': error_message }) - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return # Add videos from this operation operation_videos = operation.response.generated_videos @@ -391,7 +568,9 @@ def generate_video_async( 'message': error_message, 'error': error_message }) - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return video_uris.append(video.video.uri) print(f"Videos generated successfully. GCS URIs: {video_uris}") @@ -423,7 +602,9 @@ def generate_video_async( 'message': error_message, 'error': error_message }) - return job_id + complete_job(job_id) + remove_from_queue(job_id, user_email) + return path_parts = gcs_video_uri.replace("gs://", "").split("/", 1) source_bucket_name = path_parts[0] @@ -438,15 +619,21 @@ def generate_video_async( print(f"Downloaded video {i+1}: {local_video_path}") - # Determine if we need to create zip first + # Determine if we need to create zip has_image = image_path is not None video_count = len(downloaded_video_paths) - will_need_zip = video_count > 1 or has_image + should_use_zip = video_count > 1 or has_image zip_filename = None files_added_to_zip = [] - if will_need_zip: + print(f"=== ZIP CREATION DECISION ===") + print(f"has_image: {has_image}") + print(f"video_count: {video_count}") + print(f"should_use_zip: {should_use_zip}") + print("==============================") + + if should_use_zip: # Create a zip file inside the job folder zip_filename = os.path.join(download_folder, "all_videos.zip") print(f"Creating zip file (needed for {video_count} videos, has_image={has_image}): {zip_filename}") @@ -476,18 +663,8 @@ def generate_video_async( else: print(f"Skipping zip creation (single video, no image): {video_count} videos, has_image={has_image}") - # Determine download strategy - has_image = image_path is not None - video_count = len(downloaded_video_paths) - print(f"=== DOWNLOAD STRATEGY DECISION ===") - print(f"has_image: {has_image}") - print(f"video_count: {video_count}") - print(f"image_path: {image_path}") - print(f"downloaded_video_paths: {downloaded_video_paths}") - - # Use zip for multiple videos OR when image is present - should_use_zip = video_count > 1 or has_image + print(f"zip_filename: {zip_filename}") print(f"should_use_zip: {should_use_zip}") print("===================================") @@ -524,10 +701,12 @@ def generate_video_async( 'message': message, 'video_path': primary_download_path, 'video_paths': downloaded_video_paths, + 'individual_video_paths': downloaded_video_paths, # For individual downloads 'download_folder': download_folder, 'is_zip': should_use_zip, 'download_type': download_type, 'video_count': video_count, + 'videos_completed': downloaded_video_paths, 'has_image': has_image }) @@ -537,25 +716,130 @@ def generate_video_async( # Clean up temporary image files if image_path and image_blob_name: cleanup_image_files(job_id, image_path, image_blob_name) + + # Mark job as completed in queue + complete_job(job_id) + remove_from_queue(job_id, user_email) except Exception as e: - error_message = f"An error occurred during video generation: {str(e)}" - job_status[job_id].update({ - 'status': 'failed', - 'progress': 0, - 'message': error_message, - 'error': str(e) - }) + # Check if this is a retryable error and we haven't exceeded max retries + job = job_status.get(job_id, {}) + retry_count = job.get('retry_count', 0) - # Clean up image files on failure - if image_path and image_blob_name: - cleanup_image_files(job_id, image_path, image_blob_name) - - return job_id + is_retryable_error = ( + '503' in str(e) or + '500' in str(e) or + 'timeout' in str(e).lower() or + 'connection' in str(e).lower() or + 'timeouterror' in str(e).lower() or + 'connection aborted' in str(e).lower() or + 'write operation timed out' in str(e).lower() + ) + + if is_retryable_error and retry_count < MAX_RETRIES: + # Retry the job + retry_count += 1 + # Determine error type for user-friendly message + error_type = "Network timeout" + if 'timeout' in str(e).lower() or 'timeouterror' in str(e).lower(): + error_type = "Upload timeout" + elif 'connection' in str(e).lower(): + error_type = "Connection error" + elif '503' in str(e) or '500' in str(e): + error_type = "Server error" + + job_status[job_id].update({ + 'status': f'retry_{retry_count}_of_{MAX_RETRIES}', + 'progress': 0, + 'message': f'{error_type} - Retry attempt {retry_count} of {MAX_RETRIES} in progress...', + 'retry_count': retry_count, + 'error': f'Previous attempt failed: {str(e)}' + }) + + logger.info(f"Retrying job {job_id}, attempt {retry_count}/{MAX_RETRIES}. Error: {str(e)}") + + # Wait before retry (exponential backoff) + retry_delays = [30, 60, 120] # 30s, 1min, 2min + delay = retry_delays[min(retry_count - 1, len(retry_delays) - 1)] + + logger.info(f"Waiting {delay} seconds before retry...") + time.sleep(delay) + + # Retry by calling this function again + logger.info(f"Starting retry {retry_count} for job {job_id}") + try: + process_video_generation(job_id) + except RecursionError: + logger.error(f"Recursion error detected for job {job_id}, treating as final failure") + job_status[job_id].update({ + 'status': 'failed', + 'progress': 0, + 'message': 'System error during retry processing', + 'error': 'Retry system failure', + 'final_failure': True + }) + complete_job(job_id) + remove_from_queue(job_id, user_email) + else: + # Final failure + attempts_msg = f" (after {retry_count} retries)" if retry_count > 0 else "" + error_message = f"Video generation failed{attempts_msg}: {str(e)}" + + job_status[job_id].update({ + 'status': 'failed', + 'progress': 0, + 'message': error_message, + 'error': str(e), + 'final_failure': True + }) + + logger.error(f"Job {job_id} failed after {retry_count} retries: {str(e)}") + + # Clean up image files on failure + if image_path and image_blob_name: + cleanup_image_files(job_id, image_path, image_blob_name) + + # Mark job as completed in queue (even if failed) + complete_job(job_id) + remove_from_queue(job_id, user_email) def get_job_status(job_id: str) -> dict: - """Get the status of a video generation job.""" - return job_status.get(job_id, {'status': 'not_found', 'message': 'Job not found'}) + """Get the status of a video generation job with queue information.""" + job = job_status.get(job_id) + if not job: + return {'status': 'not_found', 'message': 'Job not found'} + + # Add current queue position if still queued + if job['status'] == 'queued': + job['queue_position'] = get_queue_position(job_id) + + return job + +def get_user_jobs(user_email: str) -> list: + """Get all jobs for a specific user.""" + user_jobs = [] + for job_id, job in job_status.items(): + if job.get('user_email') == user_email: + job_copy = job.copy() + job_copy['job_id'] = job_id + # Add current queue position if still queued + if job_copy['status'] == 'queued': + job_copy['queue_position'] = get_queue_position(job_id) + user_jobs.append(job_copy) + + # Sort by creation time (newest first) + user_jobs.sort(key=lambda x: x.get('created_at', ''), reverse=True) + return user_jobs + +def get_queue_status() -> dict: + """Get overall queue status.""" + return { + 'queue_length': len(job_queue), + 'processing_jobs': len(processing_jobs), + 'concurrent_limit': CONCURRENT_JOB_LIMIT, + 'queued_jobs': job_queue.copy(), + 'active_jobs': processing_jobs.copy() + } def cleanup_job_files(job_id: str) -> bool: """Clean up local and GCS files for a job.""" diff --git a/frontend/src/components/ProgressIndicator.jsx b/frontend/src/components/ProgressIndicator.jsx index 2a83632..e692083 100644 --- a/frontend/src/components/ProgressIndicator.jsx +++ b/frontend/src/components/ProgressIndicator.jsx @@ -29,12 +29,18 @@ const ProgressIndicator = ({ const getStatusChip = () => { const statusConfig = { + [JOB_STATUS.QUEUED]: { label: 'Queued', color: 'info' }, [JOB_STATUS.STARTING]: { label: 'Starting', color: 'info' }, + [JOB_STATUS.UPLOADING_IMAGE]: { label: 'Uploading Image', color: 'info' }, [JOB_STATUS.GENERATING]: { label: 'Generating', color: 'info' }, [JOB_STATUS.PROCESSING]: { label: 'Processing', color: 'warning' }, [JOB_STATUS.DOWNLOADING]: { label: 'Downloading', color: 'warning' }, [JOB_STATUS.COMPLETED]: { label: 'Completed', color: 'success' }, - [JOB_STATUS.FAILED]: { label: 'Failed', color: 'error' } + [JOB_STATUS.FAILED]: { label: 'Failed', color: 'error' }, + [JOB_STATUS.CANCELLED]: { label: 'Cancelled', color: 'default' }, + [JOB_STATUS.RETRY_1_OF_3]: { label: 'Retry 1/3', color: 'warning' }, + [JOB_STATUS.RETRY_2_OF_3]: { label: 'Retry 2/3', color: 'warning' }, + [JOB_STATUS.RETRY_3_OF_3]: { label: 'Retry 3/3', color: 'warning' } }; const config = statusConfig[status] || { label: 'Unknown', color: 'default' }; diff --git a/frontend/src/components/QueueManager.jsx b/frontend/src/components/QueueManager.jsx new file mode 100644 index 0000000..914fcb6 --- /dev/null +++ b/frontend/src/components/QueueManager.jsx @@ -0,0 +1,304 @@ +import React, { useState, useEffect } from 'react'; +import { + Paper, + Typography, + Box, + List, + ListItem, + Chip, + Button, + LinearProgress, + Grid, + Divider, + Alert, + IconButton, + Tooltip +} from '@mui/material'; +import { + CheckCircleRounded, + ErrorRounded, + HourglassEmptyRounded, + PlayArrowRounded, + CancelRounded, + DownloadRounded, + RefreshRounded +} from '@mui/icons-material'; +import { JOB_STATUS, API_BASE_URL } from '../utils/constants'; +import { getUserJobs, getQueueStatus, cancelJob } from '../services/api'; + +const QueueManager = ({ userEmail }) => { + const [jobs, setJobs] = useState([]); + const [queueStatus, setQueueStatus] = useState({}); + const [loading, setLoading] = useState(true); + const [error, setError] = useState(''); + + const fetchUserJobs = async () => { + try { + console.log('Fetching user jobs for:', userEmail); + const data = await getUserJobs(userEmail); + console.log('User jobs data:', data); + setJobs(data.jobs || []); + } catch (err) { + console.error('fetchUserJobs error:', err); + setError(`Failed to load jobs: ${err.message}`); + } + }; + + const fetchQueueStatusData = async () => { + try { + console.log('Fetching queue status'); + const data = await getQueueStatus(); + console.log('Queue status data:', data); + setQueueStatus(data); + } catch (err) { + console.error('fetchQueueStatus error:', err); + } + }; + + const handleCancelJob = async (jobId) => { + try { + await cancelJob(jobId); + await fetchUserJobs(); // Refresh the list + } catch (err) { + setError(`Failed to cancel job: ${err.message}`); + } + }; + + useEffect(() => { + const loadData = async () => { + setLoading(true); + await Promise.all([fetchUserJobs(), fetchQueueStatusData()]); + setLoading(false); + }; + + loadData(); + + // Poll for updates every 2 seconds + const interval = setInterval(() => { + fetchUserJobs(); + fetchQueueStatusData(); + }, 2000); + + return () => clearInterval(interval); + }, [userEmail]); + + const getStatusIcon = (status) => { + if (status === JOB_STATUS.COMPLETED) return ; + if (status === JOB_STATUS.FAILED) return ; + if (status === JOB_STATUS.CANCELLED) return ; + if (status === JOB_STATUS.QUEUED) return ; + if (status.startsWith('retry_')) return ; + return ; + }; + + const getStatusColor = (status) => { + if (status === JOB_STATUS.COMPLETED) return 'success'; + if (status === JOB_STATUS.FAILED) return 'error'; + if (status === JOB_STATUS.CANCELLED) return 'default'; + if (status === JOB_STATUS.QUEUED) return 'info'; + if (status.startsWith('retry_')) return 'warning'; + return 'primary'; + }; + + const getProgressValue = (job) => { + if (job.status === JOB_STATUS.COMPLETED) return 100; + if (job.status === JOB_STATUS.FAILED || job.status === JOB_STATUS.CANCELLED) return 0; + return job.progress || 0; + }; + + const renderDownloadButtons = (job) => { + if (job.status !== JOB_STATUS.COMPLETED) return null; + + const videoCount = job.video_count || 1; + const buttons = []; + + // Main download button (zip for multiple videos, single video for one) + buttons.push( + + ); + + // Individual download buttons for multiple videos + if (videoCount > 1) { + for (let i = 1; i <= videoCount; i++) { + buttons.push( + + ); + } + } + + return {buttons}; + }; + + if (loading) { + return ( + + Job Queue + + + ); + } + + return ( + + + Job Queue + + { fetchUserJobs(); fetchQueueStatusData(); }}> + + + + + + {/* Queue Status Summary */} + {queueStatus.queue_length !== undefined && ( + + Queue: {queueStatus.queue_length} waiting • + Processing: {queueStatus.processing_jobs}/{queueStatus.concurrent_limit} • + Your jobs: {jobs.length}/4 + + )} + + {error && ( + setError('')}> + {error} + + )} + + {jobs.length === 0 ? ( + + No jobs found. Submit a video generation request to see it here. + + ) : ( + + {jobs.map((job, index) => ( + + + + + {getStatusIcon(job.status)} + + {job.videos_requested} video{job.videos_requested > 1 ? 's' : ''} + + + {job.status === JOB_STATUS.QUEUED && job.queue_position > 0 && ( + + )} + + + + {job.can_cancel && job.status === JOB_STATUS.QUEUED && ( + + )} + + + + + {job.prompt.length > 100 ? `${job.prompt.substring(0, 100)}...` : job.prompt} + + + + {job.message || 'Processing...'} + + + {/* Progress Bar */} + + + {/* Download Buttons */} + {renderDownloadButtons(job)} + + {/* Job Details */} + + + + Created: {new Date(job.created_at).toLocaleString()} + + + {job.started_at && ( + + + Started: {new Date(job.started_at).toLocaleString()} + + + )} + + + Model: {job.model_name?.includes('fast') ? 'Fast' : 'Standard'} + + + + + Length: {job.video_length_sec}s + + + + + {/* Retry Information */} + {job.retry_count > 0 && ( + + Retry attempt {job.retry_count} of {job.max_retries} +
+ {job.status.includes('retry') && job.message && ( + {job.message} + )} + {job.error && job.error.includes('timeout') && ( +
+ Note: This is likely due to network connectivity issues. The system will automatically retry with longer timeouts. +
+ )} +
+ )} + + {/* Final Failure Information */} + {job.final_failure && ( + + Failed after {job.retry_count} retries. {job.error} + + )} +
+ {index < jobs.length - 1 && } +
+ ))} +
+ )} +
+ ); +}; + +export default QueueManager; \ No newline at end of file diff --git a/frontend/src/components/VideoForm.jsx b/frontend/src/components/VideoForm.jsx index a8bd7af..d1a3151 100644 --- a/frontend/src/components/VideoForm.jsx +++ b/frontend/src/components/VideoForm.jsx @@ -130,8 +130,8 @@ const VideoForm = ({ onSubmit, isGenerating }) => { newErrors.seed = 'Seed must be a number between 0 and 4294967295'; } - if (formData.sampleCount < 1 || formData.sampleCount > 2) { - newErrors.sampleCount = 'Sample count must be between 1 and 2'; + if (formData.sampleCount < 1 || formData.sampleCount > 10) { + newErrors.sampleCount = 'Sample count must be between 1 and 10'; } @@ -207,6 +207,8 @@ const VideoForm = ({ onSubmit, isGenerating }) => { Supported: JPG, PNG • Max: 10MB • Min: 720p +
+ Tip: Smaller images (under 2MB) upload faster and are less likely to timeout
) : ( diff --git a/frontend/src/components/VideoGenerator.jsx b/frontend/src/components/VideoGenerator.jsx index e8bc919..0177996 100644 --- a/frontend/src/components/VideoGenerator.jsx +++ b/frontend/src/components/VideoGenerator.jsx @@ -1,8 +1,9 @@ import React from 'react'; -import { Box } from '@mui/material'; +import { Box, Grid } from '@mui/material'; import { useIsAuthenticated, useMsal } from '@azure/msal-react'; import VideoForm from './VideoForm'; import ProgressIndicator from './ProgressIndicator'; +import QueueManager from './QueueManager'; import { useVideoGeneration } from '../hooks/useVideoGeneration'; import { useMockIsAuthenticated, useMockMsal } from './DevAuthWrapper'; @@ -43,20 +44,32 @@ const VideoGenerator = () => { return ( - - - + + {/* Left Column - Video Form */} + + + + + + + + + {/* Right Column - Queue Manager */} + + + + ); }; diff --git a/frontend/src/services/api.js b/frontend/src/services/api.js index 31b28fd..a48f4ce 100644 --- a/frontend/src/services/api.js +++ b/frontend/src/services/api.js @@ -72,14 +72,34 @@ export const downloadVideo = async (jobId) => { responseType: 'blob', }); + // Get the filename from Content-Disposition header or determine from content type + let filename = `generated_video_${jobId}.mp4`; + let mimeType = 'video/mp4'; + + const contentDisposition = response.headers['content-disposition']; + if (contentDisposition) { + const filenameMatch = contentDisposition.match(/filename="?(.+)"?/); + if (filenameMatch) { + filename = filenameMatch[1]; + } + } + + const contentType = response.headers['content-type']; + if (contentType) { + mimeType = contentType; + if (contentType.includes('application/zip')) { + filename = filename.replace('.mp4', '.zip'); + } + } + // Create blob URL for download - const blob = new Blob([response.data], { type: 'video/mp4' }); + const blob = new Blob([response.data], { type: mimeType }); const url = window.URL.createObjectURL(blob); // Create download link const link = document.createElement('a'); link.href = url; - link.download = `generated_video_${jobId}.mp4`; + link.download = filename; document.body.appendChild(link); link.click(); @@ -98,4 +118,42 @@ export const cleanupJob = async (jobId) => { export const healthCheck = async () => { const response = await apiClient.get('/health'); return response.data; +}; + +export const getUserJobs = async (userEmail) => { + const response = await apiClient.get(`/api/user-jobs?user_email=${encodeURIComponent(userEmail)}`); + return response.data; +}; + +export const getQueueStatus = async () => { + const response = await apiClient.get('/api/queue-status'); + return response.data; +}; + +export const cancelJob = async (jobId) => { + const response = await apiClient.delete(`/api/cancel/${jobId}`); + return response.data; +}; + +export const downloadIndividualVideo = async (jobId, videoIndex) => { + const response = await apiClient.get(`/api/download/${jobId}/video/${videoIndex}`, { + responseType: 'blob', + }); + + // Create blob URL for download + const blob = new Blob([response.data], { type: 'video/mp4' }); + const url = window.URL.createObjectURL(blob); + + // Create download link + const link = document.createElement('a'); + link.href = url; + link.download = `video_${videoIndex}_${jobId}.mp4`; + document.body.appendChild(link); + link.click(); + + // Cleanup + document.body.removeChild(link); + window.URL.revokeObjectURL(url); + + return true; }; \ No newline at end of file diff --git a/frontend/src/utils/constants.js b/frontend/src/utils/constants.js index 26f99fd..e102703 100644 --- a/frontend/src/utils/constants.js +++ b/frontend/src/utils/constants.js @@ -34,12 +34,21 @@ export const VIDEO_GENERATION_OPTIONS = { { value: 8, label: '8 seconds' } ], sampleCounts: [ - { value: 1, label: '1' }, - { value: 2, label: '2' } + { value: 1, label: '1 video' }, + { value: 2, label: '2 videos' }, + { value: 3, label: '3 videos' }, + { value: 4, label: '4 videos' }, + { value: 5, label: '5 videos' }, + { value: 6, label: '6 videos' }, + { value: 7, label: '7 videos' }, + { value: 8, label: '8 videos' }, + { value: 9, label: '9 videos' }, + { value: 10, label: '10 videos' } ] }; export const JOB_STATUS = { + QUEUED: 'queued', STARTING: 'starting', UPLOADING_IMAGE: 'uploading_image', GENERATING: 'generating', @@ -47,7 +56,11 @@ export const JOB_STATUS = { DOWNLOADING: 'downloading', COMPLETED: 'completed', FAILED: 'failed', - NOT_FOUND: 'not_found' + NOT_FOUND: 'not_found', + CANCELLED: 'cancelled', + RETRY_1_OF_3: 'retry_1_of_3', + RETRY_2_OF_3: 'retry_2_of_3', + RETRY_3_OF_3: 'retry_3_of_3' }; export const IMAGE_UPLOAD_CONFIG = {