Local and cloud-based workflows for extracting and updating text layers in PSD files via ExtendScript and Adobe PS API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
586 lines
No EOL
22 KiB
Python
Executable file
586 lines
No EOL
22 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Update PSD Text Layers Using Adobe API
|
|
|
|
This script uses the Adobe Photoshop API to update text layers in PSD files.
|
|
It works with the API-ready JSON files created by extract_and_update_json.py
|
|
which contain the correct internal layer IDs needed by Adobe's API.
|
|
|
|
Usage:
|
|
python update_text_with_api.py --json-path /path/to/file-api-ready.json
|
|
python update_text_with_api.py --directory /path/to/directory
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import argparse
|
|
import glob
|
|
from pathlib import Path
|
|
|
|
# Import local modules
|
|
import config
|
|
from adobe_token import AdobeTokenManager
|
|
from adobe_ps_api import AdobeAPI
|
|
from gcs_storage import GCSStorage
|
|
|
|
def find_api_ready_json_files(directory):
|
|
"""
|
|
Find all API-ready JSON files in a directory
|
|
|
|
Args:
|
|
directory: The directory to search
|
|
|
|
Returns:
|
|
List of API-ready JSON file paths
|
|
"""
|
|
pattern = "*-api-ready.json"
|
|
return glob.glob(os.path.join(directory, pattern))
|
|
|
|
def load_json_file(json_path):
|
|
"""
|
|
Load a JSON file
|
|
|
|
Args:
|
|
json_path: Path to the JSON file
|
|
|
|
Returns:
|
|
The loaded JSON data
|
|
"""
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
print(f"Error loading JSON file {json_path}: {str(e)}")
|
|
return None
|
|
|
|
def prepare_text_layer_updates(json_data):
|
|
"""
|
|
Prepare text layer updates from JSON data
|
|
|
|
Args:
|
|
json_data: The loaded JSON data
|
|
|
|
Returns:
|
|
List of layer updates for the API
|
|
"""
|
|
text_layer_updates = []
|
|
|
|
for layer in json_data.get('textLayers', []):
|
|
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
|
|
# Get font information if available
|
|
style_info = layer.get('styleInfo', {})
|
|
font_name = style_info.get('font') if style_info else None
|
|
font_size = style_info.get('size') if style_info else None
|
|
|
|
# Create base layer update with name
|
|
layer_update = {
|
|
"name": layer.get('name', '')
|
|
}
|
|
|
|
# Create text object with content
|
|
text_obj = {
|
|
"content": layer.get('updatedText')
|
|
}
|
|
|
|
# Add proper characterStyles if available - this is the correct way per API spec
|
|
if font_name and font_size:
|
|
# Convert font size to points - Adobe API expects points (pixels/72)
|
|
font_size_pts = float(font_size) / 72.0
|
|
|
|
# Add characterStyles array with the font info
|
|
# The API only supports fontPostScriptName
|
|
text_obj["characterStyles"] = [
|
|
{
|
|
"fontPostScriptName": font_name, # Original font name
|
|
"size": font_size_pts
|
|
}
|
|
]
|
|
|
|
# Add paragraph style too
|
|
text_obj["paragraphStyles"] = [
|
|
{
|
|
"alignment": style_info.get('alignment', 'left')
|
|
}
|
|
]
|
|
|
|
print(f"Added font '{font_name}' size {font_size_pts}pts (converted from {font_size}px) for layer '{layer.get('name')}'")
|
|
|
|
# Add the text object to the layer update
|
|
layer_update["text"] = text_obj
|
|
|
|
text_layer_updates.append(layer_update)
|
|
|
|
return text_layer_updates
|
|
|
|
def update_text_with_api(json_path):
|
|
"""
|
|
Update text layers using the Adobe API
|
|
|
|
Args:
|
|
json_path: Path to the API-ready JSON file
|
|
|
|
Returns:
|
|
Dictionary with the result of the operation
|
|
"""
|
|
print(f"\nProcessing JSON file: {os.path.basename(json_path)}")
|
|
|
|
# Load the JSON data
|
|
json_data = load_json_file(json_path)
|
|
|
|
if not json_data:
|
|
return {
|
|
"success": False,
|
|
"message": f"Failed to load JSON file: {json_path}"
|
|
}
|
|
|
|
# Get document information
|
|
document_name = json_data.get('documentName', '')
|
|
psd_path = json_data.get('psdPath', '')
|
|
|
|
if not document_name and not psd_path:
|
|
return {
|
|
"success": False,
|
|
"message": "JSON file does not contain required document name or PSD path"
|
|
}
|
|
|
|
# Handle the PSD path
|
|
if psd_path.startswith("~"):
|
|
psd_path = os.path.expanduser(psd_path)
|
|
elif not os.path.isabs(psd_path):
|
|
# Try to find the PSD next to the JSON file
|
|
json_dir = os.path.dirname(os.path.abspath(json_path))
|
|
possible_psd_path = os.path.join(json_dir, document_name or psd_path)
|
|
if os.path.exists(possible_psd_path):
|
|
psd_path = possible_psd_path
|
|
else:
|
|
# Try with the JSON file name
|
|
json_base = os.path.basename(json_path).replace('-api-ready.json', '')
|
|
possible_psd_path = os.path.join(json_dir, f"{json_base}.psd")
|
|
if os.path.exists(possible_psd_path):
|
|
psd_path = possible_psd_path
|
|
|
|
if not os.path.exists(psd_path):
|
|
return {
|
|
"success": False,
|
|
"message": f"PSD file not found: {psd_path}"
|
|
}
|
|
|
|
print(f"Using PSD file: {os.path.basename(psd_path)}")
|
|
|
|
# Prepare the text layer updates
|
|
text_layer_updates = prepare_text_layer_updates(json_data)
|
|
|
|
if not text_layer_updates:
|
|
print("No text updates needed - all layers are unchanged")
|
|
return {
|
|
"success": True,
|
|
"message": "No text updates needed"
|
|
}
|
|
|
|
print(f"Prepared {len(text_layer_updates)} text layer updates")
|
|
|
|
# Get token for API
|
|
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
try:
|
|
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
print(f"Got access token: {access_token[:15]}...{access_token[-15:]}")
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"message": f"Error getting access token: {str(e)}"
|
|
}
|
|
|
|
# Initialize the API client
|
|
api = AdobeAPI(client_secret=config.ADOBE_CLIENT_SECRET)
|
|
|
|
# Upload the PSD to GCS
|
|
upload_result = api.upload_psd_to_cloud(psd_path)
|
|
|
|
if not upload_result.get("success"):
|
|
return {
|
|
"success": False,
|
|
"message": f"Failed to upload PSD file: {upload_result.get('message')}"
|
|
}
|
|
|
|
# Get the input and output URLs
|
|
input_url = upload_result.get("input_url")
|
|
output_url = upload_result.get("output_url")
|
|
output_path = upload_result.get("output_path")
|
|
|
|
# Get the global font from the first text layer if available
|
|
global_font = None
|
|
if json_data.get('textLayers'):
|
|
for layer in json_data.get('textLayers'):
|
|
style_info = layer.get('styleInfo', {})
|
|
if style_info and style_info.get('font'):
|
|
global_font = style_info.get('font')
|
|
break
|
|
|
|
# Prepare the API request payload
|
|
payload = {
|
|
"inputs": [
|
|
{
|
|
"href": input_url,
|
|
"storage": "external"
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": text_layer_updates
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": output_url,
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Add font management options
|
|
if global_font:
|
|
# Try adding font options according to the API schema
|
|
payload["options"]["globalFont"] = global_font
|
|
payload["options"]["manageMissingFonts"] = "useDefault" # Options supported by API: 'fail' or 'useDefault'
|
|
print(f"Using global font: {global_font}")
|
|
|
|
# Add font files if available in a fonts directory
|
|
fonts_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "fonts")
|
|
if os.path.exists(fonts_dir):
|
|
font_files = []
|
|
# Check for font files that match our needed fonts
|
|
for font in [global_font] + [layer.get('styleInfo', {}).get('font') for layer in json_data.get('textLayers', []) if layer.get('styleInfo')]:
|
|
if not font:
|
|
continue
|
|
|
|
# Search for font files (.ttf, .otf, .ttc) that might match our font
|
|
possible_font_files = glob.glob(os.path.join(fonts_dir, f"{font}*.ttf")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"{font}*.otf")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"{font}*.ttc")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.ttf")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.otf")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"{font.replace('-', '')}*.ttc")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"*Futura*.ttc")) + \
|
|
glob.glob(os.path.join(fonts_dir, f"Futura.ttc"))
|
|
|
|
for font_file in possible_font_files:
|
|
if os.path.exists(font_file):
|
|
print(f"Found font file for {font}: {os.path.basename(font_file)}")
|
|
|
|
# Upload font file to GCS
|
|
try:
|
|
font_remote_path = f"fonts/{os.path.basename(font_file)}"
|
|
font_upload_result = api.upload_font_to_cloud(font_file, font_remote_path)
|
|
|
|
if font_upload_result.get("success"):
|
|
font_url = font_upload_result.get("download_url")
|
|
|
|
# Add font file URL to the font files array
|
|
font_files.append({
|
|
"href": font_url,
|
|
"fontFamily": font
|
|
})
|
|
print(f"Uploaded font file for {font}")
|
|
except Exception as font_err:
|
|
print(f"Error uploading font file {font_file}: {str(font_err)}")
|
|
|
|
# The Adobe API doesn't support fontFiles in the options
|
|
# Instead, we'll rely on characterStyles with the correct fontPostScriptName
|
|
if font_files:
|
|
print(f"Found {len(font_files)} font files, but cannot attach them to the request")
|
|
|
|
# Set the headers
|
|
headers = {
|
|
"x-api-key": config.ADOBE_CLIENT_ID,
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Set the endpoint
|
|
endpoint = "https://image.adobe.io/pie/psdService/text"
|
|
|
|
# Make the API request
|
|
print(f"Sending API request to: {endpoint}")
|
|
print("Request payload:")
|
|
print(json.dumps(payload.get("options", {}), indent=2))
|
|
|
|
try:
|
|
import requests
|
|
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
print(f"Response status: {response.status_code}")
|
|
|
|
try:
|
|
resp_json = response.json()
|
|
print(f"Response: {json.dumps(resp_json, indent=2)}")
|
|
except:
|
|
print(f"Response: {response.text}")
|
|
|
|
if response.status_code == 202 or response.status_code == 200:
|
|
print("API request successful")
|
|
|
|
# For 202 responses, check the status URL
|
|
if response.status_code == 202 and '_links' in response.json() and 'self' in response.json().get('_links', {}):
|
|
status_url = response.json().get('_links', {}).get('self', {}).get('href')
|
|
print(f"Status URL: {status_url}")
|
|
|
|
# Monitor status
|
|
print("Checking processing status...")
|
|
max_checks = 10
|
|
check_count = 0
|
|
|
|
while check_count < max_checks:
|
|
check_count += 1
|
|
print(f"Status check {check_count}/{max_checks}...")
|
|
|
|
# Wait before checking
|
|
time.sleep(5)
|
|
|
|
# Check status
|
|
status_response = requests.get(
|
|
status_url,
|
|
headers={
|
|
"x-api-key": config.ADOBE_CLIENT_ID,
|
|
"Authorization": f"Bearer {access_token}"
|
|
},
|
|
timeout=20
|
|
)
|
|
|
|
if status_response.status_code == 200:
|
|
status_data = status_response.json()
|
|
status = status_data.get('status', '')
|
|
|
|
print(f"Processing status: {status}")
|
|
|
|
if status == 'succeeded':
|
|
print("Processing completed successfully!")
|
|
break
|
|
elif status == 'failed':
|
|
error_message = status_data.get('error', {}).get('message', 'Unknown error')
|
|
print(f"Processing failed: {error_message}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Processing failed: {error_message}"
|
|
}
|
|
|
|
if check_count >= max_checks:
|
|
print("Maximum status checks reached")
|
|
|
|
# Download the processed file
|
|
output_dir = os.path.dirname(psd_path)
|
|
processed_dir = os.path.join(output_dir, "processed")
|
|
os.makedirs(processed_dir, exist_ok=True)
|
|
|
|
# Wait for the output file
|
|
print("Waiting for output file to be available...")
|
|
time.sleep(5) # Short wait
|
|
|
|
# Initialize GCS storage
|
|
gcs_storage = GCSStorage(config.GCS_BUCKET_NAME, key_path=config.GCS_KEY_PATH)
|
|
|
|
# Check for the output file
|
|
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
|
|
|
|
if output_check.get("success"):
|
|
# Download the processed file - use the same folder as original
|
|
output_dir = os.path.dirname(psd_path)
|
|
output_filename = f"api_updated_{os.path.basename(psd_path)}"
|
|
output_file_path = os.path.join(output_dir, output_filename)
|
|
|
|
print(f"Downloading processed file to: {output_file_path}")
|
|
download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True)
|
|
|
|
# Also clean up the input file
|
|
try:
|
|
# Get remote_path from upload_result
|
|
input_path = upload_result.get("input_path")
|
|
if input_path:
|
|
input_dir = os.path.dirname(input_path)
|
|
gcs_storage.cleanup_files(input_dir)
|
|
except Exception as clean_err:
|
|
print(f"Warning: Error cleaning up temporary files: {str(clean_err)}")
|
|
|
|
if download_result.get("success"):
|
|
print(f"Successfully downloaded processed file: {output_file_path}")
|
|
return {
|
|
"success": True,
|
|
"message": "Text update successful",
|
|
"processed_file": output_file_path
|
|
}
|
|
else:
|
|
print(f"Failed to download processed file: {download_result.get('message')}")
|
|
return {
|
|
"success": True,
|
|
"message": "Text update successful but failed to download processed file",
|
|
"error": download_result.get('message')
|
|
}
|
|
else:
|
|
print(f"Output file not found: {output_check.get('message')}")
|
|
return {
|
|
"success": True,
|
|
"message": "Text update request accepted but output file not found",
|
|
"error": output_check.get('message')
|
|
}
|
|
|
|
else:
|
|
error_message = "Unknown error"
|
|
try:
|
|
error_message = response.json().get('message', error_message)
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": False,
|
|
"message": f"API request failed with status {response.status_code}: {error_message}"
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"message": f"Error making API request: {str(e)}"
|
|
}
|
|
|
|
def process_directory(directory):
|
|
"""
|
|
Process all API-ready JSON files in a directory
|
|
|
|
Args:
|
|
directory: Directory containing API-ready JSON files
|
|
|
|
Returns:
|
|
Dictionary with results for each file
|
|
"""
|
|
print(f"Scanning directory for API-ready JSON files: {directory}")
|
|
|
|
json_files = find_api_ready_json_files(directory)
|
|
|
|
if not json_files:
|
|
print("No API-ready JSON files found in the directory")
|
|
return {}
|
|
|
|
print(f"Found {len(json_files)} API-ready JSON files to process")
|
|
|
|
results = {}
|
|
|
|
for json_path in json_files:
|
|
print("\n" + "="*70)
|
|
print(f"Processing: {os.path.basename(json_path)}")
|
|
print("="*70)
|
|
|
|
result = update_text_with_api(json_path)
|
|
results[json_path] = result
|
|
|
|
print("-"*70)
|
|
print(f"Result: {result.get('message')}")
|
|
if 'processed_file' in result:
|
|
print(f"Processed File: {result.get('processed_file')}")
|
|
if 'error' in result:
|
|
print(f"Error: {result.get('error')}")
|
|
print("-"*70)
|
|
|
|
return results
|
|
|
|
def main():
|
|
"""Main function"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Update PSD text layers using Adobe API with correct internal layer IDs",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Process a specific API-ready JSON file:
|
|
python update_text_with_api.py --json-path /path/to/file-api-ready.json
|
|
|
|
# Process all API-ready JSON files in a directory:
|
|
python update_text_with_api.py --directory /path/to/files
|
|
|
|
# Process files in the current directory:
|
|
python update_text_with_api.py
|
|
"""
|
|
)
|
|
|
|
parser.add_argument('--json-path', help='Path to the API-ready JSON file')
|
|
parser.add_argument('--directory', help='Directory containing API-ready JSON files to process')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Check arguments and determine what to process
|
|
if args.json_path:
|
|
# Process a specific JSON file
|
|
if not os.path.exists(args.json_path):
|
|
print(f"Error: JSON file not found: {args.json_path}")
|
|
return
|
|
|
|
print(f"Processing single JSON file: {args.json_path}")
|
|
result = update_text_with_api(args.json_path)
|
|
|
|
print("\nResult:")
|
|
print(f"Success: {result.get('success')}")
|
|
print(f"Message: {result.get('message')}")
|
|
|
|
if 'processed_file' in result:
|
|
print(f"Processed File: {result.get('processed_file')}")
|
|
|
|
if not result.get('success'):
|
|
print(f"Error: {result.get('error', 'Unknown error')}")
|
|
|
|
elif args.directory:
|
|
# Process all JSON files in a directory
|
|
if not os.path.isdir(args.directory):
|
|
print(f"Error: Directory not found: {args.directory}")
|
|
return
|
|
|
|
results = process_directory(args.directory)
|
|
|
|
# Summarize results
|
|
if results:
|
|
success_count = sum(1 for r in results.values() if r.get('success'))
|
|
print(f"\nProcessed {len(results)} files: {success_count} succeeded, {len(results) - success_count} failed")
|
|
|
|
print("\nSuccessful files:")
|
|
for path, result in results.items():
|
|
if result.get('success'):
|
|
print(f" - {os.path.basename(path)}")
|
|
if 'processed_file' in result:
|
|
print(f" => {os.path.basename(result.get('processed_file'))}")
|
|
|
|
if len(results) - success_count > 0:
|
|
print("\nFailed files:")
|
|
for path, result in results.items():
|
|
if not result.get('success'):
|
|
print(f" - {os.path.basename(path)}: {result.get('message')}")
|
|
|
|
else:
|
|
# Process the current directory
|
|
results = process_directory(os.getcwd())
|
|
|
|
# Summarize results
|
|
if results:
|
|
success_count = sum(1 for r in results.values() if r.get('success'))
|
|
print(f"\nProcessed {len(results)} files: {success_count} succeeded, {len(results) - success_count} failed")
|
|
|
|
print("\nSuccessful files:")
|
|
for path, result in results.items():
|
|
if result.get('success'):
|
|
print(f" - {os.path.basename(path)}")
|
|
if 'processed_file' in result:
|
|
print(f" => {os.path.basename(result.get('processed_file'))}")
|
|
|
|
if len(results) - success_count > 0:
|
|
print("\nFailed files:")
|
|
for path, result in results.items():
|
|
if not result.get('success'):
|
|
print(f" - {os.path.basename(path)}: {result.get('message')}")
|
|
else:
|
|
print("\nNo API-ready JSON files found in the current directory.")
|
|
print("First run extract_and_update_json.py to create API-ready JSON files.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |