adobe-ps-scripts-loreal/gcs_storage.py
DJP 4a192a8c97 Initial commit: Adobe Photoshop API text management scripts
Local and cloud-based workflows for extracting and updating
text layers in PSD files via ExtendScript and Adobe PS API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 13:46:52 -05:00

471 lines
No EOL
18 KiB
Python

#!/usr/bin/env python3
"""
Google Cloud Storage Integration for Adobe Photoshop API
-------------------------------------------------------
This module provides functionality to:
1. Upload files to Google Cloud Storage
2. Generate signed URLs for Adobe Photoshop API
3. Download processed files from GCS
Required for Adobe Photoshop API which expects external storage with signed URLs.
"""
import os
import json
import time
import logging
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from pathlib import Path
# Import Google Cloud Storage libraries
try:
from google.cloud import storage
from google.oauth2 import service_account
except ImportError:
raise ImportError(
"Google Cloud Storage libraries not installed. "
"Please install with: pip install google-cloud-storage"
)
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Default service account key path - will be used if key_path is not provided
DEFAULT_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
class GCSStorage:
"""Google Cloud Storage integration for Adobe Photoshop API"""
def __init__(self, bucket_name: str, key_path: Optional[str] = None, key_json: Optional[Dict] = None):
"""
Initialize the Google Cloud Storage client
Args:
bucket_name: Name of the GCS bucket to use
key_path: Path to service account key JSON file (optional)
key_json: Service account key as a dictionary (optional)
Either key_path or key_json must be provided, or DEFAULT_KEY_PATH will be used.
"""
self.bucket_name = bucket_name
# Initialize credentials
if key_json:
# Use provided key JSON directly
self.credentials = service_account.Credentials.from_service_account_info(key_json)
self._save_key_file(key_json) # Save for future use
logger.info(f"Using provided service account key JSON for GCS")
elif key_path and os.path.exists(key_path):
# Use key file from provided path
self.credentials = service_account.Credentials.from_service_account_file(key_path)
logger.info(f"Using service account key from: {key_path}")
elif os.path.exists(DEFAULT_KEY_PATH):
# Use default key file
self.credentials = service_account.Credentials.from_service_account_file(DEFAULT_KEY_PATH)
logger.info(f"Using default service account key from: {DEFAULT_KEY_PATH}")
else:
raise ValueError(
"No valid service account credentials provided. Please provide either "
"key_path to a JSON file or key_json as a dictionary."
)
# Initialize the client
self.client = storage.Client(credentials=self.credentials)
# Get the bucket
try:
self.bucket = self.client.get_bucket(bucket_name)
logger.info(f"Successfully connected to GCS bucket: {bucket_name}")
except Exception as e:
logger.error(f"Error connecting to GCS bucket {bucket_name}: {str(e)}")
raise
def _save_key_file(self, key_json: Dict) -> None:
"""Save the key JSON to the default path for future use"""
try:
with open(DEFAULT_KEY_PATH, 'w') as f:
json.dump(key_json, f)
# Secure the file - only owner can read and write
os.chmod(DEFAULT_KEY_PATH, 0o600)
logger.info(f"Saved service account key to: {DEFAULT_KEY_PATH}")
except Exception as e:
logger.warning(f"Could not save service account key to file: {str(e)}")
def upload_file(self, local_path: str, remote_path: Optional[str] = None) -> Dict[str, Any]:
"""
Upload a file to Google Cloud Storage
Args:
local_path: Path to the local file to upload
remote_path: Path in GCS bucket (if None, uses the basename of local_path)
Returns:
Dictionary with upload details and signed URLs
"""
# Check if file exists
if not os.path.exists(local_path):
logger.error(f"Local file not found: {local_path}")
return {
"success": False,
"message": f"Local file not found: {local_path}"
}
# Determine remote path
if not remote_path:
remote_path = os.path.basename(local_path)
# Create a blob
blob = self.bucket.blob(remote_path)
try:
# Upload the file
logger.info(f"Uploading {local_path} to gs://{self.bucket_name}/{remote_path}")
blob.upload_from_filename(local_path)
# Generate signed URLs
download_url = self.get_signed_url(remote_path, action="read")
upload_url = self.get_signed_url(f"output-{remote_path}", action="write")
return {
"success": True,
"message": f"File uploaded successfully: {local_path} -> gs://{self.bucket_name}/{remote_path}",
"bucket": self.bucket_name,
"remote_path": remote_path,
"size": os.path.getsize(local_path),
"content_type": blob.content_type,
"download_url": download_url,
"upload_url": upload_url,
"created": blob.time_created,
}
except Exception as e:
logger.error(f"Error uploading file to GCS: {str(e)}")
return {
"success": False,
"message": f"Error uploading file to GCS: {str(e)}"
}
def get_signed_url(self, blob_name: str, action: str = "read",
expiration: int = 3600, content_type: str = None) -> str:
"""
Generate a signed URL for a blob
Args:
blob_name: Name of the blob (file path within bucket)
action: 'read' for download, 'write' for upload
expiration: URL expiration time in seconds (default: 1 hour)
content_type: Content type for uploads (if known)
Returns:
Signed URL string
"""
blob = self.bucket.blob(blob_name)
# Determine method based on action
if action.lower() == "read":
method = "GET"
elif action.lower() == "write":
method = "PUT"
else:
raise ValueError(f"Invalid action '{action}'. Must be 'read' or 'write'.")
# Set content type for uploads
if action.lower() == "write" and content_type:
blob.content_type = content_type
# Default content-type for PSD files if not specified
if not content_type and blob_name.lower().endswith('.psd'):
content_type = "image/vnd.adobe.photoshop"
# Generate the signed URL
try:
logger.info(f"Generating {action} signed URL for gs://{self.bucket_name}/{blob_name}")
url = blob.generate_signed_url(
version="v4",
expiration=timedelta(seconds=expiration),
method=method,
content_type=content_type if method == "PUT" else None,
)
logger.info(f"Generated signed URL with expiration of {expiration} seconds")
return url
except Exception as e:
logger.error(f"Error generating signed URL: {str(e)}")
raise
def download_file(self, remote_path: str, local_path: str, cleanup: bool = False) -> Dict[str, Any]:
"""
Download a file from Google Cloud Storage
Args:
remote_path: Path in GCS bucket
local_path: Path to save the file locally
cleanup: Whether to delete the remote file after successful download
Returns:
Dictionary with download details
"""
# Create a blob
blob = self.bucket.blob(remote_path)
try:
# Check if blob exists
if not blob.exists():
logger.error(f"Remote file not found: gs://{self.bucket_name}/{remote_path}")
return {
"success": False,
"message": f"Remote file not found: gs://{self.bucket_name}/{remote_path}"
}
# Create local directory if needed
os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True)
# Download the file
logger.info(f"Downloading gs://{self.bucket_name}/{remote_path} to {local_path}")
blob.download_to_filename(local_path)
# Cleanup if requested
if cleanup:
try:
logger.info(f"Deleting remote file after successful download: gs://{self.bucket_name}/{remote_path}")
blob.delete()
except Exception as del_err:
logger.warning(f"Error deleting remote file: {str(del_err)}")
return {
"success": True,
"message": f"File downloaded successfully: gs://{self.bucket_name}/{remote_path} -> {local_path}",
"bucket": self.bucket_name,
"remote_path": remote_path,
"local_path": local_path,
"size": os.path.getsize(local_path),
"content_type": blob.content_type,
"created": blob.time_created,
"cleaned_up": cleanup
}
except Exception as e:
logger.error(f"Error downloading file from GCS: {str(e)}")
return {
"success": False,
"message": f"Error downloading file from GCS: {str(e)}"
}
def cleanup_files(self, prefix: str) -> Dict[str, Any]:
"""
Delete all files with a given prefix from the bucket
Args:
prefix: Prefix of files to delete
Returns:
Dictionary with cleanup results
"""
try:
deleted_count = 0
# List all blobs with the prefix
blobs = list(self.bucket.list_blobs(prefix=prefix))
logger.info(f"Found {len(blobs)} files with prefix '{prefix}' to clean up")
# Delete each blob
for blob in blobs:
try:
blob.delete()
deleted_count += 1
except Exception as e:
logger.warning(f"Error deleting blob {blob.name}: {str(e)}")
return {
"success": True,
"message": f"Successfully deleted {deleted_count} files with prefix '{prefix}'",
"deleted_count": deleted_count,
"total_count": len(blobs)
}
except Exception as e:
logger.error(f"Error cleaning up files with prefix '{prefix}': {str(e)}")
return {
"success": False,
"message": f"Error cleaning up files: {str(e)}"
}
def check_output_file(self, expected_output_path: str,
wait_time: int = 300, check_interval: int = 10) -> Dict[str, Any]:
"""
Check if an output file exists and wait for it if needed
Args:
expected_output_path: Path in GCS bucket to check for
wait_time: Maximum time to wait in seconds (default: 5 minutes)
check_interval: Time between checks in seconds (default: 10 seconds)
Returns:
Dictionary with file details if found, error if not
"""
blob = self.bucket.blob(expected_output_path)
start_time = time.time()
end_time = start_time + wait_time
logger.info(f"Waiting for output file: gs://{self.bucket_name}/{expected_output_path}")
while time.time() < end_time:
if blob.exists():
logger.info(f"Output file found after {int(time.time() - start_time)} seconds")
return {
"success": True,
"message": f"Output file found: gs://{self.bucket_name}/{expected_output_path}",
"bucket": self.bucket_name,
"remote_path": expected_output_path,
"size": blob.size,
"content_type": blob.content_type,
"created": blob.time_created,
"download_url": self.get_signed_url(expected_output_path)
}
logger.info(f"Output file not found yet, waiting {check_interval} seconds...")
time.sleep(check_interval)
logger.error(f"Output file not found after {wait_time} seconds")
return {
"success": False,
"message": f"Timeout waiting for output file: gs://{self.bucket_name}/{expected_output_path}"
}
def save_service_account_key(key_data: Dict[str, Any], key_path: str = DEFAULT_KEY_PATH) -> bool:
"""
Save a service account key to a file
Args:
key_data: Dictionary containing the service account key data
key_path: Path to save the key file (default: DEFAULT_KEY_PATH)
Returns:
True if successful, False otherwise
"""
try:
with open(key_path, 'w') as f:
json.dump(key_data, f, indent=2)
# Secure the file - only owner can read and write
os.chmod(key_path, 0o600)
logger.info(f"Service account key saved to: {key_path}")
return True
except Exception as e:
logger.error(f"Error saving service account key: {str(e)}")
return False
# Simple CLI for testing
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description='Google Cloud Storage utilities for Adobe Photoshop API',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Upload a file and generate signed URLs
python gcs_storage.py upload /path/to/file.psd --bucket my-bucket
# Generate signed URLs for existing file
python gcs_storage.py url my-file.psd --bucket my-bucket
# Download a file from GCS
python gcs_storage.py download my-file.psd /path/to/save.psd --bucket my-bucket
"""
)
# Common arguments
parser.add_argument('--bucket', required=True,
help='GCS bucket name to use')
parser.add_argument('--key-path',
help='Path to service account key JSON file')
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# Upload command
upload_parser = subparsers.add_parser('upload', help='Upload a file to GCS')
upload_parser.add_argument('local_path', help='Path to the local file to upload')
upload_parser.add_argument('--remote-path',
help='Path in GCS bucket (defaults to filename)')
# URL command
url_parser = subparsers.add_parser('url', help='Generate signed URLs for a file')
url_parser.add_argument('blob_name', help='Name of the blob in GCS')
url_parser.add_argument('--action', choices=['read', 'write'], default='read',
help='URL action type (read or write)')
url_parser.add_argument('--expiration', type=int, default=3600,
help='URL expiration time in seconds')
url_parser.add_argument('--content-type',
help='Content type for uploads')
# Download command
download_parser = subparsers.add_parser('download', help='Download a file from GCS')
download_parser.add_argument('remote_path', help='Path in GCS bucket')
download_parser.add_argument('local_path', help='Path to save the file locally')
args = parser.parse_args()
if not args.command:
parser.print_help()
exit(1)
# Initialize storage
storage = GCSStorage(args.bucket, key_path=args.key_path)
# Execute command
if args.command == 'upload':
result = storage.upload_file(args.local_path, args.remote_path)
if result['success']:
print(f"\nFile uploaded successfully:")
print(f" Local file: {args.local_path}")
print(f" Bucket: {result['bucket']}")
print(f" Remote path: {result['remote_path']}")
print(f" Size: {result['size']} bytes")
print(f"\nDownload URL (expires in 1 hour):")
print(f" {result['download_url']}")
print(f"\nUpload URL for output (expires in 1 hour):")
print(f" {result['upload_url']}")
else:
print(f"\nError uploading file: {result['message']}")
exit(1)
elif args.command == 'url':
try:
url = storage.get_signed_url(
args.blob_name,
action=args.action,
expiration=args.expiration,
content_type=args.content_type
)
print(f"\nSigned URL generated successfully:")
print(f" Bucket: {args.bucket}")
print(f" Blob: {args.blob_name}")
print(f" Action: {args.action}")
print(f" Expires in: {args.expiration} seconds")
print(f"\nURL:")
print(f" {url}")
except Exception as e:
print(f"\nError generating signed URL: {str(e)}")
exit(1)
elif args.command == 'download':
result = storage.download_file(args.remote_path, args.local_path)
if result['success']:
print(f"\nFile downloaded successfully:")
print(f" Bucket: {result['bucket']}")
print(f" Remote path: {result['remote_path']}")
print(f" Local file: {result['local_path']}")
print(f" Size: {result['size']} bytes")
else:
print(f"\nError downloading file: {result['message']}")
exit(1)