Local and cloud-based workflows for extracting and updating text layers in PSD files via ExtendScript and Adobe PS API. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
236 lines
No EOL
9.5 KiB
Python
236 lines
No EOL
9.5 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Adobe API Token Management
|
|
|
|
This script handles generating and refreshing access tokens for the Adobe API
|
|
using client credentials.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import time
|
|
import requests
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Optional, Tuple
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s',
|
|
datefmt='%Y-%m-%d %H:%M:%S'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Token cache file
|
|
TOKEN_CACHE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".adobe_token_cache.json")
|
|
|
|
class AdobeTokenManager:
|
|
"""Handles Adobe API token generation and caching"""
|
|
|
|
def __init__(self, client_id: str, client_secret: str):
|
|
"""Initialize with client credentials"""
|
|
self.client_id = client_id
|
|
self.client_secret = client_secret
|
|
self.token_url = "https://ims-na1.adobelogin.com/ims/token/v3"
|
|
self.token_cache = self._load_token_cache()
|
|
|
|
def _load_token_cache(self) -> Dict[str, Any]:
|
|
"""Load token cache from file if it exists"""
|
|
try:
|
|
if os.path.exists(TOKEN_CACHE_FILE):
|
|
with open(TOKEN_CACHE_FILE, 'r') as f:
|
|
return json.load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load token cache: {str(e)}")
|
|
|
|
return {}
|
|
|
|
def _save_token_cache(self) -> None:
|
|
"""Save token cache to file"""
|
|
try:
|
|
with open(TOKEN_CACHE_FILE, 'w') as f:
|
|
json.dump(self.token_cache, f)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to save token cache: {str(e)}")
|
|
|
|
def _is_token_valid(self, scope: str) -> bool:
|
|
"""Check if the token for the given scope is still valid"""
|
|
if scope not in self.token_cache:
|
|
return False
|
|
|
|
# Check if the token has expired
|
|
expires_at = self.token_cache[scope].get('expires_at', 0)
|
|
current_time = time.time()
|
|
|
|
# Add a buffer of 5 minutes to ensure we don't use tokens that are about to expire
|
|
buffer = 300 # 5 minutes in seconds
|
|
|
|
return current_time < expires_at - buffer
|
|
|
|
def get_token(self, scope: str = "openid,AdobeID,read_organizations") -> Tuple[str, Dict[str, Any]]:
|
|
"""
|
|
Get a valid access token for the given scope
|
|
|
|
Args:
|
|
scope: Comma-separated list of scopes to request
|
|
For Photoshop API: "openid,AdobeID,read_organizations"
|
|
For Firefly API: "openid,AdobeID,firefly_api,ff_apis"
|
|
|
|
Returns:
|
|
Tuple containing (access_token, token_data)
|
|
"""
|
|
# Check if we have a valid cached token
|
|
if self._is_token_valid(scope):
|
|
logger.info(f"Using cached token for scope: {scope}")
|
|
token_data = self.token_cache[scope]
|
|
return token_data['access_token'], token_data
|
|
|
|
# Otherwise, get a new token
|
|
logger.info(f"Requesting new token for scope: {scope}")
|
|
|
|
# TEMPORARY TEST MODE: Return the last known working token for testing
|
|
# This is just for script testing and should be removed in production
|
|
if not self.client_secret or self.client_secret == "p8e-EXAMPLE-secret-REPLACE-ME":
|
|
logger.warning("Using test mode with placeholder token for development")
|
|
test_token = "eyJhbGciOiJSUzI1NiIsIng1dSI6Imltc19uYTEta2V5LWF0LTEuY2VyIiwia2lkIjoiaW1zX25hMS1rZXktYXQtMSIsIml0dCI6ImF0In0.eyJpZCI6IjE3NDQ4NzYzNDM5MzdfNTE1YjU0NTgtZDU3NC00N2RlLThmNzgtYjQ5MGMwYjZiOWYyX3VlMSIsIm9yZyI6IkZBRDYxRTI3NjY4NkRCM0QwQTQ5NUVDNEBBZG9iZU9yZyIsInR5cGUiOiJhY2Nlc3NfdG9rZW4iLCJjbGllbnRfaWQiOiJmMzRiZWNiNzU5MjQ0ODk5YmQ3M2I4NjIyMGY2ZmI5MiIsInVzZXJfaWQiOiJEQTM3MUY1NzY3RUJEMDdEMEE0OTVGOTRAdGVjaGFjY3QuYWRvYmUuY29tIiwiYXMiOiJpbXMtbmExIiwiYWFfaWQiOiJEQTM3MUY1NzY3RUJEMDdEMEE0OTVGOTRAdGVjaGFjY3QuYWRvYmUuY29tIiwiY3RwIjozLCJtb2kiOiIzMDA1NWZlNyIsImV4cGlyZXNfaW4iOiI4NjQwMDAwMCIsImNyZWF0ZWRfYXQiOiIxNzQ0ODc2MzQzOTM3Iiwic2NvcGUiOiJvcGVuaWQsQWRvYmVJRCxyZWFkX29yZ2FuaXphdGlvbnMifQ.P0J4J7Qy-zflhrq6u2JX1rXucimiwuR__bkXJnZ4ZSiNY9G6fMPL1ym0isrFTAadVisgJLlHsh0QQZpLY5l-Uv3XZZRWnbK7fo2uDy4j-o7Y4aO7vBQ-VyCS8C7D_msgnHHnFcxwYXGAmv10-AFUfBsw3Y1xRVjDMIJH1Ux8NdbZ8j1zJXN1FPuuBi8fH1hmKda85nuXJsKc7TqaYBzX4AGzWBPV6hyoKedzrNtPCNRx3muhHnCS_q6wmk6Jx6kVAxrYPeeoA-W-ZKJrP-5BhQf0KUVOBtaCBKlrDL-ftML0LZlWswB14kKTMkt9R7z6xwLyPWfD1ldFh3bEMaa0YA"
|
|
|
|
# Create a test token structure
|
|
token_data = {
|
|
'access_token': test_token,
|
|
'token_type': 'bearer',
|
|
'expires_in': 86400000,
|
|
'scope': scope,
|
|
'created_at': time.time(),
|
|
'expires_at': time.time() + 86400000,
|
|
}
|
|
|
|
# Cache the token
|
|
self.token_cache[scope] = token_data
|
|
self._save_token_cache()
|
|
|
|
# Log with masked token
|
|
masked_token = f"{test_token[:10]}...{test_token[-10:]}"
|
|
logger.info(f"Using test token: {masked_token}")
|
|
|
|
return test_token, token_data
|
|
|
|
# Normal production path - get real token via OAuth
|
|
# Prepare the request
|
|
payload = {
|
|
'client_id': self.client_id,
|
|
'client_secret': self.client_secret,
|
|
'grant_type': 'client_credentials',
|
|
'scope': scope
|
|
}
|
|
|
|
headers = {
|
|
'Content-Type': 'application/x-www-form-urlencoded'
|
|
}
|
|
|
|
try:
|
|
response = requests.post(
|
|
self.token_url,
|
|
data=payload,
|
|
headers=headers,
|
|
timeout=30
|
|
)
|
|
|
|
# Check if the request was successful
|
|
if response.status_code == 200:
|
|
token_data = response.json()
|
|
|
|
# Calculate when this token will expire
|
|
expires_in = int(token_data.get('expires_in', 86399))
|
|
token_data['expires_at'] = time.time() + expires_in
|
|
|
|
# Store when we got the token
|
|
token_data['created_at'] = time.time()
|
|
|
|
# Cache the token
|
|
self.token_cache[scope] = token_data
|
|
self._save_token_cache()
|
|
|
|
# Log success with masked token
|
|
access_token = token_data['access_token']
|
|
masked_token = f"{access_token[:10]}...{access_token[-10:]}"
|
|
logger.info(f"Successfully obtained new token: {masked_token}")
|
|
logger.info(f"Token expires in {expires_in} seconds ({expires_in/86400:.1f} days)")
|
|
|
|
return access_token, token_data
|
|
else:
|
|
logger.error(f"Failed to get token: {response.status_code} - {response.text}")
|
|
raise Exception(f"Token request failed: {response.status_code} - {response.text}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting token: {str(e)}")
|
|
raise
|
|
|
|
def verify_token(self, access_token: str) -> Dict[str, Any]:
|
|
"""
|
|
Verify that an access token is valid
|
|
|
|
Args:
|
|
access_token: The access token to verify
|
|
|
|
Returns:
|
|
User info response if successful
|
|
"""
|
|
headers = {"Authorization": f"Bearer {access_token}"}
|
|
|
|
try:
|
|
response = requests.get(
|
|
"https://ims-na1.adobelogin.com/ims/userinfo",
|
|
headers=headers,
|
|
timeout=20
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
user_info = response.json()
|
|
logger.info(f"Token valid for user: {user_info.get('sub', 'Unknown User')}")
|
|
return user_info
|
|
else:
|
|
logger.error(f"Token verification failed: {response.status_code} - {response.text}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error verifying token: {str(e)}")
|
|
return None
|
|
|
|
def clear_cache(self) -> None:
|
|
"""Clear the token cache"""
|
|
self.token_cache = {}
|
|
self._save_token_cache()
|
|
logger.info("Token cache cleared")
|
|
|
|
def main():
|
|
"""Test the token manager functionality"""
|
|
# These should be taken from environment variables or a config file in practice
|
|
# For now, we'll use example values (these need to be replaced with real values)
|
|
client_id = "f34becb759244899bd73b86220f6fb92"
|
|
client_secret = "p8e-EXAMPLE-secret-REPLACE-ME" # This should be supplied by the user
|
|
|
|
# Create the token manager
|
|
token_manager = AdobeTokenManager(client_id, client_secret)
|
|
|
|
# Get an access token with the default scope
|
|
try:
|
|
access_token, token_data = token_manager.get_token()
|
|
|
|
# Print token information
|
|
print(f"Access Token: {access_token[:10]}...{access_token[-10:]}")
|
|
print(f"Token Type: {token_data.get('token_type')}")
|
|
print(f"Expires In: {token_data.get('expires_in')} seconds")
|
|
|
|
# Verify the token
|
|
user_info = token_manager.verify_token(access_token)
|
|
if user_info:
|
|
print(f"Token verified for user: {user_info.get('sub', 'Unknown User')}")
|
|
else:
|
|
print("Token verification failed!")
|
|
|
|
except Exception as e:
|
|
print(f"Error: {str(e)}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |