Local and cloud-based workflows for extracting and updating text layers in PSD files via ExtendScript and Adobe PS API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
471 lines
No EOL
18 KiB
Python
471 lines
No EOL
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Google Cloud Storage Integration for Adobe Photoshop API
|
|
-------------------------------------------------------
|
|
|
|
This module provides functionality to:
|
|
1. Upload files to Google Cloud Storage
|
|
2. Generate signed URLs for Adobe Photoshop API
|
|
3. Download processed files from GCS
|
|
|
|
Required for Adobe Photoshop API which expects external storage with signed URLs.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import logging
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
# Import Google Cloud Storage libraries
|
|
try:
|
|
from google.cloud import storage
|
|
from google.oauth2 import service_account
|
|
except ImportError:
|
|
raise ImportError(
|
|
"Google Cloud Storage libraries not installed. "
|
|
"Please install with: pip install google-cloud-storage"
|
|
)
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default service account key path - will be used if key_path is not provided
|
|
DEFAULT_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
|
|
|
|
class GCSStorage:
|
|
"""Google Cloud Storage integration for Adobe Photoshop API"""
|
|
|
|
def __init__(self, bucket_name: str, key_path: Optional[str] = None, key_json: Optional[Dict] = None):
|
|
"""
|
|
Initialize the Google Cloud Storage client
|
|
|
|
Args:
|
|
bucket_name: Name of the GCS bucket to use
|
|
key_path: Path to service account key JSON file (optional)
|
|
key_json: Service account key as a dictionary (optional)
|
|
|
|
Either key_path or key_json must be provided, or DEFAULT_KEY_PATH will be used.
|
|
"""
|
|
self.bucket_name = bucket_name
|
|
|
|
# Initialize credentials
|
|
if key_json:
|
|
# Use provided key JSON directly
|
|
self.credentials = service_account.Credentials.from_service_account_info(key_json)
|
|
self._save_key_file(key_json) # Save for future use
|
|
logger.info(f"Using provided service account key JSON for GCS")
|
|
elif key_path and os.path.exists(key_path):
|
|
# Use key file from provided path
|
|
self.credentials = service_account.Credentials.from_service_account_file(key_path)
|
|
logger.info(f"Using service account key from: {key_path}")
|
|
elif os.path.exists(DEFAULT_KEY_PATH):
|
|
# Use default key file
|
|
self.credentials = service_account.Credentials.from_service_account_file(DEFAULT_KEY_PATH)
|
|
logger.info(f"Using default service account key from: {DEFAULT_KEY_PATH}")
|
|
else:
|
|
raise ValueError(
|
|
"No valid service account credentials provided. Please provide either "
|
|
"key_path to a JSON file or key_json as a dictionary."
|
|
)
|
|
|
|
# Initialize the client
|
|
self.client = storage.Client(credentials=self.credentials)
|
|
|
|
# Get the bucket
|
|
try:
|
|
self.bucket = self.client.get_bucket(bucket_name)
|
|
logger.info(f"Successfully connected to GCS bucket: {bucket_name}")
|
|
except Exception as e:
|
|
logger.error(f"Error connecting to GCS bucket {bucket_name}: {str(e)}")
|
|
raise
|
|
|
|
def _save_key_file(self, key_json: Dict) -> None:
|
|
"""Save the key JSON to the default path for future use"""
|
|
try:
|
|
with open(DEFAULT_KEY_PATH, 'w') as f:
|
|
json.dump(key_json, f)
|
|
# Secure the file - only owner can read and write
|
|
os.chmod(DEFAULT_KEY_PATH, 0o600)
|
|
logger.info(f"Saved service account key to: {DEFAULT_KEY_PATH}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not save service account key to file: {str(e)}")
|
|
|
|
def upload_file(self, local_path: str, remote_path: Optional[str] = None) -> Dict[str, Any]:
|
|
"""
|
|
Upload a file to Google Cloud Storage
|
|
|
|
Args:
|
|
local_path: Path to the local file to upload
|
|
remote_path: Path in GCS bucket (if None, uses the basename of local_path)
|
|
|
|
Returns:
|
|
Dictionary with upload details and signed URLs
|
|
"""
|
|
# Check if file exists
|
|
if not os.path.exists(local_path):
|
|
logger.error(f"Local file not found: {local_path}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Local file not found: {local_path}"
|
|
}
|
|
|
|
# Determine remote path
|
|
if not remote_path:
|
|
remote_path = os.path.basename(local_path)
|
|
|
|
# Create a blob
|
|
blob = self.bucket.blob(remote_path)
|
|
|
|
try:
|
|
# Upload the file
|
|
logger.info(f"Uploading {local_path} to gs://{self.bucket_name}/{remote_path}")
|
|
blob.upload_from_filename(local_path)
|
|
|
|
# Generate signed URLs
|
|
download_url = self.get_signed_url(remote_path, action="read")
|
|
upload_url = self.get_signed_url(f"output-{remote_path}", action="write")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"File uploaded successfully: {local_path} -> gs://{self.bucket_name}/{remote_path}",
|
|
"bucket": self.bucket_name,
|
|
"remote_path": remote_path,
|
|
"size": os.path.getsize(local_path),
|
|
"content_type": blob.content_type,
|
|
"download_url": download_url,
|
|
"upload_url": upload_url,
|
|
"created": blob.time_created,
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error uploading file to GCS: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error uploading file to GCS: {str(e)}"
|
|
}
|
|
|
|
def get_signed_url(self, blob_name: str, action: str = "read",
|
|
expiration: int = 3600, content_type: str = None) -> str:
|
|
"""
|
|
Generate a signed URL for a blob
|
|
|
|
Args:
|
|
blob_name: Name of the blob (file path within bucket)
|
|
action: 'read' for download, 'write' for upload
|
|
expiration: URL expiration time in seconds (default: 1 hour)
|
|
content_type: Content type for uploads (if known)
|
|
|
|
Returns:
|
|
Signed URL string
|
|
"""
|
|
blob = self.bucket.blob(blob_name)
|
|
|
|
# Determine method based on action
|
|
if action.lower() == "read":
|
|
method = "GET"
|
|
elif action.lower() == "write":
|
|
method = "PUT"
|
|
else:
|
|
raise ValueError(f"Invalid action '{action}'. Must be 'read' or 'write'.")
|
|
|
|
# Set content type for uploads
|
|
if action.lower() == "write" and content_type:
|
|
blob.content_type = content_type
|
|
|
|
# Default content-type for PSD files if not specified
|
|
if not content_type and blob_name.lower().endswith('.psd'):
|
|
content_type = "image/vnd.adobe.photoshop"
|
|
|
|
# Generate the signed URL
|
|
try:
|
|
logger.info(f"Generating {action} signed URL for gs://{self.bucket_name}/{blob_name}")
|
|
|
|
url = blob.generate_signed_url(
|
|
version="v4",
|
|
expiration=timedelta(seconds=expiration),
|
|
method=method,
|
|
content_type=content_type if method == "PUT" else None,
|
|
)
|
|
|
|
logger.info(f"Generated signed URL with expiration of {expiration} seconds")
|
|
return url
|
|
except Exception as e:
|
|
logger.error(f"Error generating signed URL: {str(e)}")
|
|
raise
|
|
|
|
def download_file(self, remote_path: str, local_path: str, cleanup: bool = False) -> Dict[str, Any]:
|
|
"""
|
|
Download a file from Google Cloud Storage
|
|
|
|
Args:
|
|
remote_path: Path in GCS bucket
|
|
local_path: Path to save the file locally
|
|
cleanup: Whether to delete the remote file after successful download
|
|
|
|
Returns:
|
|
Dictionary with download details
|
|
"""
|
|
# Create a blob
|
|
blob = self.bucket.blob(remote_path)
|
|
|
|
try:
|
|
# Check if blob exists
|
|
if not blob.exists():
|
|
logger.error(f"Remote file not found: gs://{self.bucket_name}/{remote_path}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Remote file not found: gs://{self.bucket_name}/{remote_path}"
|
|
}
|
|
|
|
# Create local directory if needed
|
|
os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True)
|
|
|
|
# Download the file
|
|
logger.info(f"Downloading gs://{self.bucket_name}/{remote_path} to {local_path}")
|
|
blob.download_to_filename(local_path)
|
|
|
|
# Cleanup if requested
|
|
if cleanup:
|
|
try:
|
|
logger.info(f"Deleting remote file after successful download: gs://{self.bucket_name}/{remote_path}")
|
|
blob.delete()
|
|
except Exception as del_err:
|
|
logger.warning(f"Error deleting remote file: {str(del_err)}")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"File downloaded successfully: gs://{self.bucket_name}/{remote_path} -> {local_path}",
|
|
"bucket": self.bucket_name,
|
|
"remote_path": remote_path,
|
|
"local_path": local_path,
|
|
"size": os.path.getsize(local_path),
|
|
"content_type": blob.content_type,
|
|
"created": blob.time_created,
|
|
"cleaned_up": cleanup
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error downloading file from GCS: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error downloading file from GCS: {str(e)}"
|
|
}
|
|
|
|
def cleanup_files(self, prefix: str) -> Dict[str, Any]:
|
|
"""
|
|
Delete all files with a given prefix from the bucket
|
|
|
|
Args:
|
|
prefix: Prefix of files to delete
|
|
|
|
Returns:
|
|
Dictionary with cleanup results
|
|
"""
|
|
try:
|
|
deleted_count = 0
|
|
# List all blobs with the prefix
|
|
blobs = list(self.bucket.list_blobs(prefix=prefix))
|
|
|
|
logger.info(f"Found {len(blobs)} files with prefix '{prefix}' to clean up")
|
|
|
|
# Delete each blob
|
|
for blob in blobs:
|
|
try:
|
|
blob.delete()
|
|
deleted_count += 1
|
|
except Exception as e:
|
|
logger.warning(f"Error deleting blob {blob.name}: {str(e)}")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Successfully deleted {deleted_count} files with prefix '{prefix}'",
|
|
"deleted_count": deleted_count,
|
|
"total_count": len(blobs)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error cleaning up files with prefix '{prefix}': {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error cleaning up files: {str(e)}"
|
|
}
|
|
|
|
def check_output_file(self, expected_output_path: str,
|
|
wait_time: int = 300, check_interval: int = 10) -> Dict[str, Any]:
|
|
"""
|
|
Check if an output file exists and wait for it if needed
|
|
|
|
Args:
|
|
expected_output_path: Path in GCS bucket to check for
|
|
wait_time: Maximum time to wait in seconds (default: 5 minutes)
|
|
check_interval: Time between checks in seconds (default: 10 seconds)
|
|
|
|
Returns:
|
|
Dictionary with file details if found, error if not
|
|
"""
|
|
blob = self.bucket.blob(expected_output_path)
|
|
start_time = time.time()
|
|
end_time = start_time + wait_time
|
|
|
|
logger.info(f"Waiting for output file: gs://{self.bucket_name}/{expected_output_path}")
|
|
|
|
while time.time() < end_time:
|
|
if blob.exists():
|
|
logger.info(f"Output file found after {int(time.time() - start_time)} seconds")
|
|
return {
|
|
"success": True,
|
|
"message": f"Output file found: gs://{self.bucket_name}/{expected_output_path}",
|
|
"bucket": self.bucket_name,
|
|
"remote_path": expected_output_path,
|
|
"size": blob.size,
|
|
"content_type": blob.content_type,
|
|
"created": blob.time_created,
|
|
"download_url": self.get_signed_url(expected_output_path)
|
|
}
|
|
|
|
logger.info(f"Output file not found yet, waiting {check_interval} seconds...")
|
|
time.sleep(check_interval)
|
|
|
|
logger.error(f"Output file not found after {wait_time} seconds")
|
|
return {
|
|
"success": False,
|
|
"message": f"Timeout waiting for output file: gs://{self.bucket_name}/{expected_output_path}"
|
|
}
|
|
|
|
def save_service_account_key(key_data: Dict[str, Any], key_path: str = DEFAULT_KEY_PATH) -> bool:
|
|
"""
|
|
Save a service account key to a file
|
|
|
|
Args:
|
|
key_data: Dictionary containing the service account key data
|
|
key_path: Path to save the key file (default: DEFAULT_KEY_PATH)
|
|
|
|
Returns:
|
|
True if successful, False otherwise
|
|
"""
|
|
try:
|
|
with open(key_path, 'w') as f:
|
|
json.dump(key_data, f, indent=2)
|
|
|
|
# Secure the file - only owner can read and write
|
|
os.chmod(key_path, 0o600)
|
|
|
|
logger.info(f"Service account key saved to: {key_path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving service account key: {str(e)}")
|
|
return False
|
|
|
|
# Simple CLI for testing
|
|
if __name__ == "__main__":
|
|
import argparse
|
|
|
|
parser = argparse.ArgumentParser(
|
|
description='Google Cloud Storage utilities for Adobe Photoshop API',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Upload a file and generate signed URLs
|
|
python gcs_storage.py upload /path/to/file.psd --bucket my-bucket
|
|
|
|
# Generate signed URLs for existing file
|
|
python gcs_storage.py url my-file.psd --bucket my-bucket
|
|
|
|
# Download a file from GCS
|
|
python gcs_storage.py download my-file.psd /path/to/save.psd --bucket my-bucket
|
|
"""
|
|
)
|
|
|
|
# Common arguments
|
|
parser.add_argument('--bucket', required=True,
|
|
help='GCS bucket name to use')
|
|
parser.add_argument('--key-path',
|
|
help='Path to service account key JSON file')
|
|
|
|
subparsers = parser.add_subparsers(dest='command', help='Command to run')
|
|
|
|
# Upload command
|
|
upload_parser = subparsers.add_parser('upload', help='Upload a file to GCS')
|
|
upload_parser.add_argument('local_path', help='Path to the local file to upload')
|
|
upload_parser.add_argument('--remote-path',
|
|
help='Path in GCS bucket (defaults to filename)')
|
|
|
|
# URL command
|
|
url_parser = subparsers.add_parser('url', help='Generate signed URLs for a file')
|
|
url_parser.add_argument('blob_name', help='Name of the blob in GCS')
|
|
url_parser.add_argument('--action', choices=['read', 'write'], default='read',
|
|
help='URL action type (read or write)')
|
|
url_parser.add_argument('--expiration', type=int, default=3600,
|
|
help='URL expiration time in seconds')
|
|
url_parser.add_argument('--content-type',
|
|
help='Content type for uploads')
|
|
|
|
# Download command
|
|
download_parser = subparsers.add_parser('download', help='Download a file from GCS')
|
|
download_parser.add_argument('remote_path', help='Path in GCS bucket')
|
|
download_parser.add_argument('local_path', help='Path to save the file locally')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.command:
|
|
parser.print_help()
|
|
exit(1)
|
|
|
|
# Initialize storage
|
|
storage = GCSStorage(args.bucket, key_path=args.key_path)
|
|
|
|
# Execute command
|
|
if args.command == 'upload':
|
|
result = storage.upload_file(args.local_path, args.remote_path)
|
|
|
|
if result['success']:
|
|
print(f"\nFile uploaded successfully:")
|
|
print(f" Local file: {args.local_path}")
|
|
print(f" Bucket: {result['bucket']}")
|
|
print(f" Remote path: {result['remote_path']}")
|
|
print(f" Size: {result['size']} bytes")
|
|
print(f"\nDownload URL (expires in 1 hour):")
|
|
print(f" {result['download_url']}")
|
|
print(f"\nUpload URL for output (expires in 1 hour):")
|
|
print(f" {result['upload_url']}")
|
|
else:
|
|
print(f"\nError uploading file: {result['message']}")
|
|
exit(1)
|
|
|
|
elif args.command == 'url':
|
|
try:
|
|
url = storage.get_signed_url(
|
|
args.blob_name,
|
|
action=args.action,
|
|
expiration=args.expiration,
|
|
content_type=args.content_type
|
|
)
|
|
|
|
print(f"\nSigned URL generated successfully:")
|
|
print(f" Bucket: {args.bucket}")
|
|
print(f" Blob: {args.blob_name}")
|
|
print(f" Action: {args.action}")
|
|
print(f" Expires in: {args.expiration} seconds")
|
|
print(f"\nURL:")
|
|
print(f" {url}")
|
|
except Exception as e:
|
|
print(f"\nError generating signed URL: {str(e)}")
|
|
exit(1)
|
|
|
|
elif args.command == 'download':
|
|
result = storage.download_file(args.remote_path, args.local_path)
|
|
|
|
if result['success']:
|
|
print(f"\nFile downloaded successfully:")
|
|
print(f" Bucket: {result['bucket']}")
|
|
print(f" Remote path: {result['remote_path']}")
|
|
print(f" Local file: {result['local_path']}")
|
|
print(f" Size: {result['size']} bytes")
|
|
else:
|
|
print(f"\nError downloading file: {result['message']}")
|
|
exit(1) |