#!/usr/bin/env python3 """ Debug Text Layer Update with Adobe Photoshop API This script attempts to update a specific text layer in a PSD file using different layer identification techniques to determine what the Adobe Photoshop API requires for successful text updates. """ import os import json import time import requests import logging from pathlib import Path # Import local modules import config from adobe_token import AdobeTokenManager from gcs_storage import GCSStorage # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Initialize token manager token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET) # GCS bucket configuration GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" # The bucket to use for temporary storage GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json") # Initialize GCS storage if the key exists gcs_storage = None if os.path.exists(GCS_KEY_PATH): try: gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH) logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}") except Exception as e: logger.error(f"Error initializing GCS storage: {str(e)}") logger.warning("Continuing without GCS storage - some features may not work") else: logger.warning(f"GCS key file not found at {GCS_KEY_PATH} - some features may not work") def load_json_file(json_path: str) -> dict: """Load and parse a JSON file""" try: with open(json_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logger.error(f"Error loading JSON file {json_path}: {str(e)}") return {} def upload_psd_to_gcs(psd_path: str) -> dict: """Upload a PSD file to GCS and return signed URLs""" logger.info(f"Uploading PSD file to GCS: {psd_path}") # Check if file exists if not os.path.exists(psd_path): logger.error(f"PSD file not found: {psd_path}") return {"success": False, "message": f"PSD file not found: {psd_path}"} # Check if GCS storage is initialized if not gcs_storage: logger.error("GCS storage not initialized - cannot upload file") return { "success": False, "message": "GCS storage not initialized - check GCS key file and bucket configuration" } try: # Generate a timestamp-based unique path to avoid collisions file_name = os.path.basename(psd_path) timestamp = int(time.time()) remote_path = f"adobe_ps/{timestamp}_{file_name}" output_path = f"adobe_ps/output_{timestamp}_{file_name}" # Upload the file to GCS logger.info(f"Uploading file to GCS: {remote_path}") upload_result = gcs_storage.upload_file(psd_path, remote_path) if not upload_result.get("success"): logger.error(f"Failed to upload file to GCS: {upload_result.get('message')}") return upload_result # Generate signed URLs for input and output input_url = upload_result.get("download_url") # Generate a specific output URL try: output_url = gcs_storage.get_signed_url( output_path, action="write", content_type="image/vnd.adobe.photoshop" ) except Exception as e: logger.error(f"Error generating output signed URL: {str(e)}") return { "success": False, "message": f"Error generating output signed URL: {str(e)}" } logger.info(f"File uploaded successfully: {file_name}") logger.info(f"Input URL (read): {input_url[:60]}...{input_url[-20:]}") logger.info(f"Output URL (write): {output_url[:60]}...{output_url[-20:]}") return { "success": True, "message": f"PSD file uploaded successfully: {file_name}", "file_name": file_name, "bucket": GCS_BUCKET_NAME, "remote_path": remote_path, "output_path": output_path, "input_url": input_url, "output_url": output_url } except Exception as e: logger.error(f"Error uploading PSD file: {str(e)}") return { "success": False, "message": f"Error uploading PSD file: {str(e)}" } def update_text_with_api(input_url: str, output_url: str, layer_updates: list, api_key: str, access_token: str) -> dict: """ Send a text update request to the Adobe Photoshop API Args: input_url: Signed URL for the input PSD file output_url: Signed URL for the output PSD file layer_updates: List of layer updates to apply api_key: Adobe API key access_token: Adobe access token Returns: Dictionary with the API response details """ # Endpoint for text editing endpoint = "https://image.adobe.io/pie/psdService/text" # Set headers headers = { "x-api-key": api_key, "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } # Create the payload payload = { "inputs": [ { "href": input_url, "storage": "external" } ], "options": { "layers": layer_updates }, "outputs": [ { "href": output_url, "storage": "external", "type": "image/vnd.adobe.photoshop" } ] } # Log the request details logger.info(f"Sending text update request to: {endpoint}") logger.info(f"Request payload: {json.dumps(payload, indent=2)}") try: # Make the API request response = requests.post( endpoint, headers=headers, json=payload, timeout=30 ) # Log response logger.info(f"Response status: {response.status_code}") if response.text: try: resp_json = response.json() logger.info(f"Response content: {json.dumps(resp_json, indent=2)}") # Check for status URL for async operations if response.status_code == 202 and '_links' in resp_json and 'self' in resp_json.get('_links', {}): status_url = resp_json.get('_links', {}).get('self', {}).get('href') logger.info(f"Status URL: {status_url}") # Poll the status URL status_result = poll_status_url(status_url, access_token, api_key) # Return combined result return { "success": response.status_code == 202 or response.status_code == 200, "status_code": response.status_code, "message": f"API request {'accepted' if response.status_code == 202 else 'successful'}", "response": resp_json, "status_polling": status_result } except: logger.info(f"Response content: {response.text}") return { "success": response.status_code == 202 or response.status_code == 200, "status_code": response.status_code, "message": f"API request {'accepted' if response.status_code == 202 else 'successful'}", "response": response.text } except Exception as e: logger.error(f"Error making API request: {str(e)}") return { "success": False, "status_code": 0, "message": f"Error making API request: {str(e)}" } def poll_status_url(status_url: str, access_token: str, api_key: str, max_retries: int = 10, retry_interval: int = 5) -> dict: """ Poll the status URL to check processing status Args: status_url: URL to check status access_token: Adobe access token api_key: Adobe API key max_retries: Maximum number of retries retry_interval: Seconds between retries Returns: Dictionary with status details """ headers = { "x-api-key": api_key, "Authorization": f"Bearer {access_token}" } retry_count = 0 while retry_count < max_retries: try: logger.info(f"Checking status URL: Attempt {retry_count + 1}") # Wait before checking time.sleep(retry_interval) # Make request response = requests.get(status_url, headers=headers, timeout=20) if response.status_code == 200: try: status_data = response.json() status = status_data.get('status', '') logger.info(f"Status: {status}") logger.info(f"Status data: {json.dumps(status_data, indent=2)}") if status == 'succeeded': return { "success": True, "status": status, "message": "Processing completed successfully", "data": status_data } elif status == 'failed': error_message = status_data.get('error', {}).get('message', 'Unknown error') return { "success": False, "status": status, "message": f"Processing failed: {error_message}", "data": status_data } elif status == 'processing' or status == 'pending': logger.info(f"Still processing... waiting for completion") except Exception as e: logger.error(f"Error parsing status response: {str(e)}") else: logger.warning(f"Status check returned code: {response.status_code}") retry_count += 1 except Exception as e: logger.error(f"Error checking status: {str(e)}") retry_count += 1 return { "success": False, "status": "unknown", "message": f"Failed to get status after {max_retries} attempts" } def download_processed_file(output_path: str, local_directory: str) -> dict: """ Download a processed file from GCS Args: output_path: Remote path in GCS local_directory: Local directory to save the file Returns: Dictionary with download result """ if not gcs_storage: return { "success": False, "message": "GCS storage not initialized" } try: # Check if the file exists and wait for it logger.info(f"Checking for processed file: {output_path}") output_check = gcs_storage.check_output_file(output_path, wait_time=60) if not output_check.get("success"): return output_check # Create processed directory if needed processed_dir = os.path.join(local_directory, "processed") os.makedirs(processed_dir, exist_ok=True) # Set output filename output_filename = os.path.basename(output_path) if output_filename.startswith("output_"): output_filename = output_filename[7:] # Remove "output_" prefix local_path = os.path.join(processed_dir, output_filename) # Download the file logger.info(f"Downloading file to: {local_path}") download_result = gcs_storage.download_file(output_path, local_path) return download_result except Exception as e: logger.error(f"Error downloading processed file: {str(e)}") return { "success": False, "message": f"Error downloading processed file: {str(e)}" } def main(): """Main function to test various layer identification methods""" # Target files psd_path = "/Users/daveporter/Desktop/CODING-2024/Adobe-API-PS-scripts/TESTFILES/BATCH/Vichy-Product-Skincare-Liftactiv -Collagen 16 Bonding Serum -30ml-3337875912600-Safety.psd" json_path = "/Users/daveporter/Desktop/CODING-2024/Adobe-API-PS-scripts/TESTFILES/BATCH/Vichy-Product-Skincare-Liftactiv -Collagen 16 Bonding Serum -30ml-3337875912600-Safety-textonly-updated.json" # Load the JSON data json_data = load_json_file(json_path) if not json_data or "textLayers" not in json_data: logger.error("Invalid JSON data or no text layers found") return text_layers = json_data.get("textLayers", []) # Get access token try: access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES) logger.info(f"Got access token: {access_token[:15]}...{access_token[-15:]}") except Exception as e: logger.error(f"Error getting access token: {str(e)}") return # Upload PSD to GCS upload_result = upload_psd_to_gcs(psd_path) if not upload_result.get("success"): logger.error(f"Failed to upload PSD file: {upload_result.get('message')}") return input_url = upload_result.get("input_url") output_url = upload_result.get("output_url") output_path = upload_result.get("output_path") # Make an initial check of the layer data logger.info(f"Found {len(text_layers)} text layers in JSON data:") for idx, layer in enumerate(text_layers): layer_id = layer.get("id") layer_name = layer.get("name", "") original_text = layer.get("text", "") updated_text = layer.get("updatedText", "") logger.info(f"Layer {idx+1}: ID={layer_id}, Name=\"{layer_name}\"") logger.info(f" Original: \"{original_text}\"") logger.info(f" Updated: \"{updated_text}\"") # Test different layer identification methods test_methods = [ { "name": "Standard Method (ID as Integer)", "description": "Using the layer ID as an integer", "layer_updates": [ { "id": 1, # ID from JSON file for first layer (converted to integer) "text": { "content": text_layers[1].get("updatedText", "") # Use updated text from second layer (HYPOALLERGENIC) } } ] }, { "name": "ID from JSON Method", "description": "Using the exact ID from the JSON file", "layer_updates": [ { "id": 2, # ID from JSON file for second layer (converted to integer) "text": { "content": text_layers[0].get("updatedText", "") # Use updated text from first layer (DESIGNED FOR) } } ] }, { "name": "Using Name Method", "description": "Identifying layers by name instead of ID", "layer_updates": [ { "name": "HYPOALLERGENIC FORMULA", # Exact layer name from JSON "text": { "content": text_layers[1].get("updatedText", "") # Use updated text from this layer } } ] }, { "name": "Path Method", "description": "Using the layer path from JSON", "layer_updates": [ { "path": "13. Safety awards/seals/Groupe 17/DESIGNED FOR SENSITIVE SKIN", # Layer path from JSON "text": { "content": text_layers[0].get("updatedText", "") # Use updated text from this layer } } ] }, { "name": "Multiple Updates Method", "description": "Updating multiple layers at once", "layer_updates": [ { "id": 1, # First layer "text": { "content": text_layers[1].get("updatedText", "") # HYPOALLERGENIC layer text } }, { "id": 2, # Second layer "text": { "content": text_layers[0].get("updatedText", "") # DESIGNED FOR layer text } } ] } ] # Run tests for test in test_methods: logger.info("\n" + "="*50) logger.info(f"TESTING: {test['name']}") logger.info(f"Description: {test['description']}") logger.info("="*50) # Send API request result = update_text_with_api( input_url, output_url, test["layer_updates"], config.ADOBE_CLIENT_ID, access_token ) # Check result if result.get("success"): logger.info(f"API request successful with status {result.get('status_code')}") # If operation was accepted (202), download the processed file if result.get("status_code") == 202: logger.info("Waiting for processing to complete...") time.sleep(10) # Give Adobe API time to process # Download the processed file output_dir = os.path.dirname(psd_path) download_result = download_processed_file(output_path, output_dir) if download_result.get("success"): logger.info(f"Downloaded processed file to: {download_result.get('file_path')}") else: logger.error(f"Failed to download processed file: {download_result.get('message')}") else: logger.error(f"API request failed with status {result.get('status_code')}: {result.get('message')}") logger.info("\nAll tests completed") if __name__ == "__main__": main()