#!/usr/bin/env python3 """ Per-user Box.com OAuth token storage. Box rotates refresh tokens on every refresh — every successful refresh returns a NEW refresh token and invalidates the previous one. Always write back the new pair via save_tokens() after a refresh, or the user will be silently disconnected the next time around. File: backend/box_tokens.json (gitignored — holds long-lived refresh tokens that grant access to the user's Box account). """ import json import os from datetime import datetime, timedelta, timezone from threading import Lock _TOKENS_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'box_tokens.json') _lock = Lock() def _default(): return {'version': 1, 'users': {}} def _load(): if not os.path.exists(_TOKENS_PATH): return _default() try: with open(_TOKENS_PATH, 'r') as f: data = json.load(f) if 'users' not in data: data = _default() return data except (json.JSONDecodeError, OSError): return _default() def _save(data): tmp = _TOKENS_PATH + '.tmp' with open(tmp, 'w') as f: json.dump(data, f, indent=2) os.replace(tmp, _TOKENS_PATH) def _normalize(email): return (email or '').strip().lower() def get_tokens(email): """Return the stored token record for `email`, or None.""" email = _normalize(email) if not email: return None with _lock: return _load().get('users', {}).get(email) def save_tokens(email, token_response, box_user=None): """ Persist a Box token response for `email`. Args: email: Oliver-side user email (the QC tool identity, not the Box login) token_response: dict with at least access_token, refresh_token, expires_in (seconds) box_user: optional dict with Box user info {id, login, name} """ email = _normalize(email) if not email: raise ValueError('email is required') if not token_response.get('access_token') or not token_response.get('refresh_token'): raise ValueError('token_response must include access_token and refresh_token') expires_in = int(token_response.get('expires_in', 3600)) # Subtract a small skew so we refresh slightly before actual expiry. expires_at = datetime.now(timezone.utc) + timedelta(seconds=max(expires_in - 60, 60)) with _lock: data = _load() prior = data.get('users', {}).get(email, {}) record = { 'access_token': token_response['access_token'], 'refresh_token': token_response['refresh_token'], 'access_token_expires_at': expires_at.isoformat().replace('+00:00', 'Z'), 'connected_at': prior.get('connected_at') or datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'), 'updated_at': datetime.now(timezone.utc).isoformat().replace('+00:00', 'Z'), } if box_user: record['box_user_id'] = box_user.get('id') or prior.get('box_user_id') record['box_user_login'] = box_user.get('login') or prior.get('box_user_login') record['box_user_name'] = box_user.get('name') or prior.get('box_user_name') else: for k in ('box_user_id', 'box_user_login', 'box_user_name'): if prior.get(k): record[k] = prior[k] data.setdefault('users', {})[email] = record _save(data) return record def delete_tokens(email): """Forget the Box connection for `email` (user clicks Disconnect).""" email = _normalize(email) with _lock: data = _load() if email in data.get('users', {}): del data['users'][email] _save(data) return True return False def is_connected(email): return get_tokens(email) is not None def access_token_is_expired(record): """True if the stored access token is past its expiry.""" expires_at_str = (record or {}).get('access_token_expires_at') if not expires_at_str: return True try: expires_at = datetime.fromisoformat(expires_at_str.replace('Z', '+00:00')) except ValueError: return True return datetime.now(timezone.utc) >= expires_at