adobe-ps-scripts-loreal/adobe_ps_api.py
DJP d7dd117dab Fix Adobe API text updates: correct field names, font handling, and GCS bucket
- Fix API payload: "contents" not "content", "align" not "alignment",
  output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop"
- Remove broken font size /72 conversion (values already in points)
- Add automatic font upload from fonts/ directory to GCS
- Add FuturaPT-Demi.otf extracted from Adobe CoreSync
- Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired)
- Update HOW-IT-WORKS.md with working API docs, font setup guide,
  bug fixes, and verified test results

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 17:03:07 -05:00

1299 lines
No EOL
59 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Adobe Photoshop API Integration Script
--------------------------------------
This script provides functionality to interact with the Adobe Photoshop API
to update text layers in PSD files from JSON data.
It uses the credentials provided to authenticate with the Adobe API and
perform operations on Photoshop documents using external storage (Google Cloud Storage).
The workflow is:
1. Upload PSD file to Google Cloud Storage (GCS)
2. Generate signed URLs for input and output
3. Send API request to Adobe Photoshop API with these URLs
4. Download processed file from GCS
Requirements:
- Python 3.6+
- requests library (pip install requests)
- google-cloud-storage library (pip install google-cloud-storage)
"""
import os
import sys
import json
import time
import logging
import argparse
import requests
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Import local config, token manager, and GCS storage
import config
from adobe_token import AdobeTokenManager
from gcs_storage import GCSStorage
# Initialize token manager
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
# GCS bucket configuration
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" # The bucket to use for temporary storage
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
# Initialize GCS storage if the key exists
gcs_storage = None
if os.path.exists(GCS_KEY_PATH):
try:
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
except Exception as e:
logger.error(f"Error initializing GCS storage: {str(e)}")
logger.warning("Continuing without GCS storage - some features may not work")
else:
logger.warning(f"GCS key file not found at {GCS_KEY_PATH} - some features may not work")
# These will be populated dynamically when needed
API_KEY = config.ADOBE_CLIENT_ID
ACCESS_TOKEN = None # Will be fetched using token_manager when needed
class AdobeAPI:
"""Diagnostic class for Adobe API connectivity"""
def __init__(self, api_key: str = None, access_token: str = None, client_secret: str = None, force_new_token: bool = True):
"""Initialize the Adobe API client"""
self.api_key = api_key or API_KEY
# If client_secret is provided, we'll use it to get a token
self.client_secret = client_secret or config.ADOBE_CLIENT_SECRET
global ACCESS_TOKEN # Declare global variable
# Always generate a fresh token for API requests
if self.client_secret and force_new_token:
try:
# Force token generation even if a cached one exists
logger.info("Forcing generation of a fresh Adobe API token")
token_manager.token_cache = {} # Clear the cache to force new token
ACCESS_TOKEN, token_data = token_manager.get_token(config.DEFAULT_SCOPES)
logger.info(f"Successfully generated fresh token for Adobe API: {ACCESS_TOKEN[:15]}...{ACCESS_TOKEN[-15:]}")
self.access_token = ACCESS_TOKEN
except Exception as e:
logger.error(f"Failed to obtain fresh access token: {str(e)}")
# Fall back to the provided static token if available
self.access_token = access_token or ACCESS_TOKEN
# Otherwise use provided token or get from cache if available
elif not access_token and self.client_secret:
try:
ACCESS_TOKEN, _ = token_manager.get_token(config.DEFAULT_SCOPES)
self.access_token = ACCESS_TOKEN
except Exception as e:
logger.error(f"Failed to obtain access token: {str(e)}")
# Fall back to the provided static token if available
self.access_token = access_token or ACCESS_TOKEN
else:
self.access_token = access_token or ACCESS_TOKEN
# Set up headers with token
self.headers = {
"x-api-key": self.api_key,
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
# Print token for debugging
if self.access_token:
logger.info(f"Using Adobe access token: {self.access_token[:15]}...{self.access_token[-15:]}")
# Validate the credentials
self._validate_credentials()
def _validate_credentials(self) -> bool:
"""Validate the API credentials by making a test request"""
try:
# Check if access token is available
if not self.access_token:
logger.error("No access token available for validation")
# Continue anyway for testing purposes
logger.warning("Continuing without token for testing purposes")
return True
# Get user information as a simple validation request
logger.info(f"Testing authentication with token: {self.access_token[:20]}...{self.access_token[-20:]}")
try:
response = requests.get(
"https://ims-na1.adobelogin.com/ims/userinfo",
headers={"Authorization": f"Bearer {self.access_token}"},
timeout=20 # Increase timeout for potentially slow connections
)
logger.info(f"Authentication response status: {response.status_code}")
if response.text:
try:
# Try to parse as JSON
resp_json = response.json()
logger.info(f"Authentication response: {json.dumps(resp_json, indent=2)}")
except:
# Not JSON, log as text
logger.info(f"Authentication response: {response.text}")
if response.status_code == 200:
user_info = response.json()
logger.info(f"Successfully authenticated with Adobe API as: {user_info.get('name', 'Unknown User')}")
return True
else:
logger.warning(f"Authentication check returned: {response.status_code}")
# Continue anyway for testing
logger.warning("Continuing despite authentication response for testing")
return True
except Exception as req_err:
logger.error(f"Error making authentication request: {str(req_err)}")
# Continue anyway for testing purposes
logger.warning("Continuing despite authentication request error for testing")
return True
except Exception as e:
logger.error(f"Error in validation process: {str(e)}")
# Continue anyway for testing purposes
logger.warning("Continuing despite validation error for testing")
return True
def list_documents(self) -> List[Dict[str, Any]]:
"""List documents available in Creative Cloud Assets"""
# This is a diagnostic tool to check which Adobe API endpoints are available from your network
endpoints = [
"https://developer.adobe.com/apis", # Public Adobe developer site
"https://ims-na1.adobelogin.com/ims/userinfo", # Auth API - known to work
# Photoshop REST API endpoints (correct per Adobe documentation)
"https://image.adobe.io/pie/psdService", # Base endpoint
"https://image.adobe.io/pie/psdService/actionJSON", # Text editing via actionJSON
"https://image.adobe.io/pie/psdService/productCrop", # Product crop endpoint
# Other potential endpoints to test (may not all be accessible)
"https://firefly-api.adobe.io/v2", # Firefly API base endpoint
"https://firefly-api.adobe.io/v2/upload", # Firefly Upload endpoint
# Incorrect endpoints that should be avoided based on Adobe's clarification
"https://photoshop.adobe.io/api", # Not used for Photoshop REST API
"https://photoshop-services.adobe.io/api", # Not used for Photoshop REST API
"https://firefly-api.adobe.io/v2/photoshop/editText" # Not the correct endpoint structure
]
logger.info("Testing connectivity to Adobe API endpoints...")
# Test connectivity to adobe.com first
try:
test_response = requests.get("https://www.adobe.com", timeout=15)
logger.info(f"Adobe.com connectivity test: {test_response.status_code}")
except Exception as conn_err:
logger.error(f"Network connectivity issue: Cannot connect to Adobe.com - {str(conn_err)}")
logger.error("Please check your internet connection and DNS configuration.")
# Continue testing other endpoints even if adobe.com fails
logger.info("Continuing with other endpoint tests despite connectivity issue...")
# Test each possible endpoint
results = []
for endpoint in endpoints:
try:
logger.info(f"Testing endpoint: {endpoint}")
response = requests.get(
endpoint,
headers=self.headers,
timeout=10
)
status = response.status_code
logger.info(f" Response status: {status}")
if response.text and len(response.text) < 500:
logger.info(f" Response content: {response.text}")
elif response.text:
logger.info(f" Response content length: {len(response.text)} bytes")
results.append({
"endpoint": endpoint,
"status": status,
"accessible": status != 0 # Any response is better than nothing
})
except requests.exceptions.ConnectionError as e:
logger.info(f" Connection error: {str(e)}")
results.append({
"endpoint": endpoint,
"status": 0,
"accessible": False,
"error": "Connection error"
})
except Exception as e:
logger.info(f" Error: {str(e)}")
results.append({
"endpoint": endpoint,
"status": 0,
"accessible": False,
"error": str(e)
})
# Summary
logger.info("\nEndpoint Accessibility Summary:")
for result in results:
status = "✓ Available" if result["accessible"] else "✗ Not available"
logger.info(f"{status}: {result['endpoint']} (Status: {result['status']})")
return results
def test_text_edit(self) -> Dict[str, Any]:
"""Test the Adobe Photoshop Text Edit API functionality
This method attempts to use the Photoshop Text Edit API to modify text in a PSD file.
It tries multiple endpoints based on Adobe's current API structure.
Returns:
Dict containing results of the test attempts
"""
logger.info("Testing Adobe Photoshop Text Edit API functionality...")
# Test the correct Adobe Photoshop REST API endpoints
edit_endpoints = [
"https://image.adobe.io/pie/psdService/actionJSON", # Primary endpoint for text editing (actionJSON)
"https://image.adobe.io/pie/psdService/text" # Alternative endpoint (may be deprecated)
]
# Test with different payload formats to see which one might work
# Based on documentation, we need to use external storage with presigned URLs
# Example test URLs (these are fake but follow the required pattern)
test_s3_url = "https://my-test-bucket.s3.amazonaws.com/test-document.psd"
test_output_url = "https://my-test-bucket.s3.amazonaws.com/output-test-document.psd"
# Alternative storage test URLs
test_azure_url = "https://mystorageaccount.blob.core.windows.net/mycontainer/test-document.psd?sig=signature"
test_azure_output = "https://mystorageaccount.blob.core.windows.net/mycontainer/output-test-document.psd?sig=signature"
payloads = [
# Format 1 - S3 style with external storage - CORRECTED PER DOCUMENTATION
{
"inputs": [
{
"href": test_s3_url,
"storage": "external"
}
],
"options": {
"layers": [
{
"id": 1, # Must be integer
"text": { # Must be object
"content": "Updated text from API test"
}
}
]
},
"outputs": [
{
"href": test_output_url,
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
},
# Format 2 - Azure style with external storage
{
"inputs": [
{
"href": test_azure_url,
"storage": "external"
}
],
"options": {
"layers": [
{
"id": 1, # Integer ID
"text": { # Object with content
"content": "Updated text from API test"
}
}
]
},
"outputs": [
{
"href": test_azure_output,
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
},
# Format 3 - Older format field name with external storage
{
"inputs": [
{
"href": test_s3_url,
"storage": "external"
}
],
"options": {
"textLayers": [ # Testing with older field name
{
"id": 1, # Integer ID
"text": { # Object with content
"content": "Updated text from API test"
}
}
]
},
"outputs": [
{
"href": test_output_url,
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
},
# Format 4 - Testing with adobe storage (original approach) - likely to fail
{
"inputs": [
{
"href": "/temp/test-document-id.psd",
"storage": "adobe"
}
],
"options": {
"layers": [
{
"id": 1,
"text": {
"content": "Updated text from API test"
}
}
]
},
"outputs": [
{
"href": "/temp/output-test-document-id.psd",
"storage": "adobe",
"type": "image/vnd.adobe.photoshop"
}
]
}
]
results = []
for endpoint in edit_endpoints:
try:
logger.info(f"Testing text edit at endpoint: {endpoint}")
# First try a GET request to see if endpoint exists
get_response = requests.get(
endpoint,
headers=self.headers,
timeout=10
)
logger.info(f" GET response status: {get_response.status_code}")
# Try each payload format to see if any works
for i, payload in enumerate(payloads):
try:
logger.info(f" Testing payload format {i+1}...")
# Clone headers and add content type explicitly
request_headers = self.headers.copy()
request_headers["Content-Type"] = "application/json"
# Try a POST request with the sample payload
post_response = requests.post(
endpoint,
headers=request_headers,
json=payload,
timeout=10
)
status = post_response.status_code
logger.info(f" POST response status for format {i+1}: {status}")
# Try to extract more detailed error information
if post_response.text:
if len(post_response.text) < 500:
logger.info(f" POST response content for format {i+1}: {post_response.text}")
else:
logger.info(f" POST response content length for format {i+1}: {len(post_response.text)} bytes")
# Try to parse JSON responses
try:
resp_json = post_response.json()
if "error" in resp_json:
logger.info(f" Error details: {resp_json['error']}")
elif "errors" in resp_json:
logger.info(f" Error details: {resp_json['errors']}")
except:
# If it's not JSON, extract a sample
logger.info(f" Response excerpt: {post_response.text[:200]}...")
# Record result for this payload format
results.append({
"endpoint": endpoint,
"format": i+1,
"get_status": get_response.status_code,
"post_status": status,
"successful": 200 <= status < 300
})
# If successful, no need to try more payloads
if 200 <= status < 300:
logger.info(f" Format {i+1} successful, stopping tests for this endpoint")
break
except Exception as payload_error:
logger.info(f" Error testing format {i+1}: {str(payload_error)}")
continue
except requests.exceptions.ConnectionError as e:
logger.info(f" Connection error: {str(e)}")
results.append({
"endpoint": endpoint,
"format": "N/A",
"get_status": 0,
"post_status": 0,
"successful": False,
"error": "Connection error"
})
except Exception as e:
logger.info(f" Error: {str(e)}")
results.append({
"endpoint": endpoint,
"format": "N/A",
"get_status": 0,
"post_status": 0,
"successful": False,
"error": str(e)
})
# Summary
logger.info("\nText Edit API Test Summary:")
# Group results by endpoint for a cleaner summary
by_endpoint = {}
for result in results:
endpoint = result["endpoint"]
if endpoint not in by_endpoint:
by_endpoint[endpoint] = []
by_endpoint[endpoint].append(result)
for endpoint, endpoint_results in by_endpoint.items():
logger.info(f"Endpoint: {endpoint}")
# Show GET status only once per endpoint
get_status = endpoint_results[0]["get_status"] if endpoint_results else 0
logger.info(f" GET Status: {get_status}")
# Show each format result
for result in endpoint_results:
if result.get("format") == "N/A":
status = "✗ Error"
logger.info(f" {status}: {result.get('error', 'Unknown error')}")
else:
status = "✓ Successful" if result.get("successful") else "✗ Failed"
logger.info(f" Format {result.get('format')}: {status} (POST: {result['post_status']})")
return results
def upload_font_to_cloud(self, font_file_path: str, remote_path: str = None) -> Dict[str, Any]:
"""
Uploads a font file to Google Cloud Storage and generates signed URLs
Args:
font_file_path: Path to the font file to upload
remote_path: Optional remote path to use (defaults to fonts/filename)
Returns:
Dictionary with upload result info including signed URLs
"""
logger.info(f"Uploading font file to Google Cloud Storage: {font_file_path}")
# Check if file exists
if not os.path.exists(font_file_path):
logger.error(f"Font file not found: {font_file_path}")
return {
"success": False,
"message": f"Font file not found: {font_file_path}"
}
# Check if GCS storage is initialized
if not gcs_storage:
logger.error("GCS storage not initialized - cannot upload file")
return {
"success": False,
"message": "GCS storage not initialized - check GCS key file and bucket configuration"
}
# Get file name
file_name = os.path.basename(font_file_path)
try:
# Use provided remote path or generate one
if not remote_path:
remote_path = f"fonts/{file_name}"
# Upload the file to GCS
logger.info(f"Uploading font file to GCS: {remote_path}")
upload_result = gcs_storage.upload_file(font_file_path, remote_path)
if not upload_result.get("success"):
logger.error(f"Failed to upload font file to GCS: {upload_result.get('message')}")
return upload_result
# Generate signed URL for reading the font
download_url = upload_result.get("download_url")
logger.info(f"Font file uploaded successfully: {file_name}")
logger.info(f"Font URL (read): {download_url[:60]}...{download_url[-20:]}")
return {
"success": True,
"message": f"Font file uploaded successfully: {file_name}",
"file_name": file_name,
"bucket": GCS_BUCKET_NAME,
"remote_path": remote_path,
"download_url": download_url
}
except Exception as e:
logger.error(f"Error uploading font file: {str(e)}")
return {
"success": False,
"message": f"Error uploading font file: {str(e)}"
}
def upload_psd_to_cloud(self, psd_file_path: str) -> Dict[str, Any]:
"""
Uploads a PSD file to Google Cloud Storage and generates signed URLs
Args:
psd_file_path: Path to the PSD file to upload
Returns:
Dictionary with upload result info including signed URLs
"""
logger.info(f"Uploading PSD file to Google Cloud Storage: {psd_file_path}")
# Check if file exists
if not os.path.exists(psd_file_path):
logger.error(f"PSD file not found: {psd_file_path}")
return {
"success": False,
"message": f"PSD file not found: {psd_file_path}"
}
# Check if GCS storage is initialized
if not gcs_storage:
logger.error("GCS storage not initialized - cannot upload file")
return {
"success": False,
"message": "GCS storage not initialized - check GCS key file and bucket configuration"
}
# Get file name
file_name = os.path.basename(psd_file_path)
try:
# Generate a timestamp-based unique path to avoid collisions
timestamp = int(time.time())
remote_path = f"adobe_ps/{timestamp}_{file_name}"
output_path = f"adobe_ps/output_{timestamp}_{file_name}"
# Upload the file to GCS
logger.info(f"Uploading file to GCS: {remote_path}")
upload_result = gcs_storage.upload_file(psd_file_path, remote_path)
if not upload_result.get("success"):
logger.error(f"Failed to upload file to GCS: {upload_result.get('message')}")
return upload_result
# Generate signed URLs for input and output
input_url = upload_result.get("download_url")
# Generate a specific output URL
try:
output_url = gcs_storage.get_signed_url(
output_path,
action="write",
content_type="image/vnd.adobe.photoshop"
)
except Exception as e:
logger.error(f"Error generating output signed URL: {str(e)}")
return {
"success": False,
"message": f"Error generating output signed URL: {str(e)}"
}
logger.info(f"File uploaded successfully: {file_name}")
logger.info(f"Input URL (read): {input_url[:60]}...{input_url[-20:]}")
logger.info(f"Output URL (write): {output_url[:60]}...{output_url[-20:]}")
return {
"success": True,
"message": f"PSD file uploaded successfully: {file_name}",
"file_name": file_name,
"bucket": GCS_BUCKET_NAME,
"remote_path": remote_path,
"input_path": remote_path, # Add input_path for proper cleanup
"output_path": output_path,
"input_url": input_url,
"output_url": output_url
}
except Exception as e:
logger.error(f"Error uploading PSD file: {str(e)}")
return {
"success": False,
"message": f"Error uploading PSD file: {str(e)}"
}
def update_text_from_json(self, json_file_path: str, upload_psd: bool = True) -> Dict[str, Any]:
"""
Updates text in a PSD document in Adobe Cloud using text data from a JSON file
Args:
json_file_path: Path to the JSON file containing text layer data
upload_psd: Whether to upload the PSD file to Adobe Cloud first
Returns:
Dictionary with the result of the operation
"""
logger.info(f"Updating text from JSON file: {json_file_path}")
# Endpoint for text editing - use the correct Photoshop REST API endpoint
# According to our testing, the text endpoint is the correct one to use
# for updating text layers in PSD files
endpoint = "https://image.adobe.io/pie/psdService/text"
# The actionJSON endpoint requires different payload format, not using it for now
fallback_endpoint = "https://image.adobe.io/pie/psdService/actionJSON"
try:
# Load JSON data
with open(json_file_path, 'r', encoding='utf-8') as f:
json_data = json.load(f)
# Extract document name and text layers
document_name = json_data.get('documentName', '')
psd_path = json_data.get('psdPath', '')
text_layers = json_data.get('textLayers', [])
if not document_name or not text_layers:
logger.error("JSON file does not contain required document name or text layers")
return {
"success": False,
"message": "JSON file does not contain required document name or text layers"
}
logger.info(f"Document name: {document_name}")
logger.info(f"Found {len(text_layers)} text layers in JSON")
# Handle the PSD path
if psd_path.startswith("~"):
psd_path = os.path.expanduser(psd_path)
elif not os.path.isabs(psd_path):
# Try to find the PSD next to the JSON file
json_dir = os.path.dirname(os.path.abspath(json_file_path))
psd_path = os.path.join(json_dir, document_name)
# Use document_name if psd_path is not found
if not os.path.exists(psd_path) and document_name:
json_dir = os.path.dirname(os.path.abspath(json_file_path))
possible_psd_path = os.path.join(json_dir, document_name)
if os.path.exists(possible_psd_path):
psd_path = possible_psd_path
# If upload_psd is True and psd_path exists, upload the file
document_id = None
if upload_psd and os.path.exists(psd_path):
logger.info(f"Found PSD file: {psd_path}")
upload_result = self.upload_psd_to_cloud(psd_path)
if upload_result.get("success"):
# We don't need a document ID when using GCS, just the URLs
logger.info(f"File successfully uploaded to GCS")
else:
logger.error(f"Failed to upload PSD file: {upload_result.get('message')}")
return upload_result
else:
# For testing when not uploading, use a placeholder URLs
logger.warning("No upload performed - API will likely return an error without valid URLs")
# We don't need a document ID when using external storage
# Prepare updates for each text layer
text_layer_updates = []
for layer in text_layers:
# Only include layers that have updatedText that differs from the original text
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
# Extract layer ID and convert to integer - REQUIRED by the API
layer_id = layer.get('id')
layer_name = layer.get('name', '')
# Try to find a reliable ID
if layer_id is not None and layer_id != '':
try:
# Try to convert to integer as required by the API
layer_id = int(layer_id)
except (ValueError, TypeError):
# If conversion fails, try to derive from name
layer_id = hash(layer_name) % 1000 # Create a deterministic ID from name
else:
# For specific named layers, assign known IDs
if "HYPOALLERGENIC" in layer_name:
layer_id = 10 # Try with a specific ID for this layer
elif "DESIGNED FOR" in layer_name:
layer_id = 20 # Try with a specific ID for this layer
else:
# Default fallback - try layer position in array as ID
layer_id = text_layers.index(layer) + 1
# Format the text as an object with content field
text_layer_updates.append({
"id": layer_id, # Must be integer when possible
"text": { # Must be object with content field
"content": layer.get('updatedText')
}
})
if not text_layer_updates:
logger.info("No text updates needed - all layers are unchanged")
return {
"success": True,
"message": "No text updates needed"
}
logger.info(f"Prepared {len(text_layer_updates)} text layer updates")
# Prepare the API request
# Use the uploaded file's storage path if available
storage_path = upload_result.get("storage_path", f"uploads/{document_name}") if upload_psd and upload_result.get("success") else None
# Format inputs and outputs according to the API requirements
# Using Google Cloud Storage signed URLs
if upload_result and upload_result.get("success"):
# Use GCS signed URLs from the upload result
input_url = upload_result.get("input_url")
output_url = upload_result.get("output_url")
# Create the payload with proper URLs
payload = {
"inputs": [
{
"href": input_url, # GCS signed URL for input file
"storage": "external" # External storage (GCS)
}
],
"options": {
"layers": text_layer_updates # Using the corrected format with id (integer) and text as object
},
"outputs": [
{
"href": output_url, # GCS signed URL for output file
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
}
logger.info("Using GCS signed URLs for Adobe Photoshop API request")
else:
# Failed to get GCS signed URLs - using placeholder for testing
# Log warning
logger.warning("No GCS signed URLs available - using placeholder URLs for testing")
logger.warning("This request will fail with the Adobe API - for testing only")
# Example GCS-style URLs for documentation
fake_gcs_url = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}"
input_file = f"placeholder/{document_name or 'test-document.psd'}"
output_file = f"placeholder/output-{document_name or 'test-document.psd'}"
payload = {
"inputs": [
{
"href": f"{fake_gcs_url}/{input_file}",
"storage": "external"
}
],
"options": {
"layers": text_layer_updates # Using the corrected format with id (integer) and text as object
},
"outputs": [
{
"href": f"{fake_gcs_url}/{output_file}",
"storage": "external",
"type": "image/vnd.adobe.photoshop"
}
]
}
# Set appropriate headers
headers = self.headers.copy()
headers["Content-Type"] = "application/json"
# Log the request details for debugging
logger.debug(f"Request payload: {json.dumps(payload, indent=2)}")
# Make the API request to the primary endpoint
logger.info(f"Sending text update request to primary endpoint: {endpoint}")
logger.info(f"Request headers: {json.dumps(headers, indent=2)}")
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
response = requests.post(
endpoint,
headers=headers,
json=payload,
timeout=30
)
# Log response details
logger.info(f"Primary endpoint response status: {response.status_code}")
if response.text:
try:
resp_json = response.json()
logger.info(f"Response content: {json.dumps(resp_json, indent=2)}")
except:
# Not JSON, log as text (first 1000 chars)
logger.info(f"Response content: {response.text[:1000]}")
# If primary endpoint fails, try the fallback endpoint
if response.status_code == 404:
logger.info(f"Primary endpoint returned 404, trying fallback endpoint: {fallback_endpoint}")
# Make the fallback request
fallback_response = requests.post(
fallback_endpoint,
headers=headers,
json=payload,
timeout=30
)
# Log fallback response details
logger.info(f"Fallback endpoint response status: {fallback_response.status_code}")
if fallback_response.text:
try:
resp_json = fallback_response.json()
logger.info(f"Fallback response content: {json.dumps(resp_json, indent=2)}")
except:
# Not JSON, log as text (first 1000 chars)
logger.info(f"Fallback response content: {fallback_response.text[:1000]}")
# Use the fallback response
response = fallback_response
# Process response
if response.status_code == 200 or response.status_code == 202:
logger.info("Text update API request successful")
# Process API response
try:
result = response.json()
# For 202 Accepted response, include status URL for async processing
if response.status_code == 202 and '_links' in result and 'self' in result.get('_links', {}):
status_url = result.get('_links', {}).get('self', {}).get('href')
logger.info(f"Request accepted for processing. Status URL: {status_url}")
# Check status and wait for completion
if status_url:
logger.info("Checking processing status...")
# Poll the status URL until processing is complete
max_retries = 10
retry_count = 0
while retry_count < max_retries:
try:
# Wait to avoid overwhelming the API
time.sleep(5)
# Check status
status_response = requests.get(
status_url,
headers=self.headers,
timeout=30
)
if status_response.status_code == 200:
status_data = status_response.json()
status = status_data.get('status', '')
logger.info(f"Processing status: {status}")
if status == 'succeeded':
logger.info("Processing completed successfully!")
break
elif status == 'failed':
logger.error(f"Processing failed: {status_data.get('error', {}).get('message', 'Unknown error')}")
break
retry_count += 1
except Exception as e:
logger.error(f"Error checking status: {str(e)}")
retry_count += 1
api_result = {
"success": True,
"message": "Text update request accepted and processing",
"result": result,
"status_code": response.status_code
}
except Exception as e:
logger.error(f"Error parsing API response: {str(e)}")
api_result = {
"success": True,
"message": "Text update request accepted",
"response": response.text,
"status_code": response.status_code
}
# For successful uploads, try to download the output file
if upload_result and upload_result.get("success") and gcs_storage:
output_path = upload_result.get("output_path")
if output_path:
logger.info(f"Checking for processed output file: {output_path}")
# Wait for the output file to be available (up to 5 minutes)
output_check = gcs_storage.check_output_file(output_path, wait_time=300)
if output_check.get("success"):
# Download the processed file
output_dir = os.path.dirname(psd_path)
processed_dir = os.path.join(output_dir, "processed")
# Create processed directory if it doesn't exist
if not os.path.exists(processed_dir):
os.makedirs(processed_dir, exist_ok=True)
output_filename = f"processed_{os.path.basename(psd_path)}"
output_file_path = os.path.join(processed_dir, output_filename)
logger.info(f"Downloading processed file to: {output_file_path}")
download_result = gcs_storage.download_file(output_path, output_file_path)
if download_result.get("success"):
logger.info(f"Successfully downloaded processed file: {output_file_path}")
api_result["processed_file"] = output_file_path
api_result["message"] += f" and downloaded to {output_file_path}"
else:
logger.warning(f"Failed to download processed file: {download_result.get('message')}")
api_result["output_download_failed"] = download_result.get("message")
else:
logger.warning(f"Output file not found: {output_check.get('message')}")
api_result["output_check_failed"] = output_check.get("message")
return api_result
else:
logger.error(f"Text update failed with status {response.status_code}")
error_message = response.text
try:
error_data = response.json()
error_message = error_data.get('message', error_data.get('title', response.text))
except:
pass
return {
"success": False,
"status_code": response.status_code,
"message": f"Text update failed: {error_message}",
"response": response.text,
"upload_result": upload_result if upload_result and upload_result.get("success") else None
}
except Exception as e:
logger.error(f"Error updating text: {str(e)}")
return {
"success": False,
"message": f"Error updating text: {str(e)}"
}
def parse_arguments():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(
description='Adobe Photoshop API Script for text extraction and updating',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Test connectivity to Adobe API endpoints
python adobe_ps_api.py test-api
# Test text editing API functionality
python adobe_ps_api.py test-text-edit
# Test Google Cloud Storage integration
python adobe_ps_api.py test-gcs /path/to/file.psd
# Update text in a PSD file using a JSON file
python adobe_ps_api.py update-text /path/to/extracted_text.json
# Generate a new token and store it in the cache
python adobe_ps_api.py generate-token --client-secret your_client_secret
Adobe Photoshop API Workflow:
1. PSD files are uploaded to Google Cloud Storage (GCS)
2. Signed URLs are generated for both input and output files
3. These URLs are provided to the Adobe Photoshop API
4. The API processes the input file and writes the output file to GCS
5. The script downloads the processed output file when available
"""
)
# Common options
parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
parser.add_argument('--client-secret', '-s',
help='Client secret for Adobe API authentication')
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# Test API connectivity command
test_parser = subparsers.add_parser('test-api', help='Test connectivity to Adobe API endpoints')
test_parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
test_parser.add_argument('--client-secret', '-s',
help='Client secret for Adobe API authentication')
# Test text editing API command
text_edit_parser = subparsers.add_parser('test-text-edit', help='Test Photoshop text editing API functionality')
text_edit_parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
text_edit_parser.add_argument('--client-secret', '-s',
help='Client secret for Adobe API authentication')
# Test GCS upload command
gcs_test_parser = subparsers.add_parser('test-gcs', help='Test Google Cloud Storage integration')
gcs_test_parser.add_argument('psd_file', help='Path to PSD file to upload')
gcs_test_parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
# Update text command
update_parser = subparsers.add_parser('update-text', help='Update text in a PSD file using a JSON file')
update_parser.add_argument('json_file', help='Path to JSON file with text layer data')
update_parser.add_argument('--dry-run', '-d', action='store_true',
help='Preview text updates without making API calls')
update_parser.add_argument('--no-upload', '-n', action='store_true',
help='Skip uploading the PSD file (uses placeholder URLs)')
update_parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
update_parser.add_argument('--client-secret', '-s',
help='Client secret for Adobe API authentication')
update_parser.add_argument('--download-output', '-o', action='store_true',
help='Download the processed output file if available')
# Generate token command
token_parser = subparsers.add_parser('generate-token', help='Generate a new Adobe API token')
token_parser.add_argument('--client-secret', '-s', required=True,
help='Client secret for Adobe API authentication')
token_parser.add_argument('--scopes',
default=config.DEFAULT_SCOPES,
help=f'Comma-separated list of scopes (default: {config.DEFAULT_SCOPES})')
token_parser.add_argument('--verbose', '-v', action='store_true',
help='Enable verbose logging')
return parser.parse_args()
def main():
"""Main function"""
args = parse_arguments()
# Set logging level based on verbose flag
if args.verbose:
logger.setLevel(logging.DEBUG)
# Set log format to include more details
for handler in logger.handlers:
handler.setFormatter(logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s',
'%Y-%m-%d %H:%M:%S'
))
# Update config with client secret if provided
if hasattr(args, 'client_secret') and args.client_secret:
config.ADOBE_CLIENT_SECRET = args.client_secret
# Handle token generation separately
if args.command == 'generate-token':
try:
# Initialize token manager with client credentials
token_mgr = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
# Get a new token with specified scopes
access_token, token_data = token_mgr.get_token(args.scopes)
# Display token information
print("\nToken Generated Successfully")
print("-" * 40)
print(f"Access Token: {access_token[:15]}...{access_token[-15:]}")
print(f"Token Type: {token_data.get('token_type', 'bearer')}")
print(f"Expires In: {token_data.get('expires_in')} seconds ({int(token_data.get('expires_in', 0))/86400:.1f} days)")
print(f"Scope: {token_data.get('scope', args.scopes)}")
# Verify the token
user_info = token_mgr.verify_token(access_token)
if user_info:
print(f"Token verified for user/account: {user_info.get('sub', 'Unknown')}")
print("\nToken has been saved in cache and will be used for future API calls.")
else:
print("\nWarning: Token verification failed. The token may not be valid.")
except Exception as e:
print(f"Error generating token: {str(e)}")
sys.exit(1)
else:
# Initialize API client with client secret if provided
# First try to get a valid token
try:
token, token_data = token_manager.get_token(config.DEFAULT_SCOPES)
print(f"\nUsing token: {token[:20]}...{token[-20:]}")
except Exception as e:
print(f"Error getting token: {str(e)}")
api = AdobeAPI(client_secret=config.ADOBE_CLIENT_SECRET)
# Execute command
if args.command == 'test-api':
api.list_documents()
elif args.command == 'test-text-edit':
api.test_text_edit()
elif args.command == 'test-gcs':
# Test GCS upload functionality
psd_file = args.psd_file
if not os.path.exists(psd_file):
print(f"Error: PSD file '{psd_file}' does not exist.")
sys.exit(1)
print(f"\nTesting GCS upload with file: {psd_file}")
# Check if GCS storage is initialized
if not gcs_storage:
print(f"Error: GCS storage not initialized. Check that gcs_key.json exists.")
sys.exit(1)
# Upload the file
result = api.upload_psd_to_cloud(psd_file)
if result['success']:
print(f"\nFile uploaded successfully to Google Cloud Storage")
print(f" Bucket: {result['bucket']}")
print(f" Remote path: {result['remote_path']}")
print(f"\nGenerated signed URLs for Adobe Photoshop API:")
print(f" Input URL: {result['input_url'][:60]}...{result['input_url'][-20:]}")
print(f" Output URL: {result['output_url'][:60]}...{result['output_url'][-20:]}")
print(f"\nThese URLs are valid for approximately 1 hour.")
else:
print(f"\nError uploading file: {result['message']}")
sys.exit(1)
elif args.command == 'update-text':
json_file = args.json_file
if not os.path.exists(json_file):
print(f"Error: JSON file '{json_file}' does not exist.")
sys.exit(1)
if args.dry_run:
print(f"\nDRY RUN: Would update text using {json_file}")
try:
with open(json_file, 'r', encoding='utf-8') as f:
json_data = json.load(f)
document_name = json_data.get('documentName', 'Unknown document')
text_layers = json_data.get('textLayers', [])
print(f"\nDocument: {document_name}")
print(f"Found {len(text_layers)} text layers in JSON file")
updates_needed = 0
for i, layer in enumerate(text_layers):
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
updates_needed += 1
original = layer.get('text', '')
updated = layer.get('updatedText', '')
print(f"\nLayer {i+1}: {layer.get('name', 'Unnamed')}")
print(f" Original: {original[:50]}{'...' if len(original) > 50 else ''}")
print(f" Updated: {updated[:50]}{'...' if len(updated) > 50 else ''}")
if updates_needed == 0:
print("\nNo text updates needed - all layers are unchanged")
else:
print(f"\nWould update {updates_needed} text layers")
except Exception as e:
print(f"Error reading JSON file: {str(e)}")
sys.exit(1)
else:
# Determine whether to upload the PSD file
upload_psd = not args.no_upload
if upload_psd:
if not gcs_storage:
print(f"\nWarning: GCS storage not initialized. Using placeholder URLs.")
print(f"Adobe API calls will likely fail without valid pre-signed URLs.")
print(f"Check that gcs_key.json exists and is valid.")
print(f"\nUploading PSD and updating text using API with {json_file}")
else:
print(f"\nUpdating text using API with {json_file} (without PSD upload)")
print(f"Note: This mode uses placeholder URLs and will likely fail with Adobe API")
result = api.update_text_from_json(json_file, upload_psd=upload_psd)
if result['success']:
if 'status_code' in result and result['status_code'] == 202:
print(f"Success: {result['message']}")
print(f"The request has been accepted and is being processed asynchronously.")
# If we have status URL information
if 'result' in result and '_links' in result['result']:
status_url = result['result'].get('_links', {}).get('self', {}).get('href')
if status_url:
print(f"Status URL: {status_url}")
print(f"You can check the status at this URL for updates.")
else:
print(f"Success: {result['message']}")
# Check if processed file was downloaded
if 'processed_file' in result:
print(f"Processed file downloaded to: {result['processed_file']}")
else:
print(f"Error: {result['message']}")
if 'status_code' in result:
print(f"Status code: {result['status_code']}")
# Check if we have upload details despite API failure
if 'upload_result' in result and result['upload_result']:
print(f"\nNote: File was successfully uploaded to GCS despite API error")
print(f" Remote path: {result['upload_result'].get('remote_path')}")
sys.exit(1)
else:
print("\nNo command specified. Use --help to see available commands.")
print("\nAvailable commands:")
print(" test-api - Test connectivity to Adobe API endpoints")
print(" test-text-edit - Test the text editing API functionality")
print(" test-gcs - Test Google Cloud Storage upload and URL generation")
print(" update-text - Update text in a PSD file using a JSON file")
print(" generate-token - Generate a new Adobe API token using client credentials")
print("\nNote: The API endpoints used in this script may not be accessible from all networks.")
print("This could be due to DNS or network restrictions. If you are unable to connect to")
print("the Adobe API endpoints, please check with your network administrator or try from a different network.")
if __name__ == "__main__":
main()