feature of multi-processes

This commit is contained in:
Manish Tanwar 2025-10-08 19:28:02 +05:30
parent bd76e4ef67
commit d9fcfaa01d
9 changed files with 176 additions and 47 deletions

View file

@ -11,4 +11,10 @@ FLASK_DEBUG=True
PORT=7394
# CORS Configuration - Allow localhost for development
FRONTEND_URL=http://localhost:3000
FRONTEND_URL=http://localhost:3000
# File Cleanup Configuration (optimized for development - faster cleanup)
CLEANUP_DELAY_SMALL_FILES=10
CLEANUP_DELAY_LARGE_FILES=20
LARGE_FILE_THRESHOLD=50
CLEANUP_GCS_BY_DEFAULT=false

View file

@ -55,4 +55,10 @@ class Config:
# Webhook Configuration
WEBHOOK_URL = os.getenv('WEBHOOK_URL', 'https://hook.us1.make.celonis.com/8ri1h8b2he4wudp2jku69mgcxumzxf3v')
WEBHOOK_ENABLED = os.getenv('WEBHOOK_ENABLED', 'true').lower() in ['true', '1', 'yes']
WEBHOOK_TIMEOUT = int(os.getenv('WEBHOOK_TIMEOUT', 10)) # seconds
WEBHOOK_TIMEOUT = int(os.getenv('WEBHOOK_TIMEOUT', 10)) # seconds
# File Cleanup Configuration
CLEANUP_DELAY_SMALL_FILES = int(os.getenv('CLEANUP_DELAY_SMALL_FILES', 15)) # seconds
CLEANUP_DELAY_LARGE_FILES = int(os.getenv('CLEANUP_DELAY_LARGE_FILES', 30)) # seconds
LARGE_FILE_THRESHOLD = int(os.getenv('LARGE_FILE_THRESHOLD', 100)) * 1024 * 1024 # MB to bytes
CLEANUP_GCS_BY_DEFAULT = os.getenv('CLEANUP_GCS_BY_DEFAULT', 'true').lower() in ['true', '1', 'yes']

View file

@ -286,14 +286,30 @@ def download_video(job_id):
if not video_path or not os.path.exists(video_path):
return jsonify({'error': f'Video file not found at path: {video_path}'}), 404
# Re-enable cleanup with longer delay for zip files to ensure download completes
# Smart cleanup: delay based on file size and user configuration
def cleanup_after_send():
time.sleep(5) # Additional delay to ensure send_file completes
cleanup_job_files(job_id)
# Brief delay to ensure download starts
time.sleep(2)
# Use configurable cleanup (GCS cleanup can be disabled for faster cleanup)
cleanup_job_files(job_id, cleanup_gcs=Config.CLEANUP_GCS_BY_DEFAULT)
cleanup_thread = threading.Timer(60.0, cleanup_after_send) # 60 second delay
# Determine cleanup delay based on file size
file_size = 0
if os.path.exists(video_path):
file_size = os.path.getsize(video_path)
if file_size > Config.LARGE_FILE_THRESHOLD:
cleanup_delay = Config.CLEANUP_DELAY_LARGE_FILES
else:
cleanup_delay = Config.CLEANUP_DELAY_SMALL_FILES
cleanup_thread = threading.Timer(cleanup_delay, cleanup_after_send)
cleanup_thread.start()
cleanup_type = "full (local + GCS)" if Config.CLEANUP_GCS_BY_DEFAULT else "local only"
print(f"Scheduled {cleanup_type} cleanup in {cleanup_delay}s for job {job_id} (file size: {file_size / (1024*1024):.1f}MB)")
# Determine download type and filename
is_zip = job.get('is_zip', False)
download_type = job.get('download_type', 'zip')
@ -400,9 +416,9 @@ def download_all_videos(job_id):
@api_bp.route('/cleanup/<job_id>', methods=['DELETE'])
def cleanup_job(job_id):
"""Clean up job files manually."""
"""Clean up job files manually (full cleanup including GCS)."""
try:
success = cleanup_job_files(job_id)
success = cleanup_job_files(job_id, cleanup_gcs=True)
if success:
return jsonify({'message': 'Files cleaned up successfully'}), 200
else:
@ -410,6 +426,18 @@ def cleanup_job(job_id):
except Exception as e:
return jsonify({'error': f'Failed to cleanup: {str(e)}'}), 500
@api_bp.route('/cleanup/<job_id>/local', methods=['DELETE'])
def cleanup_job_local_only(job_id):
"""Fast cleanup of local files only (keeps GCS files for potential re-download)."""
try:
success = cleanup_job_files(job_id, cleanup_gcs=False)
if success:
return jsonify({'message': 'Local files cleaned up successfully (GCS files preserved)'}), 200
else:
return jsonify({'error': 'Failed to clean up some local files'}), 500
except Exception as e:
return jsonify({'error': f'Failed to cleanup local files: {str(e)}'}), 500
@api_bp.route('/user-jobs', methods=['GET'])
def get_user_job_list():
"""Get all jobs for the current user."""

