adobe-ps-scripts-loreal/simplified_payload.py
DJP d7dd117dab Fix Adobe API text updates: correct field names, font handling, and GCS bucket
- Fix API payload: "contents" not "content", "align" not "alignment",
  output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop"
- Remove broken font size /72 conversion (values already in points)
- Add automatic font upload from fonts/ directory to GCS
- Add FuturaPT-Demi.otf extracted from Adobe CoreSync
- Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired)
- Update HOW-IT-WORKS.md with working API docs, font setup guide,
  bug fixes, and verified test results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 17:03:07 -05:00

337 lines
No EOL
13 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Simplified Adobe Photoshop API Text Update
------------------------------------------
A stripped down script that uses the minimal required payload structure
to update text layers via the Adobe Photoshop API.
"""
import os
import sys
import json
import time
import logging
import argparse
import requests
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Import local config and dependencies
import config
from adobe_token import AdobeTokenManager
from gcs_storage import GCSStorage
# Initialize token manager
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
# GCS bucket configuration
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26"
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
# Initialize GCS storage
gcs_storage = None
if os.path.exists(GCS_KEY_PATH):
try:
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
except Exception as e:
logger.error(f"Error initializing GCS storage: {str(e)}")
def update_text_with_simplified_payload(json_path: str, psd_path: str):
"""
Updates text in a PSD file using the minimal required payload structure.
"""
logger.info(f"Updating text in {psd_path} using data from {json_path}")
# Check if files exist
if not os.path.exists(json_path) or not os.path.exists(psd_path):
logger.error(f"File not found: JSON={os.path.exists(json_path)}, PSD={os.path.exists(psd_path)}")
return {"success": False, "message": "File not found"}
# Load JSON data
try:
with open(json_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
except Exception as e:
logger.error(f"Error reading JSON: {e}")
return {"success": False, "message": f"Error reading JSON: {e}"}
# Upload PSD file to GCS
try:
# Generate timestamp-based path
timestamp = int(time.time())
remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}"
output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}"
# Upload file
upload_result = gcs_storage.upload_file(psd_path, remote_path)
if not upload_result.get("success"):
logger.error(f"Upload failed: {upload_result.get('message')}")
return {"success": False, "message": f"Upload failed: {upload_result.get('message')}"}
# Get signed URLs
input_url = upload_result.get("download_url")
output_url = gcs_storage.get_signed_url(
output_path,
action="write",
content_type="image/vnd.adobe.photoshop"
)
logger.info(f"File uploaded to GCS and URLs generated")
except Exception as e:
logger.error(f"Error with GCS: {e}")
return {"success": False, "message": f"Error with GCS: {e}"}
# Upload any custom fonts found in the local fonts/ directory
font_urls = []
fonts_dir = os.path.join(os.path.dirname(__file__), "fonts")
if os.path.isdir(fonts_dir):
for font_file in os.listdir(fonts_dir):
if font_file.lower().endswith(('.ttf', '.otf', '.ttc', '.woff', '.woff2')):
font_path = os.path.join(fonts_dir, font_file)
font_remote = f"adobe_ps/fonts/{timestamp}_{font_file}"
try:
font_upload = gcs_storage.upload_file(font_path, font_remote)
if font_upload.get("success"):
font_url = font_upload.get("download_url")
font_urls.append(font_url)
logger.info(f"Uploaded font '{font_file}' to GCS")
else:
logger.warning(f"Failed to upload font '{font_file}': {font_upload.get('message')}")
except Exception as fe:
logger.warning(f"Error uploading font '{font_file}': {fe}")
# Get authentication token
try:
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
except Exception as e:
logger.error(f"Error getting token: {e}")
return {"success": False, "message": f"Error getting token: {e}"}
# Find text layers that need updating
text_updates = []
for layer in json_data.get('textLayers', []):
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
text_obj = {
"contents": layer.get('updatedText')
}
# Include characterStyles with only fontPostScriptName to preserve the font
style_info = layer.get('styleInfo', {})
font_name = style_info.get('font') if style_info else None
if font_name:
text_obj["characterStyles"] = [{"fontPostScriptName": font_name}]
logger.info(f"Requesting font '{font_name}' for layer '{layer.get('name')}'")
layer_update = {
"name": layer.get('name', ''),
"text": text_obj
}
logger.info(f"Updating layer '{layer.get('name')}': '{layer.get('text')[:40]}...' -> '{layer.get('updatedText')[:40]}...'")
text_updates.append(layer_update)
if not text_updates:
logger.info("No text changes needed")
return {"success": True, "message": "No text changes needed"}
# Create payload per Adobe API documentation
payload = {
"inputs": [
{
"storage": "external",
"href": input_url
}
],
"options": {
"layers": text_updates
},
"outputs": [
{
"storage": "external",
"href": output_url,
"type": "vnd.adobe.photoshop"
}
]
}
# Add custom fonts if any were uploaded
if font_urls:
payload["options"]["fonts"] = [
{"storage": "external", "href": url} for url in font_urls
]
logger.info(f"Added {len(font_urls)} custom font(s) to payload")
# Set up request headers
headers = {
"x-api-key": config.ADOBE_CLIENT_ID,
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Log the payload
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
# Make the API request
try:
endpoint = "https://image.adobe.io/pie/psdService/text"
logger.info(f"Sending request to {endpoint}")
response = requests.post(
endpoint,
headers=headers,
json=payload,
timeout=30
)
# Log response
logger.info(f"Response status: {response.status_code}")
if response.text:
try:
resp_data = response.json()
logger.info(f"Response: {json.dumps(resp_data, indent=2)}")
except:
logger.info(f"Response text: {response.text}")
# Process response
if response.status_code == 200 or response.status_code == 202:
result = response.json()
# For async processing (202), get status URL
if response.status_code == 202 and '_links' in result:
status_url = result.get('_links', {}).get('self', {}).get('href')
logger.info(f"Request accepted. Status URL: {status_url}")
# Poll status URL
if status_url:
max_retries = 12
retry_count = 0
while retry_count < max_retries:
time.sleep(5)
status_response = requests.get(
status_url,
headers={"Authorization": f"Bearer {access_token}",
"x-api-key": config.ADOBE_CLIENT_ID},
timeout=30
)
if status_response.status_code == 200:
status_data = status_response.json()
status = status_data.get('status', '')
logger.info(f"Status: {status}")
if status == 'succeeded':
logger.info("Processing succeeded!")
break
elif status == 'failed':
logger.error(f"Processing failed: {status_data}")
return {"success": False, "message": "API processing failed"}
retry_count += 1
# Try to download output file
try:
logger.info(f"Waiting for output file...")
time.sleep(10) # Wait a bit for file to be written
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
if output_check.get("success"):
# Use a better directory structure - keep the processed file in the same directory
output_dir = os.path.dirname(psd_path)
# Generate a better output filename to avoid nested processed folders
base_name = os.path.basename(psd_path)
output_filename = f"api_updated_{base_name}"
output_file_path = os.path.join(output_dir, output_filename)
logger.info(f"Downloading processed file to: {output_file_path}")
# Add cleanup=True to delete the remote file after download
download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True)
# Also clean up the input file
try:
gcs_storage.cleanup_files(os.path.dirname(remote_path))
except Exception as clean_err:
logger.warning(f"Error cleaning up temporary files: {clean_err}")
if download_result.get("success"):
logger.info(f"Successfully downloaded to: {output_file_path}")
return {
"success": True,
"message": "Text update completed and output file downloaded",
"processed_file": output_file_path
}
else:
logger.warning(f"Output file not found: {output_check.get('message')}")
except Exception as dl_err:
logger.error(f"Error downloading output: {dl_err}")
# Even if download fails, return success for the API call itself
return {
"success": True,
"message": "Text update request processed successfully",
"api_result": result
}
else:
# API call failed
error_message = "Unknown error"
try:
error_data = response.json()
error_message = error_data.get('message', error_data.get('title', response.text))
except:
error_message = response.text if response.text else f"HTTP {response.status_code}"
return {
"success": False,
"message": f"Text update failed: {error_message}",
"status_code": response.status_code
}
except Exception as e:
logger.error(f"Request error: {e}")
return {"success": False, "message": f"Request error: {e}"}
def main():
"""Parse arguments and run the script"""
parser = argparse.ArgumentParser(
description='Update text in a PSD file using simplified Adobe API payload'
)
parser.add_argument('json_file', help='Path to JSON file with text data')
parser.add_argument('psd_file', help='Path to PSD file to update')
parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
args = parser.parse_args()
# Set verbose logging if requested
if args.verbose:
logger.setLevel(logging.DEBUG)
# Run the update
result = update_text_with_simplified_payload(args.json_file, args.psd_file)
# Print results
if result.get("success"):
print(f"\nSuccess: {result.get('message')}")
if "processed_file" in result:
print(f"Processed file: {result.get('processed_file')}")
else:
print(f"\nError: {result.get('message')}")
sys.exit(1)
if __name__ == "__main__":
main()