"""Box API client for retrieving QC reports.""" import logging import os from typing import List, Dict, Optional from boxsdk import JWTAuth, Client from boxsdk.exception import BoxAPIException logger = logging.getLogger(__name__) class BoxReportClient: """Client for interacting with Box API to retrieve QC reports.""" def __init__(self, config_path: str, report_folder_id: str): """ Initialize Box client. Args: config_path: Path to Box JWT config JSON file report_folder_id: Box folder ID containing reports """ self.report_folder_id = report_folder_id self.client = self._authenticate(config_path) def _authenticate(self, config_path: str) -> Client: """Authenticate with Box using JWT.""" try: if not os.path.exists(config_path): raise FileNotFoundError( f"Box config file not found at {config_path}. " f"Please add your box_config.json file to the config directory." ) auth = JWTAuth.from_settings_file(config_path) client = Client(auth) # Test connection user = client.user().get() logger.info(f"Authenticated as Box user: {user.name}") return client except Exception as e: logger.error(f"Failed to authenticate with Box: {e}") raise def search_by_job_number(self, job_number: str) -> List[Dict]: """ Search for reports by job number. This searches for: 1. A folder named with the job number in CAMPAIGNS 2. A QC subfolder within the job folder 3. HTML files in the QC subfolder 4. Falls back to searching files by name if folder structure not found Folder structure: CAMPAIGNS/{JobNumber}/QC/*.html Args: job_number: Job/reference number to search for Returns: List of dictionaries with report information """ reports = [] try: # First, try to find a folder with the job number folder = self._find_job_folder(job_number) if folder: logger.info(f"Found job folder: {folder['name']} (ID: {folder['id']})") reports = self._get_html_reports_from_folder(folder['id']) else: # No dedicated folder, search files in main report folder logger.info(f"No job folder found, searching files for '{job_number}'") reports = self._search_files_by_job_number(job_number) logger.info(f"Found {len(reports)} reports for job number '{job_number}'") return reports except BoxAPIException as e: logger.error(f"Box API error while searching for job {job_number}: {e}") raise except Exception as e: logger.error(f"Error searching for job {job_number}: {e}") raise def _find_job_folder(self, job_number: str) -> Optional[Dict]: """ Look for a subfolder matching the job number, then navigate to QC subfolder. New structure: CAMPAIGNS/{JobNumber}/QC/ Args: job_number: Job number to find Returns: Dictionary with QC folder info or None if not found """ try: # Step 1: Find job number folder in CAMPAIGNS # Note: Box SDK automatically handles pagination when iterating campaigns_folder = self.client.folder(self.report_folder_id) job_folder_id = None # Iterate through all items - SDK handles pagination automatically items = campaigns_folder.get_items(limit=1000) for item in items: if item.type == 'folder' and item.name == job_number: job_folder_id = item.id logger.info(f"Found job folder '{job_number}' (ID: {job_folder_id})") break if not job_folder_id: logger.info(f"Job folder '{job_number}' not found in CAMPAIGNS") return None # Step 2: Look for QC subfolder inside job folder job_folder = self.client.folder(job_folder_id).get() qc_folder_items = job_folder.get_items(limit=100) for item in qc_folder_items: if item.type == 'folder' and item.name == 'QC': logger.info(f"Found QC subfolder (ID: {item.id})") return { 'id': item.id, 'name': f"{job_number}/QC" } # Job folder exists but no QC subfolder logger.warning(f"Job folder '{job_number}' found but no QC subfolder exists") return None except Exception as e: logger.warning(f"Error finding job folder: {e}") return None def _get_html_reports_from_folder(self, folder_id: str) -> List[Dict]: """ Get all HTML reports from a specific folder. Args: folder_id: Box folder ID Returns: List of report dictionaries """ reports = [] try: folder = self.client.folder(folder_id).get() # Request specific fields to avoid minimal object issue items = folder.get_items( limit=1000, fields=['id', 'name', 'type', 'size', 'created_at', 'modified_at'] ) for item in items: if item.type == 'file' and item.name.lower().endswith('.html'): reports.append(self._get_file_info(item)) except Exception as e: logger.error(f"Error getting reports from folder {folder_id}: {e}") return reports def _search_files_by_job_number(self, job_number: str) -> List[Dict]: """ Search for HTML files containing the job number in the main report folder. Args: job_number: Job number to search for Returns: List of report dictionaries """ reports = [] try: folder = self.client.folder(self.report_folder_id).get() # Request specific fields to avoid minimal object issue items = folder.get_items( limit=1000, fields=['id', 'name', 'type', 'size', 'created_at', 'modified_at'] ) for item in items: if item.type == 'file' and item.name.lower().endswith('.html'): # Check if job number is in filename if job_number in item.name: reports.append(self._get_file_info(item)) except Exception as e: logger.error(f"Error searching files: {e}") return reports def _get_file_info(self, file_item) -> Dict: """ Extract file information. Args: file_item: Box file object Returns: Dictionary with file information """ return { 'id': file_item.id, 'name': file_item.name, 'size': file_item.size, 'created_at': file_item.created_at, 'modified_at': file_item.modified_at } def download_file(self, file_id: str) -> bytes: """ Download a file from Box. Args: file_id: Box file ID Returns: File content as bytes """ try: file_obj = self.client.file(file_id).get() content = file_obj.content() logger.info(f"Downloaded file: {file_obj.name}") return content except BoxAPIException as e: logger.error(f"Box API error downloading file {file_id}: {e}") raise except Exception as e: logger.error(f"Error downloading file {file_id}: {e}") raise def get_download_url(self, file_id: str) -> str: """ Get a temporary download URL for a file. Args: file_id: Box file ID Returns: Download URL string """ try: file_obj = self.client.file(file_id).get() download_url = file_obj.get_download_url() return download_url except Exception as e: logger.error(f"Error getting download URL for file {file_id}: {e}") raise