import os import time import uuid import tempfile from PIL import Image from google.cloud import storage from utils.auth import get_google_credentials from config import Config def download_blob(bucket_name: str, source_blob_name: str, destination_file_name: str): """Downloads a blob from the bucket.""" credentials = get_google_credentials() if credentials: storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials) else: storage_client = storage.Client(project=Config.PROJECT_ID) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(source_blob_name) # Create local directory if it doesn't exist os.makedirs(os.path.dirname(destination_file_name), exist_ok=True) try: blob.download_to_filename(destination_file_name, timeout=600) # 10 minute timeout for video downloads print(f"Video downloaded from gs://{bucket_name}/{source_blob_name} to {destination_file_name}") return destination_file_name except Exception as e: print(f"Error downloading video: {e}") raise def delete_blob(bucket_name: str, blob_name: str): """Deletes a blob from the bucket.""" credentials = get_google_credentials() if credentials: storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials) else: storage_client = storage.Client(project=Config.PROJECT_ID) bucket = storage_client.bucket(bucket_name) blob = bucket.blob(blob_name) try: blob.delete() print(f"Blob {blob_name} deleted from bucket {bucket_name}") return True except Exception as e: print(f"Error deleting blob: {e}") return False def validate_and_convert_image(image_path: str) -> dict: """ Validate image and convert to JPEG format for Veo 3 API compatibility. Supports all formats that Pillow can open and converts them to JPEG. Automatically converts unsupported aspect ratios to 16:9 or 9:16 using smart cropping. Returns dict with validation result, converted image path, and image info. """ try: print(f"DEBUG: validate_and_convert_image called with: {image_path}") print(f"DEBUG: Image file exists: {os.path.exists(image_path)}") print(f"DEBUG: Current working directory: {os.getcwd()}") print(f"DEBUG: TEMP_DOWNLOAD_PATH: {Config.TEMP_DOWNLOAD_PATH}") with Image.open(image_path) as img: # Check file size file_size = os.path.getsize(image_path) if file_size > Config.MAX_IMAGE_SIZE: return { 'valid': False, 'error': f'Image too large: {file_size} bytes. Maximum: {Config.MAX_IMAGE_SIZE} bytes' } # Get original dimensions original_width, original_height = img.size original_aspect_ratio = original_width / original_height # Check if image meets minimum resolution requirements before cropping min_width, min_height = Config.MIN_IMAGE_RESOLUTION if original_width < min_width or original_height < min_height: return { 'valid': False, 'error': f'Image resolution too low: {original_width}x{original_height}. Minimum: {min_width}x{min_height}' } # Determine target aspect ratio and crop if necessary target_16_9 = 16/9 # 1.778 target_9_16 = 9/16 # 0.5625 # Check if already close to supported ratios (within 5% tolerance) if abs(original_aspect_ratio - target_16_9) < 0.09: # Already close to 16:9 target_ratio = target_16_9 target_ratio_name = '16:9' processed_img = img.copy() elif abs(original_aspect_ratio - target_9_16) < 0.03: # Already close to 9:16 target_ratio = target_9_16 target_ratio_name = '9:16' processed_img = img.copy() else: # Need to crop to a supported aspect ratio # Choose the ratio that requires the least cropping crop_loss_16_9 = abs(original_aspect_ratio - target_16_9) crop_loss_9_16 = abs(original_aspect_ratio - target_9_16) if crop_loss_16_9 < crop_loss_9_16: target_ratio = target_16_9 target_ratio_name = '16:9' else: target_ratio = target_9_16 target_ratio_name = '9:16' # Calculate crop dimensions if original_aspect_ratio > target_ratio: # Image is wider than target, crop width new_width = int(original_height * target_ratio) new_height = original_height left = (original_width - new_width) // 2 top = 0 else: # Image is taller than target, crop height new_width = original_width new_height = int(original_width / target_ratio) left = 0 top = (original_height - new_height) // 2 # Perform center crop crop_box = (left, top, left + new_width, top + new_height) processed_img = img.crop(crop_box) print(f"Converted aspect ratio from {original_aspect_ratio:.2f} to {target_ratio_name} by cropping from {original_width}x{original_height} to {new_width}x{new_height}") # Verify final dimensions meet minimum requirements final_width, final_height = processed_img.size if final_width < min_width or final_height < min_height: return { 'valid': False, 'error': f'After aspect ratio conversion, image resolution is too low: {final_width}x{final_height}. Minimum: {min_width}x{min_height}. Try using a higher resolution image.' } # Convert to RGB if necessary (for JPEG compatibility) if processed_img.mode in ('RGBA', 'LA', 'P'): # Create white background for transparent images rgb_img = Image.new('RGB', processed_img.size, (255, 255, 255)) if processed_img.mode == 'P': processed_img = processed_img.convert('RGBA') rgb_img.paste(processed_img, mask=processed_img.split()[-1] if processed_img.mode in ('RGBA', 'LA') else None) processed_img = rgb_img elif processed_img.mode != 'RGB': processed_img = processed_img.convert('RGB') # Create temporary file for converted image in our configured temp directory print(f"DEBUG: About to create temp directory: {Config.TEMP_DOWNLOAD_PATH}") os.makedirs(Config.TEMP_DOWNLOAD_PATH, exist_ok=True) print(f"DEBUG: Temp directory created successfully") temp_filename = f"veo_img_{int(time.time())}_{os.getpid()}.jpg" temp_path = os.path.join(Config.TEMP_DOWNLOAD_PATH, temp_filename) print(f"DEBUG: Will save converted image to: {temp_path}") # Save as JPEG with high quality processed_img.save(temp_path, 'JPEG', quality=95, optimize=True) return { 'valid': True, 'original_format': img.format if hasattr(img, 'format') else 'Unknown', 'converted_path': temp_path, 'size': (final_width, final_height), 'original_size': (original_width, original_height), 'aspect_ratio': target_ratio_name, 'file_size': os.path.getsize(temp_path), 'mime_type': 'image/jpeg', 'was_cropped': target_ratio_name != f"{original_width}:{original_height}" and abs(original_aspect_ratio - target_ratio) > 0.01 } except Exception as e: return { 'valid': False, 'error': f'Failed to process image: {str(e)}' } def validate_image(image_path: str) -> dict: """ Legacy function for backward compatibility. Use validate_and_convert_image for new code. """ result = validate_and_convert_image(image_path) if result['valid'] and 'converted_path' in result: # Clean up temporary file for legacy calls try: os.remove(result['converted_path']) except: pass return {k: v for k, v in result.items() if k != 'converted_path'} def upload_image_to_gcs(image_path: str, job_id: str) -> str: """ Upload image to GCS after validating and converting to JPEG format. Returns the GCS URI of the uploaded image. """ # Validate and convert image validation_result = validate_and_convert_image(image_path) if not validation_result['valid']: raise ValueError(f"Image validation failed: {validation_result['error']}") converted_image_path = validation_result['converted_path'] try: credentials = get_google_credentials() if credentials: storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials) else: storage_client = storage.Client(project=Config.PROJECT_ID) bucket = storage_client.bucket(Config.OUTPUT_GCS_BUCKET_NAME) # Create unique blob name with .jpg extension timestamp = int(time.time()) blob_name = f"{Config.TEMP_IMAGE_GCS_PATH}/{job_id}_{timestamp}.jpg" blob = bucket.blob(blob_name) # Upload converted image with timeout blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout # Set content type to JPEG blob.content_type = 'image/jpeg' blob.patch() gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/{blob_name}" print(f"Image converted from {validation_result['original_format']} and uploaded to: {gcs_uri}") return gcs_uri except Exception as e: print(f"Error uploading image to GCS: {e}") raise finally: # Clean up temporary converted file try: os.remove(converted_image_path) except Exception as cleanup_error: print(f"Warning: Could not clean up temporary file {converted_image_path}: {cleanup_error}") def upload_reference_images_to_gcs(image_paths: list, job_id: str) -> list: """ Upload multiple reference images to GCS after validating and converting to JPEG format. Returns a list of GCS URIs for the uploaded images. Args: image_paths: List of local image file paths (max 3) job_id: Job ID for unique naming Returns: List of GCS URIs """ if not image_paths: return [] if len(image_paths) > 3: raise ValueError("Maximum 3 reference images allowed") gcs_uris = [] converted_files = [] try: credentials = get_google_credentials() if credentials: storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials) else: storage_client = storage.Client(project=Config.PROJECT_ID) bucket = storage_client.bucket(Config.OUTPUT_GCS_BUCKET_NAME) for idx, image_path in enumerate(image_paths): # Validate and convert image validation_result = validate_and_convert_image(image_path) if not validation_result['valid']: raise ValueError(f"Reference image {idx+1} validation failed: {validation_result['error']}") converted_image_path = validation_result['converted_path'] converted_files.append(converted_image_path) # Create unique blob name with index timestamp = int(time.time()) blob_name = f"{Config.TEMP_IMAGE_GCS_PATH}/{job_id}_ref{idx+1}_{timestamp}.jpg" blob = bucket.blob(blob_name) # Upload converted image with timeout blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout # Set content type to JPEG blob.content_type = 'image/jpeg' blob.patch() gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/{blob_name}" gcs_uris.append(gcs_uri) print(f"Reference image {idx+1} converted from {validation_result['original_format']} and uploaded to: {gcs_uri}") return gcs_uris except Exception as e: print(f"Error uploading reference images to GCS: {e}") raise finally: # Clean up temporary converted files for converted_path in converted_files: try: if os.path.exists(converted_path): os.remove(converted_path) except Exception as cleanup_error: print(f"Warning: Could not clean up temporary file {converted_path}: {cleanup_error}") def cleanup_image_files(job_id: str, local_path: str = None, gcs_blob_name: str = None) -> bool: """ Clean up image files from local storage and GCS. """ success = True # Delete local file if local_path and os.path.exists(local_path): try: os.remove(local_path) print(f"Deleted local image: {local_path}") except Exception as e: print(f"Error deleting local image: {e}") success = False # Delete GCS file if gcs_blob_name: try: delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, gcs_blob_name) except Exception as e: print(f"Error deleting GCS image: {e}") success = False return success def cleanup_multiple_images(job_id: str, local_paths: list = None, gcs_blob_names: list = None) -> bool: """ Clean up multiple image files from local storage and GCS. Args: job_id: Job ID for logging local_paths: List of local file paths to delete gcs_blob_names: List of GCS blob names to delete Returns: True if all cleanups succeeded, False otherwise """ success = True # Delete local files if local_paths: for local_path in local_paths: if local_path and os.path.exists(local_path): try: os.remove(local_path) print(f"Deleted local image: {local_path}") except Exception as e: print(f"Error deleting local image: {e}") success = False # Delete GCS files if gcs_blob_names: for gcs_blob_name in gcs_blob_names: if gcs_blob_name: try: delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, gcs_blob_name) except Exception as e: print(f"Error deleting GCS image: {e}") success = False return success