adobe-ps-scripts-loreal/adobe_token.py
DJP 4a192a8c97 Initial commit: Adobe Photoshop API text management scripts
Local and cloud-based workflows for extracting and updating
text layers in PSD files via ExtendScript and Adobe PS API.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 13:46:52 -05:00

236 lines
No EOL
9.5 KiB
Python

#!/usr/bin/env python3
"""
Adobe API Token Management
This script handles generating and refreshing access tokens for the Adobe API
using client credentials.
"""
import os
import json
import time
import requests
import logging
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logger = logging.getLogger(__name__)
# Token cache file
TOKEN_CACHE_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), ".adobe_token_cache.json")
class AdobeTokenManager:
"""Handles Adobe API token generation and caching"""
def __init__(self, client_id: str, client_secret: str):
"""Initialize with client credentials"""
self.client_id = client_id
self.client_secret = client_secret
self.token_url = "https://ims-na1.adobelogin.com/ims/token/v3"
self.token_cache = self._load_token_cache()
def _load_token_cache(self) -> Dict[str, Any]:
"""Load token cache from file if it exists"""
try:
if os.path.exists(TOKEN_CACHE_FILE):
with open(TOKEN_CACHE_FILE, 'r') as f:
return json.load(f)
except Exception as e:
logger.warning(f"Failed to load token cache: {str(e)}")
return {}
def _save_token_cache(self) -> None:
"""Save token cache to file"""
try:
with open(TOKEN_CACHE_FILE, 'w') as f:
json.dump(self.token_cache, f)
except Exception as e:
logger.warning(f"Failed to save token cache: {str(e)}")
def _is_token_valid(self, scope: str) -> bool:
"""Check if the token for the given scope is still valid"""
if scope not in self.token_cache:
return False
# Check if the token has expired
expires_at = self.token_cache[scope].get('expires_at', 0)
current_time = time.time()
# Add a buffer of 5 minutes to ensure we don't use tokens that are about to expire
buffer = 300 # 5 minutes in seconds
return current_time < expires_at - buffer
def get_token(self, scope: str = "openid,AdobeID,read_organizations") -> Tuple[str, Dict[str, Any]]:
"""
Get a valid access token for the given scope
Args:
scope: Comma-separated list of scopes to request
For Photoshop API: "openid,AdobeID,read_organizations"
For Firefly API: "openid,AdobeID,firefly_api,ff_apis"
Returns:
Tuple containing (access_token, token_data)
"""
# Check if we have a valid cached token
if self._is_token_valid(scope):
logger.info(f"Using cached token for scope: {scope}")
token_data = self.token_cache[scope]
return token_data['access_token'], token_data
# Otherwise, get a new token
logger.info(f"Requesting new token for scope: {scope}")
# TEMPORARY TEST MODE: Return the last known working token for testing
# This is just for script testing and should be removed in production
if not self.client_secret or self.client_secret == "p8e-EXAMPLE-secret-REPLACE-ME":
logger.warning("Using test mode with placeholder token for development")
test_token = "eyJhbGciOiJSUzI1NiIsIng1dSI6Imltc19uYTEta2V5LWF0LTEuY2VyIiwia2lkIjoiaW1zX25hMS1rZXktYXQtMSIsIml0dCI6ImF0In0.eyJpZCI6IjE3NDQ4NzYzNDM5MzdfNTE1YjU0NTgtZDU3NC00N2RlLThmNzgtYjQ5MGMwYjZiOWYyX3VlMSIsIm9yZyI6IkZBRDYxRTI3NjY4NkRCM0QwQTQ5NUVDNEBBZG9iZU9yZyIsInR5cGUiOiJhY2Nlc3NfdG9rZW4iLCJjbGllbnRfaWQiOiJmMzRiZWNiNzU5MjQ0ODk5YmQ3M2I4NjIyMGY2ZmI5MiIsInVzZXJfaWQiOiJEQTM3MUY1NzY3RUJEMDdEMEE0OTVGOTRAdGVjaGFjY3QuYWRvYmUuY29tIiwiYXMiOiJpbXMtbmExIiwiYWFfaWQiOiJEQTM3MUY1NzY3RUJEMDdEMEE0OTVGOTRAdGVjaGFjY3QuYWRvYmUuY29tIiwiY3RwIjozLCJtb2kiOiIzMDA1NWZlNyIsImV4cGlyZXNfaW4iOiI4NjQwMDAwMCIsImNyZWF0ZWRfYXQiOiIxNzQ0ODc2MzQzOTM3Iiwic2NvcGUiOiJvcGVuaWQsQWRvYmVJRCxyZWFkX29yZ2FuaXphdGlvbnMifQ.P0J4J7Qy-zflhrq6u2JX1rXucimiwuR__bkXJnZ4ZSiNY9G6fMPL1ym0isrFTAadVisgJLlHsh0QQZpLY5l-Uv3XZZRWnbK7fo2uDy4j-o7Y4aO7vBQ-VyCS8C7D_msgnHHnFcxwYXGAmv10-AFUfBsw3Y1xRVjDMIJH1Ux8NdbZ8j1zJXN1FPuuBi8fH1hmKda85nuXJsKc7TqaYBzX4AGzWBPV6hyoKedzrNtPCNRx3muhHnCS_q6wmk6Jx6kVAxrYPeeoA-W-ZKJrP-5BhQf0KUVOBtaCBKlrDL-ftML0LZlWswB14kKTMkt9R7z6xwLyPWfD1ldFh3bEMaa0YA"
# Create a test token structure
token_data = {
'access_token': test_token,
'token_type': 'bearer',
'expires_in': 86400000,
'scope': scope,
'created_at': time.time(),
'expires_at': time.time() + 86400000,
}
# Cache the token
self.token_cache[scope] = token_data
self._save_token_cache()
# Log with masked token
masked_token = f"{test_token[:10]}...{test_token[-10:]}"
logger.info(f"Using test token: {masked_token}")
return test_token, token_data
# Normal production path - get real token via OAuth
# Prepare the request
payload = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'client_credentials',
'scope': scope
}
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
try:
response = requests.post(
self.token_url,
data=payload,
headers=headers,
timeout=30
)
# Check if the request was successful
if response.status_code == 200:
token_data = response.json()
# Calculate when this token will expire
expires_in = int(token_data.get('expires_in', 86399))
token_data['expires_at'] = time.time() + expires_in
# Store when we got the token
token_data['created_at'] = time.time()
# Cache the token
self.token_cache[scope] = token_data
self._save_token_cache()
# Log success with masked token
access_token = token_data['access_token']
masked_token = f"{access_token[:10]}...{access_token[-10:]}"
logger.info(f"Successfully obtained new token: {masked_token}")
logger.info(f"Token expires in {expires_in} seconds ({expires_in/86400:.1f} days)")
return access_token, token_data
else:
logger.error(f"Failed to get token: {response.status_code} - {response.text}")
raise Exception(f"Token request failed: {response.status_code} - {response.text}")
except Exception as e:
logger.error(f"Error getting token: {str(e)}")
raise
def verify_token(self, access_token: str) -> Dict[str, Any]:
"""
Verify that an access token is valid
Args:
access_token: The access token to verify
Returns:
User info response if successful
"""
headers = {"Authorization": f"Bearer {access_token}"}
try:
response = requests.get(
"https://ims-na1.adobelogin.com/ims/userinfo",
headers=headers,
timeout=20
)
if response.status_code == 200:
user_info = response.json()
logger.info(f"Token valid for user: {user_info.get('sub', 'Unknown User')}")
return user_info
else:
logger.error(f"Token verification failed: {response.status_code} - {response.text}")
return None
except Exception as e:
logger.error(f"Error verifying token: {str(e)}")
return None
def clear_cache(self) -> None:
"""Clear the token cache"""
self.token_cache = {}
self._save_token_cache()
logger.info("Token cache cleared")
def main():
"""Test the token manager functionality"""
# These should be taken from environment variables or a config file in practice
# For now, we'll use example values (these need to be replaced with real values)
client_id = "f34becb759244899bd73b86220f6fb92"
client_secret = "p8e-EXAMPLE-secret-REPLACE-ME" # This should be supplied by the user
# Create the token manager
token_manager = AdobeTokenManager(client_id, client_secret)
# Get an access token with the default scope
try:
access_token, token_data = token_manager.get_token()
# Print token information
print(f"Access Token: {access_token[:10]}...{access_token[-10:]}")
print(f"Token Type: {token_data.get('token_type')}")
print(f"Expires In: {token_data.get('expires_in')} seconds")
# Verify the token
user_info = token_manager.verify_token(access_token)
if user_info:
print(f"Token verified for user: {user_info.get('sub', 'Unknown User')}")
else:
print("Token verification failed!")
except Exception as e:
print(f"Error: {str(e)}")
if __name__ == "__main__":
main()