Solves two problems at once:
1. Folder cleanliness — INCOMING accumulates indefinitely otherwise.
2. Duplicate-upload re-trigger — Box V2's FILE.UPLOADED trigger doesn't
fire when the same filename is "uploaded as new version" of an
existing file. By moving the source out of INCOMING after success,
re-uploading the same filename becomes a genuinely-new file event
again and the webhook fires normally.
After report uploads successfully to the REPORTS folder, the worker:
1. find_or_create_subfolder(<INCOMING>, '_PROCESSED') — idempotent
2. move_file(file_id, <_PROCESSED>, new_name=f'{session_id}_{filename}')
The session_id prefix gives the archived file a sortable timestamp and
ties it back to the matching QC_Report_<session_id>_*.html in REPORTS.
Defensive: the move only runs if the report upload to Box succeeded.
If Box delivery failed, the source stays in INCOMING so a retry just
means re-uploading. Move failures are non-fatal — logged + recorded
in result_data['box_source_move_error'], analysis still marked
complete.
Adds four helpers to box_jwt_client.py:
- find_subfolder_by_name(parent, name) → Optional[str]
- create_subfolder(parent, name) → str
- find_or_create_subfolder(parent, name) → str (idempotent)
- move_file(file_id, target_folder, new_name=None) → Dict
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
343 lines
12 KiB
Python
343 lines
12 KiB
Python
"""
|
|
Box JWT service-account client.
|
|
|
|
Authenticates as a Box Custom App with Server Authentication (JWT) — i.e. the
|
|
app has its own Box identity rather than acting on behalf of a logged-in user.
|
|
Used for webhook-driven, unattended workflows where the QC pipeline needs to
|
|
read files from Box and write reports back without a human in the loop.
|
|
|
|
The service account must be invited as a collaborator on each client folder
|
|
before any read/write succeeds. Folder IDs live in `client_config.py`.
|
|
|
|
Co-exists with the older per-user OAuth flow in `box_client.py` — different
|
|
auth model, different use cases.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import os
|
|
import secrets
|
|
import threading
|
|
import time
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import jwt
|
|
import requests
|
|
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
|
|
|
_CONFIG_PATH_ENV = 'BOX_JWT_CONFIG_PATH'
|
|
_DEFAULT_CONFIG_PATH = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), 'config', 'box_jwt_config.json'
|
|
)
|
|
_TOKEN_URL = 'https://api.box.com/oauth2/token'
|
|
_API_BASE = 'https://api.box.com/2.0'
|
|
_UPLOAD_BASE = 'https://upload.box.com/api/2.0'
|
|
|
|
_token_lock = threading.Lock()
|
|
_cached_token: Optional[Dict[str, Any]] = None # {'access_token': str, 'expires_at': float}
|
|
|
|
|
|
class BoxJWTError(RuntimeError):
|
|
"""Any failure while talking to Box via the JWT service account."""
|
|
|
|
|
|
def _config_path() -> str:
|
|
return os.environ.get(_CONFIG_PATH_ENV) or _DEFAULT_CONFIG_PATH
|
|
|
|
|
|
def is_configured() -> bool:
|
|
"""True iff the JWT config JSON exists at the expected path."""
|
|
return os.path.exists(_config_path())
|
|
|
|
|
|
def _load_config() -> Dict[str, Any]:
|
|
path = _config_path()
|
|
if not os.path.exists(path):
|
|
raise BoxJWTError(
|
|
f'Box JWT config not found at {path}. '
|
|
f'Drop the JSON Box gave you for the Custom App at that path, or set {_CONFIG_PATH_ENV}.'
|
|
)
|
|
with open(path, 'r') as f:
|
|
return json.load(f)
|
|
|
|
|
|
def _build_assertion(config: Dict[str, Any]) -> str:
|
|
box_app = config['boxAppSettings']
|
|
app_auth = box_app['appAuth']
|
|
private_key = load_pem_private_key(
|
|
app_auth['privateKey'].encode('utf-8'),
|
|
password=app_auth['passphrase'].encode('utf-8'),
|
|
)
|
|
claims = {
|
|
'iss': box_app['clientID'],
|
|
'sub': config['enterpriseID'],
|
|
'box_sub_type': 'enterprise',
|
|
'aud': _TOKEN_URL,
|
|
'jti': secrets.token_urlsafe(16),
|
|
'exp': int(time.time()) + 45, # Box caps assertion lifetime at 60s
|
|
}
|
|
return jwt.encode(
|
|
claims, private_key, algorithm='RS256', headers={'kid': app_auth['publicKeyID']}
|
|
)
|
|
|
|
|
|
def _fetch_new_token() -> Dict[str, Any]:
|
|
config = _load_config()
|
|
assertion = _build_assertion(config)
|
|
response = requests.post(
|
|
_TOKEN_URL,
|
|
data={
|
|
'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer',
|
|
'assertion': assertion,
|
|
'client_id': config['boxAppSettings']['clientID'],
|
|
'client_secret': config['boxAppSettings']['clientSecret'],
|
|
},
|
|
timeout=30,
|
|
)
|
|
if response.status_code != 200:
|
|
raise BoxJWTError(
|
|
f'Box token exchange failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
data = response.json()
|
|
# Refresh 5 minutes before expiry to absorb clock skew + network latency.
|
|
return {
|
|
'access_token': data['access_token'],
|
|
'expires_at': time.time() + data['expires_in'] - 300,
|
|
}
|
|
|
|
|
|
def get_service_account_token() -> str:
|
|
"""Return a currently-valid service-account access token, refreshing if needed."""
|
|
global _cached_token
|
|
with _token_lock:
|
|
if _cached_token and time.time() < _cached_token['expires_at']:
|
|
return _cached_token['access_token']
|
|
_cached_token = _fetch_new_token()
|
|
return _cached_token['access_token']
|
|
|
|
|
|
def _auth_headers() -> Dict[str, str]:
|
|
return {'Authorization': f'Bearer {get_service_account_token()}'}
|
|
|
|
|
|
# ---------- File / folder operations ----------
|
|
|
|
def list_folder_items(folder_id: str, fields: Optional[List[str]] = None, limit: int = 1000) -> List[Dict[str, Any]]:
|
|
"""List items in a folder. Returns the `entries` array from the Box API."""
|
|
params = {'limit': limit}
|
|
if fields:
|
|
params['fields'] = ','.join(fields)
|
|
response = requests.get(
|
|
f'{_API_BASE}/folders/{folder_id}/items',
|
|
headers=_auth_headers(),
|
|
params=params,
|
|
timeout=30,
|
|
)
|
|
if response.status_code != 200:
|
|
raise BoxJWTError(
|
|
f'list_folder_items({folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
return response.json().get('entries', [])
|
|
|
|
|
|
def get_file_metadata(file_id: str) -> Dict[str, Any]:
|
|
"""Return Box file metadata (name, size, parent, etc.)."""
|
|
response = requests.get(
|
|
f'{_API_BASE}/files/{file_id}',
|
|
headers=_auth_headers(),
|
|
timeout=30,
|
|
)
|
|
if response.status_code != 200:
|
|
raise BoxJWTError(
|
|
f'get_file_metadata({file_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
return response.json()
|
|
|
|
|
|
def download_file(file_id: str, dest_path: str) -> str:
|
|
"""Stream a Box file to dest_path. Returns dest_path on success."""
|
|
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
|
|
with requests.get(
|
|
f'{_API_BASE}/files/{file_id}/content',
|
|
headers=_auth_headers(),
|
|
stream=True,
|
|
timeout=300,
|
|
) as response:
|
|
if response.status_code != 200:
|
|
raise BoxJWTError(
|
|
f'download_file({file_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
with open(dest_path, 'wb') as f:
|
|
for chunk in response.iter_content(chunk_size=64 * 1024):
|
|
if chunk:
|
|
f.write(chunk)
|
|
return dest_path
|
|
|
|
|
|
def upload_file(local_path: str, parent_folder_id: str, name: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Upload a local file into a Box folder. Returns the new file's metadata."""
|
|
if not os.path.exists(local_path):
|
|
raise BoxJWTError(f'upload_file: local file not found: {local_path}')
|
|
upload_name = name or os.path.basename(local_path)
|
|
attributes = {'name': upload_name, 'parent': {'id': parent_folder_id}}
|
|
with open(local_path, 'rb') as f:
|
|
response = requests.post(
|
|
f'{_UPLOAD_BASE}/files/content',
|
|
headers=_auth_headers(),
|
|
data={'attributes': json.dumps(attributes)},
|
|
files={'file': (upload_name, f)},
|
|
timeout=300,
|
|
)
|
|
if response.status_code not in (200, 201):
|
|
raise BoxJWTError(
|
|
f'upload_file({upload_name} → {parent_folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
entries = response.json().get('entries', [])
|
|
return entries[0] if entries else {}
|
|
|
|
|
|
def find_subfolder_by_name(parent_folder_id: str, name: str) -> Optional[str]:
|
|
"""Return the Box folder ID of a child folder named `name`, or None if not found.
|
|
|
|
Box allows duplicate folder names within a parent; if multiple match, returns
|
|
the first one encountered.
|
|
"""
|
|
for item in list_folder_items(parent_folder_id, fields=['id', 'name', 'type']):
|
|
if item.get('type') == 'folder' and item.get('name') == name:
|
|
return str(item['id'])
|
|
return None
|
|
|
|
|
|
def create_subfolder(parent_folder_id: str, name: str) -> str:
|
|
"""Create a new folder named `name` under `parent_folder_id`. Returns its ID."""
|
|
payload = {'name': name, 'parent': {'id': parent_folder_id}}
|
|
response = requests.post(
|
|
f'{_API_BASE}/folders',
|
|
headers={**_auth_headers(), 'Content-Type': 'application/json'},
|
|
data=json.dumps(payload),
|
|
timeout=30,
|
|
)
|
|
if response.status_code not in (200, 201):
|
|
raise BoxJWTError(
|
|
f'create_subfolder({name} under {parent_folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
return str(response.json()['id'])
|
|
|
|
|
|
def find_or_create_subfolder(parent_folder_id: str, name: str) -> str:
|
|
"""Idempotent: return existing subfolder ID, or create + return new one."""
|
|
existing = find_subfolder_by_name(parent_folder_id, name)
|
|
if existing:
|
|
return existing
|
|
return create_subfolder(parent_folder_id, name)
|
|
|
|
|
|
def move_file(file_id: str, target_folder_id: str, new_name: Optional[str] = None) -> Dict[str, Any]:
|
|
"""Move (and optionally rename) a Box file. Returns the updated file metadata.
|
|
|
|
Pass `new_name` to also rename in-flight — useful for collision-avoidance when
|
|
moving into a folder that may already contain a file with the same name.
|
|
"""
|
|
payload: Dict[str, Any] = {'parent': {'id': str(target_folder_id)}}
|
|
if new_name:
|
|
payload['name'] = new_name
|
|
response = requests.put(
|
|
f'{_API_BASE}/files/{file_id}',
|
|
headers={**_auth_headers(), 'Content-Type': 'application/json'},
|
|
data=json.dumps(payload),
|
|
timeout=30,
|
|
)
|
|
if response.status_code not in (200, 201):
|
|
raise BoxJWTError(
|
|
f'move_file({file_id} → {target_folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
return response.json()
|
|
|
|
|
|
# ---------- Webhook (V2) management ----------
|
|
|
|
def create_webhook(target_type: str, target_id: str, address: str, triggers: List[str]) -> Dict[str, Any]:
|
|
"""Register a V2 webhook on a file or folder. Returns the webhook record (id + signing keys)."""
|
|
if target_type not in ('file', 'folder'):
|
|
raise BoxJWTError(f'target_type must be "file" or "folder", got {target_type!r}')
|
|
payload = {
|
|
'target': {'type': target_type, 'id': target_id},
|
|
'address': address,
|
|
'triggers': triggers,
|
|
}
|
|
response = requests.post(
|
|
f'{_API_BASE}/webhooks',
|
|
headers={**_auth_headers(), 'Content-Type': 'application/json'},
|
|
data=json.dumps(payload),
|
|
timeout=30,
|
|
)
|
|
if response.status_code not in (200, 201):
|
|
raise BoxJWTError(
|
|
f'create_webhook({target_type}/{target_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
return response.json()
|
|
|
|
|
|
def list_webhooks() -> List[Dict[str, Any]]:
|
|
"""List V2 webhooks visible to the service account."""
|
|
response = requests.get(
|
|
f'{_API_BASE}/webhooks', headers=_auth_headers(), timeout=30
|
|
)
|
|
if response.status_code != 200:
|
|
raise BoxJWTError(
|
|
f'list_webhooks failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
return response.json().get('entries', [])
|
|
|
|
|
|
def delete_webhook(webhook_id: str) -> None:
|
|
"""Delete a V2 webhook by ID."""
|
|
response = requests.delete(
|
|
f'{_API_BASE}/webhooks/{webhook_id}', headers=_auth_headers(), timeout=30
|
|
)
|
|
if response.status_code not in (200, 204):
|
|
raise BoxJWTError(
|
|
f'delete_webhook({webhook_id}) failed: HTTP {response.status_code} — {response.text[:300]}'
|
|
)
|
|
|
|
|
|
# ---------- Webhook payload verification ----------
|
|
|
|
def verify_webhook_signature(
|
|
body: bytes,
|
|
headers: Dict[str, str],
|
|
primary_key: Optional[str],
|
|
secondary_key: Optional[str],
|
|
) -> bool:
|
|
"""Verify the signature on an incoming Box webhook payload.
|
|
|
|
Box V2 webhooks sign `body + delivery_timestamp` with HMAC-SHA256 using two
|
|
rotating keys (primary + secondary). Either key matching = valid signature.
|
|
Pass `body` as raw request bytes — JSON-serializing first will reorder keys
|
|
and break verification.
|
|
"""
|
|
if headers.get('box-signature-version') != '1':
|
|
return False
|
|
if headers.get('box-signature-algorithm') != 'HmacSHA256':
|
|
return False
|
|
timestamp = headers.get('box-delivery-timestamp', '')
|
|
if not timestamp:
|
|
return False
|
|
expected_primary = headers.get('box-signature-primary')
|
|
expected_secondary = headers.get('box-signature-secondary')
|
|
|
|
message = body + timestamp.encode('utf-8')
|
|
|
|
def _matches(key: Optional[str], expected: Optional[str]) -> bool:
|
|
if not key or not expected:
|
|
return False
|
|
computed = base64.b64encode(
|
|
hmac.new(key.encode('utf-8'), message, hashlib.sha256).digest()
|
|
).decode('utf-8')
|
|
return hmac.compare_digest(computed, expected)
|
|
|
|
return _matches(primary_key, expected_primary) or _matches(secondary_key, expected_secondary)
|