View file

@ -841,14 +841,21 @@ def get_queue_status() -> dict:
'active_jobs': processing_jobs.copy()
}
def cleanup_job_files(job_id: str) -> bool:
"""Clean up local and GCS files for a job."""
def cleanup_job_files(job_id: str, cleanup_gcs: bool = True) -> bool:
"""
Clean up local and optionally GCS files for a job.
Args:
job_id: The job ID to clean up
cleanup_gcs: Whether to also clean up GCS files (default: True)
"""
job = job_status.get(job_id)
if not job:
print(f"Job {job_id} not found for cleanup")
return False
success = True
start_time = time.time()
# Clean up entire job folder (more efficient than individual files)
download_folder = job.get('download_folder')
@ -881,30 +888,34 @@ def cleanup_job_files(job_id: str) -> bool:
print(f"Error deleting individual video file: {e}")
success = False
# Delete GCS video files (for backwards compatibility, keep the old logic)
if job.get('source_bucket_name') and job.get('source_blob_name'):
try:
delete_blob(job['source_bucket_name'], job['source_blob_name'])
except Exception as e:
print(f"Error deleting GCS file: {e}")
success = False
# Delete multiple GCS video files
if job.get('gcs_video_uris'):
for gcs_uri in job['gcs_video_uris']:
# Delete GCS video files only if requested (can be slow)
if cleanup_gcs:
# Delete GCS video files (for backwards compatibility, keep the old logic)
if job.get('source_bucket_name') and job.get('source_blob_name'):
try:
if gcs_uri.startswith("gs://"):
path_parts = gcs_uri.replace("gs://", "").split("/", 1)
bucket_name = path_parts[0]
blob_name = path_parts[1]
delete_blob(bucket_name, blob_name)
print(f"Deleted GCS file: {gcs_uri}")
delete_blob(job['source_bucket_name'], job['source_blob_name'])
except Exception as e:
print(f"Error deleting GCS file {gcs_uri}: {e}")
print(f"Error deleting GCS file: {e}")
success = False
# Delete multiple GCS video files
if job.get('gcs_video_uris'):
for gcs_uri in job['gcs_video_uris']:
try:
if gcs_uri.startswith("gs://"):
path_parts = gcs_uri.replace("gs://", "").split("/", 1)
bucket_name = path_parts[0]
blob_name = path_parts[1]
delete_blob(bucket_name, blob_name)
print(f"Deleted GCS file: {gcs_uri}")
except Exception as e:
print(f"Error deleting GCS file {gcs_uri}: {e}")
success = False
else:
print("Skipping GCS cleanup (local files only)")
# Delete image files
if job.get('local_image_path') or job.get('image_blob_name'):
# Delete image files (includes both local and GCS)
if cleanup_gcs and (job.get('local_image_path') or job.get('image_blob_name')):
try:
cleanup_image_files(
job_id,
@ -919,4 +930,9 @@ def cleanup_job_files(job_id: str) -> bool:
if success:
del job_status[job_id]
# Log cleanup completion time
cleanup_time = time.time() - start_time
cleanup_type = "full (local + GCS)" if cleanup_gcs else "local only"
print(f"Cleanup completed for job {job_id} in {cleanup_time:.2f}s ({cleanup_type})")
return success

View file

@ -96,19 +96,43 @@ const ProgressIndicator = ({
)}
{status === JOB_STATUS.COMPLETED && (
<Box sx={{ display: 'flex', alignItems: 'center', gap: 2 }}>
<CheckCircleRounded color="success" />
<Typography variant="body1" sx={{ flexGrow: 1 }}>
Your content is ready for download!
</Typography>
<Button
variant="contained"
startIcon={<DownloadRounded />}
onClick={onDownload}
color="success"
>
Download
</Button>
<Box sx={{ mb: 2 }}>
<Box sx={{ display: 'flex', alignItems: 'center', gap: 2, mb: 2 }}>
<CheckCircleRounded color="success" />
<Typography variant="body1" sx={{ flexGrow: 1 }}>
Your content is ready for download!
</Typography>
<Button
variant="contained"
startIcon={<DownloadRounded />}
onClick={onDownload}
color="success"
>
Download
</Button>
</Box>
{/* Show additional download links if multiple videos */}
{downloadLinks && downloadLinks.length > 1 && (
<Box>
<Typography variant="subtitle2" sx={{ mb: 1 }}>
Individual Downloads:
</Typography>
<Box sx={{ display: 'flex', gap: 1, flexWrap: 'wrap' }}>
{downloadLinks.slice(1).map((link, index) => (
<Button
key={index}
variant="outlined"
size="small"
href={link.url}
download
>
{link.label}
</Button>
))}
</Box>
</Box>
)}
</Box>
)}
</Paper>

View file

@ -1,6 +1,6 @@
import { useState } from 'react';
import { generateVideo, checkJobStatus, downloadVideo } from '../services/api';
import { JOB_STATUS } from '../utils/constants';
import { JOB_STATUS, API_BASE_URL } from '../utils/constants';
export const useVideoGeneration = () => {
const [isGenerating, setIsGenerating] = useState(false);
@ -35,13 +35,37 @@ export const useVideoGeneration = () => {
try {
const statusResponse = await checkJobStatus(jobId);
console.log('Status update:', statusResponse);
setStatus(statusResponse.status);
setProgress(statusResponse.progress || 0);
setMessage(statusResponse.message || '');
if (statusResponse.status === JOB_STATUS.COMPLETED) {
setIsGenerating(false);
setDownloadLinks(statusResponse.download_links || []);
// Build download links from the actual backend response structure
const links = [];
if (statusResponse.video_path) {
links.push({
url: `${API_BASE_URL}/api/download/${jobId}`,
label: statusResponse.video_count > 1 ? 'Download All' : 'Download Video',
type: statusResponse.download_type || 'video'
});
}
// Add individual video download links
if (statusResponse.video_count > 1) {
for (let i = 1; i <= statusResponse.video_count; i++) {
links.push({
url: `${API_BASE_URL}/api/download/${jobId}/video/${i}`,
label: `Video ${i}`,
type: 'video'
});
}
}
setDownloadLinks(links);
return;
}
@ -51,9 +75,34 @@ export const useVideoGeneration = () => {
return;
}
if (statusResponse.status === JOB_STATUS.CANCELLED) {
setError('Job was cancelled');
setIsGenerating(false);
return;
}
// Continue polling if still in progress
if ([JOB_STATUS.STARTING, JOB_STATUS.GENERATING, JOB_STATUS.PROCESSING, JOB_STATUS.DOWNLOADING].includes(statusResponse.status)) {
setTimeout(() => pollStatus(jobId), 3000); // Poll every 3 seconds
const activeStatuses = [
JOB_STATUS.QUEUED,
JOB_STATUS.STARTING,
JOB_STATUS.UPLOADING_IMAGE,
JOB_STATUS.GENERATING,
JOB_STATUS.PROCESSING,
JOB_STATUS.DOWNLOADING,
JOB_STATUS.RETRY_1_OF_3,
JOB_STATUS.RETRY_2_OF_3,
JOB_STATUS.RETRY_3_OF_3
];
if (activeStatuses.includes(statusResponse.status)) {
console.log(`Continuing to poll. Status: ${statusResponse.status}, Progress: ${statusResponse.progress}%`);
setTimeout(() => pollStatus(jobId), 2000); // Poll every 2 seconds for better responsiveness
} else {
// Handle unexpected status by stopping polling and showing error
console.error('Unknown status received:', statusResponse.status, 'Full response:', statusResponse);
console.error('Active statuses:', activeStatuses);
setError(`Unknown status: ${statusResponse.status}`);
setIsGenerating(false);
}
} catch (err) {
setError(err.response?.data?.error || err.message || 'Failed to check status');