- Fix API payload: "contents" not "content", "align" not "alignment", output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop" - Remove broken font size /72 conversion (values already in points) - Add automatic font upload from fonts/ directory to GCS - Add FuturaPT-Demi.otf extracted from Adobe CoreSync - Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired) - Update HOW-IT-WORKS.md with working API docs, font setup guide, bug fixes, and verified test results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
504 lines
No EOL
18 KiB
Python
504 lines
No EOL
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Debug Text Layer Update with Adobe Photoshop API
|
|
|
|
This script attempts to update a specific text layer in a PSD file
|
|
using different layer identification techniques to determine what
|
|
the Adobe Photoshop API requires for successful text updates.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import requests
|
|
import logging
|
|
from pathlib import Path
|
|
|
|
# Import local modules
|
|
import config
|
|
from adobe_token import AdobeTokenManager
|
|
from gcs_storage import GCSStorage
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize token manager
|
|
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
# GCS bucket configuration
|
|
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" # The bucket to use for temporary storage
|
|
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
|
|
|
|
# Initialize GCS storage if the key exists
|
|
gcs_storage = None
|
|
if os.path.exists(GCS_KEY_PATH):
|
|
try:
|
|
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
|
|
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing GCS storage: {str(e)}")
|
|
logger.warning("Continuing without GCS storage - some features may not work")
|
|
else:
|
|
logger.warning(f"GCS key file not found at {GCS_KEY_PATH} - some features may not work")
|
|
|
|
def load_json_file(json_path: str) -> dict:
|
|
"""Load and parse a JSON file"""
|
|
try:
|
|
with open(json_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.error(f"Error loading JSON file {json_path}: {str(e)}")
|
|
return {}
|
|
|
|
def upload_psd_to_gcs(psd_path: str) -> dict:
|
|
"""Upload a PSD file to GCS and return signed URLs"""
|
|
logger.info(f"Uploading PSD file to GCS: {psd_path}")
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(psd_path):
|
|
logger.error(f"PSD file not found: {psd_path}")
|
|
return {"success": False, "message": f"PSD file not found: {psd_path}"}
|
|
|
|
# Check if GCS storage is initialized
|
|
if not gcs_storage:
|
|
logger.error("GCS storage not initialized - cannot upload file")
|
|
return {
|
|
"success": False,
|
|
"message": "GCS storage not initialized - check GCS key file and bucket configuration"
|
|
}
|
|
|
|
try:
|
|
# Generate a timestamp-based unique path to avoid collisions
|
|
file_name = os.path.basename(psd_path)
|
|
timestamp = int(time.time())
|
|
remote_path = f"adobe_ps/{timestamp}_{file_name}"
|
|
output_path = f"adobe_ps/output_{timestamp}_{file_name}"
|
|
|
|
# Upload the file to GCS
|
|
logger.info(f"Uploading file to GCS: {remote_path}")
|
|
upload_result = gcs_storage.upload_file(psd_path, remote_path)
|
|
|
|
if not upload_result.get("success"):
|
|
logger.error(f"Failed to upload file to GCS: {upload_result.get('message')}")
|
|
return upload_result
|
|
|
|
# Generate signed URLs for input and output
|
|
input_url = upload_result.get("download_url")
|
|
|
|
# Generate a specific output URL
|
|
try:
|
|
output_url = gcs_storage.get_signed_url(
|
|
output_path,
|
|
action="write",
|
|
content_type="image/vnd.adobe.photoshop"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error generating output signed URL: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error generating output signed URL: {str(e)}"
|
|
}
|
|
|
|
logger.info(f"File uploaded successfully: {file_name}")
|
|
logger.info(f"Input URL (read): {input_url[:60]}...{input_url[-20:]}")
|
|
logger.info(f"Output URL (write): {output_url[:60]}...{output_url[-20:]}")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"PSD file uploaded successfully: {file_name}",
|
|
"file_name": file_name,
|
|
"bucket": GCS_BUCKET_NAME,
|
|
"remote_path": remote_path,
|
|
"output_path": output_path,
|
|
"input_url": input_url,
|
|
"output_url": output_url
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading PSD file: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error uploading PSD file: {str(e)}"
|
|
}
|
|
|
|
def update_text_with_api(input_url: str, output_url: str, layer_updates: list, api_key: str, access_token: str) -> dict:
|
|
"""
|
|
Send a text update request to the Adobe Photoshop API
|
|
|
|
Args:
|
|
input_url: Signed URL for the input PSD file
|
|
output_url: Signed URL for the output PSD file
|
|
layer_updates: List of layer updates to apply
|
|
api_key: Adobe API key
|
|
access_token: Adobe access token
|
|
|
|
Returns:
|
|
Dictionary with the API response details
|
|
"""
|
|
# Endpoint for text editing
|
|
endpoint = "https://image.adobe.io/pie/psdService/text"
|
|
|
|
# Set headers
|
|
headers = {
|
|
"x-api-key": api_key,
|
|
"Authorization": f"Bearer {access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Create the payload
|
|
payload = {
|
|
"inputs": [
|
|
{
|
|
"href": input_url,
|
|
"storage": "external"
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": layer_updates
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": output_url,
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Log the request details
|
|
logger.info(f"Sending text update request to: {endpoint}")
|
|
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
|
|
|
|
try:
|
|
# Make the API request
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Log response
|
|
logger.info(f"Response status: {response.status_code}")
|
|
|
|
if response.text:
|
|
try:
|
|
resp_json = response.json()
|
|
logger.info(f"Response content: {json.dumps(resp_json, indent=2)}")
|
|
|
|
# Check for status URL for async operations
|
|
if response.status_code == 202 and '_links' in resp_json and 'self' in resp_json.get('_links', {}):
|
|
status_url = resp_json.get('_links', {}).get('self', {}).get('href')
|
|
logger.info(f"Status URL: {status_url}")
|
|
|
|
# Poll the status URL
|
|
status_result = poll_status_url(status_url, access_token, api_key)
|
|
|
|
# Return combined result
|
|
return {
|
|
"success": response.status_code == 202 or response.status_code == 200,
|
|
"status_code": response.status_code,
|
|
"message": f"API request {'accepted' if response.status_code == 202 else 'successful'}",
|
|
"response": resp_json,
|
|
"status_polling": status_result
|
|
}
|
|
except:
|
|
logger.info(f"Response content: {response.text}")
|
|
|
|
return {
|
|
"success": response.status_code == 202 or response.status_code == 200,
|
|
"status_code": response.status_code,
|
|
"message": f"API request {'accepted' if response.status_code == 202 else 'successful'}",
|
|
"response": response.text
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error making API request: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"status_code": 0,
|
|
"message": f"Error making API request: {str(e)}"
|
|
}
|
|
|
|
def poll_status_url(status_url: str, access_token: str, api_key: str, max_retries: int = 10, retry_interval: int = 5) -> dict:
|
|
"""
|
|
Poll the status URL to check processing status
|
|
|
|
Args:
|
|
status_url: URL to check status
|
|
access_token: Adobe access token
|
|
api_key: Adobe API key
|
|
max_retries: Maximum number of retries
|
|
retry_interval: Seconds between retries
|
|
|
|
Returns:
|
|
Dictionary with status details
|
|
"""
|
|
headers = {
|
|
"x-api-key": api_key,
|
|
"Authorization": f"Bearer {access_token}"
|
|
}
|
|
|
|
retry_count = 0
|
|
while retry_count < max_retries:
|
|
try:
|
|
logger.info(f"Checking status URL: Attempt {retry_count + 1}")
|
|
|
|
# Wait before checking
|
|
time.sleep(retry_interval)
|
|
|
|
# Make request
|
|
response = requests.get(status_url, headers=headers, timeout=20)
|
|
|
|
if response.status_code == 200:
|
|
try:
|
|
status_data = response.json()
|
|
status = status_data.get('status', '')
|
|
|
|
logger.info(f"Status: {status}")
|
|
logger.info(f"Status data: {json.dumps(status_data, indent=2)}")
|
|
|
|
if status == 'succeeded':
|
|
return {
|
|
"success": True,
|
|
"status": status,
|
|
"message": "Processing completed successfully",
|
|
"data": status_data
|
|
}
|
|
elif status == 'failed':
|
|
error_message = status_data.get('error', {}).get('message', 'Unknown error')
|
|
return {
|
|
"success": False,
|
|
"status": status,
|
|
"message": f"Processing failed: {error_message}",
|
|
"data": status_data
|
|
}
|
|
elif status == 'processing' or status == 'pending':
|
|
logger.info(f"Still processing... waiting for completion")
|
|
except Exception as e:
|
|
logger.error(f"Error parsing status response: {str(e)}")
|
|
else:
|
|
logger.warning(f"Status check returned code: {response.status_code}")
|
|
|
|
retry_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking status: {str(e)}")
|
|
retry_count += 1
|
|
|
|
return {
|
|
"success": False,
|
|
"status": "unknown",
|
|
"message": f"Failed to get status after {max_retries} attempts"
|
|
}
|
|
|
|
def download_processed_file(output_path: str, local_directory: str) -> dict:
|
|
"""
|
|
Download a processed file from GCS
|
|
|
|
Args:
|
|
output_path: Remote path in GCS
|
|
local_directory: Local directory to save the file
|
|
|
|
Returns:
|
|
Dictionary with download result
|
|
"""
|
|
if not gcs_storage:
|
|
return {
|
|
"success": False,
|
|
"message": "GCS storage not initialized"
|
|
}
|
|
|
|
try:
|
|
# Check if the file exists and wait for it
|
|
logger.info(f"Checking for processed file: {output_path}")
|
|
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
|
|
|
|
if not output_check.get("success"):
|
|
return output_check
|
|
|
|
# Create processed directory if needed
|
|
processed_dir = os.path.join(local_directory, "processed")
|
|
os.makedirs(processed_dir, exist_ok=True)
|
|
|
|
# Set output filename
|
|
output_filename = os.path.basename(output_path)
|
|
if output_filename.startswith("output_"):
|
|
output_filename = output_filename[7:] # Remove "output_" prefix
|
|
|
|
local_path = os.path.join(processed_dir, output_filename)
|
|
|
|
# Download the file
|
|
logger.info(f"Downloading file to: {local_path}")
|
|
download_result = gcs_storage.download_file(output_path, local_path)
|
|
|
|
return download_result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error downloading processed file: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error downloading processed file: {str(e)}"
|
|
}
|
|
|
|
def main():
|
|
"""Main function to test various layer identification methods"""
|
|
# Target files
|
|
psd_path = "/Users/daveporter/Desktop/CODING-2024/Adobe-API-PS-scripts/TESTFILES/BATCH/Vichy-Product-Skincare-Liftactiv -Collagen 16 Bonding Serum -30ml-3337875912600-Safety.psd"
|
|
json_path = "/Users/daveporter/Desktop/CODING-2024/Adobe-API-PS-scripts/TESTFILES/BATCH/Vichy-Product-Skincare-Liftactiv -Collagen 16 Bonding Serum -30ml-3337875912600-Safety-textonly-updated.json"
|
|
|
|
# Load the JSON data
|
|
json_data = load_json_file(json_path)
|
|
|
|
if not json_data or "textLayers" not in json_data:
|
|
logger.error("Invalid JSON data or no text layers found")
|
|
return
|
|
|
|
text_layers = json_data.get("textLayers", [])
|
|
|
|
# Get access token
|
|
try:
|
|
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
logger.info(f"Got access token: {access_token[:15]}...{access_token[-15:]}")
|
|
except Exception as e:
|
|
logger.error(f"Error getting access token: {str(e)}")
|
|
return
|
|
|
|
# Upload PSD to GCS
|
|
upload_result = upload_psd_to_gcs(psd_path)
|
|
|
|
if not upload_result.get("success"):
|
|
logger.error(f"Failed to upload PSD file: {upload_result.get('message')}")
|
|
return
|
|
|
|
input_url = upload_result.get("input_url")
|
|
output_url = upload_result.get("output_url")
|
|
output_path = upload_result.get("output_path")
|
|
|
|
# Make an initial check of the layer data
|
|
logger.info(f"Found {len(text_layers)} text layers in JSON data:")
|
|
|
|
for idx, layer in enumerate(text_layers):
|
|
layer_id = layer.get("id")
|
|
layer_name = layer.get("name", "")
|
|
original_text = layer.get("text", "")
|
|
updated_text = layer.get("updatedText", "")
|
|
|
|
logger.info(f"Layer {idx+1}: ID={layer_id}, Name=\"{layer_name}\"")
|
|
logger.info(f" Original: \"{original_text}\"")
|
|
logger.info(f" Updated: \"{updated_text}\"")
|
|
|
|
# Test different layer identification methods
|
|
test_methods = [
|
|
{
|
|
"name": "Standard Method (ID as Integer)",
|
|
"description": "Using the layer ID as an integer",
|
|
"layer_updates": [
|
|
{
|
|
"id": 1, # ID from JSON file for first layer (converted to integer)
|
|
"text": {
|
|
"content": text_layers[1].get("updatedText", "") # Use updated text from second layer (HYPOALLERGENIC)
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "ID from JSON Method",
|
|
"description": "Using the exact ID from the JSON file",
|
|
"layer_updates": [
|
|
{
|
|
"id": 2, # ID from JSON file for second layer (converted to integer)
|
|
"text": {
|
|
"content": text_layers[0].get("updatedText", "") # Use updated text from first layer (DESIGNED FOR)
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Using Name Method",
|
|
"description": "Identifying layers by name instead of ID",
|
|
"layer_updates": [
|
|
{
|
|
"name": "HYPOALLERGENIC FORMULA", # Exact layer name from JSON
|
|
"text": {
|
|
"content": text_layers[1].get("updatedText", "") # Use updated text from this layer
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Path Method",
|
|
"description": "Using the layer path from JSON",
|
|
"layer_updates": [
|
|
{
|
|
"path": "13. Safety awards/seals/Groupe 17/DESIGNED FOR SENSITIVE SKIN", # Layer path from JSON
|
|
"text": {
|
|
"content": text_layers[0].get("updatedText", "") # Use updated text from this layer
|
|
}
|
|
}
|
|
]
|
|
},
|
|
{
|
|
"name": "Multiple Updates Method",
|
|
"description": "Updating multiple layers at once",
|
|
"layer_updates": [
|
|
{
|
|
"id": 1, # First layer
|
|
"text": {
|
|
"content": text_layers[1].get("updatedText", "") # HYPOALLERGENIC layer text
|
|
}
|
|
},
|
|
{
|
|
"id": 2, # Second layer
|
|
"text": {
|
|
"content": text_layers[0].get("updatedText", "") # DESIGNED FOR layer text
|
|
}
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
# Run tests
|
|
for test in test_methods:
|
|
logger.info("\n" + "="*50)
|
|
logger.info(f"TESTING: {test['name']}")
|
|
logger.info(f"Description: {test['description']}")
|
|
logger.info("="*50)
|
|
|
|
# Send API request
|
|
result = update_text_with_api(
|
|
input_url,
|
|
output_url,
|
|
test["layer_updates"],
|
|
config.ADOBE_CLIENT_ID,
|
|
access_token
|
|
)
|
|
|
|
# Check result
|
|
if result.get("success"):
|
|
logger.info(f"API request successful with status {result.get('status_code')}")
|
|
|
|
# If operation was accepted (202), download the processed file
|
|
if result.get("status_code") == 202:
|
|
logger.info("Waiting for processing to complete...")
|
|
time.sleep(10) # Give Adobe API time to process
|
|
|
|
# Download the processed file
|
|
output_dir = os.path.dirname(psd_path)
|
|
download_result = download_processed_file(output_path, output_dir)
|
|
|
|
if download_result.get("success"):
|
|
logger.info(f"Downloaded processed file to: {download_result.get('file_path')}")
|
|
else:
|
|
logger.error(f"Failed to download processed file: {download_result.get('message')}")
|
|
else:
|
|
logger.error(f"API request failed with status {result.get('status_code')}: {result.get('message')}")
|
|
|
|
logger.info("\nAll tests completed")
|
|
|
|
if __name__ == "__main__":
|
|
main() |