#!/usr/bin/env python3 """ Google Cloud Storage Integration for Adobe Photoshop API ------------------------------------------------------- This module provides functionality to: 1. Upload files to Google Cloud Storage 2. Generate signed URLs for Adobe Photoshop API 3. Download processed files from GCS Required for Adobe Photoshop API which expects external storage with signed URLs. """ import os import json import time import logging from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta from pathlib import Path # Import Google Cloud Storage libraries try: from google.cloud import storage from google.oauth2 import service_account except ImportError: raise ImportError( "Google Cloud Storage libraries not installed. " "Please install with: pip install google-cloud-storage" ) # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Default service account key path - will be used if key_path is not provided DEFAULT_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json") class GCSStorage: """Google Cloud Storage integration for Adobe Photoshop API""" def __init__(self, bucket_name: str, key_path: Optional[str] = None, key_json: Optional[Dict] = None): """ Initialize the Google Cloud Storage client Args: bucket_name: Name of the GCS bucket to use key_path: Path to service account key JSON file (optional) key_json: Service account key as a dictionary (optional) Either key_path or key_json must be provided, or DEFAULT_KEY_PATH will be used. """ self.bucket_name = bucket_name # Initialize credentials if key_json: # Use provided key JSON directly self.credentials = service_account.Credentials.from_service_account_info(key_json) self._save_key_file(key_json) # Save for future use logger.info(f"Using provided service account key JSON for GCS") elif key_path and os.path.exists(key_path): # Use key file from provided path self.credentials = service_account.Credentials.from_service_account_file(key_path) logger.info(f"Using service account key from: {key_path}") elif os.path.exists(DEFAULT_KEY_PATH): # Use default key file self.credentials = service_account.Credentials.from_service_account_file(DEFAULT_KEY_PATH) logger.info(f"Using default service account key from: {DEFAULT_KEY_PATH}") else: raise ValueError( "No valid service account credentials provided. Please provide either " "key_path to a JSON file or key_json as a dictionary." ) # Initialize the client self.client = storage.Client(credentials=self.credentials) # Get the bucket try: self.bucket = self.client.get_bucket(bucket_name) logger.info(f"Successfully connected to GCS bucket: {bucket_name}") except Exception as e: logger.error(f"Error connecting to GCS bucket {bucket_name}: {str(e)}") raise def _save_key_file(self, key_json: Dict) -> None: """Save the key JSON to the default path for future use""" try: with open(DEFAULT_KEY_PATH, 'w') as f: json.dump(key_json, f) # Secure the file - only owner can read and write os.chmod(DEFAULT_KEY_PATH, 0o600) logger.info(f"Saved service account key to: {DEFAULT_KEY_PATH}") except Exception as e: logger.warning(f"Could not save service account key to file: {str(e)}") def upload_file(self, local_path: str, remote_path: Optional[str] = None) -> Dict[str, Any]: """ Upload a file to Google Cloud Storage Args: local_path: Path to the local file to upload remote_path: Path in GCS bucket (if None, uses the basename of local_path) Returns: Dictionary with upload details and signed URLs """ # Check if file exists if not os.path.exists(local_path): logger.error(f"Local file not found: {local_path}") return { "success": False, "message": f"Local file not found: {local_path}" } # Determine remote path if not remote_path: remote_path = os.path.basename(local_path) # Create a blob blob = self.bucket.blob(remote_path) try: # Upload the file logger.info(f"Uploading {local_path} to gs://{self.bucket_name}/{remote_path}") blob.upload_from_filename(local_path) # Generate signed URLs download_url = self.get_signed_url(remote_path, action="read") upload_url = self.get_signed_url(f"output-{remote_path}", action="write") return { "success": True, "message": f"File uploaded successfully: {local_path} -> gs://{self.bucket_name}/{remote_path}", "bucket": self.bucket_name, "remote_path": remote_path, "size": os.path.getsize(local_path), "content_type": blob.content_type, "download_url": download_url, "upload_url": upload_url, "created": blob.time_created, } except Exception as e: logger.error(f"Error uploading file to GCS: {str(e)}") return { "success": False, "message": f"Error uploading file to GCS: {str(e)}" } def get_signed_url(self, blob_name: str, action: str = "read", expiration: int = 3600, content_type: str = None) -> str: """ Generate a signed URL for a blob Args: blob_name: Name of the blob (file path within bucket) action: 'read' for download, 'write' for upload expiration: URL expiration time in seconds (default: 1 hour) content_type: Content type for uploads (if known) Returns: Signed URL string """ blob = self.bucket.blob(blob_name) # Determine method based on action if action.lower() == "read": method = "GET" elif action.lower() == "write": method = "PUT" else: raise ValueError(f"Invalid action '{action}'. Must be 'read' or 'write'.") # Set content type for uploads if action.lower() == "write" and content_type: blob.content_type = content_type # Default content-type for PSD files if not specified if not content_type and blob_name.lower().endswith('.psd'): content_type = "image/vnd.adobe.photoshop" # Generate the signed URL try: logger.info(f"Generating {action} signed URL for gs://{self.bucket_name}/{blob_name}") url = blob.generate_signed_url( version="v4", expiration=timedelta(seconds=expiration), method=method, content_type=content_type if method == "PUT" else None, ) logger.info(f"Generated signed URL with expiration of {expiration} seconds") return url except Exception as e: logger.error(f"Error generating signed URL: {str(e)}") raise def download_file(self, remote_path: str, local_path: str, cleanup: bool = False) -> Dict[str, Any]: """ Download a file from Google Cloud Storage Args: remote_path: Path in GCS bucket local_path: Path to save the file locally cleanup: Whether to delete the remote file after successful download Returns: Dictionary with download details """ # Create a blob blob = self.bucket.blob(remote_path) try: # Check if blob exists if not blob.exists(): logger.error(f"Remote file not found: gs://{self.bucket_name}/{remote_path}") return { "success": False, "message": f"Remote file not found: gs://{self.bucket_name}/{remote_path}" } # Create local directory if needed os.makedirs(os.path.dirname(os.path.abspath(local_path)), exist_ok=True) # Download the file logger.info(f"Downloading gs://{self.bucket_name}/{remote_path} to {local_path}") blob.download_to_filename(local_path) # Cleanup if requested if cleanup: try: logger.info(f"Deleting remote file after successful download: gs://{self.bucket_name}/{remote_path}") blob.delete() except Exception as del_err: logger.warning(f"Error deleting remote file: {str(del_err)}") return { "success": True, "message": f"File downloaded successfully: gs://{self.bucket_name}/{remote_path} -> {local_path}", "bucket": self.bucket_name, "remote_path": remote_path, "local_path": local_path, "size": os.path.getsize(local_path), "content_type": blob.content_type, "created": blob.time_created, "cleaned_up": cleanup } except Exception as e: logger.error(f"Error downloading file from GCS: {str(e)}") return { "success": False, "message": f"Error downloading file from GCS: {str(e)}" } def cleanup_files(self, prefix: str) -> Dict[str, Any]: """ Delete all files with a given prefix from the bucket Args: prefix: Prefix of files to delete Returns: Dictionary with cleanup results """ try: deleted_count = 0 # List all blobs with the prefix blobs = list(self.bucket.list_blobs(prefix=prefix)) logger.info(f"Found {len(blobs)} files with prefix '{prefix}' to clean up") # Delete each blob for blob in blobs: try: blob.delete() deleted_count += 1 except Exception as e: logger.warning(f"Error deleting blob {blob.name}: {str(e)}") return { "success": True, "message": f"Successfully deleted {deleted_count} files with prefix '{prefix}'", "deleted_count": deleted_count, "total_count": len(blobs) } except Exception as e: logger.error(f"Error cleaning up files with prefix '{prefix}': {str(e)}") return { "success": False, "message": f"Error cleaning up files: {str(e)}" } def check_output_file(self, expected_output_path: str, wait_time: int = 300, check_interval: int = 10) -> Dict[str, Any]: """ Check if an output file exists and wait for it if needed Args: expected_output_path: Path in GCS bucket to check for wait_time: Maximum time to wait in seconds (default: 5 minutes) check_interval: Time between checks in seconds (default: 10 seconds) Returns: Dictionary with file details if found, error if not """ blob = self.bucket.blob(expected_output_path) start_time = time.time() end_time = start_time + wait_time logger.info(f"Waiting for output file: gs://{self.bucket_name}/{expected_output_path}") while time.time() < end_time: if blob.exists(): logger.info(f"Output file found after {int(time.time() - start_time)} seconds") return { "success": True, "message": f"Output file found: gs://{self.bucket_name}/{expected_output_path}", "bucket": self.bucket_name, "remote_path": expected_output_path, "size": blob.size, "content_type": blob.content_type, "created": blob.time_created, "download_url": self.get_signed_url(expected_output_path) } logger.info(f"Output file not found yet, waiting {check_interval} seconds...") time.sleep(check_interval) logger.error(f"Output file not found after {wait_time} seconds") return { "success": False, "message": f"Timeout waiting for output file: gs://{self.bucket_name}/{expected_output_path}" } def save_service_account_key(key_data: Dict[str, Any], key_path: str = DEFAULT_KEY_PATH) -> bool: """ Save a service account key to a file Args: key_data: Dictionary containing the service account key data key_path: Path to save the key file (default: DEFAULT_KEY_PATH) Returns: True if successful, False otherwise """ try: with open(key_path, 'w') as f: json.dump(key_data, f, indent=2) # Secure the file - only owner can read and write os.chmod(key_path, 0o600) logger.info(f"Service account key saved to: {key_path}") return True except Exception as e: logger.error(f"Error saving service account key: {str(e)}") return False # Simple CLI for testing if __name__ == "__main__": import argparse parser = argparse.ArgumentParser( description='Google Cloud Storage utilities for Adobe Photoshop API', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Upload a file and generate signed URLs python gcs_storage.py upload /path/to/file.psd --bucket my-bucket # Generate signed URLs for existing file python gcs_storage.py url my-file.psd --bucket my-bucket # Download a file from GCS python gcs_storage.py download my-file.psd /path/to/save.psd --bucket my-bucket """ ) # Common arguments parser.add_argument('--bucket', required=True, help='GCS bucket name to use') parser.add_argument('--key-path', help='Path to service account key JSON file') subparsers = parser.add_subparsers(dest='command', help='Command to run') # Upload command upload_parser = subparsers.add_parser('upload', help='Upload a file to GCS') upload_parser.add_argument('local_path', help='Path to the local file to upload') upload_parser.add_argument('--remote-path', help='Path in GCS bucket (defaults to filename)') # URL command url_parser = subparsers.add_parser('url', help='Generate signed URLs for a file') url_parser.add_argument('blob_name', help='Name of the blob in GCS') url_parser.add_argument('--action', choices=['read', 'write'], default='read', help='URL action type (read or write)') url_parser.add_argument('--expiration', type=int, default=3600, help='URL expiration time in seconds') url_parser.add_argument('--content-type', help='Content type for uploads') # Download command download_parser = subparsers.add_parser('download', help='Download a file from GCS') download_parser.add_argument('remote_path', help='Path in GCS bucket') download_parser.add_argument('local_path', help='Path to save the file locally') args = parser.parse_args() if not args.command: parser.print_help() exit(1) # Initialize storage storage = GCSStorage(args.bucket, key_path=args.key_path) # Execute command if args.command == 'upload': result = storage.upload_file(args.local_path, args.remote_path) if result['success']: print(f"\nFile uploaded successfully:") print(f" Local file: {args.local_path}") print(f" Bucket: {result['bucket']}") print(f" Remote path: {result['remote_path']}") print(f" Size: {result['size']} bytes") print(f"\nDownload URL (expires in 1 hour):") print(f" {result['download_url']}") print(f"\nUpload URL for output (expires in 1 hour):") print(f" {result['upload_url']}") else: print(f"\nError uploading file: {result['message']}") exit(1) elif args.command == 'url': try: url = storage.get_signed_url( args.blob_name, action=args.action, expiration=args.expiration, content_type=args.content_type ) print(f"\nSigned URL generated successfully:") print(f" Bucket: {args.bucket}") print(f" Blob: {args.blob_name}") print(f" Action: {args.action}") print(f" Expires in: {args.expiration} seconds") print(f"\nURL:") print(f" {url}") except Exception as e: print(f"\nError generating signed URL: {str(e)}") exit(1) elif args.command == 'download': result = storage.download_file(args.remote_path, args.local_path) if result['success']: print(f"\nFile downloaded successfully:") print(f" Bucket: {result['bucket']}") print(f" Remote path: {result['remote_path']}") print(f" Local file: {result['local_path']}") print(f" Size: {result['size']} bytes") else: print(f"\nError downloading file: {result['message']}") exit(1)