- Fix API payload: "contents" not "content", "align" not "alignment", output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop" - Remove broken font size /72 conversion (values already in points) - Add automatic font upload from fonts/ directory to GCS - Add FuturaPT-Demi.otf extracted from Adobe CoreSync - Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired) - Update HOW-IT-WORKS.md with working API docs, font setup guide, bug fixes, and verified test results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
323 lines
No EOL
12 KiB
Python
Executable file
323 lines
No EOL
12 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Adobe Photoshop API Text Update - Complete Payload Example
|
|
---------------------------------------------------------
|
|
|
|
This script demonstrates how to structure a complete text update payload
|
|
for the Adobe Photoshop API (https://image.adobe.io/pie/psdService/text)
|
|
based on the official documentation. Use this to test updates on a single file.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import requests
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import local config, token manager, and GCS storage
|
|
import config
|
|
from adobe_token import AdobeTokenManager
|
|
from gcs_storage import GCSStorage
|
|
|
|
# Initialize token manager
|
|
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
# GCS bucket configuration
|
|
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26"
|
|
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
|
|
|
|
# Initialize GCS storage
|
|
gcs_storage = None
|
|
if os.path.exists(GCS_KEY_PATH):
|
|
try:
|
|
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
|
|
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing GCS storage: {str(e)}")
|
|
|
|
def update_text_with_complete_payload(json_path: str, psd_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Update text in a PSD file using the complete payload structure
|
|
from Adobe documentation.
|
|
|
|
Args:
|
|
json_path: Path to the JSON file containing text data
|
|
psd_path: Path to the PSD file to update
|
|
|
|
Returns:
|
|
Dictionary with result info
|
|
"""
|
|
logger.info(f"Updating text in {psd_path} using data from {json_path}")
|
|
|
|
# First check if files exist
|
|
if not os.path.exists(json_path):
|
|
return {"success": False, "message": f"JSON file not found: {json_path}"}
|
|
|
|
if not os.path.exists(psd_path):
|
|
return {"success": False, "message": f"PSD file not found: {psd_path}"}
|
|
|
|
# Load JSON data
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
json_data = json.load(f)
|
|
except Exception as e:
|
|
return {"success": False, "message": f"Error reading JSON file: {str(e)}"}
|
|
|
|
# Get text layers from JSON
|
|
text_layers = json_data.get('textLayers', [])
|
|
|
|
if not text_layers:
|
|
return {"success": False, "message": "No text layers found in JSON data"}
|
|
|
|
# Upload PSD file to GCS
|
|
if not gcs_storage:
|
|
return {"success": False, "message": "GCS storage not initialized"}
|
|
|
|
try:
|
|
# Generate timestamp-based paths
|
|
timestamp = int(time.time())
|
|
remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}"
|
|
output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}"
|
|
|
|
# Upload file
|
|
upload_result = gcs_storage.upload_file(psd_path, remote_path)
|
|
|
|
if not upload_result.get("success"):
|
|
return {"success": False, "message": f"Failed to upload PSD: {upload_result.get('message')}"}
|
|
|
|
# Get signed URLs
|
|
input_url = upload_result.get("download_url")
|
|
output_url = gcs_storage.get_signed_url(
|
|
output_path,
|
|
action="write",
|
|
content_type="image/vnd.adobe.photoshop"
|
|
)
|
|
except Exception as e:
|
|
return {"success": False, "message": f"Error with GCS: {str(e)}"}
|
|
|
|
# Get authentication token
|
|
try:
|
|
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
except Exception as e:
|
|
return {"success": False, "message": f"Failed to get access token: {str(e)}"}
|
|
|
|
# Prepare text layer updates using complete payload structure
|
|
layer_updates = []
|
|
for layer in text_layers:
|
|
# Only process layers with changed text
|
|
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
|
|
# Create a layer update using name as identifier
|
|
layer_update = {
|
|
"name": layer.get('name', '')
|
|
}
|
|
|
|
# Add the text content but keep original formatting
|
|
text_obj = {}
|
|
|
|
# Just update the content without specifying styles at all
|
|
# This allows Adobe API to maintain original formatting
|
|
text_obj["content"] = layer.get('updatedText')
|
|
|
|
# Add the text object to layer update
|
|
layer_update["text"] = text_obj
|
|
|
|
layer_updates.append(layer_update)
|
|
|
|
if not layer_updates:
|
|
return {"success": False, "message": "No text changes needed"}
|
|
|
|
# Build the complete payload according to Adobe documentation
|
|
full_payload = {
|
|
"inputs": [
|
|
{
|
|
"storage": "external",
|
|
"href": input_url
|
|
}
|
|
],
|
|
"options": {
|
|
"manageMissingFonts": "useDefault",
|
|
"layers": layer_updates
|
|
},
|
|
"outputs": [
|
|
{
|
|
"storage": "external",
|
|
"href": output_url,
|
|
"type": "image/vnd.adobe.photoshop",
|
|
"overwrite": True
|
|
}
|
|
]
|
|
}
|
|
|
|
# Setup request headers
|
|
headers = {
|
|
"x-api-key": config.ADOBE_CLIENT_ID,
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Log the payload for debugging
|
|
logger.info(f"Request payload: {json.dumps(full_payload, indent=2)}")
|
|
|
|
# Make the API request to Adobe
|
|
try:
|
|
endpoint = "https://image.adobe.io/pie/psdService/text"
|
|
|
|
logger.info(f"Sending request to {endpoint}")
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
json=full_payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Log response
|
|
logger.info(f"Response status: {response.status_code}")
|
|
if response.text:
|
|
try:
|
|
resp_data = response.json()
|
|
logger.info(f"Response: {json.dumps(resp_data, indent=2)}")
|
|
except:
|
|
logger.info(f"Response text: {response.text}")
|
|
|
|
# Process response
|
|
if response.status_code == 200 or response.status_code == 202:
|
|
result = response.json()
|
|
|
|
# For async processing, monitor status
|
|
if response.status_code == 202 and '_links' in result:
|
|
status_url = result.get('_links', {}).get('self', {}).get('href')
|
|
logger.info(f"Request accepted. Status URL: {status_url}")
|
|
|
|
# Poll status URL
|
|
if status_url:
|
|
max_retries = 12 # 60 seconds (5 sec intervals)
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
time.sleep(5) # Wait 5 seconds between checks
|
|
|
|
status_response = requests.get(
|
|
status_url,
|
|
headers={"Authorization": f"Bearer {access_token}",
|
|
"x-api-key": config.ADOBE_CLIENT_ID},
|
|
timeout=30
|
|
)
|
|
|
|
if status_response.status_code == 200:
|
|
status_data = status_response.json()
|
|
status = status_data.get('status', '')
|
|
|
|
logger.info(f"Status: {status}")
|
|
|
|
if status == 'succeeded':
|
|
logger.info("Processing completed successfully!")
|
|
break
|
|
elif status == 'failed':
|
|
error_msg = status_data.get('error', {}).get('message', 'Unknown error')
|
|
logger.error(f"Processing failed: {error_msg}")
|
|
return {"success": False, "message": f"API processing failed: {error_msg}"}
|
|
|
|
retry_count += 1
|
|
|
|
# Try to download the processed file
|
|
try:
|
|
logger.info(f"Waiting for output file: {output_path}")
|
|
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
|
|
|
|
if output_check.get("success"):
|
|
output_dir = os.path.dirname(psd_path)
|
|
processed_dir = os.path.join(output_dir, "processed")
|
|
|
|
# Create processed directory if needed
|
|
if not os.path.exists(processed_dir):
|
|
os.makedirs(processed_dir, exist_ok=True)
|
|
|
|
output_filename = f"processed_{os.path.basename(psd_path)}"
|
|
output_file_path = os.path.join(processed_dir, output_filename)
|
|
|
|
logger.info(f"Downloading processed file to: {output_file_path}")
|
|
download_result = gcs_storage.download_file(output_path, output_file_path)
|
|
|
|
if download_result.get("success"):
|
|
logger.info(f"Successfully downloaded to: {output_file_path}")
|
|
return {
|
|
"success": True,
|
|
"message": "Text update completed and output file downloaded",
|
|
"processed_file": output_file_path
|
|
}
|
|
else:
|
|
logger.warning(f"Download failed: {download_result.get('message')}")
|
|
|
|
else:
|
|
logger.warning(f"Output file not found: {output_check.get('message')}")
|
|
|
|
except Exception as dl_err:
|
|
logger.error(f"Error downloading output: {str(dl_err)}")
|
|
|
|
# Even if download fails, return success for the API call itself
|
|
return {
|
|
"success": True,
|
|
"message": "Text update request processed successfully",
|
|
"api_result": result
|
|
}
|
|
else:
|
|
# API call failed
|
|
error_message = "Unknown error"
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('message', error_data.get('title', response.text))
|
|
except:
|
|
error_message = response.text if response.text else f"HTTP {response.status_code}"
|
|
|
|
return {
|
|
"success": False,
|
|
"message": f"Text update failed: {error_message}",
|
|
"status_code": response.status_code
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"success": False, "message": f"Request error: {str(e)}"}
|
|
|
|
def main():
|
|
"""Parse arguments and run the script"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Update text in a PSD file using a complete payload based on the Adobe API documentation'
|
|
)
|
|
|
|
parser.add_argument('json_file', help='Path to JSON file with text data')
|
|
parser.add_argument('psd_file', help='Path to PSD file to update')
|
|
parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set verbose logging if requested
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Run the update
|
|
result = update_text_with_complete_payload(args.json_file, args.psd_file)
|
|
|
|
# Print results
|
|
if result["success"]:
|
|
print(f"\nSuccess: {result['message']}")
|
|
if "processed_file" in result:
|
|
print(f"Processed file: {result['processed_file']}")
|
|
else:
|
|
print(f"\nError: {result['message']}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |