Local and cloud-based workflows for extracting and updating text layers in PSD files via ExtendScript and Adobe PS API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
351 lines
No EOL
13 KiB
Python
Executable file
351 lines
No EOL
13 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Simplified Adobe Photoshop API Text Update
|
|
------------------------------------------
|
|
|
|
A stripped down script that uses the minimal required payload structure
|
|
to update text layers via the Adobe Photoshop API.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import requests
|
|
from pathlib import Path
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import local config and dependencies
|
|
import config
|
|
from adobe_token import AdobeTokenManager
|
|
from gcs_storage import GCSStorage
|
|
|
|
# Initialize token manager
|
|
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
# GCS bucket configuration
|
|
GCS_BUCKET_NAME = "lor-txt-tmp-bkt"
|
|
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
|
|
|
|
# Initialize GCS storage
|
|
gcs_storage = None
|
|
if os.path.exists(GCS_KEY_PATH):
|
|
try:
|
|
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
|
|
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing GCS storage: {str(e)}")
|
|
|
|
def update_text_with_simplified_payload(json_path: str, psd_path: str):
|
|
"""
|
|
Updates text in a PSD file using the minimal required payload structure.
|
|
"""
|
|
logger.info(f"Updating text in {psd_path} using data from {json_path}")
|
|
|
|
# Check if files exist
|
|
if not os.path.exists(json_path) or not os.path.exists(psd_path):
|
|
logger.error(f"File not found: JSON={os.path.exists(json_path)}, PSD={os.path.exists(psd_path)}")
|
|
return {"success": False, "message": "File not found"}
|
|
|
|
# Load JSON data
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
json_data = json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Error reading JSON: {e}")
|
|
return {"success": False, "message": f"Error reading JSON: {e}"}
|
|
|
|
# Upload PSD file to GCS
|
|
try:
|
|
# Generate timestamp-based path
|
|
timestamp = int(time.time())
|
|
remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}"
|
|
output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}"
|
|
|
|
# Upload file
|
|
upload_result = gcs_storage.upload_file(psd_path, remote_path)
|
|
|
|
if not upload_result.get("success"):
|
|
logger.error(f"Upload failed: {upload_result.get('message')}")
|
|
return {"success": False, "message": f"Upload failed: {upload_result.get('message')}"}
|
|
|
|
# Get signed URLs
|
|
input_url = upload_result.get("download_url")
|
|
output_url = gcs_storage.get_signed_url(
|
|
output_path,
|
|
action="write",
|
|
content_type="image/vnd.adobe.photoshop"
|
|
)
|
|
|
|
logger.info(f"File uploaded to GCS and URLs generated")
|
|
except Exception as e:
|
|
logger.error(f"Error with GCS: {e}")
|
|
return {"success": False, "message": f"Error with GCS: {e}"}
|
|
|
|
# Get authentication token
|
|
try:
|
|
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
except Exception as e:
|
|
logger.error(f"Error getting token: {e}")
|
|
return {"success": False, "message": f"Error getting token: {e}"}
|
|
|
|
# Find text layers that need updating
|
|
text_updates = []
|
|
for layer in json_data.get('textLayers', []):
|
|
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
|
|
# Get font information if available
|
|
style_info = layer.get('styleInfo', {})
|
|
font_name = style_info.get('font') if style_info else None
|
|
font_size = style_info.get('size') if style_info else None
|
|
|
|
# Create base layer update with name
|
|
layer_update = {
|
|
"name": layer.get('name', '')
|
|
}
|
|
|
|
# Create text object with content
|
|
text_obj = {
|
|
"content": layer.get('updatedText')
|
|
}
|
|
|
|
# Add proper characterStyles if available - this is the correct way per API spec
|
|
if font_name and font_size:
|
|
# Convert font size to points - Adobe API expects points (pixels/72)
|
|
font_size_pts = float(font_size) / 72.0
|
|
|
|
# Add characterStyles array with the font info
|
|
text_obj["characterStyles"] = [
|
|
{
|
|
"fontPostScriptName": font_name,
|
|
"size": font_size_pts
|
|
}
|
|
]
|
|
|
|
# Add paragraph style too
|
|
text_obj["paragraphStyles"] = [
|
|
{
|
|
"alignment": style_info.get('alignment', 'left')
|
|
}
|
|
]
|
|
|
|
logger.info(f"Added font '{font_name}' size {font_size_pts}pts (converted from {font_size}px) for layer '{layer.get('name')}'")
|
|
|
|
# Add the text object to the layer update
|
|
layer_update["text"] = text_obj
|
|
|
|
text_updates.append(layer_update)
|
|
|
|
if not text_updates:
|
|
logger.info("No text changes needed")
|
|
return {"success": True, "message": "No text changes needed"}
|
|
|
|
# Get the global font from the first text layer if available
|
|
global_font = None
|
|
if json_data.get('textLayers'):
|
|
for layer in json_data.get('textLayers'):
|
|
style_info = layer.get('styleInfo', {})
|
|
if style_info and style_info.get('font'):
|
|
global_font = style_info.get('font')
|
|
break
|
|
|
|
# Create simplified payload - according to API documentation
|
|
# We'll only use the options that are explicitly allowed in the schema
|
|
payload = {
|
|
"inputs": [
|
|
{
|
|
"storage": "external",
|
|
"href": input_url
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": text_updates
|
|
},
|
|
"outputs": [
|
|
{
|
|
"storage": "external",
|
|
"href": output_url,
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Add font management options
|
|
if global_font:
|
|
# Try adding font options according to the API schema
|
|
payload["options"]["globalFont"] = global_font
|
|
payload["options"]["manageMissingFonts"] = "useDefault"
|
|
logger.info(f"Using global font: {global_font}")
|
|
|
|
# Set up request headers
|
|
headers = {
|
|
"x-api-key": config.ADOBE_CLIENT_ID,
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Log the payload
|
|
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
|
|
|
|
# Make the API request
|
|
try:
|
|
endpoint = "https://image.adobe.io/pie/psdService/text"
|
|
logger.info(f"Sending request to {endpoint}")
|
|
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Log response
|
|
logger.info(f"Response status: {response.status_code}")
|
|
if response.text:
|
|
try:
|
|
resp_data = response.json()
|
|
logger.info(f"Response: {json.dumps(resp_data, indent=2)}")
|
|
except:
|
|
logger.info(f"Response text: {response.text}")
|
|
|
|
# Process response
|
|
if response.status_code == 200 or response.status_code == 202:
|
|
result = response.json()
|
|
|
|
# For async processing (202), get status URL
|
|
if response.status_code == 202 and '_links' in result:
|
|
status_url = result.get('_links', {}).get('self', {}).get('href')
|
|
logger.info(f"Request accepted. Status URL: {status_url}")
|
|
|
|
# Poll status URL
|
|
if status_url:
|
|
max_retries = 12
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
time.sleep(5)
|
|
|
|
status_response = requests.get(
|
|
status_url,
|
|
headers={"Authorization": f"Bearer {access_token}",
|
|
"x-api-key": config.ADOBE_CLIENT_ID},
|
|
timeout=30
|
|
)
|
|
|
|
if status_response.status_code == 200:
|
|
status_data = status_response.json()
|
|
status = status_data.get('status', '')
|
|
|
|
logger.info(f"Status: {status}")
|
|
|
|
if status == 'succeeded':
|
|
logger.info("Processing succeeded!")
|
|
break
|
|
elif status == 'failed':
|
|
logger.error(f"Processing failed: {status_data}")
|
|
return {"success": False, "message": "API processing failed"}
|
|
|
|
retry_count += 1
|
|
|
|
# Try to download output file
|
|
try:
|
|
logger.info(f"Waiting for output file...")
|
|
time.sleep(10) # Wait a bit for file to be written
|
|
|
|
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
|
|
|
|
if output_check.get("success"):
|
|
# Use a better directory structure - keep the processed file in the same directory
|
|
output_dir = os.path.dirname(psd_path)
|
|
|
|
# Generate a better output filename to avoid nested processed folders
|
|
base_name = os.path.basename(psd_path)
|
|
output_filename = f"api_updated_{base_name}"
|
|
output_file_path = os.path.join(output_dir, output_filename)
|
|
|
|
logger.info(f"Downloading processed file to: {output_file_path}")
|
|
# Add cleanup=True to delete the remote file after download
|
|
download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True)
|
|
|
|
# Also clean up the input file
|
|
try:
|
|
gcs_storage.cleanup_files(os.path.dirname(remote_path))
|
|
except Exception as clean_err:
|
|
logger.warning(f"Error cleaning up temporary files: {clean_err}")
|
|
|
|
if download_result.get("success"):
|
|
logger.info(f"Successfully downloaded to: {output_file_path}")
|
|
return {
|
|
"success": True,
|
|
"message": "Text update completed and output file downloaded",
|
|
"processed_file": output_file_path
|
|
}
|
|
else:
|
|
logger.warning(f"Output file not found: {output_check.get('message')}")
|
|
except Exception as dl_err:
|
|
logger.error(f"Error downloading output: {dl_err}")
|
|
|
|
# Even if download fails, return success for the API call itself
|
|
return {
|
|
"success": True,
|
|
"message": "Text update request processed successfully",
|
|
"api_result": result
|
|
}
|
|
else:
|
|
# API call failed
|
|
error_message = "Unknown error"
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('message', error_data.get('title', response.text))
|
|
except:
|
|
error_message = response.text if response.text else f"HTTP {response.status_code}"
|
|
|
|
return {
|
|
"success": False,
|
|
"message": f"Text update failed: {error_message}",
|
|
"status_code": response.status_code
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Request error: {e}")
|
|
return {"success": False, "message": f"Request error: {e}"}
|
|
|
|
def main():
|
|
"""Parse arguments and run the script"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Update text in a PSD file using simplified Adobe API payload'
|
|
)
|
|
|
|
parser.add_argument('json_file', help='Path to JSON file with text data')
|
|
parser.add_argument('psd_file', help='Path to PSD file to update')
|
|
parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Set verbose logging if requested
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
|
|
# Run the update
|
|
result = update_text_with_simplified_payload(args.json_file, args.psd_file)
|
|
|
|
# Print results
|
|
if result.get("success"):
|
|
print(f"\nSuccess: {result.get('message')}")
|
|
if "processed_file" in result:
|
|
print(f"Processed file: {result.get('processed_file')}")
|
|
else:
|
|
print(f"\nError: {result.get('message')}")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |