ai_qc/backend/box_jwt_client.py
nickviljoen b7e9c483de feat(box-jwt): move source file to _PROCESSED after successful run
Solves two problems at once:

1. Folder cleanliness — INCOMING accumulates indefinitely otherwise.
2. Duplicate-upload re-trigger — Box V2's FILE.UPLOADED trigger doesn't
   fire when the same filename is "uploaded as new version" of an
   existing file. By moving the source out of INCOMING after success,
   re-uploading the same filename becomes a genuinely-new file event
   again and the webhook fires normally.

After report uploads successfully to the REPORTS folder, the worker:
1. find_or_create_subfolder(<INCOMING>, '_PROCESSED') — idempotent
2. move_file(file_id, <_PROCESSED>, new_name=f'{session_id}_{filename}')

The session_id prefix gives the archived file a sortable timestamp and
ties it back to the matching QC_Report_<session_id>_*.html in REPORTS.

Defensive: the move only runs if the report upload to Box succeeded.
If Box delivery failed, the source stays in INCOMING so a retry just
means re-uploading. Move failures are non-fatal — logged + recorded
in result_data['box_source_move_error'], analysis still marked
complete.

Adds four helpers to box_jwt_client.py:
- find_subfolder_by_name(parent, name) → Optional[str]
- create_subfolder(parent, name) → str
- find_or_create_subfolder(parent, name) → str  (idempotent)
- move_file(file_id, target_folder, new_name=None) → Dict

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 13:29:45 +02:00

343 lines
12 KiB
Python

