creative-x-ferrero/core/api_client.py
DJP 3f8351dff6 Fix: Correct preflight payload structure
The API expects:
- name, brand_name, market_name, channel at top level
- creatives array with only source_url

Changes:
- Moved name and channel to top level
- Changed asset_url to source_url in creatives
- Updated API client validation to match

This fixes the 'channel is missing, name is missing, source_url is missing' error.
2026-01-09 14:46:41 -05:00

300 lines
9.1 KiB
Python

"""
CreativeX API Client with retry logic and error handling
"""
import requests
import logging
from urllib.parse import urlparse, parse_qs
from typing import Optional, Dict
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class APIError(Exception):
"""Custom exception for API errors"""
pass
class CreativeXAPIClient:
"""
Robust API client for CreativeX Content API v3
"""
def __init__(
self,
base_url: str,
access_token: str,
max_retries: int = 3,
timeout: int = 30
):
"""
Initialize API client
Args:
base_url: API base URL
access_token: Access token for authentication
max_retries: Maximum number of retries
timeout: Request timeout in seconds
"""
self.base_url = base_url.rstrip('/')
self.access_token = access_token
self.max_retries = max_retries
self.timeout = timeout
self.session = self._create_session()
self.logger = logging.getLogger(__name__)
def _create_session(self) -> requests.Session:
"""Create session with retry strategy"""
session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=self.max_retries,
backoff_factor=1, # 1s, 2s, 4s delays
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["GET", "POST", "PUT"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def _make_request(
self,
method: str,
endpoint: str,
**kwargs
) -> Dict:
"""
Centralized request handler with error handling and logging
Args:
method: HTTP method (GET, POST, PUT)
endpoint: API endpoint
**kwargs: Additional arguments for requests
Returns:
dict: Response JSON
Raises:
APIError: If request fails after retries
"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
# Add access token to params
params = kwargs.get('params', {})
params['access_token'] = self.access_token
kwargs['params'] = params
# Set timeout if not provided
if 'timeout' not in kwargs:
kwargs['timeout'] = self.timeout
self.logger.debug(f"{method} {url}")
try:
response = self.session.request(method, url, **kwargs)
response.raise_for_status()
# Try to parse JSON
try:
return response.json()
except ValueError:
# Not JSON response, return text directly if it looks like a URL
text = response.text.strip()
if text.startswith('http'):
return text
else:
return {'text': text, 'status_code': response.status_code}
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP {e.response.status_code}: {e.response.text}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except requests.exceptions.Timeout as e:
error_msg = f"Request timeout after {self.timeout}s"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except requests.exceptions.ConnectionError as e:
error_msg = f"Connection error: {str(e)}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except Exception as e:
error_msg = f"Unexpected error: {str(e)}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
def get_presigned_url(self, filename: str) -> str:
"""
Get presigned S3 URL for file upload
Args:
filename: Name of file to upload
Returns:
str: Presigned URL for uploading
Raises:
APIError: If request fails
"""
self.logger.info(f"Getting presigned URL for: {filename}")
# Pass filename as parameter
response = self._make_request('GET', '/presigned_url', params={'filename': filename})
# Handle different response formats
if isinstance(response, str):
# Direct URL string (common format)
if response.startswith('http'):
self.logger.info(f"Got presigned URL: {response[:80]}...")
return response
else:
raise APIError(f"Invalid URL format: {response}")
elif isinstance(response, dict):
# URL wrapped in JSON object
presigned_url = (
response.get('url') or
response.get('presigned_url') or
response.get('upload_url') or
response.get('text') # Sometimes in 'text' field
)
if presigned_url and isinstance(presigned_url, str) and presigned_url.startswith('http'):
self.logger.info(f"Got presigned URL: {presigned_url[:80]}...")
return presigned_url
raise APIError(f"Unexpected response format: {response}")
def upload_file_to_presigned_url(
self,
presigned_url: str,
file_path: str,
content_type: str
) -> None:
"""
Upload file to S3 using presigned URL
Args:
presigned_url: Full S3 presigned URL with query params
file_path: Local path to file
content_type: MIME type (video/mp4, image/jpeg, etc.)
Raises:
APIError: If upload fails
"""
self.logger.info(f"Uploading file: {file_path}")
try:
with open(file_path, 'rb') as f:
headers = {'Content-Type': content_type}
# Use PUT request to upload to S3
response = requests.put(
presigned_url,
data=f,
headers=headers,
timeout=self.timeout * 10 # Longer timeout for uploads
)
response.raise_for_status()
self.logger.info("File uploaded successfully")
except requests.exceptions.HTTPError as e:
error_msg = f"Upload failed: HTTP {e.response.status_code}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
except Exception as e:
error_msg = f"Upload failed: {str(e)}"
self.logger.error(error_msg)
raise APIError(error_msg) from e
def create_preflight(self, metadata: dict) -> dict:
"""
Create preflight request
Args:
metadata: dict with keys:
- name: Preflight name (filename)
- brand_name: Brand name (e.g., "NUTELLA")
- market_name: Market name (e.g., "Germany")
- channel: Channel name (e.g., "FB - Stories")
- creatives: List of creative objects with:
- source_url: Clean URL (no query params)
Returns:
dict: Response with request_id, status, etc.
Raises:
APIError: If request fails
"""
# Ensure required fields
required = ['name', 'brand_name', 'market_name', 'channel', 'creatives']
missing = [f for f in required if f not in metadata]
if missing:
raise ValueError(f"Missing required metadata fields: {', '.join(missing)}")
if not metadata['creatives']:
raise ValueError("creatives array cannot be empty")
self.logger.info(f"Creating preflight for: {metadata['name']}")
response = self._make_request('POST', '/preflights', json=metadata)
self.logger.info(f"Preflight created: {response.get('request_id', 'unknown')}")
return response
def get_preflight_status(self, request_id: str) -> dict:
"""
Get preflight status and results
Args:
request_id: Preflight request ID
Returns:
dict: Status, scores, guidelines, scorecard_url, etc.
Raises:
APIError: If request fails
"""
self.logger.debug(f"Checking status for: {request_id}")
response = self._make_request('GET', f'/preflights/{request_id}')
return response
def strip_query_params(self, url: str) -> str:
"""
Remove query parameters from URL
Args:
url: URL with query params
Returns:
str: Clean URL without query params
"""
parsed = urlparse(url)
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
return clean_url
def test_connection(self) -> bool:
"""
Test API connection
Returns:
bool: True if connection successful
"""
try:
# Try to get presigned URL as a connection test
self.get_presigned_url("test.mp4")
return True
except Exception as e:
self.logger.error(f"Connection test failed: {e}")
return False