adobe-ps-scripts-loreal/update_text_with_api.py
DJP 4a192a8c97 Initial commit: Adobe Photoshop API text management scripts
Local and cloud-based workflows for extracting and updating
text layers in PSD files via ExtendScript and Adobe PS API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 13:46:52 -05:00

586 lines
No EOL
22 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Update PSD Text Layers Using Adobe API
This script uses the Adobe Photoshop API to update text layers in PSD files.
It works with the API-ready JSON files created by extract_and_update_json.py
which contain the correct internal layer IDs needed by Adobe's API.
Usage:
python update_text_with_api.py --json-path /path/to/file-api-ready.json
python update_text_with_api.py --directory /path/to/directory
"""
import os
import sys
import json
import time
import argparse
import glob
from pathlib import Path
# Import local modules
import config
from adobe_token import AdobeTokenManager
from adobe_ps_api import AdobeAPI
from gcs_storage import GCSStorage
def find_api_ready_json_files(directory):
"""
Find all API-ready JSON files in a directory
Args:
directory: The directory to search
Returns:
List of API-ready JSON file paths
"""
pattern = "*-api-ready.json"
return glob.glob(os.path.join(directory, pattern))
def load_json_file(json_path):
"""
Load a JSON file
Args:
json_path: Path to the JSON file
Returns:
The loaded JSON data
"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
print(f"Error loading JSON file {json_path}: {str(e)}")
return None
def prepare_text_layer_updates(json_data):
"""
Prepare text layer updates from JSON data
Args:
json_data: The loaded JSON data
Returns:
List of layer updates for the API
"""
text_layer_updates = []
for layer in json_data.get('textLayers', []):
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
# Get font information if available
style_info = layer.get('styleInfo', {})
font_name = style_info.get('font') if style_info else None
font_size = style_info.get('size') if style_info else None
# Create base layer update with name
layer_update = {
"name": layer.get('name', '')
}
# Create text object with content
text_obj = {
"content": layer.get('updatedText')
}
# Add proper characterStyles if available - this is the correct way per API spec
if font_name and font_size:
# Convert font size to points - Adobe API expects points (pixels/72)
font_size_pts = float(font_size) / 72.0
# Add characterStyles array with the font info
# The API only supports fontPostScriptName
text_obj["characterStyles"] = [
{
"fontPostScriptName": font_name, # Original font name
"size": font_size_pts
}
]
# Add paragraph style too
text_obj["paragraphStyles"] = [
{
"alignment": style_info.get('alignment', 'left')
}
]
print(f"Added font '{font_name}' size {font_size_pts}pts (converted from {font_size}px) for layer '{layer.get('name')}'")
# Add the text object to the layer update
layer_update["text"] = text_obj
text_layer_updates.append(layer_update)
return text_layer_updates
def update_text_with_api(json_path):
"""
Update text layers using the Adobe API
Args:
json_path: Path to the API-ready JSON file
Returns:
Dictionary with the result of the operation
"""
print(f"\nProcessing JSON file: {os.path.basename(json_path)}")
# Load the JSON data
json_data = load_json_file(json_path)
if not json_data:
return {
"success": False,
"message": f"Failed to load JSON file: {json_path}"
}
# Get document information
document_name = json_data.get('documentName', '')
psd_path = json_data.get('psdPath', '')
if not document_name and not psd_path:
return {
"success": False,
"message": "JSON file does not contain required document name or PSD path"
}
# Handle the PSD path
if psd_path.startswith("~"):
psd_path = os.path.expanduser(psd_path)
elif not os.path.isabs(psd_path):
# Try to find the PSD next to the JSON file
json_dir = os.path.dirname(os.path.abspath(json_path))
possible_psd_path = os.path.join(json_dir, document_name or psd_path)
if os.path.exists(possible_psd_path):
psd_path = possible_psd_path
else:
# Try with the JSON file name
json_base = os.path.basename(json_path).replace('-api-ready.json', '')
possible_psd_path = os.path.join(json_dir, f"{json_base}.psd")
if os.path.exists(possible_psd_path):
psd_path = possible_psd_path
if not os.path.exists(psd_path):
return {
"success": False,
"message": f"PSD file not found: {psd_path}"
}
print(f"Using PSD file: {os.path.basename(psd_path)}")
# Prepare the text layer updates
text_layer_updates = prepare_text_layer_updates(json_data)
if not text_layer_updates:
print("No text updates needed - all layers are unchanged")
return {
"success": True,
"message": "No text updates needed"
}
print(f"Prepared {len(text_layer_updates)} text layer updates")
# Get token for API
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
try:
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
print(f"Got access token: {access_token[:15]}...{access_token[-15:]}")
except Exception as e:
return {
"success": False,
"message": f"Error getting access token: {str(e)}"
}
# Initialize the API client
api = AdobeAPI(client_secret=config.ADOBE_CLIENT_SECRET)
# Upload the PSD to GCS
upload_result = api.upload_psd_to_cloud(psd_path)
if not upload_result.get("success"):
return {
"success": False,
"message": f"Failed to upload PSD file: {upload_result.get('message')}"
}
# Get the input and output URLs
input_url = upload_result.get("input_url")
output_url = upload_result.get("output_url")
output_path = upload_result.get("output_path")
# Get the global font from the first text layer if available
global_font = None
if json_data.get('textLayers'):
for layer in json_data.get('textLayers'):
style_info = layer.get('styleInfo', {})
if style_info and style_info.get('font'):
global_font = style_info.get('font')
break
# Prepare the API request payload
payload = {
"inputs": [
{
"href": input_url,
"storage": "external"
}
],
"options": {
"layers": text_layer_updates
},
"outputs": [
{
"href": output_url,
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
}
# Add font management options
if global_font:
# Try adding font options according to the API schema
payload["options"]["globalFont"] = global_font
payload["options"]["manageMissingFonts"] = "useDefault" # Options supported by API: 'fail' or 'useDefault'
print(f"Using global font: {global_font}")
# Add font files if available in a fonts directory
fonts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fonts")
if os.path.exists(fonts_dir):
font_files = []
# Check for font files that match our needed fonts
for font in [global_font] + [layer.get('styleInfo', {}).get('font') for layer in json_data.get('textLayers', []) if layer.get('styleInfo')]:
if not font:
continue
# Search for font files (.ttf, .otf, .ttc) that might match our font
possible_font_files = glob.glob(os.path.join(fonts_dir, f"{font}*.ttf")) + \
glob.glob(os.path.join(fonts_dir, f"{font}*.otf")) + \
glob.glob(os.path.join(fonts_dir, f"{font}*.ttc")) + \
glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.ttf")) + \
glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.otf")) + \
glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.ttc")) + \
glob.glob(os.path.join(fonts_dir, f"*Futura*.ttc")) + \
glob.glob(os.path.join(fonts_dir, f"Futura.ttc"))
for font_file in possible_font_files:
if os.path.exists(font_file):
print(f"Found font file for {font}: {os.path.basename(font_file)}")
# Upload font file to GCS
try:
font_remote_path = f"fonts/{os.path.basename(font_file)}"
font_upload_result = api.upload_font_to_cloud(font_file, font_remote_path)
if font_upload_result.get("success"):
font_url = font_upload_result.get("download_url")
# Add font file URL to the font files array
font_files.append({
"href": font_url,
"fontFamily": font
})
print(f"Uploaded font file for {font}")
except Exception as font_err:
print(f"Error uploading font file {font_file}: {str(font_err)}")
# The Adobe API doesn't support fontFiles in the options
# Instead, we'll rely on characterStyles with the correct fontPostScriptName
if font_files:
print(f"Found {len(font_files)} font files, but cannot attach them to the request")
# Set the headers
headers = {
"x-api-key": config.ADOBE_CLIENT_ID,
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Set the endpoint
endpoint = "https://image.adobe.io/pie/psdService/text"
# Make the API request
print(f"Sending API request to: {endpoint}")
print("Request payload:")
print(json.dumps(payload.get("options", {}), indent=2))
try:
import requests
response = requests.post(
endpoint,
headers=headers,
json=payload,
timeout=30
)
print(f"Response status: {response.status_code}")
try:
resp_json = response.json()
print(f"Response: {json.dumps(resp_json, indent=2)}")
except:
print(f"Response: {response.text}")
if response.status_code == 202 or response.status_code == 200:
print("API request successful")
# For 202 responses, check the status URL
if response.status_code == 202 and '_links' in response.json() and 'self' in response.json().get('_links', {}):
status_url = response.json().get('_links', {}).get('self', {}).get('href')
print(f"Status URL: {status_url}")
# Monitor status
print("Checking processing status...")
max_checks = 10
check_count = 0
while check_count < max_checks:
check_count += 1
print(f"Status check {check_count}/{max_checks}...")
# Wait before checking
time.sleep(5)
# Check status
status_response = requests.get(
status_url,
headers={
"x-api-key": config.ADOBE_CLIENT_ID,
"Authorization": f"Bearer {access_token}"
},
timeout=20
)
if status_response.status_code == 200:
status_data = status_response.json()
status = status_data.get('status', '')
print(f"Processing status: {status}")
if status == 'succeeded':
print("Processing completed successfully!")
break
elif status == 'failed':
error_message = status_data.get('error', {}).get('message', 'Unknown error')
print(f"Processing failed: {error_message}")
return {
"success": False,
"message": f"Processing failed: {error_message}"
}
if check_count >= max_checks:
print("Maximum status checks reached")
# Download the processed file
output_dir = os.path.dirname(psd_path)
processed_dir = os.path.join(output_dir, "processed")
os.makedirs(processed_dir, exist_ok=True)
# Wait for the output file
print("Waiting for output file to be available...")
time.sleep(5) # Short wait
# Initialize GCS storage
gcs_storage = GCSStorage(config.GCS_BUCKET_NAME, key_path=config.GCS_KEY_PATH)
# Check for the output file
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
if output_check.get("success"):
# Download the processed file - use the same folder as original
output_dir = os.path.dirname(psd_path)
output_filename = f"api_updated_{os.path.basename(psd_path)}"
output_file_path = os.path.join(output_dir, output_filename)
print(f"Downloading processed file to: {output_file_path}")
download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True)
# Also clean up the input file
try:
# Get remote_path from upload_result
input_path = upload_result.get("input_path")
if input_path:
input_dir = os.path.dirname(input_path)
gcs_storage.cleanup_files(input_dir)
except Exception as clean_err:
print(f"Warning: Error cleaning up temporary files: {str(clean_err)}")
if download_result.get("success"):
print(f"Successfully downloaded processed file: {output_file_path}")
return {
"success": True,
"message": "Text update successful",
"processed_file": output_file_path
}
else:
print(f"Failed to download processed file: {download_result.get('message')}")
return {
"success": True,
"message": "Text update successful but failed to download processed file",
"error": download_result.get('message')
}
else:
print(f"Output file not found: {output_check.get('message')}")
return {
"success": True,
"message": "Text update request accepted but output file not found",
"error": output_check.get('message')
}
else:
error_message = "Unknown error"
try:
error_message = response.json().get('message', error_message)
except:
pass
return {
"success": False,
"message": f"API request failed with status {response.status_code}: {error_message}"
}
except Exception as e:
return {
"success": False,
"message": f"Error making API request: {str(e)}"
}
def process_directory(directory):
"""
Process all API-ready JSON files in a directory
Args:
directory: Directory containing API-ready JSON files
Returns:
Dictionary with results for each file
"""
print(f"Scanning directory for API-ready JSON files: {directory}")
json_files = find_api_ready_json_files(directory)
if not json_files:
print("No API-ready JSON files found in the directory")
return {}
print(f"Found {len(json_files)} API-ready JSON files to process")
results = {}
for json_path in json_files:
print("\n" + "="*70)
print(f"Processing: {os.path.basename(json_path)}")
print("="*70)
result = update_text_with_api(json_path)
results[json_path] = result
print("-"*70)
print(f"Result: {result.get('message')}")
if 'processed_file' in result:
print(f"Processed File: {result.get('processed_file')}")
if 'error' in result:
print(f"Error: {result.get('error')}")
print("-"*70)
return results
def main():
"""Main function"""
parser = argparse.ArgumentParser(
description="Update PSD text layers using Adobe API with correct internal layer IDs",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Process a specific API-ready JSON file:
python update_text_with_api.py --json-path /path/to/file-api-ready.json
# Process all API-ready JSON files in a directory:
python update_text_with_api.py --directory /path/to/files
# Process files in the current directory:
python update_text_with_api.py
"""
)
parser.add_argument('--json-path', help='Path to the API-ready JSON file')
parser.add_argument('--directory', help='Directory containing API-ready JSON files to process')
args = parser.parse_args()
# Check arguments and determine what to process
if args.json_path:
# Process a specific JSON file
if not os.path.exists(args.json_path):
print(f"Error: JSON file not found: {args.json_path}")
return
print(f"Processing single JSON file: {args.json_path}")
result = update_text_with_api(args.json_path)
print("\nResult:")
print(f"Success: {result.get('success')}")
print(f"Message: {result.get('message')}")
if 'processed_file' in result:
print(f"Processed File: {result.get('processed_file')}")
if not result.get('success'):
print(f"Error: {result.get('error', 'Unknown error')}")
elif args.directory:
# Process all JSON files in a directory
if not os.path.isdir(args.directory):
print(f"Error: Directory not found: {args.directory}")
return
results = process_directory(args.directory)
# Summarize results
if results:
success_count = sum(1 for r in results.values() if r.get('success'))
print(f"\nProcessed {len(results)} files: {success_count} succeeded, {len(results) - success_count} failed")
print("\nSuccessful files:")
for path, result in results.items():
if result.get('success'):
print(f" - {os.path.basename(path)}")
if 'processed_file' in result:
print(f" => {os.path.basename(result.get('processed_file'))}")
if len(results) - success_count > 0:
print("\nFailed files:")
for path, result in results.items():
if not result.get('success'):
print(f" - {os.path.basename(path)}: {result.get('message')}")
else:
# Process the current directory
results = process_directory(os.getcwd())
# Summarize results
if results:
success_count = sum(1 for r in results.values() if r.get('success'))
print(f"\nProcessed {len(results)} files: {success_count} succeeded, {len(results) - success_count} failed")
print("\nSuccessful files:")
for path, result in results.items():
if result.get('success'):
print(f" - {os.path.basename(path)}")
if 'processed_file' in result:
print(f" => {os.path.basename(result.get('processed_file'))}")
if len(results) - success_count > 0:
print("\nFailed files:")
for path, result in results.items():
if not result.get('success'):
print(f" - {os.path.basename(path)}: {result.get('message')}")
else:
print("\nNo API-ready JSON files found in the current directory.")
print("First run extract_and_update_json.py to create API-ready JSON files.")
if __name__ == "__main__":
main()