ferrero-opentext/Python-Version/scripts/creativex_scoring_storing.py
DJP 6fee0cc725 Add version tracking and remove .0 decimals from CreativeX scores
Implements version counter for re-scored files and cleans up numeric formatting.

Decimal Removal:
- Strip .0 suffix from creativex_id (6864255.0 → 6864255)
- Strip .0 suffix from quality_score (80.0 → 80)
- Converts float → int → string before storing
- Cleaner data for display and DAM integration

Version Tracking:
- Counts total versions per filename (active + superseded)
- Returns version_number in database result
- Logs show version: "Score 80 extracted (Version 3)"
- Email templates display version badges for updates

Email Template Updates:
- Complete template: Shows "Version 3 (Updated)" badge in header
- Includes note: "This is version 3 of this file"
- Partial template: Shows "(Version 3)" inline
- Only displays version info if > 1

Database Changes:
- Query counts ALL versions before insert
- Returns version_number in result dict
- Logs include version in success/update messages

Benefits:
- Clean numeric values without unnecessary decimals
- Users can see if file was re-scored
- Version history visible in emails
- Still preserves all history in database
- A2→A3 integration unaffected (always gets latest active)

Example progression:
Upload 1: Score 80 (no version shown - it's the first)
Upload 2: Score 85 (Version 2 badge shown)
Upload 3: Score 90 (Version 3 badge shown)

Documentation: CREATIVEX_VERSION_UPDATES.md

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-11 16:55:07 -05:00

406 lines
14 KiB
Python
Executable file

#!/usr/bin/env python3
"""
CreativeX Score Extractor and Storage
Processes PDFs from Box folder 350605024645, extracts CreativeX scores using LlamaExtract,
stores results in database, and removes processed files from Box.
Compatible with Python 3.6+
"""
import sys
import os
import logging
from datetime import datetime
from pathlib import Path
# Add shared library to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config
from shared.box_client import BoxClient
from shared.database import Database
from shared.notifier import Notifier
# Setup logging with rotation
from logging.handlers import RotatingFileHandler
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Configure logging with rotation
log_handler = RotatingFileHandler(
'logs/creativex_scoring.log',
maxBytes=10*1024*1024, # 10MB per file
backupCount=28 # Keep 28 rotated files (approximately 1 month)
)
log_handler.setLevel(logging.INFO)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
level=logging.INFO,
handlers=[log_handler, console_handler]
)
logger = logging.getLogger('CreativeXScoring')
class CreativeXExtractor:
"""Handles extraction of CreativeX data from PDF files using LlamaExtract."""
def __init__(self, api_key, agent_name):
"""
Initialize the Llama Extract client.
Args:
api_key: LlamaCloud API key
agent_name: Agent name in LlamaExtract
"""
try:
from llama_cloud_services import LlamaExtract
self.extractor = LlamaExtract(api_key=api_key)
self.agent_name = agent_name
logger.info("LlamaExtract client initialized with agent: {}".format(agent_name))
except ImportError:
logger.error("llama-cloud-services not installed. Run: pip install llama-cloud-services")
raise
except Exception as e:
logger.error("Failed to initialize LlamaExtract: {}".format(str(e)))
raise
def extract_from_file(self, file_path):
"""
Extract data from a PDF file using Llama Extract.
Args:
file_path: Path to the PDF file
Returns:
Dictionary containing the extraction result, or None if extraction fails
"""
try:
logger.info(" Getting agent: {}".format(self.agent_name))
agent = self.extractor.get_agent(name=self.agent_name)
if agent is None:
raise Exception("Agent '{}' not found".format(self.agent_name))
logger.info(" Running extraction on: {}".format(os.path.basename(file_path)))
result = agent.extract(str(file_path))
# Convert result to dictionary format
extraction_data = {
"run_id": getattr(result, "run_id", None),
"extraction_agent_id": getattr(result, "extraction_agent_id", None),
"data": result.data if hasattr(result, "data") else {},
"extraction_metadata": getattr(result, "extraction_metadata", {})
}
return extraction_data
except Exception as e:
logger.error(" ERROR: Extraction failed - {}".format(str(e)))
return None
def parse_csv_fields(self, extraction_data):
"""
Parse specific fields for database storage.
Expected fields:
- filename
- creativeXId.id
- creativeXId.url
- ferreroCreativeQuality.percentage
Args:
extraction_data: Full extraction result dictionary
Returns:
Dictionary with parsed fields, or None if required fields are missing
"""
try:
data = extraction_data.get("data", {})
# Extract filename
filename = data.get("filename", "")
# Extract creativeXId fields
creative_x_id_obj = data.get("creativeXId", {})
creative_x_id_raw = creative_x_id_obj.get("id", "") if isinstance(creative_x_id_obj, dict) else ""
creative_x_url = creative_x_id_obj.get("url", "") if isinstance(creative_x_id_obj, dict) else ""
# Extract ferreroCreativeQuality percentage
ferrero_quality_obj = data.get("ferreroCreativeQuality", {})
quality_score_raw = ferrero_quality_obj.get("percentage", "") if isinstance(ferrero_quality_obj, dict) else ""
# Clean up numeric values - remove .0 decimal
# Convert to string and remove .0 if present
creative_x_id = str(int(float(creative_x_id_raw))) if creative_x_id_raw else ""
quality_score = str(int(float(quality_score_raw))) if quality_score_raw else ""
# Validate that we have the critical fields
if not filename:
logger.warning(" WARNING: filename field is missing from extraction data")
return {
"filename": filename,
"id": creative_x_id,
"url": creative_x_url,
"score": quality_score
}
except Exception as e:
logger.error(" ERROR: Failed to parse CSV fields - {}".format(str(e)))
return None
def process_pdfs(box_client, db, extractor, notifier, config):
"""
Process all PDFs in the CreativeX Box folder.
Args:
box_client: BoxClient instance
db: Database instance
extractor: CreativeXExtractor instance
notifier: Notifier instance
config: Configuration dict
Returns:
dict with processing results
"""
creativex_folder_id = config['creativex']['box_folder_id']
logger.info("=" * 60)
logger.info("CreativeX Score Extraction")
logger.info("=" * 60)
logger.info("Box Folder ID: {}".format(creativex_folder_id))
logger.info("")
try:
# List all PDF files in Box folder
files = box_client.list_folder_files(creativex_folder_id)
pdf_files = [f for f in files if f['name'].lower().endswith('.pdf')]
if not pdf_files:
logger.info("No PDF files found in Box folder")
# Send email notification
notifier.send_email(
template_name='creativex_no_files',
recipients=config['notifications']['recipients']['success'],
data={
'timestamp': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
)
return {'success': True, 'file_count': 0, 'processed': 0, 'failed': 0}
logger.info("Found {} PDF file(s) to process".format(len(pdf_files)))
logger.info("")
# Create temp directory
temp_dir = Path('temp/creativex')
temp_dir.mkdir(parents=True, exist_ok=True)
# Track results
processed_files = []
failed_files = []
# Process each PDF
for idx, file_info in enumerate(pdf_files, 1):
file_id = file_info['id']
filename = file_info['name']
logger.info("[{}/{}] Processing: {}".format(idx, len(pdf_files), filename))
try:
# 1. Download PDF from Box
temp_file_path = temp_dir / filename
box_client.download_file(file_id, str(temp_file_path))
# 2. Extract data using LlamaExtract
extraction_data = extractor.extract_from_file(str(temp_file_path))
if extraction_data is None:
raise Exception("Extraction returned None")
# 3. Parse fields
parsed_fields = extractor.parse_csv_fields(extraction_data)
if not parsed_fields:
raise Exception("Failed to parse extraction fields")
# 4. Store in database with full JSON
db_result = db.store_creativex_score(
filename=parsed_fields['filename'],
creativex_id=parsed_fields['id'],
creativex_url=parsed_fields['url'],
quality_score=parsed_fields['score'],
box_file_id=file_id,
full_extraction_data=extraction_data
)
if not db_result['success']:
raise Exception("Database storage failed: {}".format(db_result.get('error', 'Unknown')))
# 5. Delete file from Box (only after successful storage)
try:
box_file = box_client.client.file(file_id)
box_file.delete()
logger.info(" Deleted from Box: {}".format(filename))
except Exception as e:
logger.warning(" Could not delete file from Box: {}".format(str(e)))
# Don't fail the whole process if delete fails
# 6. Clean up local temp file
try:
os.remove(str(temp_file_path))
except Exception as e:
logger.warning(" Could not delete temp file: {}".format(str(e)))
# Track success with version info
processed_files.append({
'filename': parsed_fields['filename'],
'creativex_id': parsed_fields['id'],
'creativex_url': parsed_fields['url'],
'quality_score': parsed_fields['score'],
'box_file_id': file_id,
'version_number': db_result.get('version_number', 1),
'is_update': db_result.get('is_update', False)
})
logger.info(" ✓ Success: Score {} extracted and stored (Version {})".format(
parsed_fields['score'],
db_result.get('version_number', 1)
))
logger.info("")
except Exception as e:
logger.error(" ✗ Failed: {}".format(str(e)))
logger.info("")
failed_files.append({
'filename': filename,
'box_file_id': file_id,
'error': str(e)
})
# Clean up temp file if it exists
try:
temp_file_path = temp_dir / filename
if temp_file_path.exists():
os.remove(str(temp_file_path))
except:
pass
# Summary
total_files = len(pdf_files)
success_count = len(processed_files)
failed_count = len(failed_files)
logger.info("=" * 60)
logger.info("Processing Complete")
logger.info("=" * 60)
logger.info("Total Files: {}".format(total_files))
logger.info("Successful: {}".format(success_count))
logger.info("Failed: {}".format(failed_count))
logger.info("")
# Send email notification
if failed_count == 0:
# All successful
notifier.send_email(
template_name='creativex_complete',
recipients=config['notifications']['recipients']['success'],
data={
'file_count': total_files,
'success_count': success_count,
'processed_files': processed_files
}
)
else:
# Partial success
notifier.send_email(
template_name='creativex_partial',
recipients=config['notifications']['recipients']['errors'],
data={
'file_count': total_files,
'success_count': success_count,
'failed_count': failed_count,
'processed_files': processed_files,
'failed_files': failed_files
}
)
return {
'success': success_count > 0,
'file_count': total_files,
'processed': success_count,
'failed': failed_count
}
except Exception as e:
logger.error("FATAL ERROR: {}".format(str(e)))
return {'success': False, 'error': str(e)}
def main():
"""Entry point."""
try:
logger.info("Starting CreativeX Score Extraction")
logger.info("")
# Load configuration
config = load_config('config/config.yaml')
# Initialize clients
logger.info("Initializing clients...")
# Box client for CreativeX folder
box = BoxClient(config, root_folder_id=config['creativex']['box_folder_id'])
# Database
db = Database(config)
# Notifier
notifier = Notifier(config)
# CreativeX Extractor
extractor = CreativeXExtractor(
api_key=config['creativex']['llama_api_key'],
agent_name=config['creativex']['agent_name']
)
logger.info("Clients initialized successfully")
logger.info("")
# Process PDFs
result = process_pdfs(box, db, extractor, notifier, config)
if result['success']:
logger.info("✓ CreativeX extraction completed successfully")
sys.exit(0)
else:
logger.error("✗ CreativeX extraction failed")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\n\nProcess interrupted by user.")
sys.exit(1)
except Exception as e:
logger.error("\nFATAL ERROR: {}".format(str(e)))
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Close database connections
try:
db.close()
except:
pass
if __name__ == "__main__":
main()