Changes behavior to only log when Box folder is empty, not send emails. Rationale: - Empty folder is normal operation (not an error condition) - Reduces email noise when script runs on cron - Still logs the event for monitoring - Similar to other workflows (a1_to_a2, etc.) that don't email when no work to do Changes: - Removed notifier.send_email() call for 'creativex_no_files' template - Enhanced log message: "No PDF files found - this is normal when folder is empty" - Added: "Script completed successfully with no files to process" - Still returns success=True (not an error) Template 'creativex_no_files' retained for potential future use but not called. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
399 lines
14 KiB
Python
Executable file
399 lines
14 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
CreativeX Score Extractor and Storage
|
|
Processes PDFs from Box folder 350605024645, extracts CreativeX scores using LlamaExtract,
|
|
stores results in database, and removes processed files from Box.
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add shared library to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from shared.config_loader import load_config
|
|
from shared.box_client import BoxClient
|
|
from shared.database import Database
|
|
from shared.notifier import Notifier
|
|
|
|
# Setup logging with rotation
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
# Create logs directory if it doesn't exist
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
# Configure logging with rotation
|
|
log_handler = RotatingFileHandler(
|
|
'logs/creativex_scoring.log',
|
|
maxBytes=10*1024*1024, # 10MB per file
|
|
backupCount=28 # Keep 28 rotated files (approximately 1 month)
|
|
)
|
|
log_handler.setLevel(logging.INFO)
|
|
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
handlers=[log_handler, console_handler]
|
|
)
|
|
|
|
logger = logging.getLogger('CreativeXScoring')
|
|
|
|
|
|
class CreativeXExtractor:
|
|
"""Handles extraction of CreativeX data from PDF files using LlamaExtract."""
|
|
|
|
def __init__(self, api_key, agent_name):
|
|
"""
|
|
Initialize the Llama Extract client.
|
|
|
|
Args:
|
|
api_key: LlamaCloud API key
|
|
agent_name: Agent name in LlamaExtract
|
|
"""
|
|
try:
|
|
from llama_cloud_services import LlamaExtract
|
|
self.extractor = LlamaExtract(api_key=api_key)
|
|
self.agent_name = agent_name
|
|
logger.info("LlamaExtract client initialized with agent: {}".format(agent_name))
|
|
except ImportError:
|
|
logger.error("llama-cloud-services not installed. Run: pip install llama-cloud-services")
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to initialize LlamaExtract: {}".format(str(e)))
|
|
raise
|
|
|
|
def extract_from_file(self, file_path):
|
|
"""
|
|
Extract data from a PDF file using Llama Extract.
|
|
|
|
Args:
|
|
file_path: Path to the PDF file
|
|
|
|
Returns:
|
|
Dictionary containing the extraction result, or None if extraction fails
|
|
"""
|
|
try:
|
|
logger.info(" Getting agent: {}".format(self.agent_name))
|
|
agent = self.extractor.get_agent(name=self.agent_name)
|
|
|
|
if agent is None:
|
|
raise Exception("Agent '{}' not found".format(self.agent_name))
|
|
|
|
logger.info(" Running extraction on: {}".format(os.path.basename(file_path)))
|
|
result = agent.extract(str(file_path))
|
|
|
|
# Convert result to dictionary format
|
|
extraction_data = {
|
|
"run_id": getattr(result, "run_id", None),
|
|
"extraction_agent_id": getattr(result, "extraction_agent_id", None),
|
|
"data": result.data if hasattr(result, "data") else {},
|
|
"extraction_metadata": getattr(result, "extraction_metadata", {})
|
|
}
|
|
|
|
return extraction_data
|
|
|
|
except Exception as e:
|
|
logger.error(" ERROR: Extraction failed - {}".format(str(e)))
|
|
return None
|
|
|
|
def parse_csv_fields(self, extraction_data):
|
|
"""
|
|
Parse specific fields for database storage.
|
|
|
|
Expected fields:
|
|
- filename
|
|
- creativeXId.id
|
|
- creativeXId.url
|
|
- ferreroCreativeQuality.percentage
|
|
|
|
Args:
|
|
extraction_data: Full extraction result dictionary
|
|
|
|
Returns:
|
|
Dictionary with parsed fields, or None if required fields are missing
|
|
"""
|
|
try:
|
|
data = extraction_data.get("data", {})
|
|
|
|
# Extract filename
|
|
filename = data.get("filename", "")
|
|
|
|
# Extract creativeXId fields
|
|
creative_x_id_obj = data.get("creativeXId", {})
|
|
creative_x_id_raw = creative_x_id_obj.get("id", "") if isinstance(creative_x_id_obj, dict) else ""
|
|
creative_x_url = creative_x_id_obj.get("url", "") if isinstance(creative_x_id_obj, dict) else ""
|
|
|
|
# Extract ferreroCreativeQuality percentage
|
|
ferrero_quality_obj = data.get("ferreroCreativeQuality", {})
|
|
quality_score_raw = ferrero_quality_obj.get("percentage", "") if isinstance(ferrero_quality_obj, dict) else ""
|
|
|
|
# Clean up numeric values - remove .0 decimal
|
|
# Convert to string and remove .0 if present
|
|
creative_x_id = str(int(float(creative_x_id_raw))) if creative_x_id_raw else ""
|
|
quality_score = str(int(float(quality_score_raw))) if quality_score_raw else ""
|
|
|
|
# Validate that we have the critical fields
|
|
if not filename:
|
|
logger.warning(" WARNING: filename field is missing from extraction data")
|
|
|
|
return {
|
|
"filename": filename,
|
|
"id": creative_x_id,
|
|
"url": creative_x_url,
|
|
"score": quality_score
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(" ERROR: Failed to parse CSV fields - {}".format(str(e)))
|
|
return None
|
|
|
|
|
|
def process_pdfs(box_client, db, extractor, notifier, config):
|
|
"""
|
|
Process all PDFs in the CreativeX Box folder.
|
|
|
|
Args:
|
|
box_client: BoxClient instance
|
|
db: Database instance
|
|
extractor: CreativeXExtractor instance
|
|
notifier: Notifier instance
|
|
config: Configuration dict
|
|
|
|
Returns:
|
|
dict with processing results
|
|
"""
|
|
creativex_folder_id = config['creativex']['box_folder_id']
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("CreativeX Score Extraction")
|
|
logger.info("=" * 60)
|
|
logger.info("Box Folder ID: {}".format(creativex_folder_id))
|
|
logger.info("")
|
|
|
|
try:
|
|
# List all PDF files in Box folder
|
|
files = box_client.list_folder_files(creativex_folder_id)
|
|
pdf_files = [f for f in files if f['name'].lower().endswith('.pdf')]
|
|
|
|
if not pdf_files:
|
|
logger.info("No PDF files found in Box folder - this is normal when folder is empty")
|
|
logger.info("Script completed successfully with no files to process")
|
|
|
|
# No email sent when no files found (normal operation)
|
|
return {'success': True, 'file_count': 0, 'processed': 0, 'failed': 0}
|
|
|
|
logger.info("Found {} PDF file(s) to process".format(len(pdf_files)))
|
|
logger.info("")
|
|
|
|
# Create temp directory
|
|
temp_dir = Path('temp/creativex')
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Track results
|
|
processed_files = []
|
|
failed_files = []
|
|
|
|
# Process each PDF
|
|
for idx, file_info in enumerate(pdf_files, 1):
|
|
file_id = file_info['id']
|
|
filename = file_info['name']
|
|
|
|
logger.info("[{}/{}] Processing: {}".format(idx, len(pdf_files), filename))
|
|
|
|
try:
|
|
# 1. Download PDF from Box
|
|
temp_file_path = temp_dir / filename
|
|
box_client.download_file(file_id, str(temp_file_path))
|
|
|
|
# 2. Extract data using LlamaExtract
|
|
extraction_data = extractor.extract_from_file(str(temp_file_path))
|
|
|
|
if extraction_data is None:
|
|
raise Exception("Extraction returned None")
|
|
|
|
# 3. Parse fields
|
|
parsed_fields = extractor.parse_csv_fields(extraction_data)
|
|
|
|
if not parsed_fields:
|
|
raise Exception("Failed to parse extraction fields")
|
|
|
|
# 4. Store in database with full JSON
|
|
db_result = db.store_creativex_score(
|
|
filename=parsed_fields['filename'],
|
|
creativex_id=parsed_fields['id'],
|
|
creativex_url=parsed_fields['url'],
|
|
quality_score=parsed_fields['score'],
|
|
box_file_id=file_id,
|
|
full_extraction_data=extraction_data
|
|
)
|
|
|
|
if not db_result['success']:
|
|
raise Exception("Database storage failed: {}".format(db_result.get('error', 'Unknown')))
|
|
|
|
# 5. Delete file from Box (only after successful storage)
|
|
try:
|
|
box_file = box_client.client.file(file_id)
|
|
box_file.delete()
|
|
logger.info(" Deleted from Box: {}".format(filename))
|
|
except Exception as e:
|
|
logger.warning(" Could not delete file from Box: {}".format(str(e)))
|
|
# Don't fail the whole process if delete fails
|
|
|
|
# 6. Clean up local temp file
|
|
try:
|
|
os.remove(str(temp_file_path))
|
|
except Exception as e:
|
|
logger.warning(" Could not delete temp file: {}".format(str(e)))
|
|
|
|
# Track success with version info
|
|
processed_files.append({
|
|
'filename': parsed_fields['filename'],
|
|
'creativex_id': parsed_fields['id'],
|
|
'creativex_url': parsed_fields['url'],
|
|
'quality_score': parsed_fields['score'],
|
|
'box_file_id': file_id,
|
|
'version_number': db_result.get('version_number', 1),
|
|
'is_update': db_result.get('is_update', False)
|
|
})
|
|
|
|
logger.info(" ✓ Success: Score {} extracted and stored (Version {})".format(
|
|
parsed_fields['score'],
|
|
db_result.get('version_number', 1)
|
|
))
|
|
logger.info("")
|
|
|
|
except Exception as e:
|
|
logger.error(" ✗ Failed: {}".format(str(e)))
|
|
logger.info("")
|
|
|
|
failed_files.append({
|
|
'filename': filename,
|
|
'box_file_id': file_id,
|
|
'error': str(e)
|
|
})
|
|
|
|
# Clean up temp file if it exists
|
|
try:
|
|
temp_file_path = temp_dir / filename
|
|
if temp_file_path.exists():
|
|
os.remove(str(temp_file_path))
|
|
except:
|
|
pass
|
|
|
|
# Summary
|
|
total_files = len(pdf_files)
|
|
success_count = len(processed_files)
|
|
failed_count = len(failed_files)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Processing Complete")
|
|
logger.info("=" * 60)
|
|
logger.info("Total Files: {}".format(total_files))
|
|
logger.info("Successful: {}".format(success_count))
|
|
logger.info("Failed: {}".format(failed_count))
|
|
logger.info("")
|
|
|
|
# Send email notification
|
|
if failed_count == 0:
|
|
# All successful
|
|
notifier.send_email(
|
|
template_name='creativex_complete',
|
|
recipients=config['notifications']['recipients']['success'],
|
|
data={
|
|
'file_count': total_files,
|
|
'success_count': success_count,
|
|
'processed_files': processed_files
|
|
}
|
|
)
|
|
else:
|
|
# Partial success
|
|
notifier.send_email(
|
|
template_name='creativex_partial',
|
|
recipients=config['notifications']['recipients']['errors'],
|
|
data={
|
|
'file_count': total_files,
|
|
'success_count': success_count,
|
|
'failed_count': failed_count,
|
|
'processed_files': processed_files,
|
|
'failed_files': failed_files
|
|
}
|
|
)
|
|
|
|
return {
|
|
'success': success_count > 0,
|
|
'file_count': total_files,
|
|
'processed': success_count,
|
|
'failed': failed_count
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("FATAL ERROR: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
def main():
|
|
"""Entry point."""
|
|
try:
|
|
logger.info("Starting CreativeX Score Extraction")
|
|
logger.info("")
|
|
|
|
# Load configuration
|
|
config = load_config('config/config.yaml')
|
|
|
|
# Initialize clients
|
|
logger.info("Initializing clients...")
|
|
|
|
# Box client for CreativeX folder
|
|
box = BoxClient(config, root_folder_id=config['creativex']['box_folder_id'])
|
|
|
|
# Database
|
|
db = Database(config)
|
|
|
|
# Notifier
|
|
notifier = Notifier(config)
|
|
|
|
# CreativeX Extractor
|
|
extractor = CreativeXExtractor(
|
|
api_key=config['creativex']['llama_api_key'],
|
|
agent_name=config['creativex']['agent_name']
|
|
)
|
|
|
|
logger.info("Clients initialized successfully")
|
|
logger.info("")
|
|
|
|
# Process PDFs
|
|
result = process_pdfs(box, db, extractor, notifier, config)
|
|
|
|
if result['success']:
|
|
logger.info("✓ CreativeX extraction completed successfully")
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("✗ CreativeX extraction failed")
|
|
sys.exit(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\n\nProcess interrupted by user.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error("\nFATAL ERROR: {}".format(str(e)))
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
finally:
|
|
# Close database connections
|
|
try:
|
|
db.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|