ferrero-opentext/Python-Version/scripts/creativex_scoring_storing.py
nickviljoen f83b4fae3e PPR Environment: Use SIMPLE metadata structure for tabular fields
Key Changes:
- Updated metadata_extractor_mvp.py to use SIMPLE structure for all tabular fields
- All tabular fields now use direct value objects (no MetadataTableFieldRow wrapper)
- MAIN_LANGUAGES, ASSETCOMPLIANCE, MARKETING_TAG, CREATIVEX all use SIMPLE structure
- Master Asset ID field updated to SIMPLE structure
- Date fields now use type 'string' instead of 'long'
- Matches DAM reference structure from asset_representation.json

Added Files:
- metadata_extractor_mvp_PROD.py: PROD-specific version with same SIMPLE structure
- Backup files for safety
- Analysis and comparison documentation

Environment:
- Tested and working in PPR environment (ppr.dam.ferrero.com)
- All tabular fields match DAM-supplied reference structure
- Successful uploads confirmed

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-23 16:52:50 +02:00

516 lines
19 KiB
Python
Executable file

#!/usr/bin/env python3
"""
CreativeX Score Extractor and Storage
Processes PDFs from Box folder 350605024645, extracts CreativeX scores using LlamaExtract,
stores results in database, and removes processed files from Box.
Compatible with Python 3.6+
"""
import sys
import os
import logging
from datetime import datetime
from pathlib import Path
# Add shared library to path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from shared.config_loader import load_config
from shared.box_client import BoxClient
from shared.database import Database
from shared.notifier import Notifier
# Setup logging with rotation
from logging.handlers import RotatingFileHandler
# Create logs directory if it doesn't exist
os.makedirs('logs', exist_ok=True)
# Configure logging with rotation
log_handler = RotatingFileHandler(
'logs/creativex_scoring.log',
maxBytes=10*1024*1024, # 10MB per file
backupCount=28 # Keep 28 rotated files (approximately 1 month)
)
log_handler.setLevel(logging.INFO)
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logging.basicConfig(
level=logging.INFO,
handlers=[log_handler, console_handler]
)
logger = logging.getLogger('CreativeXScoring')
class CreativeXExtractor:
"""Handles extraction of CreativeX data from PDF files using LlamaExtract."""
def __init__(self, api_key, agent_name, config=None):
"""
Initialize the Llama Extract client.
Args:
api_key: LlamaCloud API key
agent_name: Agent name in LlamaExtract
config: Configuration dict (optional, for loading mappings)
"""
try:
from llama_cloud_services import LlamaExtract
self.extractor = LlamaExtract(api_key=api_key)
self.agent_name = agent_name
logger.info("LlamaExtract client initialized with agent: {}".format(agent_name))
# Load mappings
self.platform_mappings = self._load_mappings()
except ImportError:
logger.error("llama-cloud-services not installed. Run: pip install llama-cloud-services")
raise
except Exception as e:
logger.error("Failed to initialize LlamaExtract: {}".format(str(e)))
raise
def _load_mappings(self):
"""Load Channel/Placement -> Platform mappings from CSV"""
mappings = {}
csv_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'DAM- CX mappings.csv')
try:
import csv
if not os.path.exists(csv_path):
logger.warning("Mapping file not found: {}".format(csv_path))
return {}
with open(csv_path, 'r', encoding='utf-8-sig') as f:
reader = csv.DictReader(f)
for row in reader:
# Key is (Channel, Placement)
# Handle empty placement as empty string
channel = row.get('Value Lists on Creative X', '').strip()
placement = row.get(reader.fieldnames[1], '').strip() # 2nd column is Placement
platform = row.get('Value List on DAM', '').strip()
if channel:
key = (channel.lower(), placement.lower())
mappings[key] = platform
logger.info("Loaded {} platform mappings".format(len(mappings)))
return mappings
except Exception as e:
logger.error("Failed to load mappings: {}".format(str(e)))
return {}
def get_mapped_platforms(self, channel, placements):
"""
Map Channel + Placements to list of DAM Platforms
Args:
channel: Channel string
placements: List of placement strings or None
Returns:
List of unique Mapped Platform strings
"""
platforms = set()
if not channel:
return []
channel_key = channel.strip().lower()
# If placements exist, try to match each (Channel, Placement)
if placements and isinstance(placements, list):
for placement in placements:
placement_key = placement.strip().lower()
key = (channel_key, placement_key)
if key in self.platform_mappings:
platforms.add(self.platform_mappings[key])
# If no specific placements matched, try generic channel match
if not platforms:
# Try exact match with empty placement (Channel, "")
key_empty = (channel_key, "")
if key_empty in self.platform_mappings:
platforms.add(self.platform_mappings[key_empty])
# Fallback: Try to find any match for this channel
if not platforms:
for (c, p), platform in self.platform_mappings.items():
if c == channel_key:
platforms.add(platform)
# If we find one generic match, maybe that's enough?
# Or should we collect all? Usually generic match implies one platform.
break
return list(platforms)
def extract_from_file(self, file_path):
"""
Extract data from a PDF file using Llama Extract.
Args:
file_path: Path to the PDF file
Returns:
Dictionary containing the extraction result, or None if extraction fails
"""
try:
logger.info(" Getting agent: {}".format(self.agent_name))
agent = self.extractor.get_agent(name=self.agent_name)
if agent is None:
raise Exception("Agent '{}' not found".format(self.agent_name))
logger.info(" Running extraction on: {}".format(os.path.basename(file_path)))
result = agent.extract(str(file_path))
# Convert result to dictionary format
extraction_data = {
"run_id": getattr(result, "run_id", None),
"extraction_agent_id": getattr(result, "extraction_agent_id", None),
"data": result.data if hasattr(result, "data") else {},
"extraction_metadata": getattr(result, "extraction_metadata", {})
}
return extraction_data
except Exception as e:
logger.error(" ERROR: Extraction failed - {}".format(str(e)))
return None
def parse_csv_fields(self, extraction_data):
"""
Parse specific fields for database storage.
Expected fields from schema:
- filename
- creativeXId.id
- creativeXId.url
- ferreroCreativeQuality.percentage
- channel
- placements (array)
Args:
extraction_data: Full extraction result dictionary
Returns:
Dictionary with parsed fields, or None if required fields are missing
"""
try:
data = extraction_data.get("data", {})
# Extract filename
filename = data.get("filename", "")
# Extract creativeXId fields
creative_x_id_obj = data.get("creativeXId", {})
creative_x_id_raw = creative_x_id_obj.get("id", "") if isinstance(creative_x_id_obj, dict) else ""
creative_x_url = creative_x_id_obj.get("url", "") if isinstance(creative_x_id_obj, dict) else ""
# Extract ferreroCreativeQuality percentage
ferrero_quality_obj = data.get("ferreroCreativeQuality", {})
quality_score_raw = ferrero_quality_obj.get("percentage", "") if isinstance(ferrero_quality_obj, dict) else ""
# Extract Channel and Placements
channel = data.get("channel", "")
placements = data.get("placements", [])
# Map to Platforms (List)
platforms = self.get_mapped_platforms(channel, placements)
if platforms:
logger.info(" Mapped Platforms: {} (from Channel: {}, Placements: {})".format(
platforms, channel, placements
))
else:
logger.warning(" Could not map Platform for Channel: {}, Placements: {}".format(
channel, placements
))
# Fallback to Channel name if no mapping found
platforms = [channel] if channel else []
# Clean up numeric values - remove .0 decimal
# Convert to string and remove .0 if present
creative_x_id = str(int(float(creative_x_id_raw))) if creative_x_id_raw else ""
quality_score = str(int(float(quality_score_raw))) if quality_score_raw else ""
# Validate that we have the critical fields
if not filename:
logger.warning(" WARNING: filename field is missing from extraction data")
return {
"filename": filename,
"id": creative_x_id,
"url": creative_x_url,
"score": quality_score,
"platforms": platforms,
"channel": channel,
"placements": placements
}
except Exception as e:
logger.error(" ERROR: Failed to parse CSV fields - {}".format(str(e)))
return None
def process_pdfs(box_client, db, extractor, notifier, config):
"""
Process all PDFs in the CreativeX Box folder.
Args:
box_client: BoxClient instance
db: Database instance
extractor: CreativeXExtractor instance
notifier: Notifier instance
config: Configuration dict
Returns:
dict with processing results
"""
creativex_folder_id = config['creativex']['box_folder_id']
logger.info("=" * 60)
logger.info("CreativeX Score Extraction")
logger.info("=" * 60)
logger.info("Box Folder ID: {}".format(creativex_folder_id))
logger.info("")
try:
# List all PDF files in Box folder
files = box_client.list_folder_files(creativex_folder_id)
pdf_files = [f for f in files if f['name'].lower().endswith('.pdf')]
if not pdf_files:
logger.info("No PDF files found in Box folder - this is normal when folder is empty")
logger.info("Script completed successfully with no files to process")
# No email sent when no files found (normal operation)
return {'success': True, 'file_count': 0, 'processed': 0, 'failed': 0}
logger.info("Found {} PDF file(s) to process".format(len(pdf_files)))
logger.info("")
# Create temp directory
temp_dir = Path('temp/creativex')
temp_dir.mkdir(parents=True, exist_ok=True)
# Track results
processed_files = []
failed_files = []
# Process each PDF
for idx, file_info in enumerate(pdf_files, 1):
file_id = file_info['id']
filename = file_info['name']
logger.info("[{}/{}] Processing: {}".format(idx, len(pdf_files), filename))
try:
# 1. Download PDF from Box
temp_file_path = temp_dir / filename
box_client.download_file(file_id, str(temp_file_path))
# 2. Extract data using LlamaExtract
extraction_data = extractor.extract_from_file(str(temp_file_path))
if extraction_data is None:
raise Exception("Extraction returned None")
# 3. Parse fields
parsed_fields = extractor.parse_csv_fields(extraction_data)
if not parsed_fields:
raise Exception("Failed to parse extraction fields")
# Use Box PDF filename (convert .pdf to .jpg) instead of extracted filename
# This ensures the filename matches the actual JPG file that will be uploaded
box_based_filename = filename.replace('.pdf', '.jpg') if filename.endswith('.pdf') else filename
# Log if there's a mismatch between Box filename and extracted filename
if parsed_fields['filename'] and parsed_fields['filename'] != box_based_filename:
logger.warning(" Filename mismatch detected:")
logger.warning(" Box PDF filename: {}".format(box_based_filename))
logger.warning(" Extracted from PDF: {}".format(parsed_fields['filename']))
logger.warning(" Using Box filename to ensure database lookup works")
# Inject mapped platforms into extraction_data for storage
if 'data' in extraction_data:
extraction_data['data']['ferrero_mapped_platforms'] = parsed_fields.get('platforms', [])
# 4. Store in database with full JSON using Box-based filename
db_result = db.store_creativex_score(
filename=box_based_filename,
creativex_id=parsed_fields['id'],
creativex_url=parsed_fields['url'],
quality_score=parsed_fields['score'],
box_file_id=file_id,
full_extraction_data=extraction_data
)
if not db_result['success']:
raise Exception("Database storage failed: {}".format(db_result.get('error', 'Unknown')))
# 5. Delete file from Box (only after successful storage)
try:
box_file = box_client.client.file(file_id)
box_file.delete()
logger.info(" Deleted from Box: {}".format(filename))
except Exception as e:
logger.warning(" Could not delete file from Box: {}".format(str(e)))
# Don't fail the whole process if delete fails
# 6. Clean up local temp file
try:
os.remove(str(temp_file_path))
except Exception as e:
logger.warning(" Could not delete temp file: {}".format(str(e)))
# Track success with version info
processed_files.append({
'filename': box_based_filename,
'creativex_id': parsed_fields['id'],
'creativex_url': parsed_fields['url'],
'quality_score': parsed_fields['score'],
'box_file_id': file_id,
'version_number': db_result.get('version_number', 1),
'is_update': db_result.get('is_update', False)
})
logger.info(" ✓ Success: Score {} extracted and stored (Version {})".format(
parsed_fields['score'],
db_result.get('version_number', 1)
))
logger.info("")
except Exception as e:
logger.error(" ✗ Failed: {}".format(str(e)))
logger.info("")
failed_files.append({
'filename': filename,
'box_file_id': file_id,
'error': str(e)
})
# Clean up temp file if it exists
try:
temp_file_path = temp_dir / filename
if temp_file_path.exists():
os.remove(str(temp_file_path))
except:
pass
# Summary
total_files = len(pdf_files)
success_count = len(processed_files)
failed_count = len(failed_files)
logger.info("=" * 60)
logger.info("Processing Complete")
logger.info("=" * 60)
logger.info("Total Files: {}".format(total_files))
logger.info("Successful: {}".format(success_count))
logger.info("Failed: {}".format(failed_count))
logger.info("")
# Send email notification
if failed_count == 0:
# All successful
notifier.send_email(
template_name='creativex_complete',
recipients=config['notifications']['recipients']['success'],
data={
'file_count': total_files,
'success_count': success_count,
'processed_files': processed_files
}
)
else:
# Partial success
notifier.send_email(
template_name='creativex_partial',
recipients=config['notifications']['recipients']['errors'],
data={
'file_count': total_files,
'success_count': success_count,
'failed_count': failed_count,
'processed_files': processed_files,
'failed_files': failed_files
}
)
return {
'success': success_count > 0,
'file_count': total_files,
'processed': success_count,
'failed': failed_count
}
except Exception as e:
logger.error("FATAL ERROR: {}".format(str(e)))
return {'success': False, 'error': str(e)}
def main():
"""Entry point."""
try:
logger.info("Starting CreativeX Score Extraction")
logger.info("")
# Load configuration
config = load_config('config/config.yaml')
# Initialize clients
logger.info("Initializing clients...")
# Box client for CreativeX folder
box = BoxClient(config, root_folder_id=config['creativex']['box_folder_id'])
# Database
db = Database(config)
# Notifier
notifier = Notifier(config)
# CreativeX Extractor
extractor = CreativeXExtractor(
api_key=config['creativex']['llama_api_key'],
agent_name=config['creativex']['agent_name']
)
logger.info("Clients initialized successfully")
logger.info("")
# Process PDFs
result = process_pdfs(box, db, extractor, notifier, config)
if result['success']:
logger.info("✓ CreativeX extraction completed successfully")
sys.exit(0)
else:
logger.error("✗ CreativeX extraction failed")
sys.exit(1)
except KeyboardInterrupt:
logger.info("\n\nProcess interrupted by user.")
sys.exit(1)
except Exception as e:
logger.error("\nFATAL ERROR: {}".format(str(e)))
import traceback
traceback.print_exc()
sys.exit(1)
finally:
# Close database connections
try:
db.close()
except:
pass
if __name__ == "__main__":
main()