#!/usr/bin/env python3 """ Simplified Adobe Photoshop API Text Update ------------------------------------------ A stripped down script that uses the minimal required payload structure to update text layers via the Adobe Photoshop API. """ import os import sys import json import time import logging import argparse import requests from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) logger = logging.getLogger(__name__) # Import local config and dependencies import config from adobe_token import AdobeTokenManager from gcs_storage import GCSStorage # Initialize token manager token_manager = AdobeTokenManager(config.ADOBE_CLIENT_ID, config.ADOBE_CLIENT_SECRET) # GCS bucket configuration GCS_BUCKET_NAME = "lor-txt-tmp-bkt-26" GCS_KEY_PATH = os.path.join(os.path.dirname(__file__), "gcs_key.json") # Initialize GCS storage gcs_storage = None if os.path.exists(GCS_KEY_PATH): try: gcs_storage = GCSStorage(GCS_BUCKET_NAME, key_path=GCS_KEY_PATH) logger.info(f"GCS storage initialized with bucket: {GCS_BUCKET_NAME}") except Exception as e: logger.error(f"Error initializing GCS storage: {str(e)}") def update_text_with_simplified_payload(json_path: str, psd_path: str): """ Updates text in a PSD file using the minimal required payload structure. """ logger.info(f"Updating text in {psd_path} using data from {json_path}") # Check if files exist if not os.path.exists(json_path) or not os.path.exists(psd_path): logger.error(f"File not found: JSON={os.path.exists(json_path)}, PSD={os.path.exists(psd_path)}") return {"success": False, "message": "File not found"} # Load JSON data try: with open(json_path, 'r', encoding='utf-8') as f: json_data = json.load(f) except Exception as e: logger.error(f"Error reading JSON: {e}") return {"success": False, "message": f"Error reading JSON: {e}"} # Upload PSD file to GCS try: # Generate timestamp-based path timestamp = int(time.time()) remote_path = f"adobe_ps/{timestamp}_{os.path.basename(psd_path)}" output_path = f"adobe_ps/output_{timestamp}_{os.path.basename(psd_path)}" # Upload file upload_result = gcs_storage.upload_file(psd_path, remote_path) if not upload_result.get("success"): logger.error(f"Upload failed: {upload_result.get('message')}") return {"success": False, "message": f"Upload failed: {upload_result.get('message')}"} # Get signed URLs input_url = upload_result.get("download_url") output_url = gcs_storage.get_signed_url( output_path, action="write", content_type="image/vnd.adobe.photoshop" ) logger.info(f"File uploaded to GCS and URLs generated") except Exception as e: logger.error(f"Error with GCS: {e}") return {"success": False, "message": f"Error with GCS: {e}"} # Upload any custom fonts found in the local fonts/ directory font_urls = [] fonts_dir = os.path.join(os.path.dirname(__file__), "fonts") if os.path.isdir(fonts_dir): for font_file in os.listdir(fonts_dir): if font_file.lower().endswith(('.ttf', '.otf', '.ttc', '.woff', '.woff2')): font_path = os.path.join(fonts_dir, font_file) font_remote = f"adobe_ps/fonts/{timestamp}_{font_file}" try: font_upload = gcs_storage.upload_file(font_path, font_remote) if font_upload.get("success"): font_url = font_upload.get("download_url") font_urls.append(font_url) logger.info(f"Uploaded font '{font_file}' to GCS") else: logger.warning(f"Failed to upload font '{font_file}': {font_upload.get('message')}") except Exception as fe: logger.warning(f"Error uploading font '{font_file}': {fe}") # Get authentication token try: access_token, _ = token_manager.get_token(config.DEFAULT_SCOPES) except Exception as e: logger.error(f"Error getting token: {e}") return {"success": False, "message": f"Error getting token: {e}"} # Find text layers that need updating text_updates = [] for layer in json_data.get('textLayers', []): if layer.get('updatedText') and layer.get('text') != layer.get('updatedText'): text_obj = { "contents": layer.get('updatedText') } # Include characterStyles with only fontPostScriptName to preserve the font style_info = layer.get('styleInfo', {}) font_name = style_info.get('font') if style_info else None if font_name: text_obj["characterStyles"] = [{"fontPostScriptName": font_name}] logger.info(f"Requesting font '{font_name}' for layer '{layer.get('name')}'") layer_update = { "name": layer.get('name', ''), "text": text_obj } logger.info(f"Updating layer '{layer.get('name')}': '{layer.get('text')[:40]}...' -> '{layer.get('updatedText')[:40]}...'") text_updates.append(layer_update) if not text_updates: logger.info("No text changes needed") return {"success": True, "message": "No text changes needed"} # Create payload per Adobe API documentation payload = { "inputs": [ { "storage": "external", "href": input_url } ], "options": { "layers": text_updates }, "outputs": [ { "storage": "external", "href": output_url, "type": "vnd.adobe.photoshop" } ] } # Add custom fonts if any were uploaded if font_urls: payload["options"]["fonts"] = [ {"storage": "external", "href": url} for url in font_urls ] logger.info(f"Added {len(font_urls)} custom font(s) to payload") # Set up request headers headers = { "x-api-key": config.ADOBE_CLIENT_ID, "Authorization": f"Bearer {access_token}", "Content-Type": "application/json" } # Log the payload logger.info(f"Request payload: {json.dumps(payload, indent=2)}") # Make the API request try: endpoint = "https://image.adobe.io/pie/psdService/text" logger.info(f"Sending request to {endpoint}") response = requests.post( endpoint, headers=headers, json=payload, timeout=30 ) # Log response logger.info(f"Response status: {response.status_code}") if response.text: try: resp_data = response.json() logger.info(f"Response: {json.dumps(resp_data, indent=2)}") except: logger.info(f"Response text: {response.text}") # Process response if response.status_code == 200 or response.status_code == 202: result = response.json() # For async processing (202), get status URL if response.status_code == 202 and '_links' in result: status_url = result.get('_links', {}).get('self', {}).get('href') logger.info(f"Request accepted. Status URL: {status_url}") # Poll status URL if status_url: max_retries = 12 retry_count = 0 while retry_count < max_retries: time.sleep(5) status_response = requests.get( status_url, headers={"Authorization": f"Bearer {access_token}", "x-api-key": config.ADOBE_CLIENT_ID}, timeout=30 ) if status_response.status_code == 200: status_data = status_response.json() status = status_data.get('status', '') logger.info(f"Status: {status}") if status == 'succeeded': logger.info("Processing succeeded!") break elif status == 'failed': logger.error(f"Processing failed: {status_data}") return {"success": False, "message": "API processing failed"} retry_count += 1 # Try to download output file try: logger.info(f"Waiting for output file...") time.sleep(10) # Wait a bit for file to be written output_check = gcs_storage.check_output_file(output_path, wait_time=60) if output_check.get("success"): # Use a better directory structure - keep the processed file in the same directory output_dir = os.path.dirname(psd_path) # Generate a better output filename to avoid nested processed folders base_name = os.path.basename(psd_path) output_filename = f"api_updated_{base_name}" output_file_path = os.path.join(output_dir, output_filename) logger.info(f"Downloading processed file to: {output_file_path}") # Add cleanup=True to delete the remote file after download download_result = gcs_storage.download_file(output_path, output_file_path, cleanup=True) # Also clean up the input file try: gcs_storage.cleanup_files(os.path.dirname(remote_path)) except Exception as clean_err: logger.warning(f"Error cleaning up temporary files: {clean_err}") if download_result.get("success"): logger.info(f"Successfully downloaded to: {output_file_path}") return { "success": True, "message": "Text update completed and output file downloaded", "processed_file": output_file_path } else: logger.warning(f"Output file not found: {output_check.get('message')}") except Exception as dl_err: logger.error(f"Error downloading output: {dl_err}") # Even if download fails, return success for the API call itself return { "success": True, "message": "Text update request processed successfully", "api_result": result } else: # API call failed error_message = "Unknown error" try: error_data = response.json() error_message = error_data.get('message', error_data.get('title', response.text)) except: error_message = response.text if response.text else f"HTTP {response.status_code}" return { "success": False, "message": f"Text update failed: {error_message}", "status_code": response.status_code } except Exception as e: logger.error(f"Request error: {e}") return {"success": False, "message": f"Request error: {e}"} def main(): """Parse arguments and run the script""" parser = argparse.ArgumentParser( description='Update text in a PSD file using simplified Adobe API payload' ) parser.add_argument('json_file', help='Path to JSON file with text data') parser.add_argument('psd_file', help='Path to PSD file to update') parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') args = parser.parse_args() # Set verbose logging if requested if args.verbose: logger.setLevel(logging.DEBUG) # Run the update result = update_text_with_simplified_payload(args.json_file, args.psd_file) # Print results if result.get("success"): print(f"\nSuccess: {result.get('message')}") if "processed_file" in result: print(f"Processed file: {result.get('processed_file')}") else: print(f"\nError: {result.get('message')}") sys.exit(1) if __name__ == "__main__": main()