"""AI assistant service: gap detection, anomaly analysis, tool-use chat.""" from __future__ import annotations import json from collections import defaultdict from datetime import date, datetime, timedelta, timezone from typing import Any, AsyncIterator import structlog from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from src.config import settings from src.models import AiFlag, AssistantMessage, AzureWorkItem, ManualEntry, PlannedBlock, Project, Session, Task, User from src.services.aggregator import _union_hours log = structlog.get_logger() # ── Gap detection constants ─────────────────────────────────────────────────── GAP_THRESHOLD_MINUTES = 30 # gaps between sessions longer than this may be unlogged work LONG_SESSION_HOURS = 8 # single session wall-clock > this is suspicious LONG_DAY_HOURS = 14 # total day wall-clock > this is suspicious RATIO_THRESHOLD = 0.4 # active_hours / wall_clock < 40% → likely thinking time # ── Gap / anomaly detection ─────────────────────────────────────────────────── async def detect_day_anomalies( user: User, check_date: date, db: AsyncSession, ) -> list[dict[str, Any]]: """Analyse a single day and return list of anomaly dicts (not persisted).""" sessions_result = await db.execute( select(Session, Project.display_name) .join(Project, Session.project_id == Project.id) .where(Session.user_id == user.id, Session.date == check_date) .order_by(Session.start_at) ) rows = sessions_result.all() anomalies: list[dict[str, Any]] = [] if not rows: return anomalies intervals = [(s.start_at, s.end_at) for s, _ in rows] total_wall = _union_hours(intervals) # Suspiciously long day if total_wall > LONG_DAY_HOURS: anomalies.append({ "kind": "long_session", "description": f"Day total {total_wall:.1f}h exceeds {LONG_DAY_HOURS}h — verify all entries", "entity_type": "day", "entity_id": check_date.isoformat(), }) # Per-session checks for s, display_name in rows: wall = (s.end_at - s.start_at).total_seconds() / 3600 if wall > LONG_SESSION_HOURS: anomalies.append({ "kind": "long_session", "description": ( f"Session on {display_name} lasted {wall:.1f}h " f"({s.start_at.strftime('%H:%M')}–{s.end_at.strftime('%H:%M')}). " "Consider splitting into coding + thinking/deployment entries." ), "entity_type": "session", "entity_id": s.id, }) if wall > 0 and (s.active_hours or 0) / wall < RATIO_THRESHOLD: anomalies.append({ "kind": "uncategorized", "description": ( f"Session on {display_name}: active_hours ({s.active_hours:.1f}h) is only " f"{s.active_hours / wall * 100:.0f}% of wall-clock ({wall:.1f}h). " "The difference might be thinking/review time — add a manual entry." ), "entity_type": "session", "entity_id": s.id, }) if not s.category: anomalies.append({ "kind": "uncategorized", "description": f"Session on {display_name} ({s.start_at.strftime('%H:%M')}–{s.end_at.strftime('%H:%M')}) has no category.", "entity_type": "session", "entity_id": s.id, }) # Gaps between consecutive sessions sorted_intervals = sorted(intervals, key=lambda x: x[0]) for i in range(1, len(sorted_intervals)): prev_end = sorted_intervals[i - 1][1] curr_start = sorted_intervals[i][0] gap_min = (curr_start - prev_end).total_seconds() / 60 if gap_min >= GAP_THRESHOLD_MINUTES: anomalies.append({ "kind": "gap", "description": ( f"Gap of {gap_min:.0f} min between sessions " f"({prev_end.strftime('%H:%M')}–{curr_start.strftime('%H:%M')}). " "Was this a meeting, call, or thinking time? Consider adding a manual entry." ), "entity_type": "day", "entity_id": check_date.isoformat(), }) return anomalies async def persist_flags( user: User, check_date: date, anomalies: list[dict[str, Any]], db: AsyncSession, ) -> list[AiFlag]: """Upsert-like: clear old unresolved flags for the date then re-insert.""" existing = await db.execute( select(AiFlag).where( AiFlag.user_id == user.id, AiFlag.flag_date == check_date, AiFlag.resolved == False, # noqa: E712 ) ) for flag in existing.scalars().all(): await db.delete(flag) flags = [] for a in anomalies: flag = AiFlag( user_id=user.id, flag_date=check_date, kind=a["kind"], description=a["description"], entity_type=a.get("entity_type"), entity_id=a.get("entity_id"), ) db.add(flag) flags.append(flag) await db.flush() return flags # ── Anthropic tool definitions ──────────────────────────────────────────────── TOOLS: list[dict] = [ { "name": "get_daily_summary", "description": "Get total hours, session count, projects worked, and tasks for a specific date.", "input_schema": { "type": "object", "properties": { "date": {"type": "string", "description": "ISO date YYYY-MM-DD"} }, "required": ["date"], }, }, { "name": "get_sessions", "description": "Get raw session details for a date range including start/end times, projects, summaries, and categories.", "input_schema": { "type": "object", "properties": { "from_date": {"type": "string", "description": "Start date YYYY-MM-DD"}, "to_date": {"type": "string", "description": "End date YYYY-MM-DD"}, }, "required": ["from_date", "to_date"], }, }, { "name": "get_project_stats", "description": "Get total hours logged per project for a date range.", "input_schema": { "type": "object", "properties": { "from_date": {"type": "string", "description": "Start date YYYY-MM-DD"}, "to_date": {"type": "string", "description": "End date YYYY-MM-DD"}, }, "required": ["from_date", "to_date"], }, }, { "name": "detect_anomalies", "description": ( "Analyse a day for time-tracking anomalies: large gaps between sessions, " "sessions without categories, unusually long sessions, low active/wall-clock ratio." ), "input_schema": { "type": "object", "properties": { "date": {"type": "string", "description": "ISO date YYYY-MM-DD"} }, "required": ["date"], }, }, { "name": "create_manual_entry", "description": ( "Create a manual time entry (e.g. for a meeting, deployment, thinking session). " "project_id is optional — pass null if unknown." ), "input_schema": { "type": "object", "properties": { "title": {"type": "string"}, "start_at": {"type": "string", "description": "ISO datetime with TZ, e.g. 2026-05-06T14:00:00+00:00"}, "end_at": {"type": "string", "description": "ISO datetime with TZ"}, "category": { "type": "string", "enum": ["coding", "thinking", "deployment", "meeting", "review", "other"], }, "project_id": {"type": ["string", "null"]}, }, "required": ["title", "start_at", "end_at", "category"], }, }, { "name": "set_session_category", "description": "Set the category of a specific session.", "input_schema": { "type": "object", "properties": { "session_id": {"type": "string"}, "category": { "type": "string", "enum": ["coding", "thinking", "deployment", "meeting", "review", "other"], }, }, "required": ["session_id", "category"], }, }, { "name": "get_unresolved_flags", "description": "Get all unresolved AI-detected anomaly flags for the user.", "input_schema": { "type": "object", "properties": { "days_back": {"type": "integer", "description": "How many days back to look (default 7)"} }, "required": [], }, }, { "name": "list_tasks", "description": "List the user's tasks, optionally filtered by date and/or status.", "input_schema": { "type": "object", "properties": { "date": {"type": "string", "description": "ISO date YYYY-MM-DD (optional)"}, "status": {"type": "string", "enum": ["todo", "doing", "done", "cancelled"], "description": "Optional status filter"}, }, "required": [], }, }, { "name": "create_task", "description": "Create a new task in the planner.", "input_schema": { "type": "object", "properties": { "title": {"type": "string"}, "planned_date": {"type": "string", "description": "ISO date YYYY-MM-DD"}, "estimate_hours": {"type": "number", "description": "Estimated hours (default 1)"}, "priority": {"type": "integer", "description": "1=low, 2=normal, 3=medium, 4=high, 5=critical (default 3)"}, "project_id": {"type": ["string", "null"]}, "notes": {"type": ["string", "null"]}, }, "required": ["title", "planned_date"], }, }, { "name": "update_task", "description": "Update an existing task (title, status, priority, planned_date, notes, estimate_hours).", "input_schema": { "type": "object", "properties": { "task_id": {"type": "string"}, "title": {"type": ["string", "null"]}, "status": {"type": ["string", "null"], "enum": ["todo", "doing", "done", "cancelled"]}, "priority": {"type": ["integer", "null"]}, "planned_date": {"type": ["string", "null"]}, "notes": {"type": ["string", "null"]}, "estimate_hours": {"type": ["number", "null"]}, }, "required": ["task_id"], }, }, { "name": "delete_task", "description": "Delete a task permanently. Use only when user explicitly requests deletion.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "string"}, }, "required": ["task_id"], }, }, { "name": "complete_task", "description": "Mark a task as done and push to Azure DevOps if linked.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "string"}, }, "required": ["task_id"], }, }, { "name": "prioritize_day", "description": "Re-order all tasks for a given day by priority. Updates sort_index so planner shows them in the right order.", "input_schema": { "type": "object", "properties": { "date": {"type": "string", "description": "ISO date YYYY-MM-DD"}, "strategy": { "type": "string", "enum": ["priority_desc", "estimate_asc"], "description": "priority_desc: highest priority first (default). estimate_asc: shortest tasks first.", }, }, "required": ["date"], }, }, { "name": "schedule_task", "description": "Schedule a task at a specific time by creating a planned block.", "input_schema": { "type": "object", "properties": { "task_id": {"type": "string"}, "start_at": {"type": "string", "description": "ISO datetime with TZ"}, "end_at": {"type": "string", "description": "ISO datetime with TZ"}, }, "required": ["task_id", "start_at", "end_at"], }, }, { "name": "auto_schedule_day", "description": "Automatically schedule all unscheduled tasks for a day into working hours, ordered by priority, without overlap.", "input_schema": { "type": "object", "properties": { "date": {"type": "string", "description": "ISO date YYYY-MM-DD"}, "work_start": {"type": "string", "description": "Start of work day HH:MM (default 09:00)"}, "work_end": {"type": "string", "description": "End of work day HH:MM (default 18:00)"}, }, "required": ["date"], }, }, { "name": "list_projects", "description": "List all of the user's projects.", "input_schema": {"type": "object", "properties": {}, "required": []}, }, { "name": "list_manual_entries", "description": "List manual time entries for a specific date.", "input_schema": { "type": "object", "properties": { "date": {"type": "string", "description": "ISO date YYYY-MM-DD"} }, "required": ["date"], }, }, { "name": "delete_manual_entry", "description": "Delete a manual time entry.", "input_schema": { "type": "object", "properties": {"entry_id": {"type": "string"}}, "required": ["entry_id"], }, }, { "name": "generate_report", "description": "Generate a daily or weekly AI report for a specific date.", "input_schema": { "type": "object", "properties": { "type": {"type": "string", "enum": ["daily", "weekly"]}, "date": {"type": "string", "description": "ISO date YYYY-MM-DD"}, }, "required": ["type", "date"], }, }, { "name": "search_sessions", "description": "Search sessions by keyword in work summary.", "input_schema": { "type": "object", "properties": { "query": {"type": "string"}, "from_date": {"type": "string", "description": "Optional ISO date"}, "to_date": {"type": "string", "description": "Optional ISO date"}, }, "required": ["query"], }, }, { "name": "list_work_items", "description": "List Azure DevOps work items synced for the user.", "input_schema": { "type": "object", "properties": { "state": {"type": "string", "description": "Filter by state (optional)"} }, "required": [], }, }, ] # ── Tool execution ──────────────────────────────────────────────────────────── async def execute_tool( tool_name: str, tool_input: dict, user: User, db: AsyncSession, ) -> Any: """Execute an assistant tool call and return a JSON-serialisable result.""" today = datetime.now(timezone.utc).date() if tool_name == "get_daily_summary": d = date.fromisoformat(tool_input["date"]) sessions_result = await db.execute( select(Session, Project.display_name) .join(Project, Session.project_id == Project.id) .where(Session.user_id == user.id, Session.date == d) .order_by(Session.start_at) ) rows = sessions_result.all() intervals = [(s.start_at, s.end_at) for s, _ in rows] total = _union_hours(intervals) + (user.daily_overhead_hours if intervals else 0) tasks_result = await db.execute( select(Task).where(Task.user_id == user.id, Task.planned_date == d) ) tasks = tasks_result.scalars().all() projects: dict[str, float] = {} for s, name in rows: h = (s.end_at - s.start_at).total_seconds() / 3600 projects[name] = projects.get(name, 0) + h return { "date": d.isoformat(), "total_hours": round(total, 2), "session_count": len(rows), "projects": {k: round(v, 2) for k, v in projects.items()}, "tasks_done": sum(1 for t in tasks if t.status == "done"), "tasks_total": len(tasks), } if tool_name == "get_sessions": fd = date.fromisoformat(tool_input["from_date"]) td = date.fromisoformat(tool_input["to_date"]) result = await db.execute( select(Session, Project.display_name, Project.job_number) .join(Project, Session.project_id == Project.id) .where(Session.user_id == user.id, Session.date >= fd, Session.date <= td) .order_by(Session.start_at) ) return [ { "id": s.id, "date": s.date.isoformat(), "project": name, "job_number": job_number, "start": s.start_at.isoformat(), "end": s.end_at.isoformat(), "wall_clock_h": round((s.end_at - s.start_at).total_seconds() / 3600, 2), "active_h": round(s.active_hours, 2), "category": s.category, "summary": s.work_summary[:200] if s.work_summary else "", } for s, name, job_number in result.all() ] if tool_name == "get_project_stats": fd = date.fromisoformat(tool_input["from_date"]) td = date.fromisoformat(tool_input["to_date"]) result = await db.execute( select(Session, Project.display_name, Project.job_number) .join(Project, Session.project_id == Project.id) .where(Session.user_id == user.id, Session.date >= fd, Session.date <= td) ) by_project: dict[str, list] = defaultdict(list) for s, name, job_number in result.all(): label = f"{job_number} {name}".strip() if job_number else name by_project[label].append((s.start_at, s.end_at)) return { label: round(sum((e - st).total_seconds() / 3600 for st, e in intervals), 2) for label, intervals in sorted(by_project.items(), key=lambda x: -sum((e - s).total_seconds() for s, e in x[1])) } if tool_name == "detect_anomalies": d = date.fromisoformat(tool_input["date"]) anomalies = await detect_day_anomalies(user, d, db) return {"date": d.isoformat(), "anomalies": anomalies, "count": len(anomalies)} if tool_name == "create_manual_entry": entry = ManualEntry( user_id=user.id, project_id=tool_input.get("project_id"), start_at=datetime.fromisoformat(tool_input["start_at"]), end_at=datetime.fromisoformat(tool_input["end_at"]), title=tool_input["title"], category=tool_input.get("category"), source="assistant", ) db.add(entry) await db.flush() hours = (entry.end_at - entry.start_at).total_seconds() / 3600 return {"created": entry.id, "hours": round(hours, 2), "title": entry.title} if tool_name == "set_session_category": session = await db.get(Session, tool_input["session_id"]) if not session or session.user_id != user.id: return {"error": "Session not found"} session.category = tool_input["category"] await db.flush() return {"updated": session.id, "category": session.category} if tool_name == "get_unresolved_flags": days_back = int(tool_input.get("days_back", 7)) since = today - timedelta(days=days_back) result = await db.execute( select(AiFlag).where( AiFlag.user_id == user.id, AiFlag.flag_date >= since, AiFlag.resolved == False, # noqa: E712 ).order_by(AiFlag.flag_date.desc()) ) flags = result.scalars().all() return [ { "id": f.id, "date": f.flag_date.isoformat(), "kind": f.kind, "description": f.description, "entity_type": f.entity_type, "entity_id": f.entity_id, } for f in flags ] if tool_name == "list_tasks": query = select(Task).where(Task.user_id == user.id) if tool_input.get("date"): d = date.fromisoformat(tool_input["date"]) query = query.where(Task.planned_date == d) if tool_input.get("status"): query = query.where(Task.status == tool_input["status"]) query = query.order_by(Task.planned_date, Task.sort_index) result = await db.execute(query) tasks = result.scalars().all() return [ { "id": t.id, "title": t.title, "status": t.status, "priority": t.priority, "planned_date": t.planned_date.isoformat(), "estimate_hours": t.estimate_hours, "notes": t.notes or "", } for t in tasks ] if tool_name == "create_task": task = Task( user_id=user.id, title=tool_input["title"], planned_date=date.fromisoformat(tool_input["planned_date"]), estimate_hours=float(tool_input.get("estimate_hours", 1)), priority=int(tool_input.get("priority", 3)), project_id=tool_input.get("project_id"), notes=tool_input.get("notes") or "", status="todo", ) db.add(task) await db.flush() return {"id": task.id, "title": task.title, "planned_date": task.planned_date.isoformat(), "status": task.status, "priority": task.priority} if tool_name == "update_task": task = await db.get(Task, tool_input["task_id"]) if not task or task.user_id != user.id: return {"error": "Task not found"} for field in ("title", "status", "priority", "notes", "estimate_hours"): if tool_input.get(field) is not None: setattr(task, field, tool_input[field]) if tool_input.get("planned_date"): task.planned_date = date.fromisoformat(tool_input["planned_date"]) await db.flush() return {"updated": task.id, "title": task.title, "status": task.status} if tool_name == "delete_task": task = await db.get(Task, tool_input["task_id"]) if not task or task.user_id != user.id: return {"error": "Task not found"} await db.delete(task) await db.flush() return {"deleted": tool_input["task_id"]} if tool_name == "complete_task": task = await db.get(Task, tool_input["task_id"]) if not task or task.user_id != user.id: return {"error": "Task not found"} task.status = "done" task.completed_at = datetime.now(timezone.utc) await db.flush() return {"completed": task.id, "title": task.title} if tool_name == "prioritize_day": d = date.fromisoformat(tool_input["date"]) strategy = tool_input.get("strategy", "priority_desc") result = await db.execute( select(Task).where(Task.user_id == user.id, Task.planned_date == d) ) tasks = list(result.scalars().all()) if strategy == "estimate_asc": tasks.sort(key=lambda t: t.estimate_hours) else: tasks.sort(key=lambda t: -t.priority) for i, task in enumerate(tasks): task.sort_index = i await db.flush() return {"reordered": len(tasks), "order": [t.title for t in tasks]} if tool_name == "schedule_task": task = await db.get(Task, tool_input["task_id"]) if not task or task.user_id != user.id: return {"error": "Task not found"} block = PlannedBlock( user_id=user.id, task_id=task.id, project_id=task.project_id, start_at=datetime.fromisoformat(tool_input["start_at"]), end_at=datetime.fromisoformat(tool_input["end_at"]), ) db.add(block) await db.flush() return {"block_id": block.id, "start_at": block.start_at.isoformat(), "end_at": block.end_at.isoformat()} if tool_name == "auto_schedule_day": from datetime import time as dtime d = date.fromisoformat(tool_input["date"]) work_start_str = tool_input.get("work_start", "09:00") work_end_str = tool_input.get("work_end", "18:00") ws_h, ws_m = map(int, work_start_str.split(":")) we_h, we_m = map(int, work_end_str.split(":")) result = await db.execute( select(Task).where(Task.user_id == user.id, Task.planned_date == d) .order_by(Task.priority.desc(), Task.sort_index) ) tasks = list(result.scalars().all()) # Find which tasks already have blocks for this day existing_blocks_result = await db.execute( select(PlannedBlock).where( PlannedBlock.user_id == user.id, PlannedBlock.start_at >= datetime(d.year, d.month, d.day, tzinfo=timezone.utc), PlannedBlock.start_at < datetime(d.year, d.month, d.day, 23, 59, 59, tzinfo=timezone.utc), ) ) scheduled_task_ids = {b.task_id for b in existing_blocks_result.scalars().all()} current_time = datetime(d.year, d.month, d.day, ws_h, ws_m, tzinfo=timezone.utc) work_end_dt = datetime(d.year, d.month, d.day, we_h, we_m, tzinfo=timezone.utc) scheduled = [] for task in tasks: if task.id in scheduled_task_ids: continue duration_hours = task.estimate_hours if task.estimate_hours > 0 else 1.0 end_time = current_time + timedelta(hours=duration_hours) if end_time > work_end_dt: break block = PlannedBlock( user_id=user.id, task_id=task.id, project_id=task.project_id, start_at=current_time, end_at=end_time, ) db.add(block) scheduled.append({ "title": task.title, "start": current_time.strftime("%H:%M"), "end": end_time.strftime("%H:%M"), }) current_time = end_time await db.flush() return {"scheduled": len(scheduled), "slots": scheduled} if tool_name == "list_projects": result = await db.execute( select(Project).where(Project.user_id == user.id).order_by(Project.display_name) ) projects = result.scalars().all() return [ {"id": p.id, "display_name": p.display_name, "client": p.client or "", "job_number": p.job_number or "", "slug": p.slug} for p in projects ] if tool_name == "list_manual_entries": d = date.fromisoformat(tool_input["date"]) result = await db.execute( select(ManualEntry).where( ManualEntry.user_id == user.id, ManualEntry.start_at >= datetime(d.year, d.month, d.day, tzinfo=timezone.utc), ManualEntry.start_at < datetime(d.year, d.month, d.day, 23, 59, 59, tzinfo=timezone.utc), ) ) entries = result.scalars().all() return [ { "id": e.id, "title": e.title, "start_at": e.start_at.isoformat(), "end_at": e.end_at.isoformat(), "category": e.category, "hours": round((e.end_at - e.start_at).total_seconds() / 3600, 2), } for e in entries ] if tool_name == "delete_manual_entry": entry = await db.get(ManualEntry, tool_input["entry_id"]) if not entry or entry.user_id != user.id: return {"error": "Entry not found"} await db.delete(entry) await db.flush() return {"deleted": tool_input["entry_id"]} if tool_name == "generate_report": from src.services.ai_reports import ( generate_and_send_daily_report, generate_and_send_weekly_report, ) report_type = tool_input["type"] report_date = date.fromisoformat(tool_input["date"]) if report_type == "daily": report = await generate_and_send_daily_report(user, report_date, db) else: report = await generate_and_send_weekly_report(user, report_date, db) if report is None: return {"error": "No data found for the given period"} return { "report_id": report.id, "content_preview": report.content_markdown[:500], } if tool_name == "search_sessions": query_str = tool_input["query"] query = select(Session, Project.display_name).join(Project, Session.project_id == Project.id).where( Session.user_id == user.id, Session.work_summary.ilike(f"%{query_str}%"), ) if tool_input.get("from_date"): query = query.where(Session.date >= date.fromisoformat(tool_input["from_date"])) if tool_input.get("to_date"): query = query.where(Session.date <= date.fromisoformat(tool_input["to_date"])) query = query.order_by(Session.date.desc()).limit(10) result = await db.execute(query) return [ { "id": s.id, "date": s.date.isoformat(), "project": name, "summary_excerpt": (s.work_summary or "")[:200], } for s, name in result.all() ] if tool_name == "list_work_items": query = select(AzureWorkItem).where(AzureWorkItem.user_id == user.id) if tool_input.get("state"): query = query.where(AzureWorkItem.state == tool_input["state"]) result = await db.execute(query.order_by(AzureWorkItem.ado_id)) items = result.scalars().all() return [ { "id": wi.id, "ado_id": wi.ado_id, "title": wi.title, "type": wi.type, "state": wi.state, "url": wi.url, } for wi in items ] return {"error": f"Unknown tool: {tool_name}"} # ── Streaming chat ──────────────────────────────────────────────────────────── SYSTEM_PROMPT = """You are a powerful AI assistant for CC Dashboard — a personal productivity dashboard for a developer/manager at Oliver Agency. You have FULL access to read and modify the user's data: sessions, tasks, projects, manual entries, and DevOps work items. Your capabilities: - Read time-tracking data, sessions, projects, tasks - Detect anomalies and suggest improvements - Create, update, delete, and complete tasks - Schedule tasks into planned time blocks - Auto-arrange the daily plan by priority - Categorize sessions and create manual time entries - Generate AI reports - Search through sessions - List and manage Azure DevOps work items Always be proactive: when asked about a day, also check for anomalies. Before performing any DESTRUCTIVE action (delete, mass-reschedule, complete), state exactly what you're about to do and wait for context that confirms the user wants this. After any modification, briefly summarize what changed. Respond concisely. Use Markdown for structure when helpful. For scheduling tasks, use the user's work hours (default 9-18) unless specified otherwise. Today's date: {today} User's daily overhead: {overhead}h/day """ async def chat_stream( user: User, user_message: str, db: AsyncSession, ) -> AsyncIterator[str]: """ Stream SSE events for the chat response. Yields strings formatted as SSE data lines. Uses Anthropic tool_use loop with up to 5 rounds. Falls back to a static error message if no API key. """ if not settings.ANTHROPIC_API_KEY: yield f"data: {json.dumps({'type': 'text', 'text': 'Anthropic API key not configured.'})}\n\n" yield "data: [DONE]\n\n" return import anthropic # Persist user message user_msg_rec = AssistantMessage( user_id=user.id, role="user", content=user_message, ) db.add(user_msg_rec) await db.flush() # Load recent history (last 20 turns to stay within context) history_result = await db.execute( select(AssistantMessage) .where(AssistantMessage.user_id == user.id) .order_by(AssistantMessage.created_at.desc()) .limit(20) ) history = list(reversed(history_result.scalars().all())) messages: list[dict] = [] for m in history: if m.tool_calls: # assistant message with tool_use messages.append({"role": "assistant", "content": json.loads(m.tool_calls) if isinstance(m.tool_calls, str) else m.tool_calls}) else: messages.append({"role": m.role, "content": m.content}) today = datetime.now(timezone.utc).date() client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) system = SYSTEM_PROMPT.format(today=today.isoformat(), overhead=user.daily_overhead_hours) full_response_text = "" tool_calls_log: list[dict] = [] try: # Agentic loop — up to 10 tool-use rounds for _round in range(10): response = await client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, system=system, tools=TOOLS, messages=messages, ) # Collect text from this round; build proper assistant message for next round round_text = "" tool_results: list[dict] = [] # tool_result blocks for the user turn assistant_content: list[dict] = [] # full content for the assistant turn for block in response.content: if block.type == "text": round_text += block.text full_response_text += block.text assistant_content.append({"type": "text", "text": block.text}) yield f"data: {json.dumps({'type': 'text', 'text': block.text})}\n\n" elif block.type == "tool_use": tool_name = block.name tool_input = block.input assistant_content.append({"type": "tool_use", "id": block.id, "name": tool_name, "input": tool_input}) yield f"data: {json.dumps({'type': 'tool_start', 'tool': tool_name})}\n\n" try: result = await execute_tool(tool_name, tool_input, user, db) await db.flush() except Exception as exc: log.warning("assistant.tool_error", tool=tool_name, error=str(exc)) result = {"error": str(exc)} tool_calls_log.append({"tool": tool_name, "input": tool_input, "result": result}) tool_results.append({"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result)}) yield f"data: {json.dumps({'type': 'tool_result', 'tool': tool_name, 'result': result})}\n\n" if not tool_results: # No tool calls this round — conversation is done break # Add the full assistant turn (text + all tool_use blocks) as one message messages.append({"role": "assistant", "content": assistant_content}) # Collect all tool_results into one user turn (Anthropic requirement) messages.append({"role": "user", "content": tool_results}) # Persist assistant response if full_response_text or tool_calls_log: asst_msg = AssistantMessage( user_id=user.id, role="assistant", content=full_response_text, tool_calls=tool_calls_log if tool_calls_log else None, ) db.add(asst_msg) await db.commit() except Exception as exc: await db.rollback() log.error("assistant.chat_error", user_id=user.id, error=str(exc)) yield f"data: {json.dumps({'type': 'error', 'text': 'Assistant error — please try again.'})}\n\n" finally: yield "data: [DONE]\n\n"