""" Box JWT service-account client. Authenticates as a Box Custom App with Server Authentication (JWT) — i.e. the app has its own Box identity rather than acting on behalf of a logged-in user. Used for webhook-driven, unattended workflows where the QC pipeline needs to read files from Box and write reports back without a human in the loop. The service account must be invited as a collaborator on each client folder before any read/write succeeds. Folder IDs live in `client_config.py`. Co-exists with the older per-user OAuth flow in `box_client.py` — different auth model, different use cases. """ from __future__ import annotations import base64 import hashlib import hmac import json import os import secrets import threading import time from typing import Any, Dict, List, Optional import jwt import requests from cryptography.hazmat.primitives.serialization import load_pem_private_key _CONFIG_PATH_ENV = 'BOX_JWT_CONFIG_PATH' _DEFAULT_CONFIG_PATH = os.path.join( os.path.dirname(os.path.abspath(__file__)), 'config', 'box_jwt_config.json' ) _TOKEN_URL = 'https://api.box.com/oauth2/token' _API_BASE = 'https://api.box.com/2.0' _UPLOAD_BASE = 'https://upload.box.com/api/2.0' _token_lock = threading.Lock() _cached_token: Optional[Dict[str, Any]] = None # {'access_token': str, 'expires_at': float} class BoxJWTError(RuntimeError): """Any failure while talking to Box via the JWT service account.""" def _config_path() -> str: return os.environ.get(_CONFIG_PATH_ENV) or _DEFAULT_CONFIG_PATH def is_configured() -> bool: """True iff the JWT config JSON exists at the expected path.""" return os.path.exists(_config_path()) def _load_config() -> Dict[str, Any]: path = _config_path() if not os.path.exists(path): raise BoxJWTError( f'Box JWT config not found at {path}. ' f'Drop the JSON Box gave you for the Custom App at that path, or set {_CONFIG_PATH_ENV}.' ) with open(path, 'r') as f: return json.load(f) def _build_assertion(config: Dict[str, Any]) -> str: box_app = config['boxAppSettings'] app_auth = box_app['appAuth'] private_key = load_pem_private_key( app_auth['privateKey'].encode('utf-8'), password=app_auth['passphrase'].encode('utf-8'), ) claims = { 'iss': box_app['clientID'], 'sub': config['enterpriseID'], 'box_sub_type': 'enterprise', 'aud': _TOKEN_URL, 'jti': secrets.token_urlsafe(16), 'exp': int(time.time()) + 45, # Box caps assertion lifetime at 60s } return jwt.encode( claims, private_key, algorithm='RS256', headers={'kid': app_auth['publicKeyID']} ) def _fetch_new_token() -> Dict[str, Any]: config = _load_config() assertion = _build_assertion(config) response = requests.post( _TOKEN_URL, data={ 'grant_type': 'urn:ietf:params:oauth:grant-type:jwt-bearer', 'assertion': assertion, 'client_id': config['boxAppSettings']['clientID'], 'client_secret': config['boxAppSettings']['clientSecret'], }, timeout=30, ) if response.status_code != 200: raise BoxJWTError( f'Box token exchange failed: HTTP {response.status_code} — {response.text[:300]}' ) data = response.json() # Refresh 5 minutes before expiry to absorb clock skew + network latency. return { 'access_token': data['access_token'], 'expires_at': time.time() + data['expires_in'] - 300, } def get_service_account_token() -> str: """Return a currently-valid service-account access token, refreshing if needed.""" global _cached_token with _token_lock: if _cached_token and time.time() < _cached_token['expires_at']: return _cached_token['access_token'] _cached_token = _fetch_new_token() return _cached_token['access_token'] def _auth_headers() -> Dict[str, str]: return {'Authorization': f'Bearer {get_service_account_token()}'} # ---------- File / folder operations ---------- def list_folder_items(folder_id: str, fields: Optional[List[str]] = None, limit: int = 1000) -> List[Dict[str, Any]]: """List items in a folder. Returns the `entries` array from the Box API.""" params = {'limit': limit} if fields: params['fields'] = ','.join(fields) response = requests.get( f'{_API_BASE}/folders/{folder_id}/items', headers=_auth_headers(), params=params, timeout=30, ) if response.status_code != 200: raise BoxJWTError( f'list_folder_items({folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) return response.json().get('entries', []) def get_file_metadata(file_id: str) -> Dict[str, Any]: """Return Box file metadata (name, size, parent, etc.).""" response = requests.get( f'{_API_BASE}/files/{file_id}', headers=_auth_headers(), timeout=30, ) if response.status_code != 200: raise BoxJWTError( f'get_file_metadata({file_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) return response.json() def download_file(file_id: str, dest_path: str) -> str: """Stream a Box file to dest_path. Returns dest_path on success.""" os.makedirs(os.path.dirname(dest_path), exist_ok=True) with requests.get( f'{_API_BASE}/files/{file_id}/content', headers=_auth_headers(), stream=True, timeout=300, ) as response: if response.status_code != 200: raise BoxJWTError( f'download_file({file_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) with open(dest_path, 'wb') as f: for chunk in response.iter_content(chunk_size=64 * 1024): if chunk: f.write(chunk) return dest_path def upload_file(local_path: str, parent_folder_id: str, name: Optional[str] = None) -> Dict[str, Any]: """Upload a local file into a Box folder. Returns the new file's metadata.""" if not os.path.exists(local_path): raise BoxJWTError(f'upload_file: local file not found: {local_path}') upload_name = name or os.path.basename(local_path) attributes = {'name': upload_name, 'parent': {'id': parent_folder_id}} with open(local_path, 'rb') as f: response = requests.post( f'{_UPLOAD_BASE}/files/content', headers=_auth_headers(), data={'attributes': json.dumps(attributes)}, files={'file': (upload_name, f)}, timeout=300, ) if response.status_code not in (200, 201): raise BoxJWTError( f'upload_file({upload_name} → {parent_folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) entries = response.json().get('entries', []) return entries[0] if entries else {} def find_subfolder_by_name(parent_folder_id: str, name: str) -> Optional[str]: """Return the Box folder ID of a child folder named `name`, or None if not found. Box allows duplicate folder names within a parent; if multiple match, returns the first one encountered. """ for item in list_folder_items(parent_folder_id, fields=['id', 'name', 'type']): if item.get('type') == 'folder' and item.get('name') == name: return str(item['id']) return None def create_subfolder(parent_folder_id: str, name: str) -> str: """Create a new folder named `name` under `parent_folder_id`. Returns its ID.""" payload = {'name': name, 'parent': {'id': parent_folder_id}} response = requests.post( f'{_API_BASE}/folders', headers={**_auth_headers(), 'Content-Type': 'application/json'}, data=json.dumps(payload), timeout=30, ) if response.status_code not in (200, 201): raise BoxJWTError( f'create_subfolder({name} under {parent_folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) return str(response.json()['id']) def find_or_create_subfolder(parent_folder_id: str, name: str) -> str: """Idempotent: return existing subfolder ID, or create + return new one.""" existing = find_subfolder_by_name(parent_folder_id, name) if existing: return existing return create_subfolder(parent_folder_id, name) def move_file(file_id: str, target_folder_id: str, new_name: Optional[str] = None) -> Dict[str, Any]: """Move (and optionally rename) a Box file. Returns the updated file metadata. Pass `new_name` to also rename in-flight — useful for collision-avoidance when moving into a folder that may already contain a file with the same name. """ payload: Dict[str, Any] = {'parent': {'id': str(target_folder_id)}} if new_name: payload['name'] = new_name response = requests.put( f'{_API_BASE}/files/{file_id}', headers={**_auth_headers(), 'Content-Type': 'application/json'}, data=json.dumps(payload), timeout=30, ) if response.status_code not in (200, 201): raise BoxJWTError( f'move_file({file_id} → {target_folder_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) return response.json() # ---------- Webhook (V2) management ---------- def create_webhook(target_type: str, target_id: str, address: str, triggers: List[str]) -> Dict[str, Any]: """Register a V2 webhook on a file or folder. Returns the webhook record (id + signing keys).""" if target_type not in ('file', 'folder'): raise BoxJWTError(f'target_type must be "file" or "folder", got {target_type!r}') payload = { 'target': {'type': target_type, 'id': target_id}, 'address': address, 'triggers': triggers, } response = requests.post( f'{_API_BASE}/webhooks', headers={**_auth_headers(), 'Content-Type': 'application/json'}, data=json.dumps(payload), timeout=30, ) if response.status_code not in (200, 201): raise BoxJWTError( f'create_webhook({target_type}/{target_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) return response.json() def list_webhooks() -> List[Dict[str, Any]]: """List V2 webhooks visible to the service account.""" response = requests.get( f'{_API_BASE}/webhooks', headers=_auth_headers(), timeout=30 ) if response.status_code != 200: raise BoxJWTError( f'list_webhooks failed: HTTP {response.status_code} — {response.text[:300]}' ) return response.json().get('entries', []) def delete_webhook(webhook_id: str) -> None: """Delete a V2 webhook by ID.""" response = requests.delete( f'{_API_BASE}/webhooks/{webhook_id}', headers=_auth_headers(), timeout=30 ) if response.status_code not in (200, 204): raise BoxJWTError( f'delete_webhook({webhook_id}) failed: HTTP {response.status_code} — {response.text[:300]}' ) # ---------- Webhook payload verification ---------- def verify_webhook_signature( body: bytes, headers: Dict[str, str], primary_key: Optional[str], secondary_key: Optional[str], ) -> bool: """Verify the signature on an incoming Box webhook payload. Box V2 webhooks sign `body + delivery_timestamp` with HMAC-SHA256 using two rotating keys (primary + secondary). Either key matching = valid signature. Pass `body` as raw request bytes — JSON-serializing first will reorder keys and break verification. """ if headers.get('box-signature-version') != '1': return False if headers.get('box-signature-algorithm') != 'HmacSHA256': return False timestamp = headers.get('box-delivery-timestamp', '') if not timestamp: return False expected_primary = headers.get('box-signature-primary') expected_secondary = headers.get('box-signature-secondary') message = body + timestamp.encode('utf-8') def _matches(key: Optional[str], expected: Optional[str]) -> bool: if not key or not expected: return False computed = base64.b64encode( hmac.new(key.encode('utf-8'), message, hashlib.sha256).digest() ).decode('utf-8') return hmac.compare_digest(computed, expected) return _matches(primary_key, expected_primary) or _matches(secondary_key, expected_secondary)