#!/usr/bin/env python3 """ Adobe Photoshop API Text Update - Complete Payload Example --------------------------------------------------------- This script demonstrates how to structure a complete text update payload for the Adobe Photoshop API (https://image.adobe.io/pie/psdService/text) based on the official documentation. Use this to test updates on a single file. """ import os import sys import json import time import logging import argparse import requests from pathlib import Path from typing import Dict, List, Any, Optional, Tuple # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Import local config, token manager, and GCS storage import config from adobe_token import AdobeTokenManager from gcs_storage import GCSStorage # Initialize token manager token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET) # GCS bucket configuration GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json") # Initialize GCS storage gcs_storage = None if os.path.exists(GCS_KEY_PATH): try: gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH) logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}") except Exception as e: logger.error(f"Error initializing GCS storage: {str(e)}") def update_text_with_complete_payload(json_path: str, psd_path: str) -> Dict[str, Any]: """ Update text in a PSD file using the complete payload structure from Adobe documentation. Args: json_path: Path to the JSON file containing text data psd_path: Path to the PSD file to update Returns: Dictionary with result info """ logger.info(f"Updating text in {psd_path} using data from {json_path}") # First check if files exist if not os.path.exists(json_path): return {"success": False, "message": f"JSON file not found: {json_path}"} if not os.path.exists(psd_path): return {"success": False, "message": f"PSD file not found: {psd_path}"} # Load JSON data try: with open(json_path, 'r', encoding='utf-8') as f: json_data = json.load(f) except Exception as e: return {"success": False, "message": f"Error reading JSON file: {str(e)}"} # Get text layers from JSON text_layers = json_data.get('textLayers', []) if not text_layers: return {"success": False, "message": "No text layers found in JSON data"} # Upload PSD file to GCS if not gcs_storage: return {"success": False, "message": "GCS storage not initialized"} try: # Generate timestamp-based paths timestamp = int(time.time()) remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}" output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}" # Upload file upload_result = gcs_storage.upload_file(psd_path, remote_path) if not upload_result.get("success"): return {"success": False, "message": f"Failed to upload PSD: {upload_result.get('message')}"} # Get signed URLs input_url = upload_result.get("download_url") output_url = gcs_storage.get_signed_url( output_path, action="write", content_type="image/vnd.adobe.photoshop" ) except Exception as e: return {"success": False, "message": f"Error with GCS: {str(e)}"} # Get authentication token try: access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES) except Exception as e: return {"success": False, "message": f"Failed to get access token: {str(e)}"} # Prepare text layer updates using complete payload structure layer_updates = [] for layer in text_layers: # Only process layers with changed text if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'): # Create a layer update using name as identifier layer_update = { "name": layer.get('name', '') } # Add the text content but keep original formatting text_obj = {} # Just update the content without specifying styles at all # This allows Adobe API to maintain original formatting text_obj["content"] = layer.get('updatedText') # Add the text object to layer update layer_update["text"] = text_obj layer_updates.append(layer_update) if not layer_updates: return {"success": False, "message": "No text changes needed"} # Build the complete payload according to Adobe documentation full_payload = { "inputs": [ { "storage": "external", "href": input_url } ], "options": { "manageMissingFonts": "useDefault", "layers": layer_updates }, "outputs": [ { "storage": "external", "href": output_url, "type": "image/vnd.adobe.photoshop", "overwrite": True } ] } # Setup request headers headers = { "x-api-key": config.ADOBE_CLIENT_ID, "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } # Log the payload for debugging logger.info(f"Request payload: {json.dumps(full_payload, indent=2)}") # Make the API request to Adobe try: endpoint = "https://image.adobe.io/pie/psdService/text" logger.info(f"Sending request to {endpoint}") response = requests.post( endpoint, headers=headers, json=full_payload, timeout=30 ) # Log response logger.info(f"Response status: {response.status_code}") if response.text: try: resp_data = response.json() logger.info(f"Response: {json.dumps(resp_data, indent=2)}") except: logger.info(f"Response text: {response.text}") # Process response if response.status_code == 200 or response.status_code == 202: result = response.json() # For async processing, monitor status if response.status_code == 202 and '_links' in result: status_url = result.get('_links', {}).get('self', {}).get('href') logger.info(f"Request accepted. Status URL: {status_url}") # Poll status URL if status_url: max_retries = 12 # 60 seconds (5 sec intervals) retry_count = 0 while retry_count < max_retries: time.sleep(5) # Wait 5 seconds between checks status_response = requests.get( status_url, headers={"Authorization": f"Bearer {access_token}", "x-api-key": config.ADOBE_CLIENT_ID}, timeout=30 ) if status_response.status_code == 200: status_data = status_response.json() status = status_data.get('status', '') logger.info(f"Status: {status}") if status == 'succeeded': logger.info("Processing completed successfully!") break elif status == 'failed': error_msg = status_data.get('error', {}).get('message', 'Unknown error') logger.error(f"Processing failed: {error_msg}") return {"success": False, "message": f"API processing failed: {error_msg}"} retry_count += 1 # Try to download the processed file try: logger.info(f"Waiting for output file: {output_path}") output_check = gcs_storage.check_output_file(output_path, wait_time=60) if output_check.get("success"): output_dir = os.path.dirname(psd_path) processed_dir = os.path.join(output_dir, "processed") # Create processed directory if needed if not os.path.exists(processed_dir): os.makedirs(processed_dir, exist_ok=True) output_filename = f"processed_{os.path.basename(psd_path)}" output_file_path = os.path.join(processed_dir, output_filename) logger.info(f"Downloading processed file to: {output_file_path}") download_result = gcs_storage.download_file(output_path, output_file_path) if download_result.get("success"): logger.info(f"Successfully downloaded to: {output_file_path}") return { "success": True, "message": "Text update completed and output file downloaded", "processed_file": output_file_path } else: logger.warning(f"Download failed: {download_result.get('message')}") else: logger.warning(f"Output file not found: {output_check.get('message')}") except Exception as dl_err: logger.error(f"Error downloading output: {str(dl_err)}") # Even if download fails, return success for the API call itself return { "success": True, "message": "Text update request processed successfully", "api_result": result } else: # API call failed error_message = "Unknown error" try: error_data = response.json() error_message = error_data.get('message', error_data.get('title', response.text)) except: error_message = response.text if response.text else f"HTTP {response.status_code}" return { "success": False, "message": f"Text update failed: {error_message}", "status_code": response.status_code } except Exception as e: return {"success": False, "message": f"Request error: {str(e)}"} def main(): """Parse arguments and run the script""" parser = argparse.ArgumentParser( description='Update text in a PSD file using a complete payload based on the Adobe API documentation' ) parser.add_argument('json_file', help='Path to JSON file with text data') parser.add_argument('psd_file', help='Path to PSD file to update') parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') args = parser.parse_args() # Set verbose logging if requested if args.verbose: logger.setLevel(logging.DEBUG) # Run the update result = update_text_with_complete_payload(args.json_file, args.psd_file) # Print results if result["success"]: print(f"\nSuccess: {result['message']}") if "processed_file" in result: print(f"Processed file: {result['processed_file']}") else: print(f"\nError: {result['message']}") sys.exit(1) if __name__ == "__main__": main()