veo3/backend/utils/storage.py
2025-11-04 02:31:40 +05:30

377 lines
No EOL
15 KiB
Python

import os
import time
import uuid
import tempfile
from PIL import Image
from google.cloud import storage
from utils.auth import get_google_credentials
from config import Config
def download_blob(bucket_name: str, source_blob_name: str, destination_file_name: str):
"""Downloads a blob from the bucket."""
credentials = get_google_credentials()
if credentials:
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
else:
storage_client = storage.Client(project=Config.PROJECT_ID)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(source_blob_name)
# Create local directory if it doesn't exist
os.makedirs(os.path.dirname(destination_file_name), exist_ok=True)
try:
blob.download_to_filename(destination_file_name, timeout=600) # 10 minute timeout for video downloads
print(f"Video downloaded from gs://{bucket_name}/{source_blob_name} to {destination_file_name}")
return destination_file_name
except Exception as e:
print(f"Error downloading video: {e}")
raise
def delete_blob(bucket_name: str, blob_name: str):
"""Deletes a blob from the bucket."""
credentials = get_google_credentials()
if credentials:
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
else:
storage_client = storage.Client(project=Config.PROJECT_ID)
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
try:
blob.delete()
print(f"Blob {blob_name} deleted from bucket {bucket_name}")
return True
except Exception as e:
print(f"Error deleting blob: {e}")
return False
def validate_and_convert_image(image_path: str) -> dict:
"""
Validate image and convert to JPEG format for Veo 3 API compatibility.
Supports all formats that Pillow can open and converts them to JPEG.
Automatically converts unsupported aspect ratios to 16:9 or 9:16 using smart cropping.
Returns dict with validation result, converted image path, and image info.
"""
try:
print(f"DEBUG: validate_and_convert_image called with: {image_path}")
print(f"DEBUG: Image file exists: {os.path.exists(image_path)}")
print(f"DEBUG: Current working directory: {os.getcwd()}")
print(f"DEBUG: TEMP_DOWNLOAD_PATH: {Config.TEMP_DOWNLOAD_PATH}")
with Image.open(image_path) as img:
# Check file size
file_size = os.path.getsize(image_path)
if file_size > Config.MAX_IMAGE_SIZE:
return {
'valid': False,
'error': f'Image too large: {file_size} bytes. Maximum: {Config.MAX_IMAGE_SIZE} bytes'
}
# Get original dimensions
original_width, original_height = img.size
original_aspect_ratio = original_width / original_height
# Check if image meets minimum resolution requirements before cropping
min_width, min_height = Config.MIN_IMAGE_RESOLUTION
if original_width < min_width or original_height < min_height:
return {
'valid': False,
'error': f'Image resolution too low: {original_width}x{original_height}. Minimum: {min_width}x{min_height}'
}
# Determine target aspect ratio and crop if necessary
target_16_9 = 16/9 # 1.778
target_9_16 = 9/16 # 0.5625
# Check if already close to supported ratios (within 5% tolerance)
if abs(original_aspect_ratio - target_16_9) < 0.09:
# Already close to 16:9
target_ratio = target_16_9
target_ratio_name = '16:9'
processed_img = img.copy()
elif abs(original_aspect_ratio - target_9_16) < 0.03:
# Already close to 9:16
target_ratio = target_9_16
target_ratio_name = '9:16'
processed_img = img.copy()
else:
# Need to crop to a supported aspect ratio
# Choose the ratio that requires the least cropping
crop_loss_16_9 = abs(original_aspect_ratio - target_16_9)
crop_loss_9_16 = abs(original_aspect_ratio - target_9_16)
if crop_loss_16_9 < crop_loss_9_16:
target_ratio = target_16_9
target_ratio_name = '16:9'
else:
target_ratio = target_9_16
target_ratio_name = '9:16'
# Calculate crop dimensions
if original_aspect_ratio > target_ratio:
# Image is wider than target, crop width
new_width = int(original_height * target_ratio)
new_height = original_height
left = (original_width - new_width) // 2
top = 0
else:
# Image is taller than target, crop height
new_width = original_width
new_height = int(original_width / target_ratio)
left = 0
top = (original_height - new_height) // 2
# Perform center crop
crop_box = (left, top, left + new_width, top + new_height)
processed_img = img.crop(crop_box)
print(f"Converted aspect ratio from {original_aspect_ratio:.2f} to {target_ratio_name} by cropping from {original_width}x{original_height} to {new_width}x{new_height}")
# Verify final dimensions meet minimum requirements
final_width, final_height = processed_img.size
if final_width < min_width or final_height < min_height:
return {
'valid': False,
'error': f'After aspect ratio conversion, image resolution is too low: {final_width}x{final_height}. Minimum: {min_width}x{min_height}. Try using a higher resolution image.'
}
# Convert to RGB if necessary (for JPEG compatibility)
if processed_img.mode in ('RGBA', 'LA', 'P'):
# Create white background for transparent images
rgb_img = Image.new('RGB', processed_img.size, (255, 255, 255))
if processed_img.mode == 'P':
processed_img = processed_img.convert('RGBA')
rgb_img.paste(processed_img, mask=processed_img.split()[-1] if processed_img.mode in ('RGBA', 'LA') else None)
processed_img = rgb_img
elif processed_img.mode != 'RGB':
processed_img = processed_img.convert('RGB')
# Create temporary file for converted image in our configured temp directory
print(f"DEBUG: About to create temp directory: {Config.TEMP_DOWNLOAD_PATH}")
os.makedirs(Config.TEMP_DOWNLOAD_PATH, exist_ok=True)
print(f"DEBUG: Temp directory created successfully")
temp_filename = f"veo_img_{int(time.time())}_{os.getpid()}.jpg"
temp_path = os.path.join(Config.TEMP_DOWNLOAD_PATH, temp_filename)
print(f"DEBUG: Will save converted image to: {temp_path}")
# Save as JPEG with high quality
processed_img.save(temp_path, 'JPEG', quality=95, optimize=True)
return {
'valid': True,
'original_format': img.format if hasattr(img, 'format') else 'Unknown',
'converted_path': temp_path,
'size': (final_width, final_height),
'original_size': (original_width, original_height),
'aspect_ratio': target_ratio_name,
'file_size': os.path.getsize(temp_path),
'mime_type': 'image/jpeg',
'was_cropped': target_ratio_name != f"{original_width}:{original_height}" and abs(original_aspect_ratio - target_ratio) > 0.01
}
except Exception as e:
return {
'valid': False,
'error': f'Failed to process image: {str(e)}'
}
def validate_image(image_path: str) -> dict:
"""
Legacy function for backward compatibility.
Use validate_and_convert_image for new code.
"""
result = validate_and_convert_image(image_path)
if result['valid'] and 'converted_path' in result:
# Clean up temporary file for legacy calls
try:
os.remove(result['converted_path'])
except:
pass
return {k: v for k, v in result.items() if k != 'converted_path'}
def upload_image_to_gcs(image_path: str, job_id: str) -> str:
"""
Upload image to GCS after validating and converting to JPEG format.
Returns the GCS URI of the uploaded image.
"""
# Validate and convert image
validation_result = validate_and_convert_image(image_path)
if not validation_result['valid']:
raise ValueError(f"Image validation failed: {validation_result['error']}")
converted_image_path = validation_result['converted_path']
try:
credentials = get_google_credentials()
if credentials:
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
else:
storage_client = storage.Client(project=Config.PROJECT_ID)
bucket = storage_client.bucket(Config.OUTPUT_GCS_BUCKET_NAME)
# Create unique blob name with .jpg extension
timestamp = int(time.time())
blob_name = f"{Config.TEMP_IMAGE_GCS_PATH}/{job_id}_{timestamp}.jpg"
blob = bucket.blob(blob_name)
# Upload converted image with timeout
blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout
# Set content type to JPEG
blob.content_type = 'image/jpeg'
blob.patch()
gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/{blob_name}"
print(f"Image converted from {validation_result['original_format']} and uploaded to: {gcs_uri}")
return gcs_uri
except Exception as e:
print(f"Error uploading image to GCS: {e}")
raise
finally:
# Clean up temporary converted file
try:
os.remove(converted_image_path)
except Exception as cleanup_error:
print(f"Warning: Could not clean up temporary file {converted_image_path}: {cleanup_error}")
def upload_reference_images_to_gcs(image_paths: list, job_id: str) -> list:
"""
Upload multiple reference images to GCS after validating and converting to JPEG format.
Returns a list of GCS URIs for the uploaded images.
Args:
image_paths: List of local image file paths (max 3)
job_id: Job ID for unique naming
Returns:
List of GCS URIs
"""
if not image_paths:
return []
if len(image_paths) > 3:
raise ValueError("Maximum 3 reference images allowed")
gcs_uris = []
converted_files = []
try:
credentials = get_google_credentials()
if credentials:
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
else:
storage_client = storage.Client(project=Config.PROJECT_ID)
bucket = storage_client.bucket(Config.OUTPUT_GCS_BUCKET_NAME)
for idx, image_path in enumerate(image_paths):
# Validate and convert image
validation_result = validate_and_convert_image(image_path)
if not validation_result['valid']:
raise ValueError(f"Reference image {idx+1} validation failed: {validation_result['error']}")
converted_image_path = validation_result['converted_path']
converted_files.append(converted_image_path)
# Create unique blob name with index
timestamp = int(time.time())
blob_name = f"{Config.TEMP_IMAGE_GCS_PATH}/{job_id}_ref{idx+1}_{timestamp}.jpg"
blob = bucket.blob(blob_name)
# Upload converted image with timeout
blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout
# Set content type to JPEG
blob.content_type = 'image/jpeg'
blob.patch()
gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/{blob_name}"
gcs_uris.append(gcs_uri)
print(f"Reference image {idx+1} converted from {validation_result['original_format']} and uploaded to: {gcs_uri}")
return gcs_uris
except Exception as e:
print(f"Error uploading reference images to GCS: {e}")
raise
finally:
# Clean up temporary converted files
for converted_path in converted_files:
try:
if os.path.exists(converted_path):
os.remove(converted_path)
except Exception as cleanup_error:
print(f"Warning: Could not clean up temporary file {converted_path}: {cleanup_error}")
def cleanup_image_files(job_id: str, local_path: str = None, gcs_blob_name: str = None) -> bool:
"""
Clean up image files from local storage and GCS.
"""
success = True
# Delete local file
if local_path and os.path.exists(local_path):
try:
os.remove(local_path)
print(f"Deleted local image: {local_path}")
except Exception as e:
print(f"Error deleting local image: {e}")
success = False
# Delete GCS file
if gcs_blob_name:
try:
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, gcs_blob_name)
except Exception as e:
print(f"Error deleting GCS image: {e}")
success = False
return success
def cleanup_multiple_images(job_id: str, local_paths: list = None, gcs_blob_names: list = None) -> bool:
"""
Clean up multiple image files from local storage and GCS.
Args:
job_id: Job ID for logging
local_paths: List of local file paths to delete
gcs_blob_names: List of GCS blob names to delete
Returns:
True if all cleanups succeeded, False otherwise
"""
success = True
# Delete local files
if local_paths:
for local_path in local_paths:
if local_path and os.path.exists(local_path):
try:
os.remove(local_path)
print(f"Deleted local image: {local_path}")
except Exception as e:
print(f"Error deleting local image: {e}")
success = False
# Delete GCS files
if gcs_blob_names:
for gcs_blob_name in gcs_blob_names:
if gcs_blob_name:
try:
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, gcs_blob_name)
except Exception as e:
print(f"Error deleting GCS image: {e}")
success = False
return success