377 lines
No EOL
15 KiB
Python
377 lines
No EOL
15 KiB
Python
import os
|
|
import time
|
|
import uuid
|
|
import tempfile
|
|
from PIL import Image
|
|
from google.cloud import storage
|
|
from utils.auth import get_google_credentials
|
|
from config import Config
|
|
|
|
def download_blob(bucket_name: str, source_blob_name: str, destination_file_name: str):
|
|
"""Downloads a blob from the bucket."""
|
|
credentials = get_google_credentials()
|
|
|
|
if credentials:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
|
|
else:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID)
|
|
|
|
bucket = storage_client.bucket(bucket_name)
|
|
blob = bucket.blob(source_blob_name)
|
|
|
|
# Create local directory if it doesn't exist
|
|
os.makedirs(os.path.dirname(destination_file_name), exist_ok=True)
|
|
|
|
try:
|
|
blob.download_to_filename(destination_file_name, timeout=600) # 10 minute timeout for video downloads
|
|
print(f"Video downloaded from gs://{bucket_name}/{source_blob_name} to {destination_file_name}")
|
|
return destination_file_name
|
|
except Exception as e:
|
|
print(f"Error downloading video: {e}")
|
|
raise
|
|
|
|
def delete_blob(bucket_name: str, blob_name: str):
|
|
"""Deletes a blob from the bucket."""
|
|
credentials = get_google_credentials()
|
|
|
|
if credentials:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
|
|
else:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID)
|
|
|
|
bucket = storage_client.bucket(bucket_name)
|
|
blob = bucket.blob(blob_name)
|
|
|
|
try:
|
|
blob.delete()
|
|
print(f"Blob {blob_name} deleted from bucket {bucket_name}")
|
|
return True
|
|
except Exception as e:
|
|
print(f"Error deleting blob: {e}")
|
|
return False
|
|
|
|
def validate_and_convert_image(image_path: str) -> dict:
|
|
"""
|
|
Validate image and convert to JPEG format for Veo 3 API compatibility.
|
|
Supports all formats that Pillow can open and converts them to JPEG.
|
|
Automatically converts unsupported aspect ratios to 16:9 or 9:16 using smart cropping.
|
|
Returns dict with validation result, converted image path, and image info.
|
|
"""
|
|
try:
|
|
print(f"DEBUG: validate_and_convert_image called with: {image_path}")
|
|
print(f"DEBUG: Image file exists: {os.path.exists(image_path)}")
|
|
print(f"DEBUG: Current working directory: {os.getcwd()}")
|
|
print(f"DEBUG: TEMP_DOWNLOAD_PATH: {Config.TEMP_DOWNLOAD_PATH}")
|
|
|
|
with Image.open(image_path) as img:
|
|
# Check file size
|
|
file_size = os.path.getsize(image_path)
|
|
if file_size > Config.MAX_IMAGE_SIZE:
|
|
return {
|
|
'valid': False,
|
|
'error': f'Image too large: {file_size} bytes. Maximum: {Config.MAX_IMAGE_SIZE} bytes'
|
|
}
|
|
|
|
# Get original dimensions
|
|
original_width, original_height = img.size
|
|
original_aspect_ratio = original_width / original_height
|
|
|
|
# Check if image meets minimum resolution requirements before cropping
|
|
min_width, min_height = Config.MIN_IMAGE_RESOLUTION
|
|
if original_width < min_width or original_height < min_height:
|
|
return {
|
|
'valid': False,
|
|
'error': f'Image resolution too low: {original_width}x{original_height}. Minimum: {min_width}x{min_height}'
|
|
}
|
|
|
|
# Determine target aspect ratio and crop if necessary
|
|
target_16_9 = 16/9 # 1.778
|
|
target_9_16 = 9/16 # 0.5625
|
|
|
|
# Check if already close to supported ratios (within 5% tolerance)
|
|
if abs(original_aspect_ratio - target_16_9) < 0.09:
|
|
# Already close to 16:9
|
|
target_ratio = target_16_9
|
|
target_ratio_name = '16:9'
|
|
processed_img = img.copy()
|
|
elif abs(original_aspect_ratio - target_9_16) < 0.03:
|
|
# Already close to 9:16
|
|
target_ratio = target_9_16
|
|
target_ratio_name = '9:16'
|
|
processed_img = img.copy()
|
|
else:
|
|
# Need to crop to a supported aspect ratio
|
|
# Choose the ratio that requires the least cropping
|
|
crop_loss_16_9 = abs(original_aspect_ratio - target_16_9)
|
|
crop_loss_9_16 = abs(original_aspect_ratio - target_9_16)
|
|
|
|
if crop_loss_16_9 < crop_loss_9_16:
|
|
target_ratio = target_16_9
|
|
target_ratio_name = '16:9'
|
|
else:
|
|
target_ratio = target_9_16
|
|
target_ratio_name = '9:16'
|
|
|
|
# Calculate crop dimensions
|
|
if original_aspect_ratio > target_ratio:
|
|
# Image is wider than target, crop width
|
|
new_width = int(original_height * target_ratio)
|
|
new_height = original_height
|
|
left = (original_width - new_width) // 2
|
|
top = 0
|
|
else:
|
|
# Image is taller than target, crop height
|
|
new_width = original_width
|
|
new_height = int(original_width / target_ratio)
|
|
left = 0
|
|
top = (original_height - new_height) // 2
|
|
|
|
# Perform center crop
|
|
crop_box = (left, top, left + new_width, top + new_height)
|
|
processed_img = img.crop(crop_box)
|
|
|
|
print(f"Converted aspect ratio from {original_aspect_ratio:.2f} to {target_ratio_name} by cropping from {original_width}x{original_height} to {new_width}x{new_height}")
|
|
|
|
# Verify final dimensions meet minimum requirements
|
|
final_width, final_height = processed_img.size
|
|
if final_width < min_width or final_height < min_height:
|
|
return {
|
|
'valid': False,
|
|
'error': f'After aspect ratio conversion, image resolution is too low: {final_width}x{final_height}. Minimum: {min_width}x{min_height}. Try using a higher resolution image.'
|
|
}
|
|
|
|
# Convert to RGB if necessary (for JPEG compatibility)
|
|
if processed_img.mode in ('RGBA', 'LA', 'P'):
|
|
# Create white background for transparent images
|
|
rgb_img = Image.new('RGB', processed_img.size, (255, 255, 255))
|
|
if processed_img.mode == 'P':
|
|
processed_img = processed_img.convert('RGBA')
|
|
rgb_img.paste(processed_img, mask=processed_img.split()[-1] if processed_img.mode in ('RGBA', 'LA') else None)
|
|
processed_img = rgb_img
|
|
elif processed_img.mode != 'RGB':
|
|
processed_img = processed_img.convert('RGB')
|
|
|
|
# Create temporary file for converted image in our configured temp directory
|
|
print(f"DEBUG: About to create temp directory: {Config.TEMP_DOWNLOAD_PATH}")
|
|
os.makedirs(Config.TEMP_DOWNLOAD_PATH, exist_ok=True)
|
|
print(f"DEBUG: Temp directory created successfully")
|
|
temp_filename = f"veo_img_{int(time.time())}_{os.getpid()}.jpg"
|
|
temp_path = os.path.join(Config.TEMP_DOWNLOAD_PATH, temp_filename)
|
|
print(f"DEBUG: Will save converted image to: {temp_path}")
|
|
|
|
# Save as JPEG with high quality
|
|
processed_img.save(temp_path, 'JPEG', quality=95, optimize=True)
|
|
|
|
return {
|
|
'valid': True,
|
|
'original_format': img.format if hasattr(img, 'format') else 'Unknown',
|
|
'converted_path': temp_path,
|
|
'size': (final_width, final_height),
|
|
'original_size': (original_width, original_height),
|
|
'aspect_ratio': target_ratio_name,
|
|
'file_size': os.path.getsize(temp_path),
|
|
'mime_type': 'image/jpeg',
|
|
'was_cropped': target_ratio_name != f"{original_width}:{original_height}" and abs(original_aspect_ratio - target_ratio) > 0.01
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'valid': False,
|
|
'error': f'Failed to process image: {str(e)}'
|
|
}
|
|
|
|
def validate_image(image_path: str) -> dict:
|
|
"""
|
|
Legacy function for backward compatibility.
|
|
Use validate_and_convert_image for new code.
|
|
"""
|
|
result = validate_and_convert_image(image_path)
|
|
if result['valid'] and 'converted_path' in result:
|
|
# Clean up temporary file for legacy calls
|
|
try:
|
|
os.remove(result['converted_path'])
|
|
except:
|
|
pass
|
|
return {k: v for k, v in result.items() if k != 'converted_path'}
|
|
|
|
def upload_image_to_gcs(image_path: str, job_id: str) -> str:
|
|
"""
|
|
Upload image to GCS after validating and converting to JPEG format.
|
|
Returns the GCS URI of the uploaded image.
|
|
"""
|
|
# Validate and convert image
|
|
validation_result = validate_and_convert_image(image_path)
|
|
if not validation_result['valid']:
|
|
raise ValueError(f"Image validation failed: {validation_result['error']}")
|
|
|
|
converted_image_path = validation_result['converted_path']
|
|
|
|
try:
|
|
credentials = get_google_credentials()
|
|
|
|
if credentials:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
|
|
else:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID)
|
|
|
|
bucket = storage_client.bucket(Config.OUTPUT_GCS_BUCKET_NAME)
|
|
|
|
# Create unique blob name with .jpg extension
|
|
timestamp = int(time.time())
|
|
blob_name = f"{Config.TEMP_IMAGE_GCS_PATH}/{job_id}_{timestamp}.jpg"
|
|
|
|
blob = bucket.blob(blob_name)
|
|
|
|
# Upload converted image with timeout
|
|
blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout
|
|
|
|
# Set content type to JPEG
|
|
blob.content_type = 'image/jpeg'
|
|
blob.patch()
|
|
|
|
gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/{blob_name}"
|
|
print(f"Image converted from {validation_result['original_format']} and uploaded to: {gcs_uri}")
|
|
return gcs_uri
|
|
|
|
except Exception as e:
|
|
print(f"Error uploading image to GCS: {e}")
|
|
raise
|
|
finally:
|
|
# Clean up temporary converted file
|
|
try:
|
|
os.remove(converted_image_path)
|
|
except Exception as cleanup_error:
|
|
print(f"Warning: Could not clean up temporary file {converted_image_path}: {cleanup_error}")
|
|
|
|
def upload_reference_images_to_gcs(image_paths: list, job_id: str) -> list:
|
|
"""
|
|
Upload multiple reference images to GCS after validating and converting to JPEG format.
|
|
Returns a list of GCS URIs for the uploaded images.
|
|
|
|
Args:
|
|
image_paths: List of local image file paths (max 3)
|
|
job_id: Job ID for unique naming
|
|
|
|
Returns:
|
|
List of GCS URIs
|
|
"""
|
|
if not image_paths:
|
|
return []
|
|
|
|
if len(image_paths) > 3:
|
|
raise ValueError("Maximum 3 reference images allowed")
|
|
|
|
gcs_uris = []
|
|
converted_files = []
|
|
|
|
try:
|
|
credentials = get_google_credentials()
|
|
|
|
if credentials:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID, credentials=credentials)
|
|
else:
|
|
storage_client = storage.Client(project=Config.PROJECT_ID)
|
|
|
|
bucket = storage_client.bucket(Config.OUTPUT_GCS_BUCKET_NAME)
|
|
|
|
for idx, image_path in enumerate(image_paths):
|
|
# Validate and convert image
|
|
validation_result = validate_and_convert_image(image_path)
|
|
if not validation_result['valid']:
|
|
raise ValueError(f"Reference image {idx+1} validation failed: {validation_result['error']}")
|
|
|
|
converted_image_path = validation_result['converted_path']
|
|
converted_files.append(converted_image_path)
|
|
|
|
# Create unique blob name with index
|
|
timestamp = int(time.time())
|
|
blob_name = f"{Config.TEMP_IMAGE_GCS_PATH}/{job_id}_ref{idx+1}_{timestamp}.jpg"
|
|
|
|
blob = bucket.blob(blob_name)
|
|
|
|
# Upload converted image with timeout
|
|
blob.upload_from_filename(converted_image_path, timeout=300) # 5 minute timeout
|
|
|
|
# Set content type to JPEG
|
|
blob.content_type = 'image/jpeg'
|
|
blob.patch()
|
|
|
|
gcs_uri = f"gs://{Config.OUTPUT_GCS_BUCKET_NAME}/{blob_name}"
|
|
gcs_uris.append(gcs_uri)
|
|
print(f"Reference image {idx+1} converted from {validation_result['original_format']} and uploaded to: {gcs_uri}")
|
|
|
|
return gcs_uris
|
|
|
|
except Exception as e:
|
|
print(f"Error uploading reference images to GCS: {e}")
|
|
raise
|
|
finally:
|
|
# Clean up temporary converted files
|
|
for converted_path in converted_files:
|
|
try:
|
|
if os.path.exists(converted_path):
|
|
os.remove(converted_path)
|
|
except Exception as cleanup_error:
|
|
print(f"Warning: Could not clean up temporary file {converted_path}: {cleanup_error}")
|
|
|
|
def cleanup_image_files(job_id: str, local_path: str = None, gcs_blob_name: str = None) -> bool:
|
|
"""
|
|
Clean up image files from local storage and GCS.
|
|
"""
|
|
success = True
|
|
|
|
# Delete local file
|
|
if local_path and os.path.exists(local_path):
|
|
try:
|
|
os.remove(local_path)
|
|
print(f"Deleted local image: {local_path}")
|
|
except Exception as e:
|
|
print(f"Error deleting local image: {e}")
|
|
success = False
|
|
|
|
# Delete GCS file
|
|
if gcs_blob_name:
|
|
try:
|
|
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, gcs_blob_name)
|
|
except Exception as e:
|
|
print(f"Error deleting GCS image: {e}")
|
|
success = False
|
|
|
|
return success
|
|
|
|
def cleanup_multiple_images(job_id: str, local_paths: list = None, gcs_blob_names: list = None) -> bool:
|
|
"""
|
|
Clean up multiple image files from local storage and GCS.
|
|
|
|
Args:
|
|
job_id: Job ID for logging
|
|
local_paths: List of local file paths to delete
|
|
gcs_blob_names: List of GCS blob names to delete
|
|
|
|
Returns:
|
|
True if all cleanups succeeded, False otherwise
|
|
"""
|
|
success = True
|
|
|
|
# Delete local files
|
|
if local_paths:
|
|
for local_path in local_paths:
|
|
if local_path and os.path.exists(local_path):
|
|
try:
|
|
os.remove(local_path)
|
|
print(f"Deleted local image: {local_path}")
|
|
except Exception as e:
|
|
print(f"Error deleting local image: {e}")
|
|
success = False
|
|
|
|
# Delete GCS files
|
|
if gcs_blob_names:
|
|
for gcs_blob_name in gcs_blob_names:
|
|
if gcs_blob_name:
|
|
try:
|
|
delete_blob(Config.OUTPUT_GCS_BUCKET_NAME, gcs_blob_name)
|
|
except Exception as e:
|
|
print(f"Error deleting GCS image: {e}")
|
|
success = False
|
|
|
|
return success |