creative-x-ferrero/scripts/check_status.py
DJP b20119b383 Add complete mapping system and automated Box.com monitoring service
Major Features:
- Complete Ferrero ↔ CreativeX mapping system with 93 brands
- Automated Box.com folder monitoring service
- Email notifications with score breakdowns
- Database integration for result storage

Mapping System (v2.0.0):
- mappings.json: 93 brand mappings, 44+ channel mappings
- core/mapping_resolver.py: Translates Ferrero codes to CreativeX format
- scripts/validate_mappings.py: Validation tool for brand/channel support
- scripts/generate_brand_mappings.py: Auto-mapping tool
- scripts/download_reports.py: Scorecard PDF download tool
- Updated scripts/upload.py: Integrated mapping validation
- Updated scripts/check_status.py: Added detailed score display with guidelines

Documentation:
- Updated README.md: Complete user guide with mapping system
- Updated STATUS.md: Production-ready status with test results
- MAPPINGS_GUIDE.md: Complete mapping documentation
- MAPPING_IMPLEMENTATION.md: Implementation summary
- BRAND_MAPPINGS_REVIEW.md: Brand mapping validation guide
- PRODUCTION_BRANDS_SUMMARY.md: Production brand catalog
- PRODUCTION_MAPPING_COMPLETE.md: Mapping completion summary

Automation Service (New):
- creativex-automation/: Complete automated Box monitoring service
- Monitors Box Ferrero-In folder (363284027140) for new files
- Automatically uploads to CreativeX
- Polls for completion (30 min intervals)
- Extracts scores and stores in PostgreSQL creativex_scores table
- Sends formatted emails to file uploader + daveporter@oliver.agency
- Moves processed files to Processed subfolder

Service Components:
- automation/box_monitor.py: Box folder monitoring with uploader detection
- automation/upload_processor.py: CreativeX upload integration
- automation/status_poller.py: CreativeX status polling
- automation/result_handler.py: Score extraction and email sending
- automation/orchestrator.py: Service coordination
- automation/processing_queue.py: JSON-based processing queue
- service.py: Main service entry point
- config.py: Service configuration loader
- requirements.txt: All dependencies
- deployment/systemd/: Systemd service unit file
- Updated shared/notifier.py: Added creativex_upload_complete and creativex_upload_failed templates

Testing:
- Supports --dry-run mode for configuration testing
- Supports --scan-once mode for Box folder testing
- Manual run mode for development/testing
- Comprehensive logging with rotation (10MB, 28 backups)

Database Integration:
- Uses existing creativex_scores table (no migrations needed)
- Compatible with existing Ferrero-Opentext workflows
- Stores full CreativeX API responses in JSONB

