The API expects: - name, brand_name, market_name, channel at top level - creatives array with only source_url Changes: - Moved name and channel to top level - Changed asset_url to source_url in creatives - Updated API client validation to match This fixes the 'channel is missing, name is missing, source_url is missing' error.
300 lines
9.1 KiB
Python
300 lines
9.1 KiB
Python
"""
|
|
CreativeX API Client with retry logic and error handling
|
|
"""
|
|
|
|
import requests
|
|
import logging
|
|
from urllib.parse import urlparse, parse_qs
|
|
from typing import Optional, Dict
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
|
|
|
|
class APIError(Exception):
|
|
"""Custom exception for API errors"""
|
|
pass
|
|
|
|
|
|
class CreativeXAPIClient:
|
|
"""
|
|
Robust API client for CreativeX Content API v3
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
base_url: str,
|
|
access_token: str,
|
|
max_retries: int = 3,
|
|
timeout: int = 30
|
|
):
|
|
"""
|
|
Initialize API client
|
|
|
|
Args:
|
|
base_url: API base URL
|
|
access_token: Access token for authentication
|
|
max_retries: Maximum number of retries
|
|
timeout: Request timeout in seconds
|
|
"""
|
|
self.base_url = base_url.rstrip('/')
|
|
self.access_token = access_token
|
|
self.max_retries = max_retries
|
|
self.timeout = timeout
|
|
self.session = self._create_session()
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def _create_session(self) -> requests.Session:
|
|
"""Create session with retry strategy"""
|
|
session = requests.Session()
|
|
|
|
# Configure retry strategy
|
|
retry_strategy = Retry(
|
|
total=self.max_retries,
|
|
backoff_factor=1, # 1s, 2s, 4s delays
|
|
status_forcelist=[429, 500, 502, 503, 504],
|
|
allowed_methods=["GET", "POST", "PUT"]
|
|
)
|
|
|
|
adapter = HTTPAdapter(max_retries=retry_strategy)
|
|
session.mount("http://", adapter)
|
|
session.mount("https://", adapter)
|
|
|
|
return session
|
|
|
|
def _make_request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
**kwargs
|
|
) -> Dict:
|
|
"""
|
|
Centralized request handler with error handling and logging
|
|
|
|
Args:
|
|
method: HTTP method (GET, POST, PUT)
|
|
endpoint: API endpoint
|
|
**kwargs: Additional arguments for requests
|
|
|
|
Returns:
|
|
dict: Response JSON
|
|
|
|
Raises:
|
|
APIError: If request fails after retries
|
|
"""
|
|
url = f"{self.base_url}/{endpoint.lstrip('/')}"
|
|
|
|
# Add access token to params
|
|
params = kwargs.get('params', {})
|
|
params['access_token'] = self.access_token
|
|
kwargs['params'] = params
|
|
|
|
# Set timeout if not provided
|
|
if 'timeout' not in kwargs:
|
|
kwargs['timeout'] = self.timeout
|
|
|
|
self.logger.debug(f"{method} {url}")
|
|
|
|
try:
|
|
response = self.session.request(method, url, **kwargs)
|
|
response.raise_for_status()
|
|
|
|
# Try to parse JSON
|
|
try:
|
|
return response.json()
|
|
except ValueError:
|
|
# Not JSON response, return text directly if it looks like a URL
|
|
text = response.text.strip()
|
|
if text.startswith('http'):
|
|
return text
|
|
else:
|
|
return {'text': text, 'status_code': response.status_code}
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
error_msg = f"HTTP {e.response.status_code}: {e.response.text}"
|
|
self.logger.error(error_msg)
|
|
raise APIError(error_msg) from e
|
|
|
|
except requests.exceptions.Timeout as e:
|
|
error_msg = f"Request timeout after {self.timeout}s"
|
|
self.logger.error(error_msg)
|
|
raise APIError(error_msg) from e
|
|
|
|
except requests.exceptions.ConnectionError as e:
|
|
error_msg = f"Connection error: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
raise APIError(error_msg) from e
|
|
|
|
except Exception as e:
|
|
error_msg = f"Unexpected error: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
raise APIError(error_msg) from e
|
|
|
|
def get_presigned_url(self, filename: str) -> str:
|
|
"""
|
|
Get presigned S3 URL for file upload
|
|
|
|
Args:
|
|
filename: Name of file to upload
|
|
|
|
Returns:
|
|
str: Presigned URL for uploading
|
|
|
|
Raises:
|
|
APIError: If request fails
|
|
"""
|
|
self.logger.info(f"Getting presigned URL for: {filename}")
|
|
|
|
# Pass filename as parameter
|
|
response = self._make_request('GET', '/presigned_url', params={'filename': filename})
|
|
|
|
# Handle different response formats
|
|
if isinstance(response, str):
|
|
# Direct URL string (common format)
|
|
if response.startswith('http'):
|
|
self.logger.info(f"Got presigned URL: {response[:80]}...")
|
|
return response
|
|
else:
|
|
raise APIError(f"Invalid URL format: {response}")
|
|
|
|
elif isinstance(response, dict):
|
|
# URL wrapped in JSON object
|
|
presigned_url = (
|
|
response.get('url') or
|
|
response.get('presigned_url') or
|
|
response.get('upload_url') or
|
|
response.get('text') # Sometimes in 'text' field
|
|
)
|
|
|
|
if presigned_url and isinstance(presigned_url, str) and presigned_url.startswith('http'):
|
|
self.logger.info(f"Got presigned URL: {presigned_url[:80]}...")
|
|
return presigned_url
|
|
|
|
raise APIError(f"Unexpected response format: {response}")
|
|
|
|
def upload_file_to_presigned_url(
|
|
self,
|
|
presigned_url: str,
|
|
file_path: str,
|
|
content_type: str
|
|
) -> None:
|
|
"""
|
|
Upload file to S3 using presigned URL
|
|
|
|
Args:
|
|
presigned_url: Full S3 presigned URL with query params
|
|
file_path: Local path to file
|
|
content_type: MIME type (video/mp4, image/jpeg, etc.)
|
|
|
|
Raises:
|
|
APIError: If upload fails
|
|
"""
|
|
self.logger.info(f"Uploading file: {file_path}")
|
|
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
headers = {'Content-Type': content_type}
|
|
|
|
# Use PUT request to upload to S3
|
|
response = requests.put(
|
|
presigned_url,
|
|
data=f,
|
|
headers=headers,
|
|
timeout=self.timeout * 10 # Longer timeout for uploads
|
|
)
|
|
|
|
response.raise_for_status()
|
|
self.logger.info("File uploaded successfully")
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
error_msg = f"Upload failed: HTTP {e.response.status_code}"
|
|
self.logger.error(error_msg)
|
|
raise APIError(error_msg) from e
|
|
|
|
except Exception as e:
|
|
error_msg = f"Upload failed: {str(e)}"
|
|
self.logger.error(error_msg)
|
|
raise APIError(error_msg) from e
|
|
|
|
def create_preflight(self, metadata: dict) -> dict:
|
|
"""
|
|
Create preflight request
|
|
|
|
Args:
|
|
metadata: dict with keys:
|
|
- name: Preflight name (filename)
|
|
- brand_name: Brand name (e.g., "NUTELLA")
|
|
- market_name: Market name (e.g., "Germany")
|
|
- channel: Channel name (e.g., "FB - Stories")
|
|
- creatives: List of creative objects with:
|
|
- source_url: Clean URL (no query params)
|
|
|
|
Returns:
|
|
dict: Response with request_id, status, etc.
|
|
|
|
Raises:
|
|
APIError: If request fails
|
|
"""
|
|
# Ensure required fields
|
|
required = ['name', 'brand_name', 'market_name', 'channel', 'creatives']
|
|
missing = [f for f in required if f not in metadata]
|
|
if missing:
|
|
raise ValueError(f"Missing required metadata fields: {', '.join(missing)}")
|
|
|
|
if not metadata['creatives']:
|
|
raise ValueError("creatives array cannot be empty")
|
|
|
|
self.logger.info(f"Creating preflight for: {metadata['name']}")
|
|
|
|
response = self._make_request('POST', '/preflights', json=metadata)
|
|
|
|
self.logger.info(f"Preflight created: {response.get('request_id', 'unknown')}")
|
|
|
|
return response
|
|
|
|
def get_preflight_status(self, request_id: str) -> dict:
|
|
"""
|
|
Get preflight status and results
|
|
|
|
Args:
|
|
request_id: Preflight request ID
|
|
|
|
Returns:
|
|
dict: Status, scores, guidelines, scorecard_url, etc.
|
|
|
|
Raises:
|
|
APIError: If request fails
|
|
"""
|
|
self.logger.debug(f"Checking status for: {request_id}")
|
|
|
|
response = self._make_request('GET', f'/preflights/{request_id}')
|
|
|
|
return response
|
|
|
|
def strip_query_params(self, url: str) -> str:
|
|
"""
|
|
Remove query parameters from URL
|
|
|
|
Args:
|
|
url: URL with query params
|
|
|
|
Returns:
|
|
str: Clean URL without query params
|
|
"""
|
|
parsed = urlparse(url)
|
|
clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
|
|
return clean_url
|
|
|
|
def test_connection(self) -> bool:
|
|
"""
|
|
Test API connection
|
|
|
|
Returns:
|
|
bool: True if connection successful
|
|
"""
|
|
try:
|
|
# Try to get presigned URL as a connection test
|
|
self.get_presigned_url("test.mp4")
|
|
return True
|
|
except Exception as e:
|
|
self.logger.error(f"Connection test failed: {e}")
|
|
return False
|