"""
Box JWT service-account client.
Authenticates as a Box Custom App with Server Authentication (JWT) — i.e. the
app has its own Box identity rather than acting on behalf of a logged-in user.
Used for webhook-driven, unattended workflows where the QC pipeline needs to
read files from Box and write reports back without a human in the loop.
The service account must be invited as a collaborator on each client folder
before any read/write succeeds. Folder IDs live in `client_config.py`.
Co-exists with the older per-user OAuth flow in `box_client.py` — different
auth model, different use cases.
"""
from __future__ import annotations
import base64
import hashlib
import hmac
import json
import os
import secrets
import threading
import time
from typing import Any, Dict, List, Optional
import jwt
import requests
from cryptography.hazmat.primitives.serialization import load_pem_private_key
_CONFIG_PATH_ENV = 'BOX_JWT_CONFIG_PATH'
_DEFAULT_CONFIG_PATH = os.path.join(
os.path.dirname(os.path.abspath(__file__)), 'config', 'box_jwt_config.json'
)
_TOKEN_URL = 'https://api.box.com/oauth2/token'
_API_BASE = 'https://api.box.com/2.0'
_UPLOAD_BASE = 'https://upload.box.com/api/2.0'
_token_lock = threading.Lock()
_cached_token: Optional[Dict[str, Any]] = None # {'access_token': str, 'expires_at': float}
class BoxJWTError(RuntimeError):
"""Any failure while talking to Box via the JWT service account."""
def _config_path() -> str:
return os.environ.get(_CONFIG_PATH_ENV) or _DEFAULT_CONFIG_PATH
def is_configured() -> bool:
"""True iff the JWT config JSON exists at the expected path."""
return os.path.exists(_config_path())
def _load_config() -> Dict[str, Any]:
path = _config_path()
if not os.path.exists(path):
raise BoxJWTError(
f'Box JWT config not found at {path}. '
f'Drop the JSON Box gave you for the Custom App at that path, or set {_CONFIG_PATH_ENV}.'
)
with open(path, 'r') as f:
return json.load(f)
def _build_assertion(config: Dict[str, Any]) -> str:
box_app = config['boxAppSettings']
app_auth = box_app['appAuth']
private_key = load_pem_private_key(
app_auth['privateKey'].encode('utf-8'),
password=app_auth['passphrase'].encode('utf-8'),
)
claims = {
'iss': box_app['clientID'],
'sub': config['enterpriseID'],
'box_sub_type': 'enterprise',
'aud': _TOKEN_URL,
'jti': secrets.token_urlsafe(16),
'exp': int(time.time()) + 45, # Box caps assertion lifetime at 60s
}
return jwt.encode(
claims, private_key, algorithm='RS256', headers={'kid': app_auth['publicKeyID']}
)
def _fetch_new_token() -> Dict[str, Any]:
config = _load_config()
assertion = _build_assertion(config)
response = requests.post(
_TOKEN_URL,
data={
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
'assertion': assertion,
'client_id': config['boxAppSettings']['clientID'],
'client_secret': config['boxAppSettings']['clientSecret'],
},
timeout=30,
)
if response.status_code != 200:
raise BoxJWTError(
f'Box token exchange failed: HTTP {response.status_code}{response.text[:300]}'
)
data = response.json()
# Refresh 5 minutes before expiry to absorb clock skew + network latency.
return {
'access_token': data['access_token'],
'expires_at': time.time() + data['expires_in'] - 300,
}
def get_service_account_token() -> str:
"""Return a currently-valid service-account access token, refreshing if needed."""
global _cached_token
with _token_lock:
if _cached_token and time.time() < _cached_token['expires_at']:
return _cached_token['access_token']
_cached_token = _fetch_new_token()
return _cached_token['access_token']
def _auth_headers() -> Dict[str, str]:
return {'Authorization': f'Bearer {get_service_account_token()}'}
# ---------- File / folder operations ----------
def list_folder_items(folder_id: str, fields: Optional[List[str]] = None, limit: int = 1000) -> List[Dict[str, Any]]:
"""List items in a folder. Returns the `entries` array from the Box API."""
params = {'limit': limit}
if fields:
params['fields'] = ','.join(fields)
response = requests.get(
f'{_API_BASE}/folders/{folder_id}/items',
headers=_auth_headers(),
params=params,
timeout=30,
)
if response.status_code != 200:
raise BoxJWTError(
f'list_folder_items({folder_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
return response.json().get('entries', [])
def get_file_metadata(file_id: str) -> Dict[str, Any]:
"""Return Box file metadata (name, size, parent, etc.)."""
response = requests.get(
f'{_API_BASE}/files/{file_id}',
headers=_auth_headers(),
timeout=30,
)
if response.status_code != 200:
raise BoxJWTError(
f'get_file_metadata({file_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
return response.json()
def download_file(file_id: str, dest_path: str) -> str:
"""Stream a Box file to dest_path. Returns dest_path on success."""
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
with requests.get(
f'{_API_BASE}/files/{file_id}/content',
headers=_auth_headers(),
stream=True,
timeout=300,
) as response:
if response.status_code != 200:
raise BoxJWTError(
f'download_file({file_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
with open(dest_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=64 * 1024):
if chunk:
f.write(chunk)
return dest_path
def upload_file(local_path: str, parent_folder_id: str, name: Optional[str] = None) -> Dict[str, Any]:
"""Upload a local file into a Box folder. Returns the new file's metadata."""
if not os.path.exists(local_path):
raise BoxJWTError(f'upload_file: local file not found: {local_path}')
upload_name = name or os.path.basename(local_path)
attributes = {'name': upload_name, 'parent': {'id': parent_folder_id}}
with open(local_path, 'rb') as f:
response = requests.post(
f'{_UPLOAD_BASE}/files/content',
headers=_auth_headers(),
data={'attributes': json.dumps(attributes)},
files={'file': (upload_name, f)},
timeout=300,
)
if response.status_code not in (200, 201):
raise BoxJWTError(
f'upload_file({upload_name}{parent_folder_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
entries = response.json().get('entries', [])
return entries[0] if entries else {}
def find_subfolder_by_name(parent_folder_id: str, name: str) -> Optional[str]:
"""Return the Box folder ID of a child folder named `name`, or None if not found.
Box allows duplicate folder names within a parent; if multiple match, returns
the first one encountered.
"""
for item in list_folder_items(parent_folder_id, fields=['id', 'name', 'type']):
if item.get('type') == 'folder' and item.get('name') == name:
return str(item['id'])
return None
def create_subfolder(parent_folder_id: str, name: str) -> str:
"""Create a new folder named `name` under `parent_folder_id`. Returns its ID."""
payload = {'name': name, 'parent': {'id': parent_folder_id}}
response = requests.post(
f'{_API_BASE}/folders',
headers={**_auth_headers(), 'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=30,
)
if response.status_code not in (200, 201):
raise BoxJWTError(
f'create_subfolder({name} under {parent_folder_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
return str(response.json()['id'])
def find_or_create_subfolder(parent_folder_id: str, name: str) -> str:
"""Idempotent: return existing subfolder ID, or create + return new one."""
existing = find_subfolder_by_name(parent_folder_id, name)
if existing:
return existing
return create_subfolder(parent_folder_id, name)
def move_file(file_id: str, target_folder_id: str, new_name: Optional[str] = None) -> Dict[str, Any]:
"""Move (and optionally rename) a Box file. Returns the updated file metadata.
Pass `new_name` to also rename in-flight — useful for collision-avoidance when
moving into a folder that may already contain a file with the same name.
"""
payload: Dict[str, Any] = {'parent': {'id': str(target_folder_id)}}
if new_name:
payload['name'] = new_name
response = requests.put(
f'{_API_BASE}/files/{file_id}',
headers={**_auth_headers(), 'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=30,
)
if response.status_code not in (200, 201):
raise BoxJWTError(
f'move_file({file_id}{target_folder_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
return response.json()
# ---------- Webhook (V2) management ----------
def create_webhook(target_type: str, target_id: str, address: str, triggers: List[str]) -> Dict[str, Any]:
"""Register a V2 webhook on a file or folder. Returns the webhook record (id + signing keys)."""
if target_type not in ('file', 'folder'):
raise BoxJWTError(f'target_type must be "file" or "folder", got {target_type!r}')
payload = {
'target': {'type': target_type, 'id': target_id},
'address': address,
'triggers': triggers,
}
response = requests.post(
f'{_API_BASE}/webhooks',
headers={**_auth_headers(), 'Content-Type': 'application/json'},
data=json.dumps(payload),
timeout=30,
)
if response.status_code not in (200, 201):
raise BoxJWTError(
f'create_webhook({target_type}/{target_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
return response.json()
def list_webhooks() -> List[Dict[str, Any]]:
"""List V2 webhooks visible to the service account."""
response = requests.get(
f'{_API_BASE}/webhooks', headers=_auth_headers(), timeout=30
)
if response.status_code != 200:
raise BoxJWTError(
f'list_webhooks failed: HTTP {response.status_code}{response.text[:300]}'
)
return response.json().get('entries', [])
def delete_webhook(webhook_id: str) -> None:
"""Delete a V2 webhook by ID."""
response = requests.delete(
f'{_API_BASE}/webhooks/{webhook_id}', headers=_auth_headers(), timeout=30
)
if response.status_code not in (200, 204):
raise BoxJWTError(
f'delete_webhook({webhook_id}) failed: HTTP {response.status_code}{response.text[:300]}'
)
# ---------- Webhook payload verification ----------
def verify_webhook_signature(
body: bytes,
headers: Dict[str, str],
primary_key: Optional[str],
secondary_key: Optional[str],
) -> bool:
"""Verify the signature on an incoming Box webhook payload.
Box V2 webhooks sign `body + delivery_timestamp` with HMAC-SHA256 using two
rotating keys (primary + secondary). Either key matching = valid signature.
Pass `body` as raw request bytes — JSON-serializing first will reorder keys
and break verification.
"""
if headers.get('box-signature-version') != '1':
return False
if headers.get('box-signature-algorithm') != 'HmacSHA256':
return False
timestamp = headers.get('box-delivery-timestamp', '')
if not timestamp:
return False
expected_primary = headers.get('box-signature-primary')
expected_secondary = headers.get('box-signature-secondary')
message = body + timestamp.encode('utf-8')
def _matches(key: Optional[str], expected: Optional[str]) -> bool:
if not key or not expected:
return False
computed = base64.b64encode(
hmac.new(key.encode('utf-8'), message, hashlib.sha256).digest()
).decode('utf-8')
return hmac.compare_digest(computed, expected)
return _matches(primary_key, expected_primary) or _matches(secondary_key, expected_secondary)