Email Templates:
- Matches Ferrero-Opentext styling (#9c27b0 purple for CreativeX)
- Includes score, tier, guidelines breakdown, scorecard URL
- Recipients: Box uploader + CC to daveporter@oliver.agency

Deployment:
- Runs locally for dev/testing
- Systemd service for production
- Auto-restart on failure
- Complete documentation in creativex-automation/README.md

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-29 09:51:16 -05:00

392 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Check status of CreativeX preflight requests
Usage:
python check_status.py
python check_status.py --file file.mp4
python check_status.py --poll --interval 30
python check_status.py --summary
"""
import argparse
import sys
import time
from pathlib import Path
from datetime import datetime
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from config import load_config
from core.api_client import CreativeXAPIClient, APIError
from utils.state_manager import StateManager
from utils.logger import setup_logger
class StatusChecker:
"""Monitor preflight status and retrieve results"""
def __init__(self, config):
"""Initialize status checker"""
self.config = config
# Setup logger
self.logger = setup_logger(
__name__,
log_dir=config.logs_dir,
log_level=config.log_level
)
# Initialize components
self.api_client = CreativeXAPIClient(
config.api_base_url,
config.access_token,
config.api_max_retries,
config.api_timeout
)
self.state_manager = StateManager(config.state_file)
def check_status(self, filename: str) -> dict:
"""
Check status of single upload
Args:
filename: Filename to check
Returns:
dict: Current status data
"""
upload = self.state_manager.get_upload(filename)
if not upload:
raise ValueError(f"Upload not found: {filename}")
request_id = upload.get('request_id')
if not request_id:
raise ValueError(f"No request_id for {filename}")
self.logger.info(f"Checking status: {filename}")
try:
response = self.api_client.get_preflight_status(request_id)
# Update state
status = response.get('status', 'unknown')
if status == 'completed':
self.state_manager.update_status(
filename,
StateManager.STATUS_COMPLETED,
results=response,
completed_at=datetime.now().isoformat()
)
self.logger.info(f" ✓ Completed")
elif status in ['processing', 'pending']:
self.state_manager.update_status(
filename,
StateManager.STATUS_PROCESSING,
results=response
)
self.logger.info(f" ⟳ Processing")
elif status == 'error':
# Extract error details
error_msg = response.get('error', response.get('errors', 'Unknown error'))
self.logger.error(f" ✗ Error: {error_msg}")
self.state_manager.mark_failed(filename, str(error_msg))
# Print full response for debugging
print(f"\nFull error response:")
import json
print(json.dumps(response, indent=2))
else:
self.logger.info(f" Status: {status}")
return response
except APIError as e:
self.logger.error(f" ✗ Error: {e}")
return {'error': str(e)}
def check_all_processing(self) -> dict:
"""
Check status of all uploads with status: processing or preflight_created
Returns:
dict: Summary of status changes
"""
processing = self.state_manager.get_uploads_by_status(StateManager.STATUS_PROCESSING)
preflight_created = self.state_manager.get_uploads_by_status(StateManager.STATUS_PREFLIGHT_CREATED)
all_uploads = processing + preflight_created
if not all_uploads:
self.logger.info("No uploads in processing state")
return {'checked': 0, 'completed': 0, 'processing': 0}
self.logger.info(f"\nChecking {len(all_uploads)} uploads...")
self.logger.info("-" * 60)
summary = {'checked': 0, 'completed': 0, 'processing': 0, 'errors': 0}
for upload in all_uploads:
filename = upload['filename']
summary['checked'] += 1
try:
result = self.check_status(filename)
if result.get('status') == 'completed':
summary['completed'] += 1
elif result.get('status') in ['processing', 'pending']:
summary['processing'] += 1
elif 'error' in result:
summary['errors'] += 1
except Exception as e:
self.logger.error(f"Error checking {filename}: {e}")
summary['errors'] += 1
return summary
def print_detailed_results(self, upload: dict) -> None:
"""
Print detailed score breakdown for an upload
Args:
upload: Upload dict with results
"""
filename = upload['filename']
results = upload.get('results', {})
print("\n" + "=" * 70)
print(f"📊 {filename}")
print("=" * 70)
if not results or 'creatives' not in results or not results['creatives']:
print("No results available")
return
creative = results['creatives'][0]
# Basic info
print(f"\n📍 Basic Info:")
print(f" Request ID: {results.get('request_id', 'N/A')}")
print(f" Brand: {results.get('brand', {}).get('name', 'N/A')}")
print(f" Market: {results.get('market', {}).get('name', 'N/A')}")
print(f" Channel: {results.get('channel', 'N/A')}")
print(f" Placement: {results.get('placement', 'N/A')}")
print(f" Completed: {results.get('completed_at', 'N/A')}")
# Scorecard URL
scorecard_url = creative.get('scorecard_url', '')
if scorecard_url:
print(f"\n🔗 Scorecard: {scorecard_url}")
# Scores
scores = creative.get('scores', [])
if scores:
print(f"\n📈 Scores:")
for score_obj in scores:
name = score_obj.get('name', 'Unknown')
value = score_obj.get('value', 0)
percentage = value * 100
tier = score_obj.get('creative_quality_tier', 'N/A')
is_default = score_obj.get('default', False)
default_marker = "" if is_default else ""
print(f"\n {name}{default_marker}")
print(f" Score: {percentage:.0f}% | Tier: {tier}")
# Guidelines
guidelines = score_obj.get('guidelines', [])
if guidelines:
print(f" Guidelines:")
for guideline in guidelines:
g_name = guideline.get('name', 'Unknown')
weight = guideline.get('weight', 'N/A')
passed = guideline.get('passed', False)
status_icon = "" if passed else ""
print(f" {status_icon} {g_name} ({weight})")
print("=" * 70)
def print_summary(self) -> None:
"""
Print formatted summary of all uploads
"""
summary = self.state_manager.get_summary()
print("\n" + "=" * 60)
print("PREFLIGHT STATUS SUMMARY")
print("=" * 60)
print(f"Total Uploads: {summary['total']}")
print(f" ✓ Completed: {summary['completed']}")
print(f" ⟳ Processing: {summary['processing']}")
print(f" ⊙ Preflight Created: {summary['preflight_created']}")
print(f" ⏳ Uploading: {summary['uploading']}")
print(f" ⊙ Pending: {summary['pending']}")
print(f" ✗ Failed: {summary['failed']}")
print("=" * 60)
# Show processing uploads
processing = self.state_manager.get_uploads_by_status(StateManager.STATUS_PROCESSING)
if processing:
print("\nProcessing:")
for upload in processing:
filename = upload['filename']
created_at = upload.get('preflight_created_at', '')
print(f"{filename}")
if created_at:
print(f" Created: {created_at}")
# Show completed uploads
completed = self.state_manager.get_uploads_by_status(StateManager.STATUS_COMPLETED)
if completed:
print("\nRecently Completed:")
for upload in completed[:5]: # Show last 5
filename = upload['filename']
results = upload.get('results', {})
# Extract score from nested structure
score = 'N/A'
tier = ''
if results and 'creatives' in results and len(results['creatives']) > 0:
creative = results['creatives'][0]
if 'scores' in creative and len(creative['scores']) > 0:
# Get the default score
for score_obj in creative['scores']:
if score_obj.get('default', False):
score_value = score_obj.get('value', 0)
score = f"{score_value * 100:.0f}%"
tier = score_obj.get('creative_quality_tier', '')
break
# If no default found, use first score
if score == 'N/A' and len(creative['scores']) > 0:
score_value = creative['scores'][0].get('value', 0)
score = f"{score_value * 100:.0f}%"
tier = creative['scores'][0].get('creative_quality_tier', '')
tier_str = f" ({tier})" if tier else ""
print(f"{filename} - Score: {score}{tier_str}")
# Show failed uploads
failed = self.state_manager.get_uploads_by_status(StateManager.STATUS_FAILED)
if failed:
print("\nFailed:")
for upload in failed:
filename = upload['filename']
errors = upload.get('errors', [])
error_msg = errors[-1] if errors else 'Unknown error'
print(f"{filename}")
print(f" Error: {error_msg}")
print("=" * 60 + "\n")
def poll_until_complete(self, interval_minutes: int = 30, max_hours: int = 48) -> None:
"""
Continuously poll for status updates
Args:
interval_minutes: Time between checks
max_hours: Maximum time to wait before giving up
"""
start_time = datetime.now()
max_seconds = max_hours * 3600
interval_seconds = interval_minutes * 60
self.logger.info(f"Starting polling (interval: {interval_minutes}min, max: {max_hours}h)")
try:
while True:
# Check elapsed time
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > max_seconds:
self.logger.info(f"Max wait time ({max_hours}h) exceeded")
break
# Check all processing uploads
summary = self.check_all_processing()
# Print update
self.logger.info("\n" + "-" * 60)
self.logger.info(f"Poll summary: {summary['completed']} completed, "
f"{summary['processing']} still processing")
self.logger.info(f"Elapsed: {elapsed/3600:.1f}h / {max_hours}h")
self.logger.info("-" * 60)
# If nothing processing, we're done
if summary['processing'] == 0:
self.logger.info("All uploads complete!")
break
# Wait for next check
self.logger.info(f"Next check in {interval_minutes} minutes...")
time.sleep(interval_seconds)
except KeyboardInterrupt:
self.logger.info("\nPolling interrupted by user")
self.logger.info("State has been saved. You can resume polling later.")
def main():
"""CLI entry point"""
parser = argparse.ArgumentParser(
description='Check preflight status and retrieve results'
)
parser.add_argument('--file', help='Check specific file')
parser.add_argument('--poll', action='store_true',
help='Continuously poll until complete')
parser.add_argument('--interval', type=int, default=30,
help='Polling interval in minutes (default: 30)')
parser.add_argument('--summary', action='store_true',
help='Show summary only')
parser.add_argument('--detailed', action='store_true',
help='Show detailed scores with guideline breakdown')
args = parser.parse_args()
# Load configuration
try:
config = load_config()
except Exception as e:
print(f"Error loading configuration: {e}")
sys.exit(1)
# Initialize checker
checker = StatusChecker(config)
# Execute based on arguments
if args.detailed:
# Show detailed breakdown for all completed uploads
completed = checker.state_manager.get_uploads_by_status(StateManager.STATUS_COMPLETED)
if completed:
for upload in completed:
checker.print_detailed_results(upload)
else:
print("\nNo completed uploads to show")
elif args.summary:
checker.print_summary()
elif args.file:
try:
result = checker.check_status(args.file)
print(f"\nStatus: {result.get('status', 'unknown')}")
if result.get('status') == 'completed':
print(f"Score: {result.get('score', 'N/A')}")
print(f"Scorecard URL: {result.get('scorecard_url', 'N/A')}")
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
elif args.poll:
checker.poll_until_complete(interval_minutes=args.interval)
else:
# Default: check all and show summary
checker.check_all_processing()
checker.print_summary()
if __name__ == '__main__':
main()