Key Changes: - Updated metadata_extractor_mvp.py to use SIMPLE structure for all tabular fields - All tabular fields now use direct value objects (no MetadataTableFieldRow wrapper) - MAIN_LANGUAGES, ASSETCOMPLIANCE, MARKETING_TAG, CREATIVEX all use SIMPLE structure - Master Asset ID field updated to SIMPLE structure - Date fields now use type 'string' instead of 'long' - Matches DAM reference structure from asset_representation.json Added Files: - metadata_extractor_mvp_PROD.py: PROD-specific version with same SIMPLE structure - Backup files for safety - Analysis and comparison documentation Environment: - Tested and working in PPR environment (ppr.dam.ferrero.com) - All tabular fields match DAM-supplied reference structure - Successful uploads confirmed Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
516 lines
19 KiB
Python
Executable file
516 lines
19 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
"""
|
|
CreativeX Score Extractor and Storage
|
|
Processes PDFs from Box folder 350605024645, extracts CreativeX scores using LlamaExtract,
|
|
stores results in database, and removes processed files from Box.
|
|
Compatible with Python 3.6+
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
# Add shared library to path
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
|
|
|
|
from shared.config_loader import load_config
|
|
from shared.box_client import BoxClient
|
|
from shared.database import Database
|
|
from shared.notifier import Notifier
|
|
|
|
# Setup logging with rotation
|
|
from logging.handlers import RotatingFileHandler
|
|
|
|
# Create logs directory if it doesn't exist
|
|
os.makedirs('logs', exist_ok=True)
|
|
|
|
# Configure logging with rotation
|
|
log_handler = RotatingFileHandler(
|
|
'logs/creativex_scoring.log',
|
|
maxBytes=10*1024*1024, # 10MB per file
|
|
backupCount=28 # Keep 28 rotated files (approximately 1 month)
|
|
)
|
|
log_handler.setLevel(logging.INFO)
|
|
log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
console_handler = logging.StreamHandler()
|
|
console_handler.setLevel(logging.INFO)
|
|
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
handlers=[log_handler, console_handler]
|
|
)
|
|
|
|
logger = logging.getLogger('CreativeXScoring')
|
|
|
|
|
|
class CreativeXExtractor:
|
|
"""Handles extraction of CreativeX data from PDF files using LlamaExtract."""
|
|
|
|
def __init__(self, api_key, agent_name, config=None):
|
|
"""
|
|
Initialize the Llama Extract client.
|
|
|
|
Args:
|
|
api_key: LlamaCloud API key
|
|
agent_name: Agent name in LlamaExtract
|
|
config: Configuration dict (optional, for loading mappings)
|
|
"""
|
|
try:
|
|
from llama_cloud_services import LlamaExtract
|
|
self.extractor = LlamaExtract(api_key=api_key)
|
|
self.agent_name = agent_name
|
|
logger.info("LlamaExtract client initialized with agent: {}".format(agent_name))
|
|
|
|
# Load mappings
|
|
self.platform_mappings = self._load_mappings()
|
|
|
|
except ImportError:
|
|
logger.error("llama-cloud-services not installed. Run: pip install llama-cloud-services")
|
|
raise
|
|
except Exception as e:
|
|
logger.error("Failed to initialize LlamaExtract: {}".format(str(e)))
|
|
raise
|
|
|
|
def _load_mappings(self):
|
|
"""Load Channel/Placement -> Platform mappings from CSV"""
|
|
mappings = {}
|
|
csv_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'DAM- CX mappings.csv')
|
|
|
|
try:
|
|
import csv
|
|
if not os.path.exists(csv_path):
|
|
logger.warning("Mapping file not found: {}".format(csv_path))
|
|
return {}
|
|
|
|
with open(csv_path, 'r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
# Key is (Channel, Placement)
|
|
# Handle empty placement as empty string
|
|
channel = row.get('Value Lists on Creative X', '').strip()
|
|
placement = row.get(reader.fieldnames[1], '').strip() # 2nd column is Placement
|
|
platform = row.get('Value List on DAM', '').strip()
|
|
|
|
if channel:
|
|
key = (channel.lower(), placement.lower())
|
|
mappings[key] = platform
|
|
|
|
logger.info("Loaded {} platform mappings".format(len(mappings)))
|
|
return mappings
|
|
except Exception as e:
|
|
logger.error("Failed to load mappings: {}".format(str(e)))
|
|
return {}
|
|
|
|
def get_mapped_platforms(self, channel, placements):
|
|
"""
|
|
Map Channel + Placements to list of DAM Platforms
|
|
|
|
Args:
|
|
channel: Channel string
|
|
placements: List of placement strings or None
|
|
|
|
Returns:
|
|
List of unique Mapped Platform strings
|
|
"""
|
|
platforms = set()
|
|
|
|
if not channel:
|
|
return []
|
|
|
|
channel_key = channel.strip().lower()
|
|
|
|
# If placements exist, try to match each (Channel, Placement)
|
|
if placements and isinstance(placements, list):
|
|
for placement in placements:
|
|
placement_key = placement.strip().lower()
|
|
key = (channel_key, placement_key)
|
|
if key in self.platform_mappings:
|
|
platforms.add(self.platform_mappings[key])
|
|
|
|
# If no specific placements matched, try generic channel match
|
|
if not platforms:
|
|
# Try exact match with empty placement (Channel, "")
|
|
key_empty = (channel_key, "")
|
|
if key_empty in self.platform_mappings:
|
|
platforms.add(self.platform_mappings[key_empty])
|
|
|
|
# Fallback: Try to find any match for this channel
|
|
if not platforms:
|
|
for (c, p), platform in self.platform_mappings.items():
|
|
if c == channel_key:
|
|
platforms.add(platform)
|
|
# If we find one generic match, maybe that's enough?
|
|
# Or should we collect all? Usually generic match implies one platform.
|
|
break
|
|
|
|
return list(platforms)
|
|
|
|
def extract_from_file(self, file_path):
|
|
"""
|
|
Extract data from a PDF file using Llama Extract.
|
|
|
|
Args:
|
|
file_path: Path to the PDF file
|
|
|
|
Returns:
|
|
Dictionary containing the extraction result, or None if extraction fails
|
|
"""
|
|
try:
|
|
logger.info(" Getting agent: {}".format(self.agent_name))
|
|
agent = self.extractor.get_agent(name=self.agent_name)
|
|
|
|
if agent is None:
|
|
raise Exception("Agent '{}' not found".format(self.agent_name))
|
|
|
|
logger.info(" Running extraction on: {}".format(os.path.basename(file_path)))
|
|
result = agent.extract(str(file_path))
|
|
|
|
# Convert result to dictionary format
|
|
extraction_data = {
|
|
"run_id": getattr(result, "run_id", None),
|
|
"extraction_agent_id": getattr(result, "extraction_agent_id", None),
|
|
"data": result.data if hasattr(result, "data") else {},
|
|
"extraction_metadata": getattr(result, "extraction_metadata", {})
|
|
}
|
|
|
|
return extraction_data
|
|
|
|
except Exception as e:
|
|
logger.error(" ERROR: Extraction failed - {}".format(str(e)))
|
|
return None
|
|
|
|
def parse_csv_fields(self, extraction_data):
|
|
"""
|
|
Parse specific fields for database storage.
|
|
|
|
Expected fields from schema:
|
|
- filename
|
|
- creativeXId.id
|
|
- creativeXId.url
|
|
- ferreroCreativeQuality.percentage
|
|
- channel
|
|
- placements (array)
|
|
|
|
Args:
|
|
extraction_data: Full extraction result dictionary
|
|
|
|
Returns:
|
|
Dictionary with parsed fields, or None if required fields are missing
|
|
"""
|
|
try:
|
|
data = extraction_data.get("data", {})
|
|
|
|
# Extract filename
|
|
filename = data.get("filename", "")
|
|
|
|
# Extract creativeXId fields
|
|
creative_x_id_obj = data.get("creativeXId", {})
|
|
creative_x_id_raw = creative_x_id_obj.get("id", "") if isinstance(creative_x_id_obj, dict) else ""
|
|
creative_x_url = creative_x_id_obj.get("url", "") if isinstance(creative_x_id_obj, dict) else ""
|
|
|
|
# Extract ferreroCreativeQuality percentage
|
|
ferrero_quality_obj = data.get("ferreroCreativeQuality", {})
|
|
quality_score_raw = ferrero_quality_obj.get("percentage", "") if isinstance(ferrero_quality_obj, dict) else ""
|
|
|
|
# Extract Channel and Placements
|
|
channel = data.get("channel", "")
|
|
placements = data.get("placements", [])
|
|
|
|
# Map to Platforms (List)
|
|
platforms = self.get_mapped_platforms(channel, placements)
|
|
|
|
if platforms:
|
|
logger.info(" Mapped Platforms: {} (from Channel: {}, Placements: {})".format(
|
|
platforms, channel, placements
|
|
))
|
|
else:
|
|
logger.warning(" Could not map Platform for Channel: {}, Placements: {}".format(
|
|
channel, placements
|
|
))
|
|
# Fallback to Channel name if no mapping found
|
|
platforms = [channel] if channel else []
|
|
|
|
# Clean up numeric values - remove .0 decimal
|
|
# Convert to string and remove .0 if present
|
|
creative_x_id = str(int(float(creative_x_id_raw))) if creative_x_id_raw else ""
|
|
quality_score = str(int(float(quality_score_raw))) if quality_score_raw else ""
|
|
|
|
# Validate that we have the critical fields
|
|
if not filename:
|
|
logger.warning(" WARNING: filename field is missing from extraction data")
|
|
|
|
return {
|
|
"filename": filename,
|
|
"id": creative_x_id,
|
|
"url": creative_x_url,
|
|
"score": quality_score,
|
|
"platforms": platforms,
|
|
"channel": channel,
|
|
"placements": placements
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(" ERROR: Failed to parse CSV fields - {}".format(str(e)))
|
|
return None
|
|
|
|
|
|
def process_pdfs(box_client, db, extractor, notifier, config):
|
|
"""
|
|
Process all PDFs in the CreativeX Box folder.
|
|
|
|
Args:
|
|
box_client: BoxClient instance
|
|
db: Database instance
|
|
extractor: CreativeXExtractor instance
|
|
notifier: Notifier instance
|
|
config: Configuration dict
|
|
|
|
Returns:
|
|
dict with processing results
|
|
"""
|
|
creativex_folder_id = config['creativex']['box_folder_id']
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("CreativeX Score Extraction")
|
|
logger.info("=" * 60)
|
|
logger.info("Box Folder ID: {}".format(creativex_folder_id))
|
|
logger.info("")
|
|
|
|
try:
|
|
# List all PDF files in Box folder
|
|
files = box_client.list_folder_files(creativex_folder_id)
|
|
pdf_files = [f for f in files if f['name'].lower().endswith('.pdf')]
|
|
|
|
if not pdf_files:
|
|
logger.info("No PDF files found in Box folder - this is normal when folder is empty")
|
|
logger.info("Script completed successfully with no files to process")
|
|
|
|
# No email sent when no files found (normal operation)
|
|
return {'success': True, 'file_count': 0, 'processed': 0, 'failed': 0}
|
|
|
|
logger.info("Found {} PDF file(s) to process".format(len(pdf_files)))
|
|
logger.info("")
|
|
|
|
# Create temp directory
|
|
temp_dir = Path('temp/creativex')
|
|
temp_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Track results
|
|
processed_files = []
|
|
failed_files = []
|
|
|
|
# Process each PDF
|
|
for idx, file_info in enumerate(pdf_files, 1):
|
|
file_id = file_info['id']
|
|
filename = file_info['name']
|
|
|
|
logger.info("[{}/{}] Processing: {}".format(idx, len(pdf_files), filename))
|
|
|
|
try:
|
|
# 1. Download PDF from Box
|
|
temp_file_path = temp_dir / filename
|
|
box_client.download_file(file_id, str(temp_file_path))
|
|
|
|
# 2. Extract data using LlamaExtract
|
|
extraction_data = extractor.extract_from_file(str(temp_file_path))
|
|
|
|
if extraction_data is None:
|
|
raise Exception("Extraction returned None")
|
|
|
|
# 3. Parse fields
|
|
parsed_fields = extractor.parse_csv_fields(extraction_data)
|
|
|
|
if not parsed_fields:
|
|
raise Exception("Failed to parse extraction fields")
|
|
|
|
# Use Box PDF filename (convert .pdf to .jpg) instead of extracted filename
|
|
# This ensures the filename matches the actual JPG file that will be uploaded
|
|
box_based_filename = filename.replace('.pdf', '.jpg') if filename.endswith('.pdf') else filename
|
|
|
|
# Log if there's a mismatch between Box filename and extracted filename
|
|
if parsed_fields['filename'] and parsed_fields['filename'] != box_based_filename:
|
|
logger.warning(" Filename mismatch detected:")
|
|
logger.warning(" Box PDF filename: {}".format(box_based_filename))
|
|
logger.warning(" Extracted from PDF: {}".format(parsed_fields['filename']))
|
|
logger.warning(" Using Box filename to ensure database lookup works")
|
|
|
|
# Inject mapped platforms into extraction_data for storage
|
|
if 'data' in extraction_data:
|
|
extraction_data['data']['ferrero_mapped_platforms'] = parsed_fields.get('platforms', [])
|
|
|
|
# 4. Store in database with full JSON using Box-based filename
|
|
db_result = db.store_creativex_score(
|
|
filename=box_based_filename,
|
|
creativex_id=parsed_fields['id'],
|
|
creativex_url=parsed_fields['url'],
|
|
quality_score=parsed_fields['score'],
|
|
box_file_id=file_id,
|
|
full_extraction_data=extraction_data
|
|
)
|
|
|
|
if not db_result['success']:
|
|
raise Exception("Database storage failed: {}".format(db_result.get('error', 'Unknown')))
|
|
|
|
# 5. Delete file from Box (only after successful storage)
|
|
try:
|
|
box_file = box_client.client.file(file_id)
|
|
box_file.delete()
|
|
logger.info(" Deleted from Box: {}".format(filename))
|
|
except Exception as e:
|
|
logger.warning(" Could not delete file from Box: {}".format(str(e)))
|
|
# Don't fail the whole process if delete fails
|
|
|
|
# 6. Clean up local temp file
|
|
try:
|
|
os.remove(str(temp_file_path))
|
|
except Exception as e:
|
|
logger.warning(" Could not delete temp file: {}".format(str(e)))
|
|
|
|
# Track success with version info
|
|
processed_files.append({
|
|
'filename': box_based_filename,
|
|
'creativex_id': parsed_fields['id'],
|
|
'creativex_url': parsed_fields['url'],
|
|
'quality_score': parsed_fields['score'],
|
|
'box_file_id': file_id,
|
|
'version_number': db_result.get('version_number', 1),
|
|
'is_update': db_result.get('is_update', False)
|
|
})
|
|
|
|
logger.info(" ✓ Success: Score {} extracted and stored (Version {})".format(
|
|
parsed_fields['score'],
|
|
db_result.get('version_number', 1)
|
|
))
|
|
logger.info("")
|
|
|
|
except Exception as e:
|
|
logger.error(" ✗ Failed: {}".format(str(e)))
|
|
logger.info("")
|
|
|
|
failed_files.append({
|
|
'filename': filename,
|
|
'box_file_id': file_id,
|
|
'error': str(e)
|
|
})
|
|
|
|
# Clean up temp file if it exists
|
|
try:
|
|
temp_file_path = temp_dir / filename
|
|
if temp_file_path.exists():
|
|
os.remove(str(temp_file_path))
|
|
except:
|
|
pass
|
|
|
|
# Summary
|
|
total_files = len(pdf_files)
|
|
success_count = len(processed_files)
|
|
failed_count = len(failed_files)
|
|
|
|
logger.info("=" * 60)
|
|
logger.info("Processing Complete")
|
|
logger.info("=" * 60)
|
|
logger.info("Total Files: {}".format(total_files))
|
|
logger.info("Successful: {}".format(success_count))
|
|
logger.info("Failed: {}".format(failed_count))
|
|
logger.info("")
|
|
|
|
# Send email notification
|
|
if failed_count == 0:
|
|
# All successful
|
|
notifier.send_email(
|
|
template_name='creativex_complete',
|
|
recipients=config['notifications']['recipients']['success'],
|
|
data={
|
|
'file_count': total_files,
|
|
'success_count': success_count,
|
|
'processed_files': processed_files
|
|
}
|
|
)
|
|
else:
|
|
# Partial success
|
|
notifier.send_email(
|
|
template_name='creativex_partial',
|
|
recipients=config['notifications']['recipients']['errors'],
|
|
data={
|
|
'file_count': total_files,
|
|
'success_count': success_count,
|
|
'failed_count': failed_count,
|
|
'processed_files': processed_files,
|
|
'failed_files': failed_files
|
|
}
|
|
)
|
|
|
|
return {
|
|
'success': success_count > 0,
|
|
'file_count': total_files,
|
|
'processed': success_count,
|
|
'failed': failed_count
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error("FATAL ERROR: {}".format(str(e)))
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
|
|
def main():
|
|
"""Entry point."""
|
|
try:
|
|
logger.info("Starting CreativeX Score Extraction")
|
|
logger.info("")
|
|
|
|
# Load configuration
|
|
config = load_config('config/config.yaml')
|
|
|
|
# Initialize clients
|
|
logger.info("Initializing clients...")
|
|
|
|
# Box client for CreativeX folder
|
|
box = BoxClient(config, root_folder_id=config['creativex']['box_folder_id'])
|
|
|
|
# Database
|
|
db = Database(config)
|
|
|
|
# Notifier
|
|
notifier = Notifier(config)
|
|
|
|
# CreativeX Extractor
|
|
extractor = CreativeXExtractor(
|
|
api_key=config['creativex']['llama_api_key'],
|
|
agent_name=config['creativex']['agent_name']
|
|
)
|
|
|
|
logger.info("Clients initialized successfully")
|
|
logger.info("")
|
|
|
|
# Process PDFs
|
|
result = process_pdfs(box, db, extractor, notifier, config)
|
|
|
|
if result['success']:
|
|
logger.info("✓ CreativeX extraction completed successfully")
|
|
sys.exit(0)
|
|
else:
|
|
logger.error("✗ CreativeX extraction failed")
|
|
sys.exit(1)
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("\n\nProcess interrupted by user.")
|
|
sys.exit(1)
|
|
except Exception as e:
|
|
logger.error("\nFATAL ERROR: {}".format(str(e)))
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
finally:
|
|
# Close database connections
|
|
try:
|
|
db.close()
|
|
except:
|
|
pass
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|