adobe-ps-scripts-loreal/ARCHIVE/debug_text_layer.py
DJP d7dd117dab Fix Adobe API text updates: correct field names, font handling, and GCS bucket
- Fix API payload: "contents" not "content", "align" not "alignment",
  output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop"
- Remove broken font size /72 conversion (values already in points)
- Add automatic font upload from fonts/ directory to GCS
- Add FuturaPT-Demi.otf extracted from Adobe CoreSync
- Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired)
- Update HOW-IT-WORKS.md with working API docs, font setup guide,
  bug fixes, and verified test results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 17:03:07 -05:00

504 lines
No EOL
18 KiB
Python

#!/usr/bin/env python3
"""
Debug Text Layer Update with Adobe Photoshop API
This script attempts to update a specific text layer in a PSD file
using different layer identification techniques to determine what
the Adobe Photoshop API requires for successful text updates.
"""
import os
import json
import time
import requests
import logging
from pathlib import Path
# Import local modules
import config
from adobe_token import AdobeTokenManager
from gcs_storage import GCSStorage
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Initialize token manager
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
# GCS bucket configuration
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" # The bucket to use for temporary storage
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
# Initialize GCS storage if the key exists
gcs_storage = None
if os.path.exists(GCS_KEY_PATH):
try:
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
except Exception as e:
logger.error(f"Error initializing GCS storage: {str(e)}")
logger.warning("Continuing without GCS storage - some features may not work")
else:
logger.warning(f"GCS key file not found at {GCS_KEY_PATH} - some features may not work")
def load_json_file(json_path: str) -> dict:
"""Load and parse a JSON file"""
try:
with open(json_path, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
logger.error(f"Error loading JSON file {json_path}: {str(e)}")
return {}
def upload_psd_to_gcs(psd_path: str) -> dict:
"""Upload a PSD file to GCS and return signed URLs"""
logger.info(f"Uploading PSD file to GCS: {psd_path}")
# Check if file exists
if not os.path.exists(psd_path):
logger.error(f"PSD file not found: {psd_path}")
return {"success": False, "message": f"PSD file not found: {psd_path}"}
# Check if GCS storage is initialized
if not gcs_storage:
logger.error("GCS storage not initialized - cannot upload file")
return {
"success": False,
"message": "GCS storage not initialized - check GCS key file and bucket configuration"
}
try:
# Generate a timestamp-based unique path to avoid collisions
file_name = os.path.basename(psd_path)
timestamp = int(time.time())
remote_path = f"adobe_ps/{timestamp}_{file_name}"
output_path = f"adobe_ps/output_{timestamp}_{file_name}"
# Upload the file to GCS
logger.info(f"Uploading file to GCS: {remote_path}")
upload_result = gcs_storage.upload_file(psd_path, remote_path)
if not upload_result.get("success"):
logger.error(f"Failed to upload file to GCS: {upload_result.get('message')}")
return upload_result
# Generate signed URLs for input and output
input_url = upload_result.get("download_url")
# Generate a specific output URL
try:
output_url = gcs_storage.get_signed_url(
output_path,
action="write",
content_type="image/vnd.adobe.photoshop"
)
except Exception as e:
logger.error(f"Error generating output signed URL: {str(e)}")
return {
"success": False,
"message": f"Error generating output signed URL: {str(e)}"
}
logger.info(f"File uploaded successfully: {file_name}")
logger.info(f"Input URL (read): {input_url[:60]}...{input_url[-20:]}")
logger.info(f"Output URL (write): {output_url[:60]}...{output_url[-20:]}")
return {
"success": True,
"message": f"PSD file uploaded successfully: {file_name}",
"file_name": file_name,
"bucket": GCS_BUCKET_NAME,
"remote_path": remote_path,
"output_path": output_path,
"input_url": input_url,
"output_url": output_url
}
except Exception as e:
logger.error(f"Error uploading PSD file: {str(e)}")
return {
"success": False,
"message": f"Error uploading PSD file: {str(e)}"
}
def update_text_with_api(input_url: str, output_url: str, layer_updates: list, api_key: str, access_token: str) -> dict:
"""
Send a text update request to the Adobe Photoshop API
Args:
input_url: Signed URL for the input PSD file
output_url: Signed URL for the output PSD file
layer_updates: List of layer updates to apply
api_key: Adobe API key
access_token: Adobe access token
Returns:
Dictionary with the API response details
"""
# Endpoint for text editing
endpoint = "https://image.adobe.io/pie/psdService/text"
# Set headers
headers = {
"x-api-key": api_key,
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
# Create the payload
payload = {
"inputs": [
{
"href": input_url,
"storage": "external"
}
],
"options": {
"layers": layer_updates
},
"outputs": [
{
"href": output_url,
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
}
# Log the request details
logger.info(f"Sending text update request to: {endpoint}")
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
try:
# Make the API request
response = requests.post(
endpoint,
headers=headers,
json=payload,
timeout=30
)
# Log response
logger.info(f"Response status: {response.status_code}")
if response.text:
try:
resp_json = response.json()
logger.info(f"Response content: {json.dumps(resp_json, indent=2)}")
# Check for status URL for async operations
if response.status_code == 202 and '_links' in resp_json and 'self' in resp_json.get('_links', {}):
status_url = resp_json.get('_links', {}).get('self', {}).get('href')
logger.info(f"Status URL: {status_url}")
# Poll the status URL
status_result = poll_status_url(status_url, access_token, api_key)
# Return combined result
return {
"success": response.status_code == 202 or response.status_code == 200,
"status_code": response.status_code,
"message": f"API request {'accepted' if response.status_code == 202 else 'successful'}",
"response": resp_json,
"status_polling": status_result
}
except:
logger.info(f"Response content: {response.text}")
return {
"success": response.status_code == 202 or response.status_code == 200,
"status_code": response.status_code,
"message": f"API request {'accepted' if response.status_code == 202 else 'successful'}",
"response": response.text
}
except Exception as e:
logger.error(f"Error making API request: {str(e)}")
return {
"success": False,
"status_code": 0,
"message": f"Error making API request: {str(e)}"
}
def poll_status_url(status_url: str, access_token: str, api_key: str, max_retries: int = 10, retry_interval: int = 5) -> dict:
"""
Poll the status URL to check processing status
Args:
status_url: URL to check status
access_token: Adobe access token
api_key: Adobe API key
max_retries: Maximum number of retries
retry_interval: Seconds between retries
Returns:
Dictionary with status details
"""
headers = {
"x-api-key": api_key,
"Authorization": f"Bearer {access_token}"
}
retry_count = 0
while retry_count < max_retries:
try:
logger.info(f"Checking status URL: Attempt {retry_count + 1}")
# Wait before checking
time.sleep(retry_interval)
# Make request
response = requests.get(status_url, headers=headers, timeout=20)
if response.status_code == 200:
try:
status_data = response.json()
status = status_data.get('status', '')
logger.info(f"Status: {status}")
logger.info(f"Status data: {json.dumps(status_data, indent=2)}")
if status == 'succeeded':
return {
"success": True,
"status": status,
"message": "Processing completed successfully",
"data": status_data
}
elif status == 'failed':
error_message = status_data.get('error', {}).get('message', 'Unknown error')
return {
"success": False,
"status": status,
"message": f"Processing failed: {error_message}",
"data": status_data
}
elif status == 'processing' or status == 'pending':
logger.info(f"Still processing... waiting for completion")
except Exception as e:
logger.error(f"Error parsing status response: {str(e)}")
else:
logger.warning(f"Status check returned code: {response.status_code}")
retry_count += 1
except Exception as e:
logger.error(f"Error checking status: {str(e)}")
retry_count += 1
return {
"success": False,
"status": "unknown",
"message": f"Failed to get status after {max_retries} attempts"
}
def download_processed_file(output_path: str, local_directory: str) -> dict:
"""
Download a processed file from GCS
Args:
output_path: Remote path in GCS
local_directory: Local directory to save the file
Returns:
Dictionary with download result
"""
if not gcs_storage:
return {
"success": False,
"message": "GCS storage not initialized"
}
try:
# Check if the file exists and wait for it
logger.info(f"Checking for processed file: {output_path}")
output_check = gcs_storage.check_output_file(output_path, wait_time=60)
if not output_check.get("success"):
return output_check
# Create processed directory if needed
processed_dir = os.path.join(local_directory, "processed")
os.makedirs(processed_dir, exist_ok=True)
# Set output filename
output_filename = os.path.basename(output_path)
if output_filename.startswith("output_"):
output_filename = output_filename[7:] # Remove "output_" prefix
local_path = os.path.join(processed_dir, output_filename)
# Download the file
logger.info(f"Downloading file to: {local_path}")
download_result = gcs_storage.download_file(output_path, local_path)
return download_result
except Exception as e:
logger.error(f"Error downloading processed file: {str(e)}")
return {
"success": False,
"message": f"Error downloading processed file: {str(e)}"
}
def main():
"""Main function to test various layer identification methods"""
# Target files
psd_path = "/Users/daveporter/Desktop/CODING-2024/Adobe-API-PS-scripts/TESTFILES/BATCH/Vichy-Product-Skincare-Liftactiv -Collagen 16 Bonding Serum -30ml-3337875912600-Safety.psd"
json_path = "/Users/daveporter/Desktop/CODING-2024/Adobe-API-PS-scripts/TESTFILES/BATCH/Vichy-Product-Skincare-Liftactiv -Collagen 16 Bonding Serum -30ml-3337875912600-Safety-textonly-updated.json"
# Load the JSON data
json_data = load_json_file(json_path)
if not json_data or "textLayers" not in json_data:
logger.error("Invalid JSON data or no text layers found")
return
text_layers = json_data.get("textLayers", [])
# Get access token
try:
access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES)
logger.info(f"Got access token: {access_token[:15]}...{access_token[-15:]}")
except Exception as e:
logger.error(f"Error getting access token: {str(e)}")
return
# Upload PSD to GCS
upload_result = upload_psd_to_gcs(psd_path)
if not upload_result.get("success"):
logger.error(f"Failed to upload PSD file: {upload_result.get('message')}")
return
input_url = upload_result.get("input_url")
output_url = upload_result.get("output_url")
output_path = upload_result.get("output_path")
# Make an initial check of the layer data
logger.info(f"Found {len(text_layers)} text layers in JSON data:")
for idx, layer in enumerate(text_layers):
layer_id = layer.get("id")
layer_name = layer.get("name", "")
original_text = layer.get("text", "")
updated_text = layer.get("updatedText", "")
logger.info(f"Layer {idx+1}: ID={layer_id}, Name=\"{layer_name}\"")
logger.info(f" Original: \"{original_text}\"")
logger.info(f" Updated: \"{updated_text}\"")
# Test different layer identification methods
test_methods = [
{
"name": "Standard Method (ID as Integer)",
"description": "Using the layer ID as an integer",
"layer_updates": [
{
"id": 1, # ID from JSON file for first layer (converted to integer)
"text": {
"content": text_layers[1].get("updatedText", "") # Use updated text from second layer (HYPOALLERGENIC)
}
}
]
},
{
"name": "ID from JSON Method",
"description": "Using the exact ID from the JSON file",
"layer_updates": [
{
"id": 2, # ID from JSON file for second layer (converted to integer)
"text": {
"content": text_layers[0].get("updatedText", "") # Use updated text from first layer (DESIGNED FOR)
}
}
]
},
{
"name": "Using Name Method",
"description": "Identifying layers by name instead of ID",
"layer_updates": [
{
"name": "HYPOALLERGENIC FORMULA", # Exact layer name from JSON
"text": {
"content": text_layers[1].get("updatedText", "") # Use updated text from this layer
}
}
]
},
{
"name": "Path Method",
"description": "Using the layer path from JSON",
"layer_updates": [
{
"path": "13. Safety awards/seals/Groupe 17/DESIGNED FOR SENSITIVE SKIN", # Layer path from JSON
"text": {
"content": text_layers[0].get("updatedText", "") # Use updated text from this layer
}
}
]
},
{
"name": "Multiple Updates Method",
"description": "Updating multiple layers at once",
"layer_updates": [
{
"id": 1, # First layer
"text": {
"content": text_layers[1].get("updatedText", "") # HYPOALLERGENIC layer text
}
},
{
"id": 2, # Second layer
"text": {
"content": text_layers[0].get("updatedText", "") # DESIGNED FOR layer text
}
}
]
}
]
# Run tests
for test in test_methods:
logger.info("\n" + "="*50)
logger.info(f"TESTING: {test['name']}")
logger.info(f"Description: {test['description']}")
logger.info("="*50)
# Send API request
result = update_text_with_api(
input_url,
output_url,
test["layer_updates"],
config.ADOBE_CLIENT_ID,
access_token
)
# Check result
if result.get("success"):
logger.info(f"API request successful with status {result.get('status_code')}")
# If operation was accepted (202), download the processed file
if result.get("status_code") == 202:
logger.info("Waiting for processing to complete...")
time.sleep(10) # Give Adobe API time to process
# Download the processed file
output_dir = os.path.dirname(psd_path)
download_result = download_processed_file(output_path, output_dir)
if download_result.get("success"):
logger.info(f"Downloaded processed file to: {download_result.get('file_path')}")
else:
logger.error(f"Failed to download processed file: {download_result.get('message')}")
else:
logger.error(f"API request failed with status {result.get('status_code')}: {result.get('message')}")
logger.info("\nAll tests completed")
if __name__ == "__main__":
main()