adobe-ps-scripts-loreal/updated_text_payload.py
DJP d7dd117dab Fix Adobe API text updates: correct field names, font handling, and GCS bucket
- Fix API payload: "contents" not "content", "align" not "alignment",
  output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop"
- Remove broken font size /72 conversion (values already in points)
- Add automatic font upload from fonts/ directory to GCS
- Add FuturaPT-Demi.otf extracted from Adobe CoreSync
- Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired)
- Update HOW-IT-WORKS.md with working API docs, font setup guide,
  bug fixes, and verified test results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 17:03:07 -05:00

323 lines
No EOL
12 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Adobe Photoshop API Text Update - Complete Payload Example
---------------------------------------------------------
This script demonstrates how to structure a complete text update payload
for the Adobe Photoshop API (https://image.adobe.io/pie/psdService/text)
based on the official documentation. Use this to test updates on a single file.
"""
import os
import sys
import json
import time
import logging
import argparse
import requests
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Import local config, token manager, and GCS storage
import config
from adobe_token import AdobeTokenManager
from gcs_storage import GCSStorage
# Initialize token manager
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
# GCS bucket configuration
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26"
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
# Initialize GCS storage
gcs_storage = None
if os.path.exists(GCS_KEY_PATH):
try:
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
except Exception as e:
logger.error(f"Error initializing GCS storage: {str(e)}")
def update_text_with_complete_payload(json_path: str, psd_path: str) -> Dict[str, Any]:
"""
Update text in a PSD file using the complete payload structure
from Adobe documentation.
Args:
json_path: Path to the JSON file containing text data
psd_path: Path to the PSD file to update
Returns:
Dictionary with result info
"""
logger.info(f"Updating text in {psd_path} using data from {json_path}")
# First check if files exist
if not os.path.exists(json_path):
return {"success": False, "message": f"JSON file not found: {json_path}"}
if not os.path.exists(psd_path):
return {"success": False, "message": f"PSD file not found: {psd_path}"}
# Load JSON data
try:
with open(json_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
except Exception as e:
return {"success": False, "message": f"Error reading JSON file: {str(e)}"}
# Get text layers from JSON
text_layers = json_data.get('textLayers', [])
if not text_layers:
return {"success": False, "message": "No text layers found in JSON data"}
# Upload PSD file to GCS
if not gcs_storage:
return {"success": False, "message": "GCS storage not initialized"}
try:
# Generate timestamp-based paths
timestamp = int(time.time())
remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}"
output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}"
# Upload file
upload_result = gcs_storage.upload_file(psd_path, remote_path)
if not upload_result.get("success"):
return {"success": False, "message": f"Failed to upload PSD: {upload_result.get('message')}"}
# Get signed URLs
input_url = upload_result.get("download_url")
output_url = gcs_storage.get_signed_url(
output_path,
action="write",
content_type="image/vnd.adobe.photoshop"
)
except Exception as e:
return {"success": False, "message": f"Error with GCS: {str(e)}"}
# Get authentication token
try:
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
except Exception as e:
return {"success": False, "message": f"Failed to get access token: {str(e)}"}
# Prepare text layer updates using complete payload structure
layer_updates = []
for layer in text_layers:
# Only process layers with changed text
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
# Create a layer update using name as identifier
layer_update = {
"name": layer.get('name', '')
}
# Add the text content but keep original formatting
text_obj = {}
# Just update the content without specifying styles at all
# This allows Adobe API to maintain original formatting
text_obj["content"] = layer.get('updatedText')
# Add the text object to layer update
layer_update["text"] = text_obj
layer_updates.append(layer_update)
if not layer_updates:
return {"success": False, "message": "No text changes needed"}
# Build the complete payload according to Adobe documentation
full_payload = {
"inputs": [
{
"storage": "external",
"href": input_url
}
],
"options": {
"manageMissingFonts": "useDefault",
"layers": layer_updates
},
"outputs": [
{
"storage": "external",
"href": output_url,
"type": "image/vnd.adobe.photoshop",
"overwrite": True
}
]
}
# Setup request headers
headers = {
"x-api-key": config.ADOBE_CLIENT_ID,
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Log the payload for debugging
logger.info(f"Request payload: {json.dumps(full_payload, indent=2)}")
# Make the API request to Adobe
try:
endpoint = "https://image.adobe.io/pie/psdService/text"
logger.info(f"Sending request to {endpoint}")
response = requests.post(
endpoint,
headers=headers,
json=full_payload,
timeout=30
)
# Log response
logger.info(f"Response status: {response.status_code}")
if response.text:
try:
resp_data = response.json()
logger.info(f"Response: {json.dumps(resp_data, indent=2)}")
except:
logger.info(f"Response text: {response.text}")
# Process response
if response.status_code == 200 or response.status_code == 202:
result = response.json()
# For async processing, monitor status
if response.status_code == 202 and '_links' in result:
status_url = result.get('_links', {}).get('self', {}).get('href')
logger.info(f"Request accepted. Status URL: {status_url}")
# Poll status URL
if status_url:
max_retries = 12 # 60 seconds (5 sec intervals)
retry_count = 0
while retry_count < max_retries:
time.sleep(5) # Wait 5 seconds between checks
status_response = requests.get(
status_url,
headers={"Authorization": f"Bearer {access_token}",
"x-api-key": config.ADOBE_CLIENT_ID},
timeout=30
)
if status_response.status_code == 200:
status_data = status_response.json()
status = status_data.get('status', '')
logger.info(f"Status: {status}")
if status == 'succeeded':
logger.info("Processing completed successfully!")
break
elif status == 'failed':
error_msg = status_data.get('error', {}).get('message', 'Unknown error')
logger.error(f"Processing failed: {error_msg}")
return {"success": False, "message": f"API processing failed: {error_msg}"}
retry_count += 1
# Try to download the processed file
try:
logger.info(f"Waiting for output file: {output_path}")
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
if output_check.get("success"):
output_dir = os.path.dirname(psd_path)
processed_dir = os.path.join(output_dir, "processed")
# Create processed directory if needed
if not os.path.exists(processed_dir):
os.makedirs(processed_dir, exist_ok=True)
output_filename = f"processed_{os.path.basename(psd_path)}"
output_file_path = os.path.join(processed_dir, output_filename)
logger.info(f"Downloading processed file to: {output_file_path}")
download_result = gcs_storage.download_file(output_path, output_file_path)
if download_result.get("success"):
logger.info(f"Successfully downloaded to: {output_file_path}")
return {
"success": True,
"message": "Text update completed and output file downloaded",
"processed_file": output_file_path
}
else:
logger.warning(f"Download failed: {download_result.get('message')}")
else:
logger.warning(f"Output file not found: {output_check.get('message')}")
except Exception as dl_err:
logger.error(f"Error downloading output: {str(dl_err)}")
# Even if download fails, return success for the API call itself
return {
"success": True,
"message": "Text update request processed successfully",
"api_result": result
}
else:
# API call failed
error_message = "Unknown error"
try:
error_data = response.json()
error_message = error_data.get('message', error_data.get('title', response.text))
except:
error_message = response.text if response.text else f"HTTP {response.status_code}"
return {
"success": False,
"message": f"Text update failed: {error_message}",
"status_code": response.status_code
}
except Exception as e:
return {"success": False, "message": f"Request error: {str(e)}"}
def main():
"""Parse arguments and run the script"""
parser = argparse.ArgumentParser(
description='Update text in a PSD file using a complete payload based on the Adobe API documentation'
)
parser.add_argument('json_file', help='Path to JSON file with text data')
parser.add_argument('psd_file', help='Path to PSD file to update')
parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
args = parser.parse_args()
# Set verbose logging if requested
if args.verbose:
logger.setLevel(logging.DEBUG)
# Run the update
result = update_text_with_complete_payload(args.json_file, args.psd_file)
# Print results
if result["success"]:
print(f"\nSuccess: {result['message']}")
if "processed_file" in result:
print(f"Processed file: {result['processed_file']}")
else:
print(f"\nError: {result['message']}")
sys.exit(1)
if __name__ == "__main__":
main()