creative-x-ferrero/creativex-automation/core/api_client.py
DJP b20119b383 Add complete mapping system and automated Box.com monitoring service
Major Features:
- Complete Ferrero ↔ CreativeX mapping system with 93 brands
- Automated Box.com folder monitoring service
- Email notifications with score breakdowns
- Database integration for result storage

Mapping System (v2.0.0):
- mappings.json: 93 brand mappings, 44+ channel mappings
- core/mapping_resolver.py: Translates Ferrero codes to CreativeX format
- scripts/validate_mappings.py: Validation tool for brand/channel support
- scripts/generate_brand_mappings.py: Auto-mapping tool
- scripts/download_reports.py: Scorecard PDF download tool
- Updated scripts/upload.py: Integrated mapping validation
- Updated scripts/check_status.py: Added detailed score display with guidelines

Documentation:
- Updated README.md: Complete user guide with mapping system
- Updated STATUS.md: Production-ready status with test results
- MAPPINGS_GUIDE.md: Complete mapping documentation
- MAPPING_IMPLEMENTATION.md: Implementation summary
- BRAND_MAPPINGS_REVIEW.md: Brand mapping validation guide
- PRODUCTION_BRANDS_SUMMARY.md: Production brand catalog
- PRODUCTION_MAPPING_COMPLETE.md: Mapping completion summary

Automation Service (New):
- creativex-automation/: Complete automated Box monitoring service
- Monitors Box Ferrero-In folder (363284027140) for new files
- Automatically uploads to CreativeX
- Polls for completion (30 min intervals)
- Extracts scores and stores in PostgreSQL creativex_scores table
- Sends formatted emails to file uploader + daveporter@oliver.agency
- Moves processed files to Processed subfolder

Service Components:
- automation/box_monitor.py: Box folder monitoring with uploader detection
- automation/upload_processor.py: CreativeX upload integration
- automation/status_poller.py: CreativeX status polling
- automation/result_handler.py: Score extraction and email sending
- automation/orchestrator.py: Service coordination
- automation/processing_queue.py: JSON-based processing queue
- service.py: Main service entry point
- config.py: Service configuration loader
- requirements.txt: All dependencies
- deployment/systemd/: Systemd service unit file
- Updated shared/notifier.py: Added creativex_upload_complete and creativex_upload_failed templates

Testing:
- Supports --dry-run mode for configuration testing
- Supports --scan-once mode for Box folder testing
- Manual run mode for development/testing
- Comprehensive logging with rotation (10MB, 28 backups)

Database Integration:
- Uses existing creativex_scores table (no migrations needed)
- Compatible with existing Ferrero-Opentext workflows
- Stores full CreativeX API responses in JSONB

Email Templates:
- Matches Ferrero-Opentext styling (#9c27b0 purple for CreativeX)
- Includes score, tier, guidelines breakdown, scorecard URL
- Recipients: Box uploader + CC to daveporter@oliver.agency

Deployment:
- Runs locally for dev/testing
- Systemd service for production
- Auto-restart on failure
- Complete documentation in creativex-automation/README.md

Co-Authored-By: Claude Sonnet 4.5 (1M context) <noreply@anthropic.com>
2026-01-29 09:51:16 -05:00

300 lines
9.1 KiB
Python

"""
CreativeX API Client with retry logic and error handling
"""
import requests
import logging
from urllib.parse import urlparse, parse_qs
from typing import Optional, Dict
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class APIError(Exception):
"""Custom exception for API errors"""
pass
class CreativeXAPIClient:
"""
Robust API client for CreativeX Content API v3
"""
def __init__(
self,
base_url: str,
access_token: str,
max_retries: int = 3,
timeout: int = 30
):
"""
Initialize API client
Args:
base_url: API base URL
access_token: Access token for authentication
max_retries: Maximum number of retries
timeout: Request timeout in seconds
"""
self.base_url = base_url.rstrip('/')
self.access_token = access_token
self.max_retries = max_retries
self.timeout = timeout
self.session = self._create_session()
self.logger = logging.getLogger(__name__)
def _create_session(self) -> requests.Session:
"""Create session with retry strategy"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=self.max_retries,
backoff_factor=1, # 1s, 2s, 4s delays
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> Dict:
"""
Centralized request handler with error handling and logging
Args:
method: HTTP method (GET, POST, PUT)
endpoint: API endpoint
**kwargs: Additional arguments for requests
Returns:
dict: Response JSON
Raises:
APIError: If request fails after retries
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# Add access token to params
params = kwargs.get('params', {})
params['access_token'] = self.access_token
kwargs['params'] = params
# Set timeout if not provided
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
self.logger.debug(f"{method} {url}")
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
# Try to parse JSON
try:
return response.json()
except ValueError:
# Not JSON response, return text directly if it looks like a URL
text = response.text.strip()
if text.startswith('http'):
return text
else:
return {'text': text, 'status_code': response.status_code}
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP {e.response.status_code}: {e.response.text}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except requests.exceptions.Timeout as e:
error_msg = f"Request timeout after {self.timeout}s"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except requests.exceptions.ConnectionError as e:
error_msg = f"Connection error: {str(e)}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
def get_presigned_url(self, filename: str) -> str:
"""
Get presigned S3 URL for file upload
Args:
filename: Name of file to upload
Returns:
str: Presigned URL for uploading
Raises:
APIError: If request fails
"""
self.logger.info(f"Getting presigned URL for: {filename}")
# Pass filename as parameter
response = self._make_request('GET', '/presigned_url', params={'filename': filename})
# Handle different response formats
if isinstance(response, str):
# Direct URL string (common format)
if response.startswith('http'):
self.logger.info(f"Got presigned URL: {response[:80]}...")
return response
else:
raise APIError(f"Invalid URL format: {response}")
elif isinstance(response, dict):
# URL wrapped in JSON object
presigned_url = (
response.get('url') or
response.get('presigned_url') or
response.get('upload_url') or
response.get('text') # Sometimes in 'text' field
)
if presigned_url and isinstance(presigned_url, str) and presigned_url.startswith('http'):
self.logger.info(f"Got presigned URL: {presigned_url[:80]}...")
return presigned_url
raise APIError(f"Unexpected response format: {response}")
def upload_file_to_presigned_url(
self,
presigned_url: str,
file_path: str,
content_type: str
) -> None:
"""
Upload file to S3 using presigned URL
Args:
presigned_url: Full S3 presigned URL with query params
file_path: Local path to file
content_type: MIME type (video/mp4, image/jpeg, etc.)
Raises:
APIError: If upload fails
"""
self.logger.info(f"Uploading file: {file_path}")
try:
with open(file_path, 'rb') as f:
headers = {'Content-Type': content_type}
# Use PUT request to upload to S3
response = requests.put(
presigned_url,
data=f,
headers=headers,
timeout=self.timeout * 10 # Longer timeout for uploads
)
response.raise_for_status()
self.logger.info("File uploaded successfully")
except requests.exceptions.HTTPError as e:
error_msg = f"Upload failed: HTTP {e.response.status_code}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except Exception as e:
error_msg = f"Upload failed: {str(e)}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
def create_preflight(self, metadata: dict) -> dict:
"""
Create preflight request
Args:
metadata: dict with keys:
- name: Preflight name (filename)
- brand_name: Brand name (e.g., "NUTELLA")
- market_name: Market name (e.g., "Germany")
- channel: Channel name (e.g., "FB - Stories")
- creatives: List of creative objects with:
- source_url: Clean URL (no query params)
Returns:
dict: Response with request_id, status, etc.
Raises:
APIError: If request fails
"""
# Ensure required fields
required = ['name', 'brand_name', 'market_name', 'channel', 'creatives']
missing = [f for f in required if f not in metadata]
if missing:
raise ValueError(f"Missing required metadata fields: {', '.join(missing)}")
if not metadata['creatives']:
raise ValueError("creatives array cannot be empty")
self.logger.info(f"Creating preflight for: {metadata['name']}")
response = self._make_request('POST', '/preflights', json=metadata)
self.logger.info(f"Preflight created: {response.get('request_id', 'unknown')}")
return response
def get_preflight_status(self, request_id: str) -> dict:
"""
Get preflight status and results
Args:
request_id: Preflight request ID
Returns:
dict: Status, scores, guidelines, scorecard_url, etc.
Raises:
APIError: If request fails
"""
self.logger.debug(f"Checking status for: {request_id}")
response = self._make_request('GET', f'/preflights/{request_id}')
return response
def strip_query_params(self, url: str) -> str:
"""
Remove query parameters from URL
Args:
url: URL with query params
Returns:
str: Clean URL without query params
"""
parsed = urlparse(url)
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return clean_url
def test_connection(self) -> bool:
"""
Test API connection
Returns:
bool: True if connection successful
"""
try:
# Try to get presigned URL as a connection test
self.get_presigned_url("test.mp4")
return True
except Exception as e:
self.logger.error(f"Connection test failed: {e}")
return False