- Fix API payload: "contents" not "content", "align" not "alignment", output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop" - Remove broken font size /72 conversion (values already in points) - Add automatic font upload from fonts/ directory to GCS - Add FuturaPT-Demi.otf extracted from Adobe CoreSync - Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired) - Update HOW-IT-WORKS.md with working API docs, font setup guide, bug fixes, and verified test results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
337 lines
No EOL
13 KiB
Python
Executable file
337 lines
No EOL
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Simplified Adobe Photoshop API Text Update
|
|
------------------------------------------
|
|
|
|
A stripped down script that uses the minimal required payload structure
|
|
to update text layers via the Adobe Photoshop API.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import requests
|
|
from pathlib import Path
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import local config and dependencies
|
|
import config
|
|
from adobe_token import AdobeTokenManager
|
|
from gcs_storage import GCSStorage
|
|
|
|
# Initialize token manager
|
|
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
# GCS bucket configuration
|
|
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26"
|
|
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
|
|
|
|
# Initialize GCS storage
|
|
gcs_storage = None
|
|
if os.path.exists(GCS_KEY_PATH):
|
|
try:
|
|
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
|
|
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing GCS storage: {str(e)}")
|
|
|
|
def update_text_with_simplified_payload(json_path: str, psd_path: str):
|
|
"""
|
|
Updates text in a PSD file using the minimal required payload structure.
|
|
"""
|
|
logger.info(f"Updating text in {psd_path} using data from {json_path}")
|
|
|
|
# Check if files exist
|
|
if not os.path.exists(json_path) or not os.path.exists(psd_path):
|
|
logger.error(f"File not found: JSON={os.path.exists(json_path)}, PSD={os.path.exists(psd_path)}")
|
|
return {"success": False, "message": "File not found"}
|
|
|
|
# Load JSON data
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
json_data = json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Error reading JSON: {e}")
|
|
return {"success": False, "message": f"Error reading JSON: {e}"}
|
|
|
|
# Upload PSD file to GCS
|
|
try:
|
|
# Generate timestamp-based path
|
|
timestamp = int(time.time())
|
|
remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}"
|
|
output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}"
|
|
|
|
# Upload file
|
|
upload_result = gcs_storage.upload_file(psd_path, remote_path)
|
|
|
|
if not upload_result.get("success"):
|
|
logger.error(f"Upload failed: {upload_result.get('message')}")
|
|
return {"success": False, "message": f"Upload failed: {upload_result.get('message')}"}
|
|
|
|
# Get signed URLs
|
|
input_url = upload_result.get("download_url")
|
|
output_url = gcs_storage.get_signed_url(
|
|
output_path,
|
|
action="write",
|
|
content_type="image/vnd.adobe.photoshop"
|
|
)
|
|
|
|
logger.info(f"File uploaded to GCS and URLs generated")
|
|
except Exception as e:
|
|
logger.error(f"Error with GCS: {e}")
|
|
return {"success": False, "message": f"Error with GCS: {e}"}
|
|
|
|
# Upload any custom fonts found in the local fonts/ directory
|
|
font_urls = []
|
|
fonts_dir = os.path.join(os.path.dirname(__file__), "fonts")
|
|
if os.path.isdir(fonts_dir):
|
|
for font_file in os.listdir(fonts_dir):
|
|
if font_file.lower().endswith(('.ttf', '.otf', '.ttc', '.woff', '.woff2')):
|
|
font_path = os.path.join(fonts_dir, font_file)
|
|
font_remote = f"adobe_ps/fonts/{timestamp}_{font_file}"
|
|
try:
|
|
font_upload = gcs_storage.upload_file(font_path, font_remote)
|
|
if font_upload.get("success"):
|
|
font_url = font_upload.get("download_url")
|
|
font_urls.append(font_url)
|
|
logger.info(f"Uploaded font '{font_file}' to GCS")
|
|
else:
|
|
logger.warning(f"Failed to upload font '{font_file}': {font_upload.get('message')}")
|
|
except Exception as fe:
|
|
logger.warning(f"Error uploading font '{font_file}': {fe}")
|
|
|
|
# Get authentication token
|
|
try:
|
|
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
except Exception as e:
|
|
logger.error(f"Error getting token: {e}")
|
|
return {"success": False, "message": f"Error getting token: {e}"}
|
|
|
|
# Find text layers that need updating
|
|
text_updates = []
|
|
for layer in json_data.get('textLayers', []):
|
|
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
|
|
text_obj = {
|
|
"contents": layer.get('updatedText')
|
|
}
|
|
|
|
# Include characterStyles with only fontPostScriptName to preserve the font
|
|
style_info = layer.get('styleInfo', {})
|
|
font_name = style_info.get('font') if style_info else None
|
|
if font_name:
|
|
text_obj["characterStyles"] = [{"fontPostScriptName": font_name}]
|
|
logger.info(f"Requesting font '{font_name}' for layer '{layer.get('name')}'")
|
|
|
|
layer_update = {
|
|
"name": layer.get('name', ''),
|
|
"text": text_obj
|
|
}
|
|
|
|
logger.info(f"Updating layer '{layer.get('name')}': '{layer.get('text')[:40]}...' -> '{layer.get('updatedText')[:40]}...'")
|
|
text_updates.append(layer_update)
|
|
|
|
if not text_updates:
|
|
logger.info("No text changes needed")
|
|
return {"success": True, "message": "No text changes needed"}
|
|
|
|
# Create payload per Adobe API documentation
|
|
payload = {
|
|
"inputs": [
|
|
{
|
|
"storage": "external",
|
|
"href": input_url
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": text_updates
|
|
},
|
|
"outputs": [
|
|
{
|
|
"storage": "external",
|
|
"href": output_url,
|
|
"type": "vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Add custom fonts if any were uploaded
|
|
if font_urls:
|
|
payload["options"]["fonts"] = [
|
|
{"storage": "external", "href": url} for url in font_urls
|
|
]
|
|
logger.info(f"Added {len(font_urls)} custom font(s) to payload")
|
|
|
|
# Set up request headers
|
|
headers = {
|
|
"x-api-key": config.ADOBE_CLIENT_ID,
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Log the payload
|
|
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
|
|
|
|
# Make the API request
|
|
try:
|
|
endpoint = "https://image.adobe.io/pie/psdService/text"
|
|
logger.info(f"Sending request to {endpoint}")
|
|
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Log response
|
|
logger.info(f"Response status: {response.status_code}")
|
|
if response.text:
|
|
try:
|
|
resp_data = response.json()
|
|
logger.info(f"Response: {json.dumps(resp_data, indent=2)}")
|
|
except:
|
|
logger.info(f"Response text: {response.text}")
|
|
|
|
# Process response
|
|
if response.status_code == 200 or response.status_code == 202:
|
|
result = response.json()
|
|
|
|
# For async processing (202), get status URL
|
|
if response.status_code == 202 and '_links' in result:
|
|
status_url = result.get('_links', {}).get('self', {}).get('href')
|
|
logger.info(f"Request accepted. Status URL: {status_url}")
|
|
|
|
# Poll status URL
|
|
if status_url:
|
|
max_retries = 12
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
time.sleep(5)
|
|
|
|
status_response = requests.get(
|
|
status_url,
|
|
headers={"Authorization": f"Bearer {access_token}",
|
|
"x-api-key": config.ADOBE_CLIENT_ID},
|
|
timeout=30
|
|
)
|
|
|
|
if status_response.status_code == 200:
|
|
status_data = status_response.json()
|
|
status = status_data.get('status', '')
|
|
|
|
logger.info(f"Status: {status}")
|
|
|
|
if status == 'succeeded':
|
|
logger.info("Processing succeeded!")
|
|
break
|
|
elif status == 'failed':
|
|
logger.error(f"Processing failed: {status_data}")
|
|
return {"success": False, "message": "API processing failed"}
|
|
|
|
retry_count += 1
|
|
|
|
# Try to download output file
|
|
try:
|
|
logger.info(f"Waiting for output file...")
|
|
time.sleep(10) # Wait a bit for file to be written
|
|
|
|
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
|
|
|
|
if output_check.get("success"):
|
|
# Use a better directory structure - keep the processed file in the same directory
|
|
output_dir = os.path.dirname(psd_path)
|
|
|
|
# Generate a better output filename to avoid nested processed folders
|
|
base_name = os.path.basename(psd_path)
|
|
output_filename = f"api_updated_{base_name}"
|
|
output_file_path = os.path.join(output_dir, output_filename)
|
|
|
|
logger.info(f"Downloading processed file to: {output_file_path}")
|
|
# Add cleanup=True to delete the remote file after download
|
|
download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True)
|
|
|
|
# Also clean up the input file
|
|
try:
|
|
gcs_storage.cleanup_files(os.path.dirname(remote_path))
|
|
except Exception as clean_err:
|
|
logger.warning(f"Error cleaning up temporary files: {clean_err}")
|
|
|
|
if download_result.get("success"):
|
|
logger.info(f"Successfully downloaded to: {output_file_path}")
|
|
return {
|
|
"success": True,
|
|
"message": "Text update completed and output file downloaded",
|
|
"processed_file": output_file_path
|
|
}
|
|
else:
|
|
logger.warning(f"Output file not found: {output_check.get('message')}")
|
|
except Exception as dl_err:
|
|
logger.error(f"Error downloading output: {dl_err}")
|
|
|
|
# Even if download fails, return success for the API call itself
|
|
return {
|
|
"success": True,
|
|
"message": "Text update request processed successfully",
|
|
"api_result": result
|
|
}
|
|
else:
|
|
# API call failed
|
|
error_message = "Unknown error"
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('message', error_data.get('title', response.text))
|
|
except:
|
|
error_message = response.text if response.text else f"HTTP {response.status_code}"
|
|
|
|
return {
|
|
"success": False,
|
|
"message": f"Text update failed: {error_message}",
|
|
"status_code": response.status_code
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Request error: {e}")
|
|
return {"success": False, "message": f"Request error: {e}"}
|
|
|
|
def main():
|
|
"""Parse arguments and run the script"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Update text in a PSD file using simplified Adobe API payload'
|
|
)
|
|
|
|
parser.add_argument('json_file', help='Path to JSON file with text data')
|
|
parser.add_argument('psd_file', help='Path to PSD file to update')
|
|
parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set verbose logging if requested
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Run the update
|
|
result = update_text_with_simplified_payload(args.json_file, args.psd_file)
|
|
|
|
# Print results
|
|
if result.get("success"):
|
|
print(f"\nSuccess: {result.get('message')}")
|
|
if "processed_file" in result:
|
|
print(f"Processed file: {result.get('processed_file')}")
|
|
else:
|
|
print(f"\nError: {result.get('message')}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |