""" CreativeX API Client with retry logic and error handling """ import requests import logging from urllib.parse import urlparse, parse_qs from typing import Optional, Dict from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry class APIError(Exception): """Custom exception for API errors""" pass class CreativeXAPIClient: """ Robust API client for CreativeX Content API v3 """ def __init__( self, base_url: str, access_token: str, max_retries: int = 3, timeout: int = 30 ): """ Initialize API client Args: base_url: API base URL access_token: Access token for authentication max_retries: Maximum number of retries timeout: Request timeout in seconds """ self.base_url = base_url.rstrip('/') self.access_token = access_token self.max_retries = max_retries self.timeout = timeout self.session = self._create_session() self.logger = logging.getLogger(__name__) def _create_session(self) -> requests.Session: """Create session with retry strategy""" session = requests.Session() # Configure retry strategy retry_strategy = Retry( total=self.max_retries, backoff_factor=1, # 1s, 2s, 4s delays status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["GET", "POST", "PUT"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("http://", adapter) session.mount("https://", adapter) return session def _make_request( self, method: str, endpoint: str, **kwargs ) -> Dict: """ Centralized request handler with error handling and logging Args: method: HTTP method (GET, POST, PUT) endpoint: API endpoint **kwargs: Additional arguments for requests Returns: dict: Response JSON Raises: APIError: If request fails after retries """ url = f"{self.base_url}/{endpoint.lstrip('/')}" # Add access token to params params = kwargs.get('params', {}) params['access_token'] = self.access_token kwargs['params'] = params # Set timeout if not provided if 'timeout' not in kwargs: kwargs['timeout'] = self.timeout self.logger.debug(f"{method} {url}") try: response = self.session.request(method, url, **kwargs) response.raise_for_status() # Try to parse JSON try: return response.json() except ValueError: # Not JSON response, return text directly if it looks like a URL text = response.text.strip() if text.startswith('http'): return text else: return {'text': text, 'status_code': response.status_code} except requests.exceptions.HTTPError as e: error_msg = f"HTTP {e.response.status_code}: {e.response.text}" self.logger.error(error_msg) raise APIError(error_msg) from e except requests.exceptions.Timeout as e: error_msg = f"Request timeout after {self.timeout}s" self.logger.error(error_msg) raise APIError(error_msg) from e except requests.exceptions.ConnectionError as e: error_msg = f"Connection error: {str(e)}" self.logger.error(error_msg) raise APIError(error_msg) from e except Exception as e: error_msg = f"Unexpected error: {str(e)}" self.logger.error(error_msg) raise APIError(error_msg) from e def get_presigned_url(self, filename: str) -> str: """ Get presigned S3 URL for file upload Args: filename: Name of file to upload Returns: str: Presigned URL for uploading Raises: APIError: If request fails """ self.logger.info(f"Getting presigned URL for: {filename}") # Pass filename as parameter response = self._make_request('GET', '/presigned_url', params={'filename': filename}) # Handle different response formats if isinstance(response, str): # Direct URL string (common format) if response.startswith('http'): self.logger.info(f"Got presigned URL: {response[:80]}...") return response else: raise APIError(f"Invalid URL format: {response}") elif isinstance(response, dict): # URL wrapped in JSON object presigned_url = ( response.get('url') or response.get('presigned_url') or response.get('upload_url') or response.get('text') # Sometimes in 'text' field ) if presigned_url and isinstance(presigned_url, str) and presigned_url.startswith('http'): self.logger.info(f"Got presigned URL: {presigned_url[:80]}...") return presigned_url raise APIError(f"Unexpected response format: {response}") def upload_file_to_presigned_url( self, presigned_url: str, file_path: str, content_type: str ) -> None: """ Upload file to S3 using presigned URL Args: presigned_url: Full S3 presigned URL with query params file_path: Local path to file content_type: MIME type (video/mp4, image/jpeg, etc.) Raises: APIError: If upload fails """ self.logger.info(f"Uploading file: {file_path}") try: with open(file_path, 'rb') as f: headers = {'Content-Type': content_type} # Use PUT request to upload to S3 response = requests.put( presigned_url, data=f, headers=headers, timeout=self.timeout * 10 # Longer timeout for uploads ) response.raise_for_status() self.logger.info("File uploaded successfully") except requests.exceptions.HTTPError as e: error_msg = f"Upload failed: HTTP {e.response.status_code}" self.logger.error(error_msg) raise APIError(error_msg) from e except Exception as e: error_msg = f"Upload failed: {str(e)}" self.logger.error(error_msg) raise APIError(error_msg) from e def create_preflight(self, metadata: dict) -> dict: """ Create preflight request Args: metadata: dict with keys: - name: Preflight name (filename) - brand_name: Brand name (e.g., "NUTELLA") - market_name: Market name (e.g., "Germany") - channel: Channel name (e.g., "FB - Stories") - creatives: List of creative objects with: - source_url: Clean URL (no query params) Returns: dict: Response with request_id, status, etc. Raises: APIError: If request fails """ # Ensure required fields required = ['name', 'brand_name', 'market_name', 'channel', 'creatives'] missing = [f for f in required if f not in metadata] if missing: raise ValueError(f"Missing required metadata fields: {', '.join(missing)}") if not metadata['creatives']: raise ValueError("creatives array cannot be empty") self.logger.info(f"Creating preflight for: {metadata['name']}") response = self._make_request('POST', '/preflights', json=metadata) self.logger.info(f"Preflight created: {response.get('request_id', 'unknown')}") return response def get_preflight_status(self, request_id: str) -> dict: """ Get preflight status and results Args: request_id: Preflight request ID Returns: dict: Status, scores, guidelines, scorecard_url, etc. Raises: APIError: If request fails """ self.logger.debug(f"Checking status for: {request_id}") response = self._make_request('GET', f'/preflights/{request_id}') return response def strip_query_params(self, url: str) -> str: """ Remove query parameters from URL Args: url: URL with query params Returns: str: Clean URL without query params """ parsed = urlparse(url) clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}" return clean_url def test_connection(self) -> bool: """ Test API connection Returns: bool: True if connection successful """ try: # Try to get presigned URL as a connection test self.get_presigned_url("test.mp4") return True except Exception as e: self.logger.error(f"Connection test failed: {e}") return False