#!/usr/bin/env python3 """ Update PSD Text Layers Using Adobe API This script uses the Adobe Photoshop API to update text layers in PSD files. It works with the API-ready JSON files created by extract_and_update_json.py which contain the correct internal layer IDs needed by Adobe's API. Usage: python update_text_with_api.py --json-path /path/to/file-api-ready.json python update_text_with_api.py --directory /path/to/directory """ import os import sys import json import time import argparse import glob from pathlib import Path # Import local modules import config from adobe_token import AdobeTokenManager from adobe_ps_api import AdobeAPI from gcs_storage import GCSStorage def find_api_ready_json_files(directory): """ Find all API-ready JSON files in a directory Args: directory: The directory to search Returns: List of API-ready JSON file paths """ pattern = "*-api-ready.json" return glob.glob(os.path.join(directory, pattern)) def load_json_file(json_path): """ Load a JSON file Args: json_path: Path to the JSON file Returns: The loaded JSON data """ try: with open(json_path, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: print(f"Error loading JSON file {json_path}: {str(e)}") return None def prepare_text_layer_updates(json_data): """ Prepare text layer updates from JSON data Args: json_data: The loaded JSON data Returns: List of layer updates for the API """ text_layer_updates = [] for layer in json_data.get('textLayers', []): if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'): # Get font information if available style_info = layer.get('styleInfo', {}) font_name = style_info.get('font') if style_info else None font_size = style_info.get('size') if style_info else None # Create base layer update with name layer_update = { "name": layer.get('name', '') } # Create text object with content text_obj = { "content": layer.get('updatedText') } # Add proper characterStyles if available - this is the correct way per API spec if font_name and font_size: # Convert font size to points - Adobe API expects points (pixels/72) font_size_pts = float(font_size) / 72.0 # Add characterStyles array with the font info # The API only supports fontPostScriptName text_obj["characterStyles"] = [ { "fontPostScriptName": font_name, # Original font name "size": font_size_pts } ] # Add paragraph style too text_obj["paragraphStyles"] = [ { "alignment": style_info.get('alignment', 'left') } ] print(f"Added font '{font_name}' size {font_size_pts}pts (converted from {font_size}px) for layer '{layer.get('name')}'") # Add the text object to the layer update layer_update["text"] = text_obj text_layer_updates.append(layer_update) return text_layer_updates def update_text_with_api(json_path): """ Update text layers using the Adobe API Args: json_path: Path to the API-ready JSON file Returns: Dictionary with the result of the operation """ print(f"\nProcessing JSON file: {os.path.basename(json_path)}") # Load the JSON data json_data = load_json_file(json_path) if not json_data: return { "success": False, "message": f"Failed to load JSON file: {json_path}" } # Get document information document_name = json_data.get('documentName', '') psd_path = json_data.get('psdPath', '') if not document_name and not psd_path: return { "success": False, "message": "JSON file does not contain required document name or PSD path" } # Handle the PSD path if psd_path.startswith("~"): psd_path = os.path.expanduser(psd_path) elif not os.path.isabs(psd_path): # Try to find the PSD next to the JSON file json_dir = os.path.dirname(os.path.abspath(json_path)) possible_psd_path = os.path.join(json_dir, document_name or psd_path) if os.path.exists(possible_psd_path): psd_path = possible_psd_path else: # Try with the JSON file name json_base = os.path.basename(json_path).replace('-api-ready.json', '') possible_psd_path = os.path.join(json_dir, f"{json_base}.psd") if os.path.exists(possible_psd_path): psd_path = possible_psd_path if not os.path.exists(psd_path): return { "success": False, "message": f"PSD file not found: {psd_path}" } print(f"Using PSD file: {os.path.basename(psd_path)}") # Prepare the text layer updates text_layer_updates = prepare_text_layer_updates(json_data) if not text_layer_updates: print("No text updates needed - all layers are unchanged") return { "success": True, "message": "No text updates needed" } print(f"Prepared {len(text_layer_updates)} text layer updates") # Get token for API token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET) try: access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES) print(f"Got access token: {access_token[:15]}...{access_token[-15:]}") except Exception as e: return { "success": False, "message": f"Error getting access token: {str(e)}" } # Initialize the API client api = AdobeAPI(client_secret=config.ADOBE_CLIENT_SECRET) # Upload the PSD to GCS upload_result = api.upload_psd_to_cloud(psd_path) if not upload_result.get("success"): return { "success": False, "message": f"Failed to upload PSD file: {upload_result.get('message')}" } # Get the input and output URLs input_url = upload_result.get("input_url") output_url = upload_result.get("output_url") output_path = upload_result.get("output_path") # Get the global font from the first text layer if available global_font = None if json_data.get('textLayers'): for layer in json_data.get('textLayers'): style_info = layer.get('styleInfo', {}) if style_info and style_info.get('font'): global_font = style_info.get('font') break # Prepare the API request payload payload = { "inputs": [ { "href": input_url, "storage": "external" } ], "options": { "layers": text_layer_updates }, "outputs": [ { "href": output_url, "storage": "external", "type": "image/vnd.adobe.photoshop" } ] } # Add font management options if global_font: # Try adding font options according to the API schema payload["options"]["globalFont"] = global_font payload["options"]["manageMissingFonts"] = "useDefault" # Options supported by API: 'fail' or 'useDefault' print(f"Using global font: {global_font}") # Add font files if available in a fonts directory fonts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fonts") if os.path.exists(fonts_dir): font_files = [] # Check for font files that match our needed fonts for font in [global_font] + [layer.get('styleInfo', {}).get('font') for layer in json_data.get('textLayers', []) if layer.get('styleInfo')]: if not font: continue # Search for font files (.ttf, .otf, .ttc) that might match our font possible_font_files = glob.glob(os.path.join(fonts_dir, f"{font}*.ttf")) + \ glob.glob(os.path.join(fonts_dir, f"{font}*.otf")) + \ glob.glob(os.path.join(fonts_dir, f"{font}*.ttc")) + \ glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.ttf")) + \ glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.otf")) + \ glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.ttc")) + \ glob.glob(os.path.join(fonts_dir, f"*Futura*.ttc")) + \ glob.glob(os.path.join(fonts_dir, f"Futura.ttc")) for font_file in possible_font_files: if os.path.exists(font_file): print(f"Found font file for {font}: {os.path.basename(font_file)}") # Upload font file to GCS try: font_remote_path = f"fonts/{os.path.basename(font_file)}" font_upload_result = api.upload_font_to_cloud(font_file, font_remote_path) if font_upload_result.get("success"): font_url = font_upload_result.get("download_url") # Add font file URL to the font files array font_files.append({ "href": font_url, "fontFamily": font }) print(f"Uploaded font file for {font}") except Exception as font_err: print(f"Error uploading font file {font_file}: {str(font_err)}") # The Adobe API doesn't support fontFiles in the options # Instead, we'll rely on characterStyles with the correct fontPostScriptName if font_files: print(f"Found {len(font_files)} font files, but cannot attach them to the request") # Set the headers headers = { "x-api-key": config.ADOBE_CLIENT_ID, "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } # Set the endpoint endpoint = "https://image.adobe.io/pie/psdService/text" # Make the API request print(f"Sending API request to: {endpoint}") print("Request payload:") print(json.dumps(payload.get("options", {}), indent=2)) try: import requests response = requests.post( endpoint, headers=headers, json=payload, timeout=30 ) print(f"Response status: {response.status_code}") try: resp_json = response.json() print(f"Response: {json.dumps(resp_json, indent=2)}") except: print(f"Response: {response.text}") if response.status_code == 202 or response.status_code == 200: print("API request successful") # For 202 responses, check the status URL if response.status_code == 202 and '_links' in response.json() and 'self' in response.json().get('_links', {}): status_url = response.json().get('_links', {}).get('self', {}).get('href') print(f"Status URL: {status_url}") # Monitor status print("Checking processing status...") max_checks = 10 check_count = 0 while check_count < max_checks: check_count += 1 print(f"Status check {check_count}/{max_checks}...") # Wait before checking time.sleep(5) # Check status status_response = requests.get( status_url, headers={ "x-api-key": config.ADOBE_CLIENT_ID, "Authorization": f"Bearer {access_token}" }, timeout=20 ) if status_response.status_code == 200: status_data = status_response.json() status = status_data.get('status', '') print(f"Processing status: {status}") if status == 'succeeded': print("Processing completed successfully!") break elif status == 'failed': error_message = status_data.get('error', {}).get('message', 'Unknown error') print(f"Processing failed: {error_message}") return { "success": False, "message": f"Processing failed: {error_message}" } if check_count >= max_checks: print("Maximum status checks reached") # Download the processed file output_dir = os.path.dirname(psd_path) processed_dir = os.path.join(output_dir, "processed") os.makedirs(processed_dir, exist_ok=True) # Wait for the output file print("Waiting for output file to be available...") time.sleep(5) # Short wait # Initialize GCS storage gcs_storage = GCSStorage(config.GCS_BUCKET_NAME, key_path=config.GCS_KEY_PATH) # Check for the output file output_check = gcs_storage.check_output_file(output_path, wait_time=60) if output_check.get("success"): # Download the processed file - use the same folder as original output_dir = os.path.dirname(psd_path) output_filename = f"api_updated_{os.path.basename(psd_path)}" output_file_path = os.path.join(output_dir, output_filename) print(f"Downloading processed file to: {output_file_path}") download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True) # Also clean up the input file try: # Get remote_path from upload_result input_path = upload_result.get("input_path") if input_path: input_dir = os.path.dirname(input_path) gcs_storage.cleanup_files(input_dir) except Exception as clean_err: print(f"Warning: Error cleaning up temporary files: {str(clean_err)}") if download_result.get("success"): print(f"Successfully downloaded processed file: {output_file_path}") return { "success": True, "message": "Text update successful", "processed_file": output_file_path } else: print(f"Failed to download processed file: {download_result.get('message')}") return { "success": True, "message": "Text update successful but failed to download processed file", "error": download_result.get('message') } else: print(f"Output file not found: {output_check.get('message')}") return { "success": True, "message": "Text update request accepted but output file not found", "error": output_check.get('message') } else: error_message = "Unknown error" try: error_message = response.json().get('message', error_message) except: pass return { "success": False, "message": f"API request failed with status {response.status_code}: {error_message}" } except Exception as e: return { "success": False, "message": f"Error making API request: {str(e)}" } def process_directory(directory): """ Process all API-ready JSON files in a directory Args: directory: Directory containing API-ready JSON files Returns: Dictionary with results for each file """ print(f"Scanning directory for API-ready JSON files: {directory}") json_files = find_api_ready_json_files(directory) if not json_files: print("No API-ready JSON files found in the directory") return {} print(f"Found {len(json_files)} API-ready JSON files to process") results = {} for json_path in json_files: print("\n" + "="*70) print(f"Processing: {os.path.basename(json_path)}") print("="*70) result = update_text_with_api(json_path) results[json_path] = result print("-"*70) print(f"Result: {result.get('message')}") if 'processed_file' in result: print(f"Processed File: {result.get('processed_file')}") if 'error' in result: print(f"Error: {result.get('error')}") print("-"*70) return results def main(): """Main function""" parser = argparse.ArgumentParser( description="Update PSD text layers using Adobe API with correct internal layer IDs", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Process a specific API-ready JSON file: python update_text_with_api.py --json-path /path/to/file-api-ready.json # Process all API-ready JSON files in a directory: python update_text_with_api.py --directory /path/to/files # Process files in the current directory: python update_text_with_api.py """ ) parser.add_argument('--json-path', help='Path to the API-ready JSON file') parser.add_argument('--directory', help='Directory containing API-ready JSON files to process') args = parser.parse_args() # Check arguments and determine what to process if args.json_path: # Process a specific JSON file if not os.path.exists(args.json_path): print(f"Error: JSON file not found: {args.json_path}") return print(f"Processing single JSON file: {args.json_path}") result = update_text_with_api(args.json_path) print("\nResult:") print(f"Success: {result.get('success')}") print(f"Message: {result.get('message')}") if 'processed_file' in result: print(f"Processed File: {result.get('processed_file')}") if not result.get('success'): print(f"Error: {result.get('error', 'Unknown error')}") elif args.directory: # Process all JSON files in a directory if not os.path.isdir(args.directory): print(f"Error: Directory not found: {args.directory}") return results = process_directory(args.directory) # Summarize results if results: success_count = sum(1 for r in results.values() if r.get('success')) print(f"\nProcessed {len(results)} files: {success_count} succeeded, {len(results) - success_count} failed") print("\nSuccessful files:") for path, result in results.items(): if result.get('success'): print(f" - {os.path.basename(path)}") if 'processed_file' in result: print(f" => {os.path.basename(result.get('processed_file'))}") if len(results) - success_count > 0: print("\nFailed files:") for path, result in results.items(): if not result.get('success'): print(f" - {os.path.basename(path)}: {result.get('message')}") else: # Process the current directory results = process_directory(os.getcwd()) # Summarize results if results: success_count = sum(1 for r in results.values() if r.get('success')) print(f"\nProcessed {len(results)} files: {success_count} succeeded, {len(results) - success_count} failed") print("\nSuccessful files:") for path, result in results.items(): if result.get('success'): print(f" - {os.path.basename(path)}") if 'processed_file' in result: print(f" => {os.path.basename(result.get('processed_file'))}") if len(results) - success_count > 0: print("\nFailed files:") for path, result in results.items(): if not result.get('success'): print(f" - {os.path.basename(path)}: {result.get('message')}") else: print("\nNo API-ready JSON files found in the current directory.") print("First run extract_and_update_json.py to create API-ready JSON files.") if __name__ == "__main__": main()