programme-pulse-chat/web_app.py
DJP b70d148b94 Productionise Programme Pulse
Backend
- Routes moved under /api/, JWT bearer auth via @before_request
- DEV_AUTH_BYPASS escape hatch for local dev
- In-memory chat history and report state replaced with Postgres tables
  (preferences, chat_messages, reports, feedback_events) keyed on user
- SQLAlchemy 2.x + Alembic migrations run on container start
- Graceful Airtable failure handling — bad creds no longer 500 the API
- Per-user data isolation via g.user_email from validated token

Frontend
- React + Vite + TypeScript SPA at /programme-pulse/
- MSAL.js (PKCE, sessionStorage, ID token to backend)
- VITE_DEV_AUTH_BYPASS mirrors backend bypass for local dev
- Streaming chat via fetch ReadableStream + SSE parsing
- Charts via chart.js, markdown via react-markdown + remark-gfm
- Full UI parity with the original templates/index.html

Deploy (optical-dev split-build pattern)
- Dockerfile + docker-compose.yml (name: programme-pulse pinned;
  app + Postgres; 127.0.0.1 binding only)
- deploy/apache-programme-pulse.conf.tmpl with flushpackets=on for SSE
- deploy/deploy.sh mirrors OSOP — port auto-pick (5051..5099),
  apache conf render, frontend build in throwaway node container,
  rsync to /var/www/html/programme-pulse, /api/health poll

Tests
- 49 passing; new tests for DB-backed preferences and JWT auth helpers
- SQLite-backed test fixture in tests/conftest.py

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-07 11:08:28 -04:00

450 lines
15 KiB
Python

from __future__ import annotations
import json
import os
import threading
from concurrent.futures import ThreadPoolExecutor
from datetime import date, datetime
from pathlib import Path
from dotenv import load_dotenv
from flask import Flask, Response, g, jsonify, request, send_file, stream_with_context
from flask_cors import CORS
from sqlalchemy import select
from src.auth import AuthError, dev_bypass_enabled, dev_user_claims, validate_bearer_token
from src.db import ChatMessage, FeedbackEvent, Preference, Report, db_available, session_scope
load_dotenv()
app = Flask(__name__)
# CORS — frontend lives at /programme-pulse/ on optical-dev (same origin in prod via Apache),
# and at http://localhost:5173 in dev. Authorization header must be allowed.
CORS(
app,
origins=[
"https://optical-dev.oliver.solutions",
"http://localhost:5173",
"http://127.0.0.1:5173",
],
allow_headers=["Authorization", "Content-Type"],
expose_headers=["Content-Disposition"],
supports_credentials=False,
)
PULSE_API_KEY = os.getenv("PULSE_AIRTABLE_API_KEY", "")
PULSE_BASE_ID = os.getenv("PULSE_AIRTABLE_BASE_ID", "")
PULSE_TABLE_ID = os.getenv("PULSE_AIRTABLE_TABLE_ID", "")
RESOURCE_API_KEY = os.getenv("PULSE_RESOURCE_API_KEY", "") or PULSE_API_KEY
RESOURCE_BASE_ID = os.getenv("PULSE_RESOURCE_BASE_ID", "")
RESOURCE_TABLE_ID = os.getenv("PULSE_RESOURCE_TABLE_ID", "")
ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "")
OUTPUT_DIR = Path("reports")
OUTPUT_DIR.mkdir(exist_ok=True)
# Airtable caches — not user data, fine to keep in-process. /api/refresh rebuilds them.
_snapshot: list[dict] = []
_snapshot_lock = threading.Lock()
_resource_snapshot: list[dict] = []
_resource_lock = threading.Lock()
def _load_snapshot():
global _snapshot
if not PULSE_API_KEY:
return
from src.airtable_client import PulseAirtableClient
try:
client = PulseAirtableClient(PULSE_API_KEY, PULSE_BASE_ID, PULSE_TABLE_ID)
data = client.fetch_all_tasks()
except Exception as e:
app.logger.warning("Airtable tasks load failed: %s", e)
return
with _snapshot_lock:
_snapshot = data
def _get_snapshot() -> list[dict]:
with _snapshot_lock:
if _snapshot:
return _snapshot
_load_snapshot()
with _snapshot_lock:
return _snapshot
def _load_resource_snapshot():
global _resource_snapshot
if not RESOURCE_API_KEY or not RESOURCE_BASE_ID:
return
from src.airtable_client import ResourceAirtableClient
try:
client = ResourceAirtableClient(RESOURCE_API_KEY, RESOURCE_BASE_ID, RESOURCE_TABLE_ID)
data = client.fetch_all_bookings()
except Exception as e:
app.logger.warning("Airtable bookings load failed: %s", e)
return
with _resource_lock:
_resource_snapshot = data
def _get_resource_snapshot() -> list[dict]:
with _resource_lock:
if _resource_snapshot:
return _resource_snapshot
_load_resource_snapshot()
with _resource_lock:
return _resource_snapshot
# Eagerly load both data sources in background on startup
threading.Thread(target=_load_snapshot, daemon=True).start()
threading.Thread(target=_load_resource_snapshot, daemon=True).start()
# ---------------------------------------------------------------------------
# Auth gate
# ---------------------------------------------------------------------------
PUBLIC_PATHS = {"/api/health"}
@app.before_request
def _auth_gate():
if request.method == "OPTIONS":
return None
if request.path in PUBLIC_PATHS:
return None
if not request.path.startswith("/api/"):
# We only serve API routes; the SPA is served by Apache from /var/www/html.
return jsonify({"error": "Not found"}), 404
if dev_bypass_enabled():
claims = dev_user_claims()
else:
header = request.headers.get("Authorization", "")
if not header.startswith("Bearer "):
return jsonify({"error": "Missing Authorization: Bearer header"}), 401
token = header[7:].strip()
try:
claims = validate_bearer_token(token)
except AuthError as e:
return jsonify({"error": str(e)}), e.status_code
g.user_email = claims["_email"]
g.user_name = claims.get("_name") or claims["_email"]
return None
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.get("/api/health")
def health():
return jsonify({
"status": "ok",
"db": db_available(),
"tasks_loaded": len(_snapshot),
"bookings_loaded": len(_resource_snapshot),
})
@app.get("/api/me")
def me():
return jsonify({"email": g.user_email, "name": g.user_name})
@app.post("/api/chat")
def chat():
message = (request.json or {}).get("message", "").strip()
if not message:
return jsonify({"error": "Empty message"}), 400
tasks = _get_snapshot()
if not tasks:
return jsonify({"response": "No data loaded. Check Airtable credentials."})
from src.claude_client import ClaudeClient
from src.prompts import build_chat_system_prompt
history = _load_history(g.user_email)
history.append({"role": "user", "content": message})
_save_message(g.user_email, "user", message)
system_prompt = build_chat_system_prompt(tasks, _get_resource_snapshot(), user_email=g.user_email)
claude = ClaudeClient(ANTHROPIC_API_KEY)
response_text = claude.chat(messages=history[-20:], system_prompt=system_prompt)
_save_message(g.user_email, "assistant", response_text)
return jsonify({"response": response_text})
@app.post("/api/chat/stream")
def chat_stream():
user_email = g.user_email
message = (request.json or {}).get("message", "").strip()
if not message:
return jsonify({"error": "Empty message"}), 400
tasks = _get_snapshot()
if not tasks:
def _no_data():
yield "data: " + json.dumps({"chunk": "No data loaded. Check Airtable credentials."}) + "\n\n"
yield "data: [DONE]\n\n"
return Response(stream_with_context(_no_data()), mimetype="text/event-stream")
from src.claude_client import ClaudeClient
from src.prompts import build_chat_system_prompt
history = _load_history(user_email)
history.append({"role": "user", "content": message})
_save_message(user_email, "user", message)
system_prompt = build_chat_system_prompt(tasks, _get_resource_snapshot(), user_email=user_email)
claude = ClaudeClient(ANTHROPIC_API_KEY)
def generate():
full_response: list[str] = []
try:
with claude.chat_stream(messages=history[-20:], system_prompt=system_prompt) as stream:
for text in stream.text_stream:
full_response.append(text)
yield "data: " + json.dumps({"chunk": text}) + "\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
yield "data: " + json.dumps({"error": str(e)}) + "\n\n"
yield "data: [DONE]\n\n"
finally:
response_text = "".join(full_response)
if response_text:
_save_message(user_email, "assistant", response_text)
return Response(
stream_with_context(generate()),
mimetype="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
@app.post("/api/generate")
def generate():
tasks = _get_snapshot()
if not tasks:
return jsonify({"error": "No data loaded"}), 500
from src.analyzer import analyze
from src.claude_client import ClaudeClient
from src.prompts import PULSE_SYSTEM_PROMPT, build_full_report_prompt, build_manager_summary_prompt
from src.reporter import build_full_report_docx, build_manager_summary_docx, manager_summary_to_markdown
analysis = analyze(tasks)
ts = datetime.now().strftime("%Y%m%d-%H%M%S")
def _gen_summary():
return ClaudeClient(ANTHROPIC_API_KEY).generate_report(
PULSE_SYSTEM_PROMPT, build_manager_summary_prompt(analysis)
)
def _gen_full():
return ClaudeClient(ANTHROPIC_API_KEY).generate_report(
PULSE_SYSTEM_PROMPT, build_full_report_prompt(analysis)
)
with ThreadPoolExecutor(max_workers=2) as pool:
f_summary = pool.submit(_gen_summary)
f_full = pool.submit(_gen_full)
summary_text = f_summary.result()
full_text = f_full.result()
summary_md = manager_summary_to_markdown(analysis, summary_text)
summary_doc = build_manager_summary_docx(analysis, summary_text, OUTPUT_DIR, f"summary-{ts}.docx")
full_doc = build_full_report_docx(analysis, full_text, OUTPUT_DIR, f"full-{ts}.docx")
with session_scope() as s:
report = Report(
user_email=g.user_email,
summary_md=summary_md,
summary_doc_path=str(summary_doc),
full_doc_path=str(full_doc),
)
s.add(report)
s.flush()
report_id = report.id
return jsonify({"status": "ok", "report_id": report_id})
@app.get("/api/copy/summary")
def copy_summary():
report = _latest_report(g.user_email)
if not report:
return jsonify({"error": "No report generated yet"}), 404
return jsonify({"markdown": report.summary_md})
@app.get("/api/download/summary")
def download_summary():
report = _latest_report(g.user_email)
if not report:
return jsonify({"error": "No report generated yet"}), 404
return _send_report_file(report.summary_doc_path, f"programme-pulse-summary-{date.today()}.docx")
@app.get("/api/download/full")
def download_full():
report = _latest_report(g.user_email)
if not report:
return jsonify({"error": "No report generated yet"}), 404
return _send_report_file(report.full_doc_path, f"programme-pulse-full-{date.today()}.docx")
@app.get("/api/download/history/<int:report_id>/<which>")
def download_history(report_id: int, which: str):
if which not in ("summary", "full"):
return jsonify({"error": "Invalid type"}), 400
with session_scope() as s:
report = s.get(Report, report_id)
if report is None or report.user_email != g.user_email:
return jsonify({"error": "Not found"}), 404
path = report.summary_doc_path if which == "summary" else report.full_doc_path
download_name = f"programme-pulse-{which}-{report.generated_at.strftime('%Y%m%d-%H%M%S')}.docx"
return _send_report_file(path, download_name)
@app.get("/api/history")
def history():
with session_scope() as s:
rows = s.scalars(
select(Report)
.where(Report.user_email == g.user_email)
.order_by(Report.generated_at.desc())
.limit(10)
).all()
runs = [
{
"id": r.id,
"label": r.generated_at.strftime("%-d %b %Y, %H:%M"),
"ts": r.generated_at.strftime("%Y%m%d-%H%M%S"),
}
for r in rows
]
return jsonify({"runs": runs})
@app.get("/api/preferences")
def get_preferences():
from src.preferences import list_preferences
return jsonify({"preferences": list_preferences(g.user_email)})
@app.delete("/api/preferences/<int:pref_id>")
def remove_preference(pref_id: int):
from src.preferences import delete_preference
ok = delete_preference(g.user_email, pref_id)
if not ok:
return jsonify({"error": "Not found"}), 404
return jsonify({"status": "ok"})
@app.post("/api/feedback")
def feedback():
body = request.json or {}
rating = body.get("rating")
message_text = (body.get("message") or "").strip()
if not message_text or rating not in ("up", "down"):
return jsonify({"error": "Invalid feedback"}), 400
from src.claude_client import ClaudeClient
from src.preferences import append_preference
direction = "liked" if rating == "up" else "disliked"
extraction_prompt = f"""The user {direction} this assistant response. Extract one concise preference insight that should guide future responses.
Response the user {direction}:
\"\"\"{message_text}\"\"\"
Write a single plain sentence starting with an action word, e.g. "Prefer...", "Avoid...", "Always...", "Lead with...". No preamble. Max 20 words."""
claude = ClaudeClient(ANTHROPIC_API_KEY)
insight = claude.chat(
messages=[{"role": "user", "content": extraction_prompt}],
system_prompt="You extract concise preference insights from user feedback. One sentence only.",
)
insight = insight.strip()
pref_id = append_preference(g.user_email, insight)
with session_scope() as s:
s.add(FeedbackEvent(
user_email=g.user_email,
rating=rating,
message_text=message_text,
extracted_insight=insight,
preference_id=pref_id,
))
return jsonify({"status": "ok", "insight": insight})
@app.post("/api/refresh")
def refresh():
global _snapshot, _resource_snapshot
with _snapshot_lock:
_snapshot = []
with _resource_lock:
_resource_snapshot = []
_load_snapshot()
_load_resource_snapshot()
with _snapshot_lock:
count = len(_snapshot)
return jsonify({"status": "ok", "tasks": count})
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _load_history(user_email: str) -> list[dict]:
"""Last 20 chat turns for this user, oldest-first, in Anthropic message format."""
if not db_available():
return []
with session_scope() as s:
rows = s.scalars(
select(ChatMessage)
.where(ChatMessage.user_email == user_email)
.order_by(ChatMessage.created_at.desc())
.limit(20)
).all()
return [{"role": r.role, "content": r.content} for r in reversed(rows)]
def _save_message(user_email: str, role: str, content: str) -> None:
if not db_available():
return
with session_scope() as s:
s.add(ChatMessage(user_email=user_email, role=role, content=content))
def _latest_report(user_email: str) -> Report | None:
if not db_available():
return None
with session_scope() as s:
return s.scalars(
select(Report)
.where(Report.user_email == user_email)
.order_by(Report.generated_at.desc())
.limit(1)
).first()
def _send_report_file(path: str, download_name: str):
p = Path(path)
if not p.exists():
return jsonify({"error": "File missing on disk"}), 410
return send_file(str(p), as_attachment=True, download_name=download_name)
if __name__ == "__main__":
port = int(os.getenv("PORT", "5051"))
app.run(host="0.0.0.0", port=port, debug=False)