#!/usr/bin/env python3 """ CreativeX Score Extractor and Storage Processes PDFs from Box folder 350605024645, extracts CreativeX scores using LlamaExtract, stores results in database, and removes processed files from Box. Compatible with Python 3.6+ """ import sys import os import logging from datetime import datetime from pathlib import Path # Add shared library to path sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) from shared.config_loader import load_config from shared.box_client import BoxClient from shared.database import Database from shared.notifier import Notifier # Setup logging with rotation from logging.handlers import RotatingFileHandler # Create logs directory if it doesn't exist os.makedirs('logs', exist_ok=True) # Configure logging with rotation log_handler = RotatingFileHandler( 'logs/creativex_scoring.log', maxBytes=10*1024*1024, # 10MB per file backupCount=28 # Keep 28 rotated files (approximately 1 month) ) log_handler.setLevel(logging.INFO) log_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.basicConfig( level=logging.INFO, handlers=[log_handler, console_handler] ) logger = logging.getLogger('CreativeXScoring') class CreativeXExtractor: """Handles extraction of CreativeX data from PDF files using LlamaExtract.""" def __init__(self, api_key, agent_name, config=None): """ Initialize the Llama Extract client. Args: api_key: LlamaCloud API key agent_name: Agent name in LlamaExtract config: Configuration dict (optional, for loading mappings) """ try: from llama_cloud_services import LlamaExtract self.extractor = LlamaExtract(api_key=api_key) self.agent_name = agent_name logger.info("LlamaExtract client initialized with agent: {}".format(agent_name)) # Load mappings self.platform_mappings = self._load_mappings() except ImportError: logger.error("llama-cloud-services not installed. Run: pip install llama-cloud-services") raise except Exception as e: logger.error("Failed to initialize LlamaExtract: {}".format(str(e))) raise def _load_mappings(self): """Load Channel/Placement -> Platform mappings from CSV""" mappings = {} csv_path = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'DAM- CX mappings.csv') try: import csv if not os.path.exists(csv_path): logger.warning("Mapping file not found: {}".format(csv_path)) return {} with open(csv_path, 'r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: # Key is (Channel, Placement) # Handle empty placement as empty string channel = row.get('Value Lists on Creative X', '').strip() placement = row.get(reader.fieldnames[1], '').strip() # 2nd column is Placement platform = row.get('Value List on DAM', '').strip() if channel: key = (channel.lower(), placement.lower()) mappings[key] = platform logger.info("Loaded {} platform mappings".format(len(mappings))) return mappings except Exception as e: logger.error("Failed to load mappings: {}".format(str(e))) return {} def get_mapped_platforms(self, channel, placements): """ Map Channel + Placements to list of DAM Platforms Args: channel: Channel string placements: List of placement strings or None Returns: List of unique Mapped Platform strings """ platforms = set() if not channel: return [] channel_key = channel.strip().lower() # If placements exist, try to match each (Channel, Placement) if placements and isinstance(placements, list): for placement in placements: placement_key = placement.strip().lower() key = (channel_key, placement_key) if key in self.platform_mappings: platforms.add(self.platform_mappings[key]) # If no specific placements matched, try generic channel match if not platforms: # Try exact match with empty placement (Channel, "") key_empty = (channel_key, "") if key_empty in self.platform_mappings: platforms.add(self.platform_mappings[key_empty]) # Fallback: Try to find any match for this channel if not platforms: for (c, p), platform in self.platform_mappings.items(): if c == channel_key: platforms.add(platform) # If we find one generic match, maybe that's enough? # Or should we collect all? Usually generic match implies one platform. break return list(platforms) def extract_from_file(self, file_path): """ Extract data from a PDF file using Llama Extract. Args: file_path: Path to the PDF file Returns: Dictionary containing the extraction result, or None if extraction fails """ try: logger.info(" Getting agent: {}".format(self.agent_name)) agent = self.extractor.get_agent(name=self.agent_name) if agent is None: raise Exception("Agent '{}' not found".format(self.agent_name)) logger.info(" Running extraction on: {}".format(os.path.basename(file_path))) result = agent.extract(str(file_path)) # Convert result to dictionary format extraction_data = { "run_id": getattr(result, "run_id", None), "extraction_agent_id": getattr(result, "extraction_agent_id", None), "data": result.data if hasattr(result, "data") else {}, "extraction_metadata": getattr(result, "extraction_metadata", {}) } return extraction_data except Exception as e: logger.error(" ERROR: Extraction failed - {}".format(str(e))) return None def parse_csv_fields(self, extraction_data): """ Parse specific fields for database storage. Expected fields from schema: - filename - creativeXId.id - creativeXId.url - ferreroCreativeQuality.percentage - channel - placements (array) Args: extraction_data: Full extraction result dictionary Returns: Dictionary with parsed fields, or None if required fields are missing """ try: data = extraction_data.get("data", {}) # Extract filename filename = data.get("filename", "") # Extract creativeXId fields creative_x_id_obj = data.get("creativeXId", {}) creative_x_id_raw = creative_x_id_obj.get("id", "") if isinstance(creative_x_id_obj, dict) else "" creative_x_url = creative_x_id_obj.get("url", "") if isinstance(creative_x_id_obj, dict) else "" # Extract ferreroCreativeQuality percentage ferrero_quality_obj = data.get("ferreroCreativeQuality", {}) quality_score_raw = ferrero_quality_obj.get("percentage", "") if isinstance(ferrero_quality_obj, dict) else "" # Extract Channel and Placements channel = data.get("channel", "") placements = data.get("placements", []) # Map to Platforms (List) platforms = self.get_mapped_platforms(channel, placements) if platforms: logger.info(" Mapped Platforms: {} (from Channel: {}, Placements: {})".format( platforms, channel, placements )) else: logger.warning(" Could not map Platform for Channel: {}, Placements: {}".format( channel, placements )) # Fallback to Channel name if no mapping found platforms = [channel] if channel else [] # Clean up numeric values - remove .0 decimal # Convert to string and remove .0 if present creative_x_id = str(int(float(creative_x_id_raw))) if creative_x_id_raw else "" quality_score = str(int(float(quality_score_raw))) if quality_score_raw else "" # Validate that we have the critical fields if not filename: logger.warning(" WARNING: filename field is missing from extraction data") return { "filename": filename, "id": creative_x_id, "url": creative_x_url, "score": quality_score, "platforms": platforms, "channel": channel, "placements": placements } except Exception as e: logger.error(" ERROR: Failed to parse CSV fields - {}".format(str(e))) return None def process_pdfs(box_client, db, extractor, notifier, config): """ Process all PDFs in the CreativeX Box folder. Args: box_client: BoxClient instance db: Database instance extractor: CreativeXExtractor instance notifier: Notifier instance config: Configuration dict Returns: dict with processing results """ creativex_folder_id = config['creativex']['box_folder_id'] logger.info("=" * 60) logger.info("CreativeX Score Extraction") logger.info("=" * 60) logger.info("Box Folder ID: {}".format(creativex_folder_id)) logger.info("") try: # List all PDF files in Box folder files = box_client.list_folder_files(creativex_folder_id) pdf_files = [f for f in files if f['name'].lower().endswith('.pdf')] if not pdf_files: logger.info("No PDF files found in Box folder - this is normal when folder is empty") logger.info("Script completed successfully with no files to process") # No email sent when no files found (normal operation) return {'success': True, 'file_count': 0, 'processed': 0, 'failed': 0} logger.info("Found {} PDF file(s) to process".format(len(pdf_files))) logger.info("") # Create temp directory temp_dir = Path('temp/creativex') temp_dir.mkdir(parents=True, exist_ok=True) # Track results processed_files = [] failed_files = [] # Process each PDF for idx, file_info in enumerate(pdf_files, 1): file_id = file_info['id'] filename = file_info['name'] logger.info("[{}/{}] Processing: {}".format(idx, len(pdf_files), filename)) try: # 1. Download PDF from Box temp_file_path = temp_dir / filename box_client.download_file(file_id, str(temp_file_path)) # 2. Extract data using LlamaExtract extraction_data = extractor.extract_from_file(str(temp_file_path)) if extraction_data is None: raise Exception("Extraction returned None") # 3. Parse fields parsed_fields = extractor.parse_csv_fields(extraction_data) if not parsed_fields: raise Exception("Failed to parse extraction fields") # Use Box PDF filename (convert .pdf to .jpg) instead of extracted filename # This ensures the filename matches the actual JPG file that will be uploaded box_based_filename = filename.replace('.pdf', '.jpg') if filename.endswith('.pdf') else filename # Log if there's a mismatch between Box filename and extracted filename if parsed_fields['filename'] and parsed_fields['filename'] != box_based_filename: logger.warning(" Filename mismatch detected:") logger.warning(" Box PDF filename: {}".format(box_based_filename)) logger.warning(" Extracted from PDF: {}".format(parsed_fields['filename'])) logger.warning(" Using Box filename to ensure database lookup works") # Inject mapped platforms into extraction_data for storage if 'data' in extraction_data: extraction_data['data']['ferrero_mapped_platforms'] = parsed_fields.get('platforms', []) # 4. Store in database with full JSON using Box-based filename db_result = db.store_creativex_score( filename=box_based_filename, creativex_id=parsed_fields['id'], creativex_url=parsed_fields['url'], quality_score=parsed_fields['score'], box_file_id=file_id, full_extraction_data=extraction_data ) if not db_result['success']: raise Exception("Database storage failed: {}".format(db_result.get('error', 'Unknown'))) # 5. Delete file from Box (only after successful storage) try: box_file = box_client.client.file(file_id) box_file.delete() logger.info(" Deleted from Box: {}".format(filename)) except Exception as e: logger.warning(" Could not delete file from Box: {}".format(str(e))) # Don't fail the whole process if delete fails # 6. Clean up local temp file try: os.remove(str(temp_file_path)) except Exception as e: logger.warning(" Could not delete temp file: {}".format(str(e))) # Track success with version info processed_files.append({ 'filename': box_based_filename, 'creativex_id': parsed_fields['id'], 'creativex_url': parsed_fields['url'], 'quality_score': parsed_fields['score'], 'box_file_id': file_id, 'version_number': db_result.get('version_number', 1), 'is_update': db_result.get('is_update', False) }) logger.info(" ✓ Success: Score {} extracted and stored (Version {})".format( parsed_fields['score'], db_result.get('version_number', 1) )) logger.info("") except Exception as e: logger.error(" ✗ Failed: {}".format(str(e))) logger.info("") failed_files.append({ 'filename': filename, 'box_file_id': file_id, 'error': str(e) }) # Clean up temp file if it exists try: temp_file_path = temp_dir / filename if temp_file_path.exists(): os.remove(str(temp_file_path)) except: pass # Summary total_files = len(pdf_files) success_count = len(processed_files) failed_count = len(failed_files) logger.info("=" * 60) logger.info("Processing Complete") logger.info("=" * 60) logger.info("Total Files: {}".format(total_files)) logger.info("Successful: {}".format(success_count)) logger.info("Failed: {}".format(failed_count)) logger.info("") # Send email notification if failed_count == 0: # All successful notifier.send_email( template_name='creativex_complete', recipients=config['notifications']['recipients']['success'], data={ 'file_count': total_files, 'success_count': success_count, 'processed_files': processed_files } ) else: # Partial success notifier.send_email( template_name='creativex_partial', recipients=config['notifications']['recipients']['errors'], data={ 'file_count': total_files, 'success_count': success_count, 'failed_count': failed_count, 'processed_files': processed_files, 'failed_files': failed_files } ) return { 'success': success_count > 0, 'file_count': total_files, 'processed': success_count, 'failed': failed_count } except Exception as e: logger.error("FATAL ERROR: {}".format(str(e))) return {'success': False, 'error': str(e)} def main(): """Entry point.""" try: logger.info("Starting CreativeX Score Extraction") logger.info("") # Load configuration config = load_config('config/config.yaml') # Initialize clients logger.info("Initializing clients...") # Box client for CreativeX folder box = BoxClient(config, root_folder_id=config['creativex']['box_folder_id']) # Database db = Database(config) # Notifier notifier = Notifier(config) # CreativeX Extractor extractor = CreativeXExtractor( api_key=config['creativex']['llama_api_key'], agent_name=config['creativex']['agent_name'] ) logger.info("Clients initialized successfully") logger.info("") # Process PDFs result = process_pdfs(box, db, extractor, notifier, config) if result['success']: logger.info("✓ CreativeX extraction completed successfully") sys.exit(0) else: logger.error("✗ CreativeX extraction failed") sys.exit(1) except KeyboardInterrupt: logger.info("\n\nProcess interrupted by user.") sys.exit(1) except Exception as e: logger.error("\nFATAL ERROR: {}".format(str(e))) import traceback traceback.print_exc() sys.exit(1) finally: # Close database connections try: db.close() except: pass if __name__ == "__main__": main()