from __future__ import annotations import json import os import threading from concurrent.futures import ThreadPoolExecutor from datetime import date, datetime from pathlib import Path from dotenv import load_dotenv from flask import Flask, Response, g, jsonify, request, send_file, stream_with_context from flask_cors import CORS from sqlalchemy import select from src.auth import AuthError, dev_bypass_enabled, dev_user_claims, validate_bearer_token from src.db import ChatMessage, FeedbackEvent, Preference, Report, db_available, session_scope load_dotenv() app = Flask(__name__) # CORS — frontend lives at /programme-pulse/ on optical-dev (same origin in prod via Apache), # and at http://localhost:5173 in dev. Authorization header must be allowed. CORS( app, origins=[ "https://optical-dev.oliver.solutions", "http://localhost:5173", "http://127.0.0.1:5173", ], allow_headers=["Authorization", "Content-Type"], expose_headers=["Content-Disposition"], supports_credentials=False, ) PULSE_API_KEY = os.getenv("PULSE_AIRTABLE_API_KEY", "") PULSE_BASE_ID = os.getenv("PULSE_AIRTABLE_BASE_ID", "") PULSE_TABLE_ID = os.getenv("PULSE_AIRTABLE_TABLE_ID", "") RESOURCE_API_KEY = os.getenv("PULSE_RESOURCE_API_KEY", "") or PULSE_API_KEY RESOURCE_BASE_ID = os.getenv("PULSE_RESOURCE_BASE_ID", "") RESOURCE_TABLE_ID = os.getenv("PULSE_RESOURCE_TABLE_ID", "") ANTHROPIC_API_KEY = os.getenv("ANTHROPIC_API_KEY", "") OUTPUT_DIR = Path("reports") OUTPUT_DIR.mkdir(exist_ok=True) # Airtable caches — not user data, fine to keep in-process. /api/refresh rebuilds them. _snapshot: list[dict] = [] _snapshot_lock = threading.Lock() _resource_snapshot: list[dict] = [] _resource_lock = threading.Lock() def _load_snapshot(): global _snapshot if not PULSE_API_KEY: return from src.airtable_client import PulseAirtableClient try: client = PulseAirtableClient(PULSE_API_KEY, PULSE_BASE_ID, PULSE_TABLE_ID) data = client.fetch_all_tasks() except Exception as e: app.logger.warning("Airtable tasks load failed: %s", e) return with _snapshot_lock: _snapshot = data def _get_snapshot() -> list[dict]: with _snapshot_lock: if _snapshot: return _snapshot _load_snapshot() with _snapshot_lock: return _snapshot def _load_resource_snapshot(): global _resource_snapshot if not RESOURCE_API_KEY or not RESOURCE_BASE_ID: return from src.airtable_client import ResourceAirtableClient try: client = ResourceAirtableClient(RESOURCE_API_KEY, RESOURCE_BASE_ID, RESOURCE_TABLE_ID) data = client.fetch_all_bookings() except Exception as e: app.logger.warning("Airtable bookings load failed: %s", e) return with _resource_lock: _resource_snapshot = data def _get_resource_snapshot() -> list[dict]: with _resource_lock: if _resource_snapshot: return _resource_snapshot _load_resource_snapshot() with _resource_lock: return _resource_snapshot # Eagerly load both data sources in background on startup threading.Thread(target=_load_snapshot, daemon=True).start() threading.Thread(target=_load_resource_snapshot, daemon=True).start() # --------------------------------------------------------------------------- # Auth gate # --------------------------------------------------------------------------- PUBLIC_PATHS = {"/api/health"} @app.before_request def _auth_gate(): if request.method == "OPTIONS": return None if request.path in PUBLIC_PATHS: return None if not request.path.startswith("/api/"): # We only serve API routes; the SPA is served by Apache from /var/www/html. return jsonify({"error": "Not found"}), 404 if dev_bypass_enabled(): claims = dev_user_claims() else: header = request.headers.get("Authorization", "") if not header.startswith("Bearer "): return jsonify({"error": "Missing Authorization: Bearer header"}), 401 token = header[7:].strip() try: claims = validate_bearer_token(token) except AuthError as e: return jsonify({"error": str(e)}), e.status_code g.user_email = claims["_email"] g.user_name = claims.get("_name") or claims["_email"] return None # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.get("/api/health") def health(): return jsonify({ "status": "ok", "db": db_available(), "tasks_loaded": len(_snapshot), "bookings_loaded": len(_resource_snapshot), }) @app.get("/api/me") def me(): return jsonify({"email": g.user_email, "name": g.user_name}) @app.post("/api/chat") def chat(): message = (request.json or {}).get("message", "").strip() if not message: return jsonify({"error": "Empty message"}), 400 tasks = _get_snapshot() if not tasks: return jsonify({"response": "No data loaded. Check Airtable credentials."}) from src.claude_client import ClaudeClient from src.prompts import build_chat_system_prompt history = _load_history(g.user_email) history.append({"role": "user", "content": message}) _save_message(g.user_email, "user", message) system_prompt = build_chat_system_prompt(tasks, _get_resource_snapshot(), user_email=g.user_email) claude = ClaudeClient(ANTHROPIC_API_KEY) response_text = claude.chat(messages=history[-20:], system_prompt=system_prompt) _save_message(g.user_email, "assistant", response_text) return jsonify({"response": response_text}) @app.post("/api/chat/stream") def chat_stream(): user_email = g.user_email message = (request.json or {}).get("message", "").strip() if not message: return jsonify({"error": "Empty message"}), 400 tasks = _get_snapshot() if not tasks: def _no_data(): yield "data: " + json.dumps({"chunk": "No data loaded. Check Airtable credentials."}) + "\n\n" yield "data: [DONE]\n\n" return Response(stream_with_context(_no_data()), mimetype="text/event-stream") from src.claude_client import ClaudeClient from src.prompts import build_chat_system_prompt history = _load_history(user_email) history.append({"role": "user", "content": message}) _save_message(user_email, "user", message) system_prompt = build_chat_system_prompt(tasks, _get_resource_snapshot(), user_email=user_email) claude = ClaudeClient(ANTHROPIC_API_KEY) def generate(): full_response: list[str] = [] try: with claude.chat_stream(messages=history[-20:], system_prompt=system_prompt) as stream: for text in stream.text_stream: full_response.append(text) yield "data: " + json.dumps({"chunk": text}) + "\n\n" yield "data: [DONE]\n\n" except Exception as e: yield "data: " + json.dumps({"error": str(e)}) + "\n\n" yield "data: [DONE]\n\n" finally: response_text = "".join(full_response) if response_text: _save_message(user_email, "assistant", response_text) return Response( stream_with_context(generate()), mimetype="text/event-stream", headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"}, ) @app.post("/api/generate") def generate(): tasks = _get_snapshot() if not tasks: return jsonify({"error": "No data loaded"}), 500 from src.analyzer import analyze from src.claude_client import ClaudeClient from src.prompts import PULSE_SYSTEM_PROMPT, build_full_report_prompt, build_manager_summary_prompt from src.reporter import build_full_report_docx, build_manager_summary_docx, manager_summary_to_markdown analysis = analyze(tasks) ts = datetime.now().strftime("%Y%m%d-%H%M%S") def _gen_summary(): return ClaudeClient(ANTHROPIC_API_KEY).generate_report( PULSE_SYSTEM_PROMPT, build_manager_summary_prompt(analysis) ) def _gen_full(): return ClaudeClient(ANTHROPIC_API_KEY).generate_report( PULSE_SYSTEM_PROMPT, build_full_report_prompt(analysis) ) with ThreadPoolExecutor(max_workers=2) as pool: f_summary = pool.submit(_gen_summary) f_full = pool.submit(_gen_full) summary_text = f_summary.result() full_text = f_full.result() summary_md = manager_summary_to_markdown(analysis, summary_text) summary_doc = build_manager_summary_docx(analysis, summary_text, OUTPUT_DIR, f"summary-{ts}.docx") full_doc = build_full_report_docx(analysis, full_text, OUTPUT_DIR, f"full-{ts}.docx") with session_scope() as s: report = Report( user_email=g.user_email, summary_md=summary_md, summary_doc_path=str(summary_doc), full_doc_path=str(full_doc), ) s.add(report) s.flush() report_id = report.id return jsonify({"status": "ok", "report_id": report_id}) @app.get("/api/copy/summary") def copy_summary(): report = _latest_report(g.user_email) if not report: return jsonify({"error": "No report generated yet"}), 404 return jsonify({"markdown": report.summary_md}) @app.get("/api/download/summary") def download_summary(): report = _latest_report(g.user_email) if not report: return jsonify({"error": "No report generated yet"}), 404 return _send_report_file(report.summary_doc_path, f"programme-pulse-summary-{date.today()}.docx") @app.get("/api/download/full") def download_full(): report = _latest_report(g.user_email) if not report: return jsonify({"error": "No report generated yet"}), 404 return _send_report_file(report.full_doc_path, f"programme-pulse-full-{date.today()}.docx") @app.get("/api/download/history//") def download_history(report_id: int, which: str): if which not in ("summary", "full"): return jsonify({"error": "Invalid type"}), 400 with session_scope() as s: report = s.get(Report, report_id) if report is None or report.user_email != g.user_email: return jsonify({"error": "Not found"}), 404 path = report.summary_doc_path if which == "summary" else report.full_doc_path download_name = f"programme-pulse-{which}-{report.generated_at.strftime('%Y%m%d-%H%M%S')}.docx" return _send_report_file(path, download_name) @app.get("/api/history") def history(): with session_scope() as s: rows = s.scalars( select(Report) .where(Report.user_email == g.user_email) .order_by(Report.generated_at.desc()) .limit(10) ).all() runs = [ { "id": r.id, "label": r.generated_at.strftime("%-d %b %Y, %H:%M"), "ts": r.generated_at.strftime("%Y%m%d-%H%M%S"), } for r in rows ] return jsonify({"runs": runs}) @app.get("/api/preferences") def get_preferences(): from src.preferences import list_preferences return jsonify({"preferences": list_preferences(g.user_email)}) @app.delete("/api/preferences/") def remove_preference(pref_id: int): from src.preferences import delete_preference ok = delete_preference(g.user_email, pref_id) if not ok: return jsonify({"error": "Not found"}), 404 return jsonify({"status": "ok"}) @app.post("/api/feedback") def feedback(): body = request.json or {} rating = body.get("rating") message_text = (body.get("message") or "").strip() if not message_text or rating not in ("up", "down"): return jsonify({"error": "Invalid feedback"}), 400 from src.claude_client import ClaudeClient from src.preferences import append_preference direction = "liked" if rating == "up" else "disliked" extraction_prompt = f"""The user {direction} this assistant response. Extract one concise preference insight that should guide future responses. Response the user {direction}: \"\"\"{message_text}\"\"\" Write a single plain sentence starting with an action word, e.g. "Prefer...", "Avoid...", "Always...", "Lead with...". No preamble. Max 20 words.""" claude = ClaudeClient(ANTHROPIC_API_KEY) insight = claude.chat( messages=[{"role": "user", "content": extraction_prompt}], system_prompt="You extract concise preference insights from user feedback. One sentence only.", ) insight = insight.strip() pref_id = append_preference(g.user_email, insight) with session_scope() as s: s.add(FeedbackEvent( user_email=g.user_email, rating=rating, message_text=message_text, extracted_insight=insight, preference_id=pref_id, )) return jsonify({"status": "ok", "insight": insight}) @app.post("/api/refresh") def refresh(): global _snapshot, _resource_snapshot with _snapshot_lock: _snapshot = [] with _resource_lock: _resource_snapshot = [] _load_snapshot() _load_resource_snapshot() with _snapshot_lock: count = len(_snapshot) return jsonify({"status": "ok", "tasks": count}) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _load_history(user_email: str) -> list[dict]: """Last 20 chat turns for this user, oldest-first, in Anthropic message format.""" if not db_available(): return [] with session_scope() as s: rows = s.scalars( select(ChatMessage) .where(ChatMessage.user_email == user_email) .order_by(ChatMessage.created_at.desc()) .limit(20) ).all() return [{"role": r.role, "content": r.content} for r in reversed(rows)] def _save_message(user_email: str, role: str, content: str) -> None: if not db_available(): return with session_scope() as s: s.add(ChatMessage(user_email=user_email, role=role, content=content)) def _latest_report(user_email: str) -> Report | None: if not db_available(): return None with session_scope() as s: return s.scalars( select(Report) .where(Report.user_email == user_email) .order_by(Report.generated_at.desc()) .limit(1) ).first() def _send_report_file(path: str, download_name: str): p = Path(path) if not p.exists(): return jsonify({"error": "File missing on disk"}), 410 return send_file(str(p), as_attachment=True, download_name=download_name) if __name__ == "__main__": port = int(os.getenv("PORT", "5051")) app.run(host="0.0.0.0", port=port, debug=False)