- Fix API payload: "contents" not "content", "align" not "alignment", output type "vnd.adobe.photoshop" not "image/vnd.adobe.photoshop" - Remove broken font size /72 conversion (values already in points) - Add automatic font upload from fonts/ directory to GCS - Add FuturaPT-Demi.otf extracted from Adobe CoreSync - Update GCS bucket to lor-txt-tmp-bkt-26 (old billing expired) - Update HOW-IT-WORKS.md with working API docs, font setup guide, bug fixes, and verified test results Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1299 lines
No EOL
59 KiB
Python
Executable file
1299 lines
No EOL
59 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
Adobe Photoshop API Integration Script
|
|
--------------------------------------
|
|
|
|
This script provides functionality to interact with the Adobe Photoshop API
|
|
to update text layers in PSD files from JSON data.
|
|
|
|
It uses the credentials provided to authenticate with the Adobe API and
|
|
perform operations on Photoshop documents using external storage (Google Cloud Storage).
|
|
|
|
The workflow is:
|
|
1. Upload PSD file to Google Cloud Storage (GCS)
|
|
2. Generate signed URLs for input and output
|
|
3. Send API request to Adobe Photoshop API with these URLs
|
|
4. Download processed file from GCS
|
|
|
|
Requirements:
|
|
- Python 3.6+
|
|
- requests library (pip install requests)
|
|
- google-cloud-storage library (pip install google-cloud-storage)
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import logging
|
|
import argparse
|
|
import requests
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Import local config, token manager, and GCS storage
|
|
import config
|
|
from adobe_token import AdobeTokenManager
|
|
from gcs_storage import GCSStorage
|
|
|
|
# Initialize token manager
|
|
token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
# GCS bucket configuration
|
|
GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" # The bucket to use for temporary storage
|
|
GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json")
|
|
|
|
# Initialize GCS storage if the key exists
|
|
gcs_storage = None
|
|
if os.path.exists(GCS_KEY_PATH):
|
|
try:
|
|
gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH)
|
|
logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}")
|
|
except Exception as e:
|
|
logger.error(f"Error initializing GCS storage: {str(e)}")
|
|
logger.warning("Continuing without GCS storage - some features may not work")
|
|
else:
|
|
logger.warning(f"GCS key file not found at {GCS_KEY_PATH} - some features may not work")
|
|
|
|
# These will be populated dynamically when needed
|
|
API_KEY = config.ADOBE_CLIENT_ID
|
|
ACCESS_TOKEN = None # Will be fetched using token_manager when needed
|
|
|
|
class AdobeAPI:
|
|
"""Diagnostic class for Adobe API connectivity"""
|
|
|
|
def __init__(self, api_key: str = None, access_token: str = None, client_secret: str = None, force_new_token: bool = True):
|
|
"""Initialize the Adobe API client"""
|
|
self.api_key = api_key or API_KEY
|
|
|
|
# If client_secret is provided, we'll use it to get a token
|
|
self.client_secret = client_secret or config.ADOBE_CLIENT_SECRET
|
|
|
|
global ACCESS_TOKEN # Declare global variable
|
|
|
|
# Always generate a fresh token for API requests
|
|
if self.client_secret and force_new_token:
|
|
try:
|
|
# Force token generation even if a cached one exists
|
|
logger.info("Forcing generation of a fresh Adobe API token")
|
|
token_manager.token_cache = {} # Clear the cache to force new token
|
|
ACCESS_TOKEN, token_data = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
logger.info(f"Successfully generated fresh token for Adobe API: {ACCESS_TOKEN[:15]}...{ACCESS_TOKEN[-15:]}")
|
|
self.access_token = ACCESS_TOKEN
|
|
except Exception as e:
|
|
logger.error(f"Failed to obtain fresh access token: {str(e)}")
|
|
# Fall back to the provided static token if available
|
|
self.access_token = access_token or ACCESS_TOKEN
|
|
# Otherwise use provided token or get from cache if available
|
|
elif not access_token and self.client_secret:
|
|
try:
|
|
ACCESS_TOKEN, _ = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
self.access_token = ACCESS_TOKEN
|
|
except Exception as e:
|
|
logger.error(f"Failed to obtain access token: {str(e)}")
|
|
# Fall back to the provided static token if available
|
|
self.access_token = access_token or ACCESS_TOKEN
|
|
else:
|
|
self.access_token = access_token or ACCESS_TOKEN
|
|
|
|
# Set up headers with token
|
|
self.headers = {
|
|
"x-api-key": self.api_key,
|
|
"Authorization": f"Bearer {self.access_token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
# Print token for debugging
|
|
if self.access_token:
|
|
logger.info(f"Using Adobe access token: {self.access_token[:15]}...{self.access_token[-15:]}")
|
|
|
|
# Validate the credentials
|
|
self._validate_credentials()
|
|
|
|
def _validate_credentials(self) -> bool:
|
|
"""Validate the API credentials by making a test request"""
|
|
try:
|
|
# Check if access token is available
|
|
if not self.access_token:
|
|
logger.error("No access token available for validation")
|
|
# Continue anyway for testing purposes
|
|
logger.warning("Continuing without token for testing purposes")
|
|
return True
|
|
|
|
# Get user information as a simple validation request
|
|
logger.info(f"Testing authentication with token: {self.access_token[:20]}...{self.access_token[-20:]}")
|
|
|
|
try:
|
|
response = requests.get(
|
|
"https://ims-na1.adobelogin.com/ims/userinfo",
|
|
headers={"Authorization": f"Bearer {self.access_token}"},
|
|
timeout=20 # Increase timeout for potentially slow connections
|
|
)
|
|
|
|
logger.info(f"Authentication response status: {response.status_code}")
|
|
if response.text:
|
|
try:
|
|
# Try to parse as JSON
|
|
resp_json = response.json()
|
|
logger.info(f"Authentication response: {json.dumps(resp_json, indent=2)}")
|
|
except:
|
|
# Not JSON, log as text
|
|
logger.info(f"Authentication response: {response.text}")
|
|
|
|
if response.status_code == 200:
|
|
user_info = response.json()
|
|
logger.info(f"Successfully authenticated with Adobe API as: {user_info.get('name', 'Unknown User')}")
|
|
return True
|
|
else:
|
|
logger.warning(f"Authentication check returned: {response.status_code}")
|
|
# Continue anyway for testing
|
|
logger.warning("Continuing despite authentication response for testing")
|
|
return True
|
|
except Exception as req_err:
|
|
logger.error(f"Error making authentication request: {str(req_err)}")
|
|
# Continue anyway for testing purposes
|
|
logger.warning("Continuing despite authentication request error for testing")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in validation process: {str(e)}")
|
|
# Continue anyway for testing purposes
|
|
logger.warning("Continuing despite validation error for testing")
|
|
return True
|
|
|
|
def list_documents(self) -> List[Dict[str, Any]]:
|
|
"""List documents available in Creative Cloud Assets"""
|
|
# This is a diagnostic tool to check which Adobe API endpoints are available from your network
|
|
|
|
endpoints = [
|
|
"https://developer.adobe.com/apis", # Public Adobe developer site
|
|
"https://ims-na1.adobelogin.com/ims/userinfo", # Auth API - known to work
|
|
|
|
# Photoshop REST API endpoints (correct per Adobe documentation)
|
|
"https://image.adobe.io/pie/psdService", # Base endpoint
|
|
"https://image.adobe.io/pie/psdService/actionJSON", # Text editing via actionJSON
|
|
"https://image.adobe.io/pie/psdService/productCrop", # Product crop endpoint
|
|
|
|
# Other potential endpoints to test (may not all be accessible)
|
|
"https://firefly-api.adobe.io/v2", # Firefly API base endpoint
|
|
"https://firefly-api.adobe.io/v2/upload", # Firefly Upload endpoint
|
|
|
|
# Incorrect endpoints that should be avoided based on Adobe's clarification
|
|
"https://photoshop.adobe.io/api", # Not used for Photoshop REST API
|
|
"https://photoshop-services.adobe.io/api", # Not used for Photoshop REST API
|
|
"https://firefly-api.adobe.io/v2/photoshop/editText" # Not the correct endpoint structure
|
|
]
|
|
|
|
logger.info("Testing connectivity to Adobe API endpoints...")
|
|
|
|
# Test connectivity to adobe.com first
|
|
try:
|
|
test_response = requests.get("https://www.adobe.com", timeout=15)
|
|
logger.info(f"Adobe.com connectivity test: {test_response.status_code}")
|
|
except Exception as conn_err:
|
|
logger.error(f"Network connectivity issue: Cannot connect to Adobe.com - {str(conn_err)}")
|
|
logger.error("Please check your internet connection and DNS configuration.")
|
|
# Continue testing other endpoints even if adobe.com fails
|
|
logger.info("Continuing with other endpoint tests despite connectivity issue...")
|
|
|
|
# Test each possible endpoint
|
|
results = []
|
|
for endpoint in endpoints:
|
|
try:
|
|
logger.info(f"Testing endpoint: {endpoint}")
|
|
response = requests.get(
|
|
endpoint,
|
|
headers=self.headers,
|
|
timeout=10
|
|
)
|
|
|
|
status = response.status_code
|
|
logger.info(f" Response status: {status}")
|
|
|
|
if response.text and len(response.text) < 500:
|
|
logger.info(f" Response content: {response.text}")
|
|
elif response.text:
|
|
logger.info(f" Response content length: {len(response.text)} bytes")
|
|
|
|
results.append({
|
|
"endpoint": endpoint,
|
|
"status": status,
|
|
"accessible": status != 0 # Any response is better than nothing
|
|
})
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.info(f" Connection error: {str(e)}")
|
|
results.append({
|
|
"endpoint": endpoint,
|
|
"status": 0,
|
|
"accessible": False,
|
|
"error": "Connection error"
|
|
})
|
|
except Exception as e:
|
|
logger.info(f" Error: {str(e)}")
|
|
results.append({
|
|
"endpoint": endpoint,
|
|
"status": 0,
|
|
"accessible": False,
|
|
"error": str(e)
|
|
})
|
|
|
|
# Summary
|
|
logger.info("\nEndpoint Accessibility Summary:")
|
|
for result in results:
|
|
status = "✓ Available" if result["accessible"] else "✗ Not available"
|
|
logger.info(f"{status}: {result['endpoint']} (Status: {result['status']})")
|
|
|
|
return results
|
|
|
|
def test_text_edit(self) -> Dict[str, Any]:
|
|
"""Test the Adobe Photoshop Text Edit API functionality
|
|
|
|
This method attempts to use the Photoshop Text Edit API to modify text in a PSD file.
|
|
It tries multiple endpoints based on Adobe's current API structure.
|
|
|
|
Returns:
|
|
Dict containing results of the test attempts
|
|
"""
|
|
logger.info("Testing Adobe Photoshop Text Edit API functionality...")
|
|
|
|
# Test the correct Adobe Photoshop REST API endpoints
|
|
edit_endpoints = [
|
|
"https://image.adobe.io/pie/psdService/actionJSON", # Primary endpoint for text editing (actionJSON)
|
|
"https://image.adobe.io/pie/psdService/text" # Alternative endpoint (may be deprecated)
|
|
]
|
|
|
|
# Test with different payload formats to see which one might work
|
|
# Based on documentation, we need to use external storage with presigned URLs
|
|
|
|
# Example test URLs (these are fake but follow the required pattern)
|
|
test_s3_url = "https://my-test-bucket.s3.amazonaws.com/test-document.psd"
|
|
test_output_url = "https://my-test-bucket.s3.amazonaws.com/output-test-document.psd"
|
|
|
|
# Alternative storage test URLs
|
|
test_azure_url = "https://mystorageaccount.blob.core.windows.net/mycontainer/test-document.psd?sig=signature"
|
|
test_azure_output = "https://mystorageaccount.blob.core.windows.net/mycontainer/output-test-document.psd?sig=signature"
|
|
|
|
payloads = [
|
|
# Format 1 - S3 style with external storage - CORRECTED PER DOCUMENTATION
|
|
{
|
|
"inputs": [
|
|
{
|
|
"href": test_s3_url,
|
|
"storage": "external"
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": [
|
|
{
|
|
"id": 1, # Must be integer
|
|
"text": { # Must be object
|
|
"content": "Updated text from API test"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": test_output_url,
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
},
|
|
# Format 2 - Azure style with external storage
|
|
{
|
|
"inputs": [
|
|
{
|
|
"href": test_azure_url,
|
|
"storage": "external"
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": [
|
|
{
|
|
"id": 1, # Integer ID
|
|
"text": { # Object with content
|
|
"content": "Updated text from API test"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": test_azure_output,
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
},
|
|
# Format 3 - Older format field name with external storage
|
|
{
|
|
"inputs": [
|
|
{
|
|
"href": test_s3_url,
|
|
"storage": "external"
|
|
}
|
|
],
|
|
"options": {
|
|
"textLayers": [ # Testing with older field name
|
|
{
|
|
"id": 1, # Integer ID
|
|
"text": { # Object with content
|
|
"content": "Updated text from API test"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": test_output_url,
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
},
|
|
# Format 4 - Testing with adobe storage (original approach) - likely to fail
|
|
{
|
|
"inputs": [
|
|
{
|
|
"href": "/temp/test-document-id.psd",
|
|
"storage": "adobe"
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": [
|
|
{
|
|
"id": 1,
|
|
"text": {
|
|
"content": "Updated text from API test"
|
|
}
|
|
}
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": "/temp/output-test-document-id.psd",
|
|
"storage": "adobe",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
]
|
|
|
|
results = []
|
|
for endpoint in edit_endpoints:
|
|
try:
|
|
logger.info(f"Testing text edit at endpoint: {endpoint}")
|
|
|
|
# First try a GET request to see if endpoint exists
|
|
get_response = requests.get(
|
|
endpoint,
|
|
headers=self.headers,
|
|
timeout=10
|
|
)
|
|
|
|
logger.info(f" GET response status: {get_response.status_code}")
|
|
|
|
# Try each payload format to see if any works
|
|
for i, payload in enumerate(payloads):
|
|
try:
|
|
logger.info(f" Testing payload format {i+1}...")
|
|
|
|
# Clone headers and add content type explicitly
|
|
request_headers = self.headers.copy()
|
|
request_headers["Content-Type"] = "application/json"
|
|
|
|
# Try a POST request with the sample payload
|
|
post_response = requests.post(
|
|
endpoint,
|
|
headers=request_headers,
|
|
json=payload,
|
|
timeout=10
|
|
)
|
|
|
|
status = post_response.status_code
|
|
logger.info(f" POST response status for format {i+1}: {status}")
|
|
|
|
# Try to extract more detailed error information
|
|
if post_response.text:
|
|
if len(post_response.text) < 500:
|
|
logger.info(f" POST response content for format {i+1}: {post_response.text}")
|
|
else:
|
|
logger.info(f" POST response content length for format {i+1}: {len(post_response.text)} bytes")
|
|
|
|
# Try to parse JSON responses
|
|
try:
|
|
resp_json = post_response.json()
|
|
if "error" in resp_json:
|
|
logger.info(f" Error details: {resp_json['error']}")
|
|
elif "errors" in resp_json:
|
|
logger.info(f" Error details: {resp_json['errors']}")
|
|
except:
|
|
# If it's not JSON, extract a sample
|
|
logger.info(f" Response excerpt: {post_response.text[:200]}...")
|
|
|
|
# Record result for this payload format
|
|
results.append({
|
|
"endpoint": endpoint,
|
|
"format": i+1,
|
|
"get_status": get_response.status_code,
|
|
"post_status": status,
|
|
"successful": 200 <= status < 300
|
|
})
|
|
|
|
# If successful, no need to try more payloads
|
|
if 200 <= status < 300:
|
|
logger.info(f" Format {i+1} successful, stopping tests for this endpoint")
|
|
break
|
|
|
|
except Exception as payload_error:
|
|
logger.info(f" Error testing format {i+1}: {str(payload_error)}")
|
|
continue
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
logger.info(f" Connection error: {str(e)}")
|
|
results.append({
|
|
"endpoint": endpoint,
|
|
"format": "N/A",
|
|
"get_status": 0,
|
|
"post_status": 0,
|
|
"successful": False,
|
|
"error": "Connection error"
|
|
})
|
|
except Exception as e:
|
|
logger.info(f" Error: {str(e)}")
|
|
results.append({
|
|
"endpoint": endpoint,
|
|
"format": "N/A",
|
|
"get_status": 0,
|
|
"post_status": 0,
|
|
"successful": False,
|
|
"error": str(e)
|
|
})
|
|
|
|
# Summary
|
|
logger.info("\nText Edit API Test Summary:")
|
|
|
|
# Group results by endpoint for a cleaner summary
|
|
by_endpoint = {}
|
|
for result in results:
|
|
endpoint = result["endpoint"]
|
|
if endpoint not in by_endpoint:
|
|
by_endpoint[endpoint] = []
|
|
by_endpoint[endpoint].append(result)
|
|
|
|
for endpoint, endpoint_results in by_endpoint.items():
|
|
logger.info(f"Endpoint: {endpoint}")
|
|
|
|
# Show GET status only once per endpoint
|
|
get_status = endpoint_results[0]["get_status"] if endpoint_results else 0
|
|
logger.info(f" GET Status: {get_status}")
|
|
|
|
# Show each format result
|
|
for result in endpoint_results:
|
|
if result.get("format") == "N/A":
|
|
status = "✗ Error"
|
|
logger.info(f" {status}: {result.get('error', 'Unknown error')}")
|
|
else:
|
|
status = "✓ Successful" if result.get("successful") else "✗ Failed"
|
|
logger.info(f" Format {result.get('format')}: {status} (POST: {result['post_status']})")
|
|
|
|
return results
|
|
|
|
def upload_font_to_cloud(self, font_file_path: str, remote_path: str = None) -> Dict[str, Any]:
|
|
"""
|
|
Uploads a font file to Google Cloud Storage and generates signed URLs
|
|
|
|
Args:
|
|
font_file_path: Path to the font file to upload
|
|
remote_path: Optional remote path to use (defaults to fonts/filename)
|
|
|
|
Returns:
|
|
Dictionary with upload result info including signed URLs
|
|
"""
|
|
logger.info(f"Uploading font file to Google Cloud Storage: {font_file_path}")
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(font_file_path):
|
|
logger.error(f"Font file not found: {font_file_path}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Font file not found: {font_file_path}"
|
|
}
|
|
|
|
# Check if GCS storage is initialized
|
|
if not gcs_storage:
|
|
logger.error("GCS storage not initialized - cannot upload file")
|
|
return {
|
|
"success": False,
|
|
"message": "GCS storage not initialized - check GCS key file and bucket configuration"
|
|
}
|
|
|
|
# Get file name
|
|
file_name = os.path.basename(font_file_path)
|
|
|
|
try:
|
|
# Use provided remote path or generate one
|
|
if not remote_path:
|
|
remote_path = f"fonts/{file_name}"
|
|
|
|
# Upload the file to GCS
|
|
logger.info(f"Uploading font file to GCS: {remote_path}")
|
|
upload_result = gcs_storage.upload_file(font_file_path, remote_path)
|
|
|
|
if not upload_result.get("success"):
|
|
logger.error(f"Failed to upload font file to GCS: {upload_result.get('message')}")
|
|
return upload_result
|
|
|
|
# Generate signed URL for reading the font
|
|
download_url = upload_result.get("download_url")
|
|
|
|
logger.info(f"Font file uploaded successfully: {file_name}")
|
|
logger.info(f"Font URL (read): {download_url[:60]}...{download_url[-20:]}")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"Font file uploaded successfully: {file_name}",
|
|
"file_name": file_name,
|
|
"bucket": GCS_BUCKET_NAME,
|
|
"remote_path": remote_path,
|
|
"download_url": download_url
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading font file: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error uploading font file: {str(e)}"
|
|
}
|
|
|
|
def upload_psd_to_cloud(self, psd_file_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Uploads a PSD file to Google Cloud Storage and generates signed URLs
|
|
|
|
Args:
|
|
psd_file_path: Path to the PSD file to upload
|
|
|
|
Returns:
|
|
Dictionary with upload result info including signed URLs
|
|
"""
|
|
logger.info(f"Uploading PSD file to Google Cloud Storage: {psd_file_path}")
|
|
|
|
# Check if file exists
|
|
if not os.path.exists(psd_file_path):
|
|
logger.error(f"PSD file not found: {psd_file_path}")
|
|
return {
|
|
"success": False,
|
|
"message": f"PSD file not found: {psd_file_path}"
|
|
}
|
|
|
|
# Check if GCS storage is initialized
|
|
if not gcs_storage:
|
|
logger.error("GCS storage not initialized - cannot upload file")
|
|
return {
|
|
"success": False,
|
|
"message": "GCS storage not initialized - check GCS key file and bucket configuration"
|
|
}
|
|
|
|
# Get file name
|
|
file_name = os.path.basename(psd_file_path)
|
|
|
|
try:
|
|
# Generate a timestamp-based unique path to avoid collisions
|
|
timestamp = int(time.time())
|
|
remote_path = f"adobe_ps/{timestamp}_{file_name}"
|
|
output_path = f"adobe_ps/output_{timestamp}_{file_name}"
|
|
|
|
# Upload the file to GCS
|
|
logger.info(f"Uploading file to GCS: {remote_path}")
|
|
upload_result = gcs_storage.upload_file(psd_file_path, remote_path)
|
|
|
|
if not upload_result.get("success"):
|
|
logger.error(f"Failed to upload file to GCS: {upload_result.get('message')}")
|
|
return upload_result
|
|
|
|
# Generate signed URLs for input and output
|
|
input_url = upload_result.get("download_url")
|
|
|
|
# Generate a specific output URL
|
|
try:
|
|
output_url = gcs_storage.get_signed_url(
|
|
output_path,
|
|
action="write",
|
|
content_type="image/vnd.adobe.photoshop"
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error generating output signed URL: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error generating output signed URL: {str(e)}"
|
|
}
|
|
|
|
logger.info(f"File uploaded successfully: {file_name}")
|
|
logger.info(f"Input URL (read): {input_url[:60]}...{input_url[-20:]}")
|
|
logger.info(f"Output URL (write): {output_url[:60]}...{output_url[-20:]}")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": f"PSD file uploaded successfully: {file_name}",
|
|
"file_name": file_name,
|
|
"bucket": GCS_BUCKET_NAME,
|
|
"remote_path": remote_path,
|
|
"input_path": remote_path, # Add input_path for proper cleanup
|
|
"output_path": output_path,
|
|
"input_url": input_url,
|
|
"output_url": output_url
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error uploading PSD file: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error uploading PSD file: {str(e)}"
|
|
}
|
|
|
|
def update_text_from_json(self, json_file_path: str, upload_psd: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Updates text in a PSD document in Adobe Cloud using text data from a JSON file
|
|
|
|
Args:
|
|
json_file_path: Path to the JSON file containing text layer data
|
|
upload_psd: Whether to upload the PSD file to Adobe Cloud first
|
|
|
|
Returns:
|
|
Dictionary with the result of the operation
|
|
"""
|
|
logger.info(f"Updating text from JSON file: {json_file_path}")
|
|
|
|
# Endpoint for text editing - use the correct Photoshop REST API endpoint
|
|
# According to our testing, the text endpoint is the correct one to use
|
|
# for updating text layers in PSD files
|
|
endpoint = "https://image.adobe.io/pie/psdService/text"
|
|
|
|
# The actionJSON endpoint requires different payload format, not using it for now
|
|
fallback_endpoint = "https://image.adobe.io/pie/psdService/actionJSON"
|
|
|
|
try:
|
|
# Load JSON data
|
|
with open(json_file_path, 'r', encoding='utf-8') as f:
|
|
json_data = json.load(f)
|
|
|
|
# Extract document name and text layers
|
|
document_name = json_data.get('documentName', '')
|
|
psd_path = json_data.get('psdPath', '')
|
|
text_layers = json_data.get('textLayers', [])
|
|
|
|
if not document_name or not text_layers:
|
|
logger.error("JSON file does not contain required document name or text layers")
|
|
return {
|
|
"success": False,
|
|
"message": "JSON file does not contain required document name or text layers"
|
|
}
|
|
|
|
logger.info(f"Document name: {document_name}")
|
|
logger.info(f"Found {len(text_layers)} text layers in JSON")
|
|
|
|
# Handle the PSD path
|
|
if psd_path.startswith("~"):
|
|
psd_path = os.path.expanduser(psd_path)
|
|
elif not os.path.isabs(psd_path):
|
|
# Try to find the PSD next to the JSON file
|
|
json_dir = os.path.dirname(os.path.abspath(json_file_path))
|
|
psd_path = os.path.join(json_dir, document_name)
|
|
|
|
# Use document_name if psd_path is not found
|
|
if not os.path.exists(psd_path) and document_name:
|
|
json_dir = os.path.dirname(os.path.abspath(json_file_path))
|
|
possible_psd_path = os.path.join(json_dir, document_name)
|
|
if os.path.exists(possible_psd_path):
|
|
psd_path = possible_psd_path
|
|
|
|
# If upload_psd is True and psd_path exists, upload the file
|
|
document_id = None
|
|
if upload_psd and os.path.exists(psd_path):
|
|
logger.info(f"Found PSD file: {psd_path}")
|
|
upload_result = self.upload_psd_to_cloud(psd_path)
|
|
|
|
if upload_result.get("success"):
|
|
# We don't need a document ID when using GCS, just the URLs
|
|
logger.info(f"File successfully uploaded to GCS")
|
|
else:
|
|
logger.error(f"Failed to upload PSD file: {upload_result.get('message')}")
|
|
return upload_result
|
|
else:
|
|
# For testing when not uploading, use a placeholder URLs
|
|
logger.warning("No upload performed - API will likely return an error without valid URLs")
|
|
|
|
# We don't need a document ID when using external storage
|
|
|
|
# Prepare updates for each text layer
|
|
text_layer_updates = []
|
|
for layer in text_layers:
|
|
# Only include layers that have updatedText that differs from the original text
|
|
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
|
|
# Extract layer ID and convert to integer - REQUIRED by the API
|
|
layer_id = layer.get('id')
|
|
layer_name = layer.get('name', '')
|
|
|
|
# Try to find a reliable ID
|
|
if layer_id is not None and layer_id != '':
|
|
try:
|
|
# Try to convert to integer as required by the API
|
|
layer_id = int(layer_id)
|
|
except (ValueError, TypeError):
|
|
# If conversion fails, try to derive from name
|
|
layer_id = hash(layer_name) % 1000 # Create a deterministic ID from name
|
|
else:
|
|
# For specific named layers, assign known IDs
|
|
if "HYPOALLERGENIC" in layer_name:
|
|
layer_id = 10 # Try with a specific ID for this layer
|
|
elif "DESIGNED FOR" in layer_name:
|
|
layer_id = 20 # Try with a specific ID for this layer
|
|
else:
|
|
# Default fallback - try layer position in array as ID
|
|
layer_id = text_layers.index(layer) + 1
|
|
|
|
# Format the text as an object with content field
|
|
text_layer_updates.append({
|
|
"id": layer_id, # Must be integer when possible
|
|
"text": { # Must be object with content field
|
|
"content": layer.get('updatedText')
|
|
}
|
|
})
|
|
|
|
if not text_layer_updates:
|
|
logger.info("No text updates needed - all layers are unchanged")
|
|
return {
|
|
"success": True,
|
|
"message": "No text updates needed"
|
|
}
|
|
|
|
logger.info(f"Prepared {len(text_layer_updates)} text layer updates")
|
|
|
|
# Prepare the API request
|
|
# Use the uploaded file's storage path if available
|
|
storage_path = upload_result.get("storage_path", f"uploads/{document_name}") if upload_psd and upload_result.get("success") else None
|
|
|
|
# Format inputs and outputs according to the API requirements
|
|
# Using Google Cloud Storage signed URLs
|
|
|
|
if upload_result and upload_result.get("success"):
|
|
# Use GCS signed URLs from the upload result
|
|
input_url = upload_result.get("input_url")
|
|
output_url = upload_result.get("output_url")
|
|
|
|
# Create the payload with proper URLs
|
|
payload = {
|
|
"inputs": [
|
|
{
|
|
"href": input_url, # GCS signed URL for input file
|
|
"storage": "external" # External storage (GCS)
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": text_layer_updates # Using the corrected format with id (integer) and text as object
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": output_url, # GCS signed URL for output file
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
|
|
logger.info("Using GCS signed URLs for Adobe Photoshop API request")
|
|
else:
|
|
# Failed to get GCS signed URLs - using placeholder for testing
|
|
|
|
# Log warning
|
|
logger.warning("No GCS signed URLs available - using placeholder URLs for testing")
|
|
logger.warning("This request will fail with the Adobe API - for testing only")
|
|
|
|
# Example GCS-style URLs for documentation
|
|
fake_gcs_url = f"https://storage.googleapis.com/{GCS_BUCKET_NAME}"
|
|
input_file = f"placeholder/{document_name or 'test-document.psd'}"
|
|
output_file = f"placeholder/output-{document_name or 'test-document.psd'}"
|
|
|
|
payload = {
|
|
"inputs": [
|
|
{
|
|
"href": f"{fake_gcs_url}/{input_file}",
|
|
"storage": "external"
|
|
}
|
|
],
|
|
"options": {
|
|
"layers": text_layer_updates # Using the corrected format with id (integer) and text as object
|
|
},
|
|
"outputs": [
|
|
{
|
|
"href": f"{fake_gcs_url}/{output_file}",
|
|
"storage": "external",
|
|
"type": "image/vnd.adobe.photoshop"
|
|
}
|
|
]
|
|
}
|
|
|
|
# Set appropriate headers
|
|
headers = self.headers.copy()
|
|
headers["Content-Type"] = "application/json"
|
|
|
|
# Log the request details for debugging
|
|
logger.debug(f"Request payload: {json.dumps(payload, indent=2)}")
|
|
|
|
# Make the API request to the primary endpoint
|
|
logger.info(f"Sending text update request to primary endpoint: {endpoint}")
|
|
logger.info(f"Request headers: {json.dumps(headers, indent=2)}")
|
|
logger.info(f"Request payload: {json.dumps(payload, indent=2)}")
|
|
|
|
response = requests.post(
|
|
endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Log response details
|
|
logger.info(f"Primary endpoint response status: {response.status_code}")
|
|
if response.text:
|
|
try:
|
|
resp_json = response.json()
|
|
logger.info(f"Response content: {json.dumps(resp_json, indent=2)}")
|
|
except:
|
|
# Not JSON, log as text (first 1000 chars)
|
|
logger.info(f"Response content: {response.text[:1000]}")
|
|
|
|
# If primary endpoint fails, try the fallback endpoint
|
|
if response.status_code == 404:
|
|
logger.info(f"Primary endpoint returned 404, trying fallback endpoint: {fallback_endpoint}")
|
|
# Make the fallback request
|
|
fallback_response = requests.post(
|
|
fallback_endpoint,
|
|
headers=headers,
|
|
json=payload,
|
|
timeout=30
|
|
)
|
|
|
|
# Log fallback response details
|
|
logger.info(f"Fallback endpoint response status: {fallback_response.status_code}")
|
|
if fallback_response.text:
|
|
try:
|
|
resp_json = fallback_response.json()
|
|
logger.info(f"Fallback response content: {json.dumps(resp_json, indent=2)}")
|
|
except:
|
|
# Not JSON, log as text (first 1000 chars)
|
|
logger.info(f"Fallback response content: {fallback_response.text[:1000]}")
|
|
|
|
# Use the fallback response
|
|
response = fallback_response
|
|
|
|
# Process response
|
|
if response.status_code == 200 or response.status_code == 202:
|
|
logger.info("Text update API request successful")
|
|
|
|
# Process API response
|
|
try:
|
|
result = response.json()
|
|
|
|
# For 202 Accepted response, include status URL for async processing
|
|
if response.status_code == 202 and '_links' in result and 'self' in result.get('_links', {}):
|
|
status_url = result.get('_links', {}).get('self', {}).get('href')
|
|
logger.info(f"Request accepted for processing. Status URL: {status_url}")
|
|
|
|
# Check status and wait for completion
|
|
if status_url:
|
|
logger.info("Checking processing status...")
|
|
|
|
# Poll the status URL until processing is complete
|
|
max_retries = 10
|
|
retry_count = 0
|
|
|
|
while retry_count < max_retries:
|
|
try:
|
|
# Wait to avoid overwhelming the API
|
|
time.sleep(5)
|
|
|
|
# Check status
|
|
status_response = requests.get(
|
|
status_url,
|
|
headers=self.headers,
|
|
timeout=30
|
|
)
|
|
|
|
if status_response.status_code == 200:
|
|
status_data = status_response.json()
|
|
status = status_data.get('status', '')
|
|
|
|
logger.info(f"Processing status: {status}")
|
|
|
|
if status == 'succeeded':
|
|
logger.info("Processing completed successfully!")
|
|
break
|
|
elif status == 'failed':
|
|
logger.error(f"Processing failed: {status_data.get('error', {}).get('message', 'Unknown error')}")
|
|
break
|
|
|
|
retry_count += 1
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error checking status: {str(e)}")
|
|
retry_count += 1
|
|
|
|
api_result = {
|
|
"success": True,
|
|
"message": "Text update request accepted and processing",
|
|
"result": result,
|
|
"status_code": response.status_code
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error parsing API response: {str(e)}")
|
|
api_result = {
|
|
"success": True,
|
|
"message": "Text update request accepted",
|
|
"response": response.text,
|
|
"status_code": response.status_code
|
|
}
|
|
|
|
# For successful uploads, try to download the output file
|
|
if upload_result and upload_result.get("success") and gcs_storage:
|
|
output_path = upload_result.get("output_path")
|
|
|
|
if output_path:
|
|
logger.info(f"Checking for processed output file: {output_path}")
|
|
|
|
# Wait for the output file to be available (up to 5 minutes)
|
|
output_check = gcs_storage.check_output_file(output_path, wait_time=300)
|
|
|
|
if output_check.get("success"):
|
|
# Download the processed file
|
|
output_dir = os.path.dirname(psd_path)
|
|
processed_dir = os.path.join(output_dir, "processed")
|
|
|
|
# Create processed directory if it doesn't exist
|
|
if not os.path.exists(processed_dir):
|
|
os.makedirs(processed_dir, exist_ok=True)
|
|
|
|
output_filename = f"processed_{os.path.basename(psd_path)}"
|
|
output_file_path = os.path.join(processed_dir, output_filename)
|
|
|
|
logger.info(f"Downloading processed file to: {output_file_path}")
|
|
download_result = gcs_storage.download_file(output_path, output_file_path)
|
|
|
|
if download_result.get("success"):
|
|
logger.info(f"Successfully downloaded processed file: {output_file_path}")
|
|
api_result["processed_file"] = output_file_path
|
|
api_result["message"] += f" and downloaded to {output_file_path}"
|
|
else:
|
|
logger.warning(f"Failed to download processed file: {download_result.get('message')}")
|
|
api_result["output_download_failed"] = download_result.get("message")
|
|
else:
|
|
logger.warning(f"Output file not found: {output_check.get('message')}")
|
|
api_result["output_check_failed"] = output_check.get("message")
|
|
|
|
return api_result
|
|
else:
|
|
logger.error(f"Text update failed with status {response.status_code}")
|
|
error_message = response.text
|
|
try:
|
|
error_data = response.json()
|
|
error_message = error_data.get('message', error_data.get('title', response.text))
|
|
except:
|
|
pass
|
|
|
|
return {
|
|
"success": False,
|
|
"status_code": response.status_code,
|
|
"message": f"Text update failed: {error_message}",
|
|
"response": response.text,
|
|
"upload_result": upload_result if upload_result and upload_result.get("success") else None
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating text: {str(e)}")
|
|
return {
|
|
"success": False,
|
|
"message": f"Error updating text: {str(e)}"
|
|
}
|
|
|
|
def parse_arguments():
|
|
"""Parse command line arguments"""
|
|
parser = argparse.ArgumentParser(
|
|
description='Adobe Photoshop API Script for text extraction and updating',
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
# Test connectivity to Adobe API endpoints
|
|
python adobe_ps_api.py test-api
|
|
|
|
# Test text editing API functionality
|
|
python adobe_ps_api.py test-text-edit
|
|
|
|
# Test Google Cloud Storage integration
|
|
python adobe_ps_api.py test-gcs /path/to/file.psd
|
|
|
|
# Update text in a PSD file using a JSON file
|
|
python adobe_ps_api.py update-text /path/to/extracted_text.json
|
|
|
|
# Generate a new token and store it in the cache
|
|
python adobe_ps_api.py generate-token --client-secret your_client_secret
|
|
|
|
Adobe Photoshop API Workflow:
|
|
1. PSD files are uploaded to Google Cloud Storage (GCS)
|
|
2. Signed URLs are generated for both input and output files
|
|
3. These URLs are provided to the Adobe Photoshop API
|
|
4. The API processes the input file and writes the output file to GCS
|
|
5. The script downloads the processed output file when available
|
|
"""
|
|
)
|
|
|
|
# Common options
|
|
parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
parser.add_argument('--client-secret', '-s',
|
|
help='Client secret for Adobe API authentication')
|
|
|
|
subparsers = parser.add_subparsers(dest='command', help='Command to run')
|
|
|
|
# Test API connectivity command
|
|
test_parser = subparsers.add_parser('test-api', help='Test connectivity to Adobe API endpoints')
|
|
test_parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
test_parser.add_argument('--client-secret', '-s',
|
|
help='Client secret for Adobe API authentication')
|
|
|
|
# Test text editing API command
|
|
text_edit_parser = subparsers.add_parser('test-text-edit', help='Test Photoshop text editing API functionality')
|
|
text_edit_parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
text_edit_parser.add_argument('--client-secret', '-s',
|
|
help='Client secret for Adobe API authentication')
|
|
|
|
# Test GCS upload command
|
|
gcs_test_parser = subparsers.add_parser('test-gcs', help='Test Google Cloud Storage integration')
|
|
gcs_test_parser.add_argument('psd_file', help='Path to PSD file to upload')
|
|
gcs_test_parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
|
|
# Update text command
|
|
update_parser = subparsers.add_parser('update-text', help='Update text in a PSD file using a JSON file')
|
|
update_parser.add_argument('json_file', help='Path to JSON file with text layer data')
|
|
update_parser.add_argument('--dry-run', '-d', action='store_true',
|
|
help='Preview text updates without making API calls')
|
|
update_parser.add_argument('--no-upload', '-n', action='store_true',
|
|
help='Skip uploading the PSD file (uses placeholder URLs)')
|
|
update_parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
update_parser.add_argument('--client-secret', '-s',
|
|
help='Client secret for Adobe API authentication')
|
|
update_parser.add_argument('--download-output', '-o', action='store_true',
|
|
help='Download the processed output file if available')
|
|
|
|
# Generate token command
|
|
token_parser = subparsers.add_parser('generate-token', help='Generate a new Adobe API token')
|
|
token_parser.add_argument('--client-secret', '-s', required=True,
|
|
help='Client secret for Adobe API authentication')
|
|
token_parser.add_argument('--scopes',
|
|
default=config.DEFAULT_SCOPES,
|
|
help=f'Comma-separated list of scopes (default: {config.DEFAULT_SCOPES})')
|
|
token_parser.add_argument('--verbose', '-v', action='store_true',
|
|
help='Enable verbose logging')
|
|
|
|
return parser.parse_args()
|
|
|
|
def main():
|
|
"""Main function"""
|
|
args = parse_arguments()
|
|
|
|
# Set logging level based on verbose flag
|
|
if args.verbose:
|
|
logger.setLevel(logging.DEBUG)
|
|
# Set log format to include more details
|
|
for handler in logger.handlers:
|
|
handler.setFormatter(logging.Formatter(
|
|
'%(asctime)s - %(levelname)s - %(message)s',
|
|
'%Y-%m-%d %H:%M:%S'
|
|
))
|
|
|
|
# Update config with client secret if provided
|
|
if hasattr(args, 'client_secret') and args.client_secret:
|
|
config.ADOBE_CLIENT_SECRET = args.client_secret
|
|
|
|
# Handle token generation separately
|
|
if args.command == 'generate-token':
|
|
try:
|
|
# Initialize token manager with client credentials
|
|
token_mgr = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET)
|
|
|
|
# Get a new token with specified scopes
|
|
access_token, token_data = token_mgr.get_token(args.scopes)
|
|
|
|
# Display token information
|
|
print("\nToken Generated Successfully")
|
|
print("-" * 40)
|
|
print(f"Access Token: {access_token[:15]}...{access_token[-15:]}")
|
|
print(f"Token Type: {token_data.get('token_type', 'bearer')}")
|
|
print(f"Expires In: {token_data.get('expires_in')} seconds ({int(token_data.get('expires_in', 0))/86400:.1f} days)")
|
|
print(f"Scope: {token_data.get('scope', args.scopes)}")
|
|
|
|
# Verify the token
|
|
user_info = token_mgr.verify_token(access_token)
|
|
if user_info:
|
|
print(f"Token verified for user/account: {user_info.get('sub', 'Unknown')}")
|
|
print("\nToken has been saved in cache and will be used for future API calls.")
|
|
else:
|
|
print("\nWarning: Token verification failed. The token may not be valid.")
|
|
|
|
except Exception as e:
|
|
print(f"Error generating token: {str(e)}")
|
|
sys.exit(1)
|
|
else:
|
|
# Initialize API client with client secret if provided
|
|
# First try to get a valid token
|
|
try:
|
|
token, token_data = token_manager.get_token(config.DEFAULT_SCOPES)
|
|
print(f"\nUsing token: {token[:20]}...{token[-20:]}")
|
|
except Exception as e:
|
|
print(f"Error getting token: {str(e)}")
|
|
|
|
api = AdobeAPI(client_secret=config.ADOBE_CLIENT_SECRET)
|
|
|
|
# Execute command
|
|
if args.command == 'test-api':
|
|
api.list_documents()
|
|
elif args.command == 'test-text-edit':
|
|
api.test_text_edit()
|
|
elif args.command == 'test-gcs':
|
|
# Test GCS upload functionality
|
|
psd_file = args.psd_file
|
|
|
|
if not os.path.exists(psd_file):
|
|
print(f"Error: PSD file '{psd_file}' does not exist.")
|
|
sys.exit(1)
|
|
|
|
print(f"\nTesting GCS upload with file: {psd_file}")
|
|
|
|
# Check if GCS storage is initialized
|
|
if not gcs_storage:
|
|
print(f"Error: GCS storage not initialized. Check that gcs_key.json exists.")
|
|
sys.exit(1)
|
|
|
|
# Upload the file
|
|
result = api.upload_psd_to_cloud(psd_file)
|
|
|
|
if result['success']:
|
|
print(f"\nFile uploaded successfully to Google Cloud Storage")
|
|
print(f" Bucket: {result['bucket']}")
|
|
print(f" Remote path: {result['remote_path']}")
|
|
print(f"\nGenerated signed URLs for Adobe Photoshop API:")
|
|
print(f" Input URL: {result['input_url'][:60]}...{result['input_url'][-20:]}")
|
|
print(f" Output URL: {result['output_url'][:60]}...{result['output_url'][-20:]}")
|
|
print(f"\nThese URLs are valid for approximately 1 hour.")
|
|
else:
|
|
print(f"\nError uploading file: {result['message']}")
|
|
sys.exit(1)
|
|
|
|
elif args.command == 'update-text':
|
|
json_file = args.json_file
|
|
|
|
if not os.path.exists(json_file):
|
|
print(f"Error: JSON file '{json_file}' does not exist.")
|
|
sys.exit(1)
|
|
|
|
if args.dry_run:
|
|
print(f"\nDRY RUN: Would update text using {json_file}")
|
|
try:
|
|
with open(json_file, 'r', encoding='utf-8') as f:
|
|
json_data = json.load(f)
|
|
|
|
document_name = json_data.get('documentName', 'Unknown document')
|
|
text_layers = json_data.get('textLayers', [])
|
|
|
|
print(f"\nDocument: {document_name}")
|
|
print(f"Found {len(text_layers)} text layers in JSON file")
|
|
|
|
updates_needed = 0
|
|
for i, layer in enumerate(text_layers):
|
|
if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'):
|
|
updates_needed += 1
|
|
original = layer.get('text', '')
|
|
updated = layer.get('updatedText', '')
|
|
print(f"\nLayer {i+1}: {layer.get('name', 'Unnamed')}")
|
|
print(f" Original: {original[:50]}{'...' if len(original) > 50 else ''}")
|
|
print(f" Updated: {updated[:50]}{'...' if len(updated) > 50 else ''}")
|
|
|
|
if updates_needed == 0:
|
|
print("\nNo text updates needed - all layers are unchanged")
|
|
else:
|
|
print(f"\nWould update {updates_needed} text layers")
|
|
|
|
except Exception as e:
|
|
print(f"Error reading JSON file: {str(e)}")
|
|
sys.exit(1)
|
|
else:
|
|
# Determine whether to upload the PSD file
|
|
upload_psd = not args.no_upload
|
|
|
|
if upload_psd:
|
|
if not gcs_storage:
|
|
print(f"\nWarning: GCS storage not initialized. Using placeholder URLs.")
|
|
print(f"Adobe API calls will likely fail without valid pre-signed URLs.")
|
|
print(f"Check that gcs_key.json exists and is valid.")
|
|
|
|
print(f"\nUploading PSD and updating text using API with {json_file}")
|
|
else:
|
|
print(f"\nUpdating text using API with {json_file} (without PSD upload)")
|
|
print(f"Note: This mode uses placeholder URLs and will likely fail with Adobe API")
|
|
|
|
result = api.update_text_from_json(json_file, upload_psd=upload_psd)
|
|
|
|
if result['success']:
|
|
if 'status_code' in result and result['status_code'] == 202:
|
|
print(f"Success: {result['message']}")
|
|
print(f"The request has been accepted and is being processed asynchronously.")
|
|
|
|
# If we have status URL information
|
|
if 'result' in result and '_links' in result['result']:
|
|
status_url = result['result'].get('_links', {}).get('self', {}).get('href')
|
|
if status_url:
|
|
print(f"Status URL: {status_url}")
|
|
print(f"You can check the status at this URL for updates.")
|
|
else:
|
|
print(f"Success: {result['message']}")
|
|
|
|
# Check if processed file was downloaded
|
|
if 'processed_file' in result:
|
|
print(f"Processed file downloaded to: {result['processed_file']}")
|
|
else:
|
|
print(f"Error: {result['message']}")
|
|
if 'status_code' in result:
|
|
print(f"Status code: {result['status_code']}")
|
|
|
|
# Check if we have upload details despite API failure
|
|
if 'upload_result' in result and result['upload_result']:
|
|
print(f"\nNote: File was successfully uploaded to GCS despite API error")
|
|
print(f" Remote path: {result['upload_result'].get('remote_path')}")
|
|
|
|
sys.exit(1)
|
|
else:
|
|
print("\nNo command specified. Use --help to see available commands.")
|
|
print("\nAvailable commands:")
|
|
print(" test-api - Test connectivity to Adobe API endpoints")
|
|
print(" test-text-edit - Test the text editing API functionality")
|
|
print(" test-gcs - Test Google Cloud Storage upload and URL generation")
|
|
print(" update-text - Update text in a PSD file using a JSON file")
|
|
print(" generate-token - Generate a new Adobe API token using client credentials")
|
|
print("\nNote: The API endpoints used in this script may not be accessible from all networks.")
|
|
print("This could be due to DNS or network restrictions. If you are unable to connect to")
|
|
print("the Adobe API endpoints, please check with your network administrator or try from a different network.")
|
|
|
|
if __name__ == "__main__":
|
|
main() |