hm_ai_qc_report_tool/box_client.py
2025-12-30 16:47:56 +02:00

258 lines
8.3 KiB
Python

"""Box API client for retrieving QC reports."""
import logging
import os
from typing import List, Dict, Optional
from boxsdk import JWTAuth, Client
from boxsdk.exception import BoxAPIException
logger = logging.getLogger(__name__)
class BoxReportClient:
"""Client for interacting with Box API to retrieve QC reports."""
def __init__(self, config_path: str, report_folder_id: str):
"""
Initialize Box client.
Args:
config_path: Path to Box JWT config JSON file
report_folder_id: Box folder ID containing reports
"""
self.report_folder_id = report_folder_id
self.client = self._authenticate(config_path)
def _authenticate(self, config_path: str) -> Client:
"""Authenticate with Box using JWT."""
try:
if not os.path.exists(config_path):
raise FileNotFoundError(
f"Box config file not found at {config_path}. "
f"Please add your box_config.json file to the config directory."
)
auth = JWTAuth.from_settings_file(config_path)
client = Client(auth)
# Test connection
user = client.user().get()
logger.info(f"Authenticated as Box user: {user.name}")
return client
except Exception as e:
logger.error(f"Failed to authenticate with Box: {e}")
raise
def search_by_job_number(self, job_number: str) -> List[Dict]:
"""
Search for reports by job number.
This searches for:
1. A folder named with the job number in CAMPAIGNS
2. A QC subfolder within the job folder
3. HTML files in the QC subfolder
4. Falls back to searching files by name if folder structure not found
Folder structure: CAMPAIGNS/{JobNumber}/QC/*.html
Args:
job_number: Job/reference number to search for
Returns:
List of dictionaries with report information
"""
reports = []
try:
# First, try to find a folder with the job number
folder = self._find_job_folder(job_number)
if folder:
logger.info(f"Found job folder: {folder['name']} (ID: {folder['id']})")
reports = self._get_html_reports_from_folder(folder['id'])
else:
# No dedicated folder, search files in main report folder
logger.info(f"No job folder found, searching files for '{job_number}'")
reports = self._search_files_by_job_number(job_number)
logger.info(f"Found {len(reports)} reports for job number '{job_number}'")
return reports
except BoxAPIException as e:
logger.error(f"Box API error while searching for job {job_number}: {e}")
raise
except Exception as e:
logger.error(f"Error searching for job {job_number}: {e}")
raise
def _find_job_folder(self, job_number: str) -> Optional[Dict]:
"""
Look for a subfolder matching the job number, then navigate to QC subfolder.
New structure: CAMPAIGNS/{JobNumber}/QC/
Args:
job_number: Job number to find
Returns:
Dictionary with QC folder info or None if not found
"""
try:
# Step 1: Find job number folder in CAMPAIGNS
# Note: Box SDK automatically handles pagination when iterating
campaigns_folder = self.client.folder(self.report_folder_id)
job_folder_id = None
# Iterate through all items - SDK handles pagination automatically
items = campaigns_folder.get_items(limit=1000)
for item in items:
if item.type == 'folder' and item.name == job_number:
job_folder_id = item.id
logger.info(f"Found job folder '{job_number}' (ID: {job_folder_id})")
break
if not job_folder_id:
logger.info(f"Job folder '{job_number}' not found in CAMPAIGNS")
return None
# Step 2: Look for QC subfolder inside job folder
job_folder = self.client.folder(job_folder_id).get()
qc_folder_items = job_folder.get_items(limit=100)
for item in qc_folder_items:
if item.type == 'folder' and item.name == 'QC':
logger.info(f"Found QC subfolder (ID: {item.id})")
return {
'id': item.id,
'name': f"{job_number}/QC"
}
# Job folder exists but no QC subfolder
logger.warning(f"Job folder '{job_number}' found but no QC subfolder exists")
return None
except Exception as e:
logger.warning(f"Error finding job folder: {e}")
return None
def _get_html_reports_from_folder(self, folder_id: str) -> List[Dict]:
"""
Get all HTML reports from a specific folder.
Args:
folder_id: Box folder ID
Returns:
List of report dictionaries
"""
reports = []
try:
folder = self.client.folder(folder_id).get()
# Request specific fields to avoid minimal object issue
items = folder.get_items(
limit=1000,
fields=['id', 'name', 'type', 'size', 'created_at', 'modified_at']
)
for item in items:
if item.type == 'file' and item.name.lower().endswith('.html'):
reports.append(self._get_file_info(item))
except Exception as e:
logger.error(f"Error getting reports from folder {folder_id}: {e}")
return reports
def _search_files_by_job_number(self, job_number: str) -> List[Dict]:
"""
Search for HTML files containing the job number in the main report folder.
Args:
job_number: Job number to search for
Returns:
List of report dictionaries
"""
reports = []
try:
folder = self.client.folder(self.report_folder_id).get()
# Request specific fields to avoid minimal object issue
items = folder.get_items(
limit=1000,
fields=['id', 'name', 'type', 'size', 'created_at', 'modified_at']
)
for item in items:
if item.type == 'file' and item.name.lower().endswith('.html'):
# Check if job number is in filename
if job_number in item.name:
reports.append(self._get_file_info(item))
except Exception as e:
logger.error(f"Error searching files: {e}")
return reports
def _get_file_info(self, file_item) -> Dict:
"""
Extract file information.
Args:
file_item: Box file object
Returns:
Dictionary with file information
"""
return {
'id': file_item.id,
'name': file_item.name,
'size': file_item.size,
'created_at': file_item.created_at,
'modified_at': file_item.modified_at
}
def download_file(self, file_id: str) -> bytes:
"""
Download a file from Box.
Args:
file_id: Box file ID
Returns:
File content as bytes
"""
try:
file_obj = self.client.file(file_id).get()
content = file_obj.content()
logger.info(f"Downloaded file: {file_obj.name}")
return content
except BoxAPIException as e:
logger.error(f"Box API error downloading file {file_id}: {e}")
raise
except Exception as e:
logger.error(f"Error downloading file {file_id}: {e}")
raise
def get_download_url(self, file_id: str) -> str:
"""
Get a temporary download URL for a file.
Args:
file_id: Box file ID
Returns:
Download URL string
"""
try:
file_obj = self.client.file(file_id).get()
download_url = file_obj.get_download_url()
return download_url
except Exception as e:
logger.error(f"Error getting download URL for file {file_id}: {e}")
raise