#!/usr/bin/env python3 """ Adobe Photoshop API Integration Script -------------------------------------- This script provides functionality to interact with the Adobe Photoshop API to update text layers in PSD files from JSON data. It uses the credentials provided to authenticate with the Adobe API and perform operations on Photoshop documents using external storage (Google Cloud Storage). The workflow is: 1. Upload PSD file to Google Cloud Storage (GCS) 2. Generate signed URLs for input and output 3. Send API request to Adobe Photoshop API with these URLs 4. Download processed file from GCS Requirements: - Python 3.6+ - requests library (pip install requests) - google-cloud-storage library (pip install google-cloud-storage) """ import os import sys import json import time import logging import argparse import requests from pathlib import Path from typing import Dict, List, Any, Optional, Tuple # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Import local config, token manager, and GCS storage import config from adobe_token import AdobeTokenManager from gcs_storage import GCSStorage # Initialize token manager token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET) # GCS bucket configuration GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" # The bucket to use for temporary storage GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json") # Initialize GCS storage if the key exists gcs_storage = None if os.path.exists(GCS_KEY_PATH): try: gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH) logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}") except Exception as e: logger.error(f"Error initializing GCS storage: {str(e)}") logger.warning("Continuing without GCS storage - some features may not work") else: logger.warning(f"GCS key file not found at {GCS_KEY_PATH} - some features may not work") # These will be populated dynamically when needed API_KEY = config.ADOBE_CLIENT_ID ACCESS_TOKEN = None # Will be fetched using token_manager when needed class AdobeAPI: """Diagnostic class for Adobe API connectivity""" def __init__(self, api_key: str = None, access_token: str = None, client_secret: str = None, force_new_token: bool = True): """Initialize the Adobe API client""" self.api_key = api_key or API_KEY # If client_secret is provided, we'll use it to get a token self.client_secret = client_secret or config.ADOBE_CLIENT_SECRET global ACCESS_TOKEN # Declare global variable # Always generate a fresh token for API requests if self.client_secret and force_new_token: try: # Force token generation even if a cached one exists logger.info("Forcing generation of a fresh Adobe API token") token_manager.token_cache = {} # Clear the cache to force new token ACCESS_TOKEN, token_data = token_manager.get_token(config.DEFAULT_SCOPES) logger.info(f"Successfully generated fresh token for Adobe API: {ACCESS_TOKEN[:15]}...{ACCESS_TOKEN[-15:]}") self.access_token = ACCESS_TOKEN except Exception as e: logger.error(f"Failed to obtain fresh access token: {str(e)}") # Fall back to the provided static token if available self.access_token = access_token or ACCESS_TOKEN # Otherwise use provided token or get from cache if available elif not access_token and self.client_secret: try: ACCESS_TOKEN, _ = token_manager.get_token(config.DEFAULT_SCOPES) self.access_token = ACCESS_TOKEN except Exception as e: logger.error(f"Failed to obtain access token: {str(e)}") # Fall back to the provided static token if available self.access_token = access_token or ACCESS_TOKEN else: self.access_token = access_token or ACCESS_TOKEN # Set up headers with token self.headers = { "x-api-key": self.api_key, "Authorization": f"Bearer {self.access_token}", "Content-Type": "application/json" } # Print token for debugging if self.access_token: logger.info(f"Using Adobe access token: {self.access_token[:15]}...{self.access_token[-15:]}") # Validate the credentials self._validate_credentials() def _validate_credentials(self) -> bool: """Validate the API credentials by making a test request""" try: # Check if access token is available if not self.access_token: logger.error("No access token available for validation") # Continue anyway for testing purposes logger.warning("Continuing without token for testing purposes") return True # Get user information as a simple validation request logger.info(f"Testing authentication with token: {self.access_token[:20]}...{self.access_token[-20:]}") try: response = requests.get( "https://ims-na1.adobelogin.com/ims/userinfo", headers={"Authorization": f"Bearer {self.access_token}"}, timeout=20 # Increase timeout for potentially slow connections ) logger.info(f"Authentication response status: {response.status_code}") if response.text: try: # Try to parse as JSON resp_json = response.json() logger.info(f"Authentication response: {json.dumps(resp_json, indent=2)}") except: # Not JSON, log as text logger.info(f"Authentication response: {response.text}") if response.status_code == 200: user_info = response.json() logger.info(f"Successfully authenticated with Adobe API as: {user_info.get('name', 'Unknown User')}") return True else: logger.warning(f"Authentication check returned: {response.status_code}") # Continue anyway for testing logger.warning("Continuing despite authentication response for testing") return True except Exception as req_err: logger.error(f"Error making authentication request: {str(req_err)}") # Continue anyway for testing purposes logger.warning("Continuing despite authentication request error for testing") return True except Exception as e: logger.error(f"Error in validation process: {str(e)}") # Continue anyway for testing purposes logger.warning("Continuing despite validation error for testing") return True def list_documents(self) -> List[Dict[str, Any]]: """List documents available in Creative Cloud Assets""" # This is a diagnostic tool to check which Adobe API endpoints are available from your network endpoints = [ "https://developer.adobe.com/apis", # Public Adobe developer site "https://ims-na1.adobelogin.com/ims/userinfo", # Auth API - known to work # Photoshop REST API endpoints (correct per Adobe documentation) "https://image.adobe.io/pie/psdService", # Base endpoint "https://image.adobe.io/pie/psdService/actionJSON", # Text editing via actionJSON "https://image.adobe.io/pie/psdService/productCrop", # Product crop endpoint # Other potential endpoints to test (may not all be accessible) "https://firefly-api.adobe.io/v2", # Firefly API base endpoint "https://firefly-api.adobe.io/v2/upload", # Firefly Upload endpoint # Incorrect endpoints that should be avoided based on Adobe's clarification "https://photoshop.adobe.io/api", # Not used for Photoshop REST API "https://photoshop-services.adobe.io/api", # Not used for Photoshop REST API "https://firefly-api.adobe.io/v2/photoshop/editText" # Not the correct endpoint structure ] logger.info("Testing connectivity to Adobe API endpoints...") # Test connectivity to adobe.com first try: test_response = requests.get("https://www.adobe.com", timeout=15) logger.info(f"Adobe.com connectivity test: {test_response.status_code}") except Exception as conn_err: logger.error(f"Network connectivity issue: Cannot connect to Adobe.com - {str(conn_err)}") logger.error("Please check your internet connection and DNS configuration.") # Continue testing other endpoints even if adobe.com fails logger.info("Continuing with other endpoint tests despite connectivity issue...") # Test each possible endpoint results = [] for endpoint in endpoints: try: logger.info(f"Testing endpoint: {endpoint}") response = requests.get( endpoint, headers=self.headers, timeout=10 ) status = response.status_code logger.info(f" Response status: {status}") if response.text and len(response.text) < 500: logger.info(f" Response content: {response.text}") elif response.text: logger.info(f" Response content length: {len(response.text)} bytes") results.append({ "endpoint": endpoint, "status": status, "accessible": status != 0 # Any response is better than nothing }) except requests.exceptions.ConnectionError as e: logger.info(f" Connection error: {str(e)}") results.append({ "endpoint": endpoint, "status": 0, "accessible": False, "error": "Connection error" }) except Exception as e: logger.info(f" Error: {str(e)}") results.append({ "endpoint": endpoint, "status": 0, "accessible": False, "error": str(e) }) # Summary logger.info("\nEndpoint Accessibility Summary:") for result in results: status = "✓ Available" if result["accessible"] else "✗ Not available" logger.info(f"{status}: {result['endpoint']} (Status: {result['status']})") return results def test_text_edit(self) -> Dict[str, Any]: """Test the Adobe Photoshop Text Edit API functionality This method attempts to use the Photoshop Text Edit API to modify text in a PSD file. It tries multiple endpoints based on Adobe's current API structure. Returns: Dict containing results of the test attempts """ logger.info("Testing Adobe Photoshop Text Edit API functionality...") # Test the correct Adobe Photoshop REST API endpoints edit_endpoints = [ "https://image.adobe.io/pie/psdService/actionJSON", # Primary endpoint for text editing (actionJSON) "https://image.adobe.io/pie/psdService/text" # Alternative endpoint (may be deprecated) ] # Test with different payload formats to see which one might work # Based on documentation, we need to use external storage with presigned URLs # Example test URLs (these are fake but follow the required pattern) test_s3_url = "https://my-test-bucket.s3.amazonaws.com/test-document.psd" test_output_url = "https://my-test-bucket.s3.amazonaws.com/output-test-document.psd" # Alternative storage test URLs test_azure_url = "https://mystorageaccount.blob.core.windows.net/mycontainer/test-document.psd?sig=signature" test_azure_output = "https://mystorageaccount.blob.core.windows.net/mycontainer/output-test-document.psd?sig=signature" payloads = [ # Format 1 - S3 style with external storage - CORRECTED PER DOCUMENTATION { "inputs": [ { "href": test_s3_url, "storage": "external" } ], "options": { "layers": [ { "id": 1, # Must be integer "text": { # Must be object "content": "Updated text from API test" } } ] }, "outputs": [ { "href": test_output_url, "storage": "external", "type": "image/vnd.adobe.photoshop" } ] }, # Format 2 - Azure style with external storage { "inputs": [ { "href": test_azure_url, "storage": "external" } ], "options": { "layers": [ { "id": 1, # Integer ID "text": { # Object with content "content": "Updated text from API test" } } ] }, "outputs": [ { "href": test_azure_output, "storage": "external", "type": "image/vnd.adobe.photoshop" } ] }, # Format 3 - Older format field name with external storage { "inputs": [ { "href": test_s3_url, "storage": "external" } ], "options": { "textLayers": [ # Testing with older field name { "id": 1, # Integer ID "text": { # Object with content "content": "Updated text from API test" } } ] }, "outputs": [ { "href": test_output_url, "storage": "external", "type": "image/vnd.adobe.photoshop" } ] }, # Format 4 - Testing with adobe storage (original approach) - likely to fail { "inputs": [ { "href": "/temp/test-document-id.psd", "storage": "adobe" } ], "options": { "layers": [ { "id": 1, "text": { "content": "Updated text from API test" } } ] }, "outputs": [ { "href": "/temp/output-test-document-id.psd", "storage": "adobe", "type": "image/vnd.adobe.photoshop" } ] } ] results = [] for endpoint in edit_endpoints: try: logger.info(f"Testing text edit at endpoint: {endpoint}") # First try a GET request to see if endpoint exists get_response = requests.get( endpoint, headers=self.headers, timeout=10 ) logger.info(f" GET response status: {get_response.status_code}") # Try each payload format to see if any works for i, payload in enumerate(payloads): try: logger.info(f" Testing payload format {i+1}...") # Clone headers and add content type explicitly request_headers = self.headers.copy() request_headers["Content-Type"] = "application/json" # Try a POST request with the sample payload post_response = requests.post( endpoint, headers=request_headers, json=payload, timeout=10 ) status = post_response.status_code logger.info(f" POST response status for format {i+1}: {status}") # Try to extract more detailed error information if post_response.text: if len(post_response.text) < 500: logger.info(f" POST response content for format {i+1}: {post_response.text}") else: logger.info(f" POST response content length for format {i+1}: {len(post_response.text)} bytes") # Try to parse JSON responses try: resp_json = post_response.json() if "error" in resp_json: logger.info(f" Error details: {resp_json['error']}") elif "errors" in resp_json: logger.info(f" Error details: {resp_json['errors']}") except: # If it's not JSON, extract a sample logger.info(f" Response excerpt: {post_response.text[:200]}...") # Record result for this payload format results.append({ "endpoint": endpoint, "format": i+1, "get_status": get_response.status_code, "post_status": status, "successful": 200 <= status < 300 }) # If successful, no need to try more payloads if 200 <= status < 300: logger.info(f" Format {i+1} successful, stopping tests for this endpoint") break except Exception as payload_error: logger.info(f" Error testing format {i+1}: {str(payload_error)}") continue except requests.exceptions.ConnectionError as e: logger.info(f" Connection error: {str(e)}") results.append({ "endpoint": endpoint, "format": "N/A", "get_status": 0, "post_status": 0, "successful": False, "error": "Connection error" }) except Exception as e: logger.info(f" Error: {str(e)}") results.append({ "endpoint": endpoint, "format": "N/A", "get_status": 0, "post_status": 0, "successful": False, "error": str(e) }) # Summary logger.info("\nText Edit API Test Summary:") # Group results by endpoint for a cleaner summary by_endpoint = {} for result in results: endpoint = result["endpoint"] if endpoint not in by_endpoint: by_endpoint[endpoint] = [] by_endpoint[endpoint].append(result) for endpoint, endpoint_results in by_endpoint.items(): logger.info(f"Endpoint: {endpoint}") # Show GET status only once per endpoint get_status = endpoint_results[0]["get_status"] if endpoint_results else 0 logger.info(f" GET Status: {get_status}") # Show each format result for result in endpoint_results: if result.get("format") == "N/A": status = "✗ Error" logger.info(f" {status}: {result.get('error', 'Unknown error')}") else: status = "✓ Successful" if result.get("successful") else "✗ Failed" logger.info(f" Format {result.get('format')}: {status} (POST: {result['post_status']})") return results def upload_font_to_cloud(self, font_file_path: str, remote_path: str = None) -> Dict[str, Any]: """ Uploads a font file to Google Cloud Storage and generates signed URLs Args: font_file_path: Path to the font file to upload remote_path: Optional remote path to use (defaults to fonts/filename) Returns: Dictionary with upload result info including signed URLs """ logger.info(f"Uploading font file to Google Cloud Storage: {font_file_path}") # Check if file exists if not os.path.exists(font_file_path): logger.error(f"Font file not found: {font_file_path}") return { "success": False, "message": f"Font file not found: {font_file_path}" } # Check if GCS storage is initialized if not gcs_storage: logger.error("GCS storage not initialized - cannot upload file") return { "success": False, "message": "GCS storage not initialized - check GCS key file and bucket configuration" } # Get file name file_name = os.path.basename(font_file_path) try: # Use provided remote path or generate one if not remote_path: remote_path = f"fonts/{file_name}" # Upload the file to GCS logger.info(f"Uploading font file to GCS: {remote_path}") upload_result = gcs_storage.upload_file(font_file_path, remote_path) if not upload_result.get("success"): logger.error(f"Failed to upload font file to GCS: {upload_result.get('message')}") return upload_result # Generate signed URL for reading the font download_url = upload_result.get("download_url") logger.info(f"Font file uploaded successfully: {file_name}") logger.info(f"Font URL (read): {download_url[:60]}...{download_url[-20:]}") return { "success": True, "message": f"Font file uploaded successfully: {file_name}", "file_name": file_name, "bucket": GCS_BUCKET_NAME, "remote_path": remote_path, "download_url": download_url } except Exception as e: logger.error(f"Error uploading font file: {str(e)}") return { "success": False, "message": f"Error uploading font file: {str(e)}" } def upload_psd_to_cloud(self, psd_file_path: str) -> Dict[str, Any]: """ Uploads a PSD file to Google Cloud Storage and generates signed URLs Args: psd_file_path: Path to the PSD file to upload Returns: Dictionary with upload result info including signed URLs """ logger.info(f"Uploading PSD file to Google Cloud Storage: {psd_file_path}") # Check if file exists if not os.path.exists(psd_file_path): logger.error(f"PSD file not found: {psd_file_path}") return { "success": False, "message": f"PSD file not found: {psd_file_path}" } # Check if GCS storage is initialized if not gcs_storage: logger.error("GCS storage not initialized - cannot upload file") return { "success": False, "message": "GCS storage not initialized - check GCS key file and bucket configuration" } # Get file name file_name = os.path.basename(psd_file_path) try: # Generate a timestamp-based unique path to avoid collisions timestamp = int(time.time()) remote_path = f"adobe_ps/{timestamp}_{file_name}" output_path = f"adobe_ps/output_{timestamp}_{file_name}" # Upload the file to GCS logger.info(f"Uploading file to GCS: {remote_path}") upload_result = gcs_storage.upload_file(psd_file_path, remote_path) if not upload_result.get("success"): logger.error(f"Failed to upload file to GCS: {upload_result.get('message')}") return upload_result # Generate signed URLs for input and output input_url = upload_result.get("download_url") # Generate a specific output URL try: output_url = gcs_storage.get_signed_url( output_path, action="write", content_type="image/vnd.adobe.photoshop" ) except Exception as e: logger.error(f"Error generating output signed URL: {str(e)}") return { "success": False, "message": f"Error generating output signed URL: {str(e)}" } logger.info(f"File uploaded successfully: {file_name}") logger.info(f"Input URL (read): {input_url[:60]}...{input_url[-20:]}") logger.info(f"Output URL (write): {output_url[:60]}...{output_url[-20:]}") return { "success": True, "message": f"PSD file uploaded successfully: {file_name}", "file_name": file_name, "bucket": GCS_BUCKET_NAME, "remote_path": remote_path, "input_path": remote_path, # Add input_path for proper cleanup "output_path": output_path, "input_url": input_url, "output_url": output_url } except Exception as e: logger.error(f"Error uploading PSD file: {str(e)}") return { "success": False, "message": f"Error uploading PSD file: {str(e)}" } def update_text_from_json(self, json_file_path: str, upload_psd: bool = True) -> Dict[str, Any]: """ Updates text in a PSD document in Adobe Cloud using text data from a JSON file Args: json_file_path: Path to the JSON file containing text layer data upload_psd: Whether to upload the PSD file to Adobe Cloud first Returns: Dictionary with the result of the operation """ logger.info(f"Updating text from JSON file: {json_file_path}") # Endpoint for text editing - use the correct Photoshop REST API endpoint # According to our testing, the text endpoint is the correct one to use # for updating text layers in PSD files endpoint = "https://image.adobe.io/pie/psdService/text" # The actionJSON endpoint requires different payload format, not using it for now fallback_endpoint = "https://image.adobe.io/pie/psdService/actionJSON" try: # Load JSON data with open(json_file_path, 'r', encoding='utf-8') as f: json_data = json.load(f) # Extract document name and text layers document_name = json_data.get('documentName', '') psd_path = json_data.get('psdPath', '') text_layers = json_data.get('textLayers', []) if not document_name or not text_layers: logger.error("JSON file does not contain required document name or text layers") return { "success": False, "message": "JSON file does not contain required document name or text layers" } logger.info(f"Document name: {document_name}") logger.info(f"Found {len(text_layers)} text layers in JSON") # Handle the PSD path if psd_path.startswith("~"): psd_path = os.path.expanduser(psd_path) elif not os.path.isabs(psd_path): # Try to find the PSD next to the JSON file json_dir = os.path.dirname(os.path.abspath(json_file_path)) psd_path = os.path.join(json_dir, document_name) # Use document_name if psd_path is not found if not os.path.exists(psd_path) and document_name: json_dir = os.path.dirname(os.path.abspath(json_file_path)) possible_psd_path = os.path.join(json_dir, document_name) if os.path.exists(possible_psd_path): psd_path = possible_psd_path # If upload_psd is True and psd_path exists, upload the file document_id = None if upload_psd and os.path.exists(psd_path): logger.info(f"Found PSD file: {psd_path}") upload_result = self.upload_psd_to_cloud(psd_path) if upload_result.get("success"): # We don't need a document ID when using GCS, just the URLs logger.info(f"File successfully uploaded to GCS") else: logger.error(f"Failed to upload PSD file: {upload_result.get('message')}") return upload_result else: # For testing when not uploading, use a placeholder URLs logger.warning("No upload performed - API will likely return an error without valid URLs") # We don't need a document ID when using external storage # Prepare updates for each text layer text_layer_updates = [] for layer in text_layers: # Only include layers that have updatedText that differs from the original text if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'): # Extract layer ID and convert to integer - REQUIRED by the API layer_id = layer.get('id') layer_name = layer.get('name', '') # Try to find a reliable ID if layer_id is not None and layer_id != '': try: # Try to convert to integer as required by the API layer_id = int(layer_id) except (ValueError, TypeError): # If conversion fails, try to derive from name layer_id = hash(layer_name) % 1000 # Create a deterministic ID from name else: # For specific named layers, assign known IDs if "HYPOALLERGENIC" in layer_name: layer_id = 10 # Try with a specific ID for this layer elif "DESIGNED FOR" in layer_name: layer_id = 20 # Try with a specific ID for this layer else: # Default fallback - try layer position in array as ID layer_id = text_layers.index(layer) + 1 # Format the text as an object with content field text_layer_updates.append({ "id": layer_id, # Must be integer when possible "text": { # Must be object with content field "content": layer.get('updatedText') } }) if not text_layer_updates: logger.info("No text updates needed - all layers are unchanged") return { "success": True, "message": "No text updates needed" } logger.info(f"Prepared {len(text_layer_updates)} text layer updates") # Prepare the API request # Use the uploaded file's storage path if available storage_path = upload_result.get("storage_path", f"uploads/{document_name}") if upload_psd and upload_result.get("success") else None # Format inputs and outputs according to the API requirements # Using Google Cloud Storage signed URLs if upload_result and upload_result.get("success"): # Use GCS signed URLs from the upload result input_url = upload_result.get("input_url") output_url = upload_result.get("output_url") # Create the payload with proper URLs payload = { "inputs": [ { "href": input_url, # GCS signed URL for input file "storage": "external" # External storage (GCS) } ], "options": { "layers": text_layer_updates # Using the corrected format with id (integer) and text as object }, "outputs": [ { "href": output_url, # GCS signed URL for output file "storage": "external", "type": "image/vnd.adobe.photoshop" } ] } logger.info("Using GCS signed URLs for Adobe Photoshop API request") else: # Failed to get GCS signed URLs - using placeholder for testing # Log warning logger.warning("No GCS signed URLs available - using placeholder URLs for testing") logger.warning("This request will fail with the Adobe API - for testing only") # Example GCS-style URLs for documentation fake_gcs_url = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}" input_file = f"placeholder/{document_name or 'test-document.psd'}" output_file = f"placeholder/output-{document_name or 'test-document.psd'}" payload = { "inputs": [ { "href": f"{fake_gcs_url}/{input_file}", "storage": "external" } ], "options": { "layers": text_layer_updates # Using the corrected format with id (integer) and text as object }, "outputs": [ { "href": f"{fake_gcs_url}/{output_file}", "storage": "external", "type": "image/vnd.adobe.photoshop" } ] } # Set appropriate headers headers = self.headers.copy() headers["Content-Type"] = "application/json" # Log the request details for debugging logger.debug(f"Request payload: {json.dumps(payload, indent=2)}") # Make the API request to the primary endpoint logger.info(f"Sending text update request to primary endpoint: {endpoint}") logger.info(f"Request headers: {json.dumps(headers, indent=2)}") logger.info(f"Request payload: {json.dumps(payload, indent=2)}") response = requests.post( endpoint, headers=headers, json=payload, timeout=30 ) # Log response details logger.info(f"Primary endpoint response status: {response.status_code}") if response.text: try: resp_json = response.json() logger.info(f"Response content: {json.dumps(resp_json, indent=2)}") except: # Not JSON, log as text (first 1000 chars) logger.info(f"Response content: {response.text[:1000]}") # If primary endpoint fails, try the fallback endpoint if response.status_code == 404: logger.info(f"Primary endpoint returned 404, trying fallback endpoint: {fallback_endpoint}") # Make the fallback request fallback_response = requests.post( fallback_endpoint, headers=headers, json=payload, timeout=30 ) # Log fallback response details logger.info(f"Fallback endpoint response status: {fallback_response.status_code}") if fallback_response.text: try: resp_json = fallback_response.json() logger.info(f"Fallback response content: {json.dumps(resp_json, indent=2)}") except: # Not JSON, log as text (first 1000 chars) logger.info(f"Fallback response content: {fallback_response.text[:1000]}") # Use the fallback response response = fallback_response # Process response if response.status_code == 200 or response.status_code == 202: logger.info("Text update API request successful") # Process API response try: result = response.json() # For 202 Accepted response, include status URL for async processing if response.status_code == 202 and '_links' in result and 'self' in result.get('_links', {}): status_url = result.get('_links', {}).get('self', {}).get('href') logger.info(f"Request accepted for processing. Status URL: {status_url}") # Check status and wait for completion if status_url: logger.info("Checking processing status...") # Poll the status URL until processing is complete max_retries = 10 retry_count = 0 while retry_count < max_retries: try: # Wait to avoid overwhelming the API time.sleep(5) # Check status status_response = requests.get( status_url, headers=self.headers, timeout=30 ) if status_response.status_code == 200: status_data = status_response.json() status = status_data.get('status', '') logger.info(f"Processing status: {status}") if status == 'succeeded': logger.info("Processing completed successfully!") break elif status == 'failed': logger.error(f"Processing failed: {status_data.get('error', {}).get('message', 'Unknown error')}") break retry_count += 1 except Exception as e: logger.error(f"Error checking status: {str(e)}") retry_count += 1 api_result = { "success": True, "message": "Text update request accepted and processing", "result": result, "status_code": response.status_code } except Exception as e: logger.error(f"Error parsing API response: {str(e)}") api_result = { "success": True, "message": "Text update request accepted", "response": response.text, "status_code": response.status_code } # For successful uploads, try to download the output file if upload_result and upload_result.get("success") and gcs_storage: output_path = upload_result.get("output_path") if output_path: logger.info(f"Checking for processed output file: {output_path}") # Wait for the output file to be available (up to 5 minutes) output_check = gcs_storage.check_output_file(output_path, wait_time=300) if output_check.get("success"): # Download the processed file output_dir = os.path.dirname(psd_path) processed_dir = os.path.join(output_dir, "processed") # Create processed directory if it doesn't exist if not os.path.exists(processed_dir): os.makedirs(processed_dir, exist_ok=True) output_filename = f"processed_{os.path.basename(psd_path)}" output_file_path = os.path.join(processed_dir, output_filename) logger.info(f"Downloading processed file to: {output_file_path}") download_result = gcs_storage.download_file(output_path, output_file_path) if download_result.get("success"): logger.info(f"Successfully downloaded processed file: {output_file_path}") api_result["processed_file"] = output_file_path api_result["message"] += f" and downloaded to {output_file_path}" else: logger.warning(f"Failed to download processed file: {download_result.get('message')}") api_result["output_download_failed"] = download_result.get("message") else: logger.warning(f"Output file not found: {output_check.get('message')}") api_result["output_check_failed"] = output_check.get("message") return api_result else: logger.error(f"Text update failed with status {response.status_code}") error_message = response.text try: error_data = response.json() error_message = error_data.get('message', error_data.get('title', response.text)) except: pass return { "success": False, "status_code": response.status_code, "message": f"Text update failed: {error_message}", "response": response.text, "upload_result": upload_result if upload_result and upload_result.get("success") else None } except Exception as e: logger.error(f"Error updating text: {str(e)}") return { "success": False, "message": f"Error updating text: {str(e)}" } def parse_arguments(): """Parse command line arguments""" parser = argparse.ArgumentParser( description='Adobe Photoshop API Script for text extraction and updating', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Test connectivity to Adobe API endpoints python adobe_ps_api.py test-api # Test text editing API functionality python adobe_ps_api.py test-text-edit # Test Google Cloud Storage integration python adobe_ps_api.py test-gcs /path/to/file.psd # Update text in a PSD file using a JSON file python adobe_ps_api.py update-text /path/to/extracted_text.json # Generate a new token and store it in the cache python adobe_ps_api.py generate-token --client-secret your_client_secret Adobe Photoshop API Workflow: 1. PSD files are uploaded to Google Cloud Storage (GCS) 2. Signed URLs are generated for both input and output files 3. These URLs are provided to the Adobe Photoshop API 4. The API processes the input file and writes the output file to GCS 5. The script downloads the processed output file when available """ ) # Common options parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') parser.add_argument('--client-secret', '-s', help='Client secret for Adobe API authentication') subparsers = parser.add_subparsers(dest='command', help='Command to run') # Test API connectivity command test_parser = subparsers.add_parser('test-api', help='Test connectivity to Adobe API endpoints') test_parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') test_parser.add_argument('--client-secret', '-s', help='Client secret for Adobe API authentication') # Test text editing API command text_edit_parser = subparsers.add_parser('test-text-edit', help='Test Photoshop text editing API functionality') text_edit_parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') text_edit_parser.add_argument('--client-secret', '-s', help='Client secret for Adobe API authentication') # Test GCS upload command gcs_test_parser = subparsers.add_parser('test-gcs', help='Test Google Cloud Storage integration') gcs_test_parser.add_argument('psd_file', help='Path to PSD file to upload') gcs_test_parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') # Update text command update_parser = subparsers.add_parser('update-text', help='Update text in a PSD file using a JSON file') update_parser.add_argument('json_file', help='Path to JSON file with text layer data') update_parser.add_argument('--dry-run', '-d', action='store_true', help='Preview text updates without making API calls') update_parser.add_argument('--no-upload', '-n', action='store_true', help='Skip uploading the PSD file (uses placeholder URLs)') update_parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') update_parser.add_argument('--client-secret', '-s', help='Client secret for Adobe API authentication') update_parser.add_argument('--download-output', '-o', action='store_true', help='Download the processed output file if available') # Generate token command token_parser = subparsers.add_parser('generate-token', help='Generate a new Adobe API token') token_parser.add_argument('--client-secret', '-s', required=True, help='Client secret for Adobe API authentication') token_parser.add_argument('--scopes', default=config.DEFAULT_SCOPES, help=f'Comma-separated list of scopes (default: {config.DEFAULT_SCOPES})') token_parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') return parser.parse_args() def main(): """Main function""" args = parse_arguments() # Set logging level based on verbose flag if args.verbose: logger.setLevel(logging.DEBUG) # Set log format to include more details for handler in logger.handlers: handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s', '%Y-%m-%d %H:%M:%S' )) # Update config with client secret if provided if hasattr(args, 'client_secret') and args.client_secret: config.ADOBE_CLIENT_SECRET = args.client_secret # Handle token generation separately if args.command == 'generate-token': try: # Initialize token manager with client credentials token_mgr = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET) # Get a new token with specified scopes access_token, token_data = token_mgr.get_token(args.scopes) # Display token information print("\nToken Generated Successfully") print("-" * 40) print(f"Access Token: {access_token[:15]}...{access_token[-15:]}") print(f"Token Type: {token_data.get('token_type', 'bearer')}") print(f"Expires In: {token_data.get('expires_in')} seconds ({int(token_data.get('expires_in', 0))/86400:.1f} days)") print(f"Scope: {token_data.get('scope', args.scopes)}") # Verify the token user_info = token_mgr.verify_token(access_token) if user_info: print(f"Token verified for user/account: {user_info.get('sub', 'Unknown')}") print("\nToken has been saved in cache and will be used for future API calls.") else: print("\nWarning: Token verification failed. The token may not be valid.") except Exception as e: print(f"Error generating token: {str(e)}") sys.exit(1) else: # Initialize API client with client secret if provided # First try to get a valid token try: token, token_data = token_manager.get_token(config.DEFAULT_SCOPES) print(f"\nUsing token: {token[:20]}...{token[-20:]}") except Exception as e: print(f"Error getting token: {str(e)}") api = AdobeAPI(client_secret=config.ADOBE_CLIENT_SECRET) # Execute command if args.command == 'test-api': api.list_documents() elif args.command == 'test-text-edit': api.test_text_edit() elif args.command == 'test-gcs': # Test GCS upload functionality psd_file = args.psd_file if not os.path.exists(psd_file): print(f"Error: PSD file '{psd_file}' does not exist.") sys.exit(1) print(f"\nTesting GCS upload with file: {psd_file}") # Check if GCS storage is initialized if not gcs_storage: print(f"Error: GCS storage not initialized. Check that gcs_key.json exists.") sys.exit(1) # Upload the file result = api.upload_psd_to_cloud(psd_file) if result['success']: print(f"\nFile uploaded successfully to Google Cloud Storage") print(f" Bucket: {result['bucket']}") print(f" Remote path: {result['remote_path']}") print(f"\nGenerated signed URLs for Adobe Photoshop API:") print(f" Input URL: {result['input_url'][:60]}...{result['input_url'][-20:]}") print(f" Output URL: {result['output_url'][:60]}...{result['output_url'][-20:]}") print(f"\nThese URLs are valid for approximately 1 hour.") else: print(f"\nError uploading file: {result['message']}") sys.exit(1) elif args.command == 'update-text': json_file = args.json_file if not os.path.exists(json_file): print(f"Error: JSON file '{json_file}' does not exist.") sys.exit(1) if args.dry_run: print(f"\nDRY RUN: Would update text using {json_file}") try: with open(json_file, 'r', encoding='utf-8') as f: json_data = json.load(f) document_name = json_data.get('documentName', 'Unknown document') text_layers = json_data.get('textLayers', []) print(f"\nDocument: {document_name}") print(f"Found {len(text_layers)} text layers in JSON file") updates_needed = 0 for i, layer in enumerate(text_layers): if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'): updates_needed += 1 original = layer.get('text', '') updated = layer.get('updatedText', '') print(f"\nLayer {i+1}: {layer.get('name', 'Unnamed')}") print(f" Original: {original[:50]}{'...' if len(original) > 50 else ''}") print(f" Updated: {updated[:50]}{'...' if len(updated) > 50 else ''}") if updates_needed == 0: print("\nNo text updates needed - all layers are unchanged") else: print(f"\nWould update {updates_needed} text layers") except Exception as e: print(f"Error reading JSON file: {str(e)}") sys.exit(1) else: # Determine whether to upload the PSD file upload_psd = not args.no_upload if upload_psd: if not gcs_storage: print(f"\nWarning: GCS storage not initialized. Using placeholder URLs.") print(f"Adobe API calls will likely fail without valid pre-signed URLs.") print(f"Check that gcs_key.json exists and is valid.") print(f"\nUploading PSD and updating text using API with {json_file}") else: print(f"\nUpdating text using API with {json_file} (without PSD upload)") print(f"Note: This mode uses placeholder URLs and will likely fail with Adobe API") result = api.update_text_from_json(json_file, upload_psd=upload_psd) if result['success']: if 'status_code' in result and result['status_code'] == 202: print(f"Success: {result['message']}") print(f"The request has been accepted and is being processed asynchronously.") # If we have status URL information if 'result' in result and '_links' in result['result']: status_url = result['result'].get('_links', {}).get('self', {}).get('href') if status_url: print(f"Status URL: {status_url}") print(f"You can check the status at this URL for updates.") else: print(f"Success: {result['message']}") # Check if processed file was downloaded if 'processed_file' in result: print(f"Processed file downloaded to: {result['processed_file']}") else: print(f"Error: {result['message']}") if 'status_code' in result: print(f"Status code: {result['status_code']}") # Check if we have upload details despite API failure if 'upload_result' in result and result['upload_result']: print(f"\nNote: File was successfully uploaded to GCS despite API error") print(f" Remote path: {result['upload_result'].get('remote_path')}") sys.exit(1) else: print("\nNo command specified. Use --help to see available commands.") print("\nAvailable commands:") print(" test-api - Test connectivity to Adobe API endpoints") print(" test-text-edit - Test the text editing API functionality") print(" test-gcs - Test Google Cloud Storage upload and URL generation") print(" update-text - Update text in a PSD file using a JSON file") print(" generate-token - Generate a new Adobe API token using client credentials") print("\nNote: The API endpoints used in this script may not be accessible from all networks.") print("This could be due to DNS or network restrictions. If you are unable to connect to") print("the Adobe API endpoints, please check with your network administrator or try from a different network.") if __name__ == "__main__": main()