diff --git a/alembic/versions/0005_session_category.py b/alembic/versions/0005_session_category.py new file mode 100644 index 0000000..ac9b385 --- /dev/null +++ b/alembic/versions/0005_session_category.py @@ -0,0 +1,104 @@ +"""Add session category and time_gaps view for accurate time accounting + +Revision ID: 0005 +Revises: 0004 +Create Date: 2026-05-06 +""" +import sqlalchemy as sa +from alembic import op + +revision = "0005" +down_revision = "0004" +branch_labels = None +depends_on = None + +CATEGORIES = ("coding", "thinking", "deployment", "meeting", "review", "other") + + +def upgrade(): + # Add category to sessions — defaults to NULL (uncategorized) + op.add_column( + "sessions", + sa.Column( + "category", + sa.String(20), + nullable=True, + comment="coding|thinking|deployment|meeting|review|other", + ), + ) + op.create_index("ix_sessions_category", "sessions", ["user_id", "category"]) + + # Add category to manual_entries as well + op.add_column( + "manual_entries", + sa.Column( + "category", + sa.String(20), + nullable=True, + server_default="other", + ), + ) + + # ai_flags: anomalies/suggestions surfaced by the assistant + op.create_table( + "ai_flags", + sa.Column("id", sa.String(36), primary_key=True), + sa.Column( + "user_id", + sa.String(36), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + index=True, + ), + sa.Column("flag_date", sa.Date, nullable=False), + sa.Column( + "kind", + sa.String(30), + nullable=False, + comment="gap|long_session|uncategorized|missing_overhead", + ), + sa.Column("description", sa.Text, nullable=False, server_default=""), + sa.Column("entity_type", sa.String(20), nullable=True), # session|manual_entry|day + sa.Column("entity_id", sa.String(36), nullable=True), + sa.Column("resolved", sa.Boolean, nullable=False, server_default="false"), + sa.Column("resolved_at", sa.DateTime(timezone=True), nullable=True), + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now() + ), + ) + op.create_index("ix_ai_flags_user_date", "ai_flags", ["user_id", "flag_date"]) + op.create_index("ix_ai_flags_resolved", "ai_flags", ["user_id", "resolved"]) + + # assistant_messages: persisted chat history per user + op.create_table( + "assistant_messages", + sa.Column("id", sa.String(36), primary_key=True), + sa.Column( + "user_id", + sa.String(36), + sa.ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + index=True, + ), + sa.Column("role", sa.String(10), nullable=False), # user|assistant + sa.Column("content", sa.Text, nullable=False), + sa.Column("tool_calls", sa.JSON, nullable=True), # tool use metadata + sa.Column( + "created_at", sa.DateTime(timezone=True), server_default=sa.func.now() + ), + ) + op.create_index("ix_assistant_messages_user", "assistant_messages", ["user_id", "created_at"]) + + +def downgrade(): + op.drop_index("ix_assistant_messages_user", "assistant_messages") + op.drop_table("assistant_messages") + + op.drop_index("ix_ai_flags_resolved", "ai_flags") + op.drop_index("ix_ai_flags_user_date", "ai_flags") + op.drop_table("ai_flags") + + op.drop_column("manual_entries", "category") + + op.drop_index("ix_sessions_category", "sessions") + op.drop_column("sessions", "category") diff --git a/src/main.py b/src/main.py index e71ad36..ec87069 100644 --- a/src/main.py +++ b/src/main.py @@ -23,7 +23,7 @@ def _ensure_static_dir() -> None: ) from src.middleware.logging import LoggingMiddleware from src.routers import admin, auth, dashboard, events, ingest, keys, projects -from src.routers import calendar, tasks, manual_entries, budgets, tags, devops, exports, reports +from src.routers import calendar, tasks, manual_entries, budgets, tags, devops, exports, reports, assistant from src.services.scheduler import scheduler, setup_scheduler BASE = settings.BASE_PATH @@ -92,6 +92,7 @@ for router in [ devops.router, exports.router, reports.router, + assistant.router, ]: app.include_router(router) diff --git a/src/models.py b/src/models.py index 52fac28..af16d18 100644 --- a/src/models.py +++ b/src/models.py @@ -97,6 +97,7 @@ class Session(Base): files_changed: Mapped[list] = mapped_column(JSONB, default=list) raw_stats: Mapped[dict] = mapped_column(JSONB, default=dict) task_id: Mapped[str | None] = mapped_column(UUID(as_uuid=False), ForeignKey("tasks.id", ondelete="SET NULL"), nullable=True, index=True) + category: Mapped[str | None] = mapped_column(String(20), nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) user: Mapped["User"] = relationship(back_populates="sessions") @@ -176,6 +177,7 @@ class ManualEntry(Base): title: Mapped[str] = mapped_column(String(500), nullable=False) notes: Mapped[str] = mapped_column(Text, default="") source: Mapped[str] = mapped_column(String(20), default="manual") + category: Mapped[str | None] = mapped_column(String(20), nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) project: Mapped["Project | None"] = relationship(foreign_keys=[project_id]) @@ -268,3 +270,35 @@ class AuditLog(Base): entity_id: Mapped[str] = mapped_column(String(64), default="") payload_json: Mapped[dict] = mapped_column(JSONB, default=dict) created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), index=True) + + +class AiFlag(Base): + """Anomaly/suggestion surfaced by the AI assistant.""" + __tablename__ = "ai_flags" + + id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=new_uuid) + user_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True) + flag_date: Mapped[date] = mapped_column(Date, nullable=False) + kind: Mapped[str] = mapped_column(String(30), nullable=False) # gap|long_session|uncategorized|missing_overhead + description: Mapped[str] = mapped_column(Text, default="") + entity_type: Mapped[str | None] = mapped_column(String(20), nullable=True) # session|manual_entry|day + entity_id: Mapped[str | None] = mapped_column(String(36), nullable=True) + resolved: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False) + resolved_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True), nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + user: Mapped["User"] = relationship(foreign_keys=[user_id]) + + +class AssistantMessage(Base): + """Persisted chat history for the AI assistant per user.""" + __tablename__ = "assistant_messages" + + id: Mapped[str] = mapped_column(UUID(as_uuid=False), primary_key=True, default=new_uuid) + user_id: Mapped[str] = mapped_column(UUID(as_uuid=False), ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True) + role: Mapped[str] = mapped_column(String(10), nullable=False) # user|assistant + content: Mapped[str] = mapped_column(Text, nullable=False) + tool_calls: Mapped[dict | None] = mapped_column(JSONB, nullable=True) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + + user: Mapped["User"] = relationship(foreign_keys=[user_id]) diff --git a/src/routers/assistant.py b/src/routers/assistant.py new file mode 100644 index 0000000..333edb7 --- /dev/null +++ b/src/routers/assistant.py @@ -0,0 +1,139 @@ +"""AI assistant router — streaming chat + flag management + session categorization.""" +import logging +from datetime import date, datetime, timezone + +from fastapi import APIRouter, Depends, HTTPException, Response +from fastapi.responses import StreamingResponse +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.auth import CurrentUser +from src.database import get_db +from src.models import AiFlag, AssistantMessage, Session +from src.schemas import ( + AiFlagOut, + AssistantChatIn, + AssistantMessageOut, + SessionCategoryIn, +) +from src.services.assistant import chat_stream, detect_day_anomalies, persist_flags + +log = logging.getLogger(__name__) + +router = APIRouter(prefix="/api/assistant", tags=["assistant"]) + + +@router.post("/chat") +async def chat( + body: AssistantChatIn, + user: CurrentUser, + db: AsyncSession = Depends(get_db), +) -> StreamingResponse: + """Stream chat response as SSE. Each event is a JSON object on a `data:` line.""" + return StreamingResponse( + chat_stream(user, body.message, db), + media_type="text/event-stream", + headers={ + "Cache-Control": "no-cache", + "X-Accel-Buffering": "no", + }, + ) + + +@router.get("/history", response_model=list[AssistantMessageOut]) +async def get_history( + user: CurrentUser, + limit: int = 50, + db: AsyncSession = Depends(get_db), +) -> list[AssistantMessage]: + result = await db.execute( + select(AssistantMessage) + .where(AssistantMessage.user_id == user.id) + .order_by(AssistantMessage.created_at.desc()) + .limit(limit) + ) + return list(reversed(result.scalars().all())) + + +@router.delete("/history", status_code=204) +async def clear_history( + user: CurrentUser, + db: AsyncSession = Depends(get_db), +) -> Response: + result = await db.execute( + select(AssistantMessage).where(AssistantMessage.user_id == user.id) + ) + for msg in result.scalars().all(): + await db.delete(msg) + await db.commit() + return Response(status_code=204) + + +# ── Flags ───────────────────────────────────────────────────────────────────── + +@router.get("/flags", response_model=list[AiFlagOut]) +async def list_flags( + user: CurrentUser, + days_back: int = 14, + resolved: bool = False, + db: AsyncSession = Depends(get_db), +) -> list[AiFlag]: + from datetime import timedelta + since = datetime.now(timezone.utc).date() - timedelta(days=days_back) + q = ( + select(AiFlag) + .where( + AiFlag.user_id == user.id, + AiFlag.flag_date >= since, + AiFlag.resolved == resolved, + ) + .order_by(AiFlag.flag_date.desc(), AiFlag.created_at.desc()) + ) + result = await db.execute(q) + return result.scalars().all() + + +@router.post("/flags/scan", response_model=list[AiFlagOut]) +async def scan_day( + check_date: date, + user: CurrentUser, + db: AsyncSession = Depends(get_db), +) -> list[AiFlag]: + """Run anomaly detection for a specific date and persist results.""" + anomalies = await detect_day_anomalies(user, check_date, db) + flags = await persist_flags(user, check_date, anomalies, db) + await db.commit() + return flags + + +@router.patch("/flags/{flag_id}/resolve", response_model=AiFlagOut) +async def resolve_flag( + flag_id: str, + user: CurrentUser, + db: AsyncSession = Depends(get_db), +) -> AiFlag: + flag = await db.get(AiFlag, flag_id) + if not flag or flag.user_id != user.id: + raise HTTPException(status_code=404, detail="Flag not found") + flag.resolved = True + flag.resolved_at = datetime.now(timezone.utc) + await db.commit() + await db.refresh(flag) + return flag + + +# ── Session categorization ──────────────────────────────────────────────────── + +@router.patch("/sessions/{session_id}/category", response_model=dict) +async def set_session_category( + session_id: str, + body: SessionCategoryIn, + user: CurrentUser, + db: AsyncSession = Depends(get_db), +) -> dict: + session = await db.get(Session, session_id) + if not session or session.user_id != user.id: + raise HTTPException(status_code=404, detail="Session not found") + session.category = body.category + await db.commit() + return {"id": session.id, "category": session.category} diff --git a/src/schemas.py b/src/schemas.py index 9ef7607..1449522 100644 --- a/src/schemas.py +++ b/src/schemas.py @@ -405,3 +405,36 @@ class AiReportOut(BaseModel): class GenerateReportIn(BaseModel): type: str = Field(pattern="^(daily|weekly)$") date: date + + +# ── AI Assistant ────────────────────────────────────────────────────────────── + +class AiFlagOut(BaseModel): + id: str + flag_date: date + kind: str + description: str + entity_type: str | None + entity_id: str | None + resolved: bool + resolved_at: datetime | None + created_at: datetime + + model_config = {"from_attributes": True} + + +class AssistantMessageOut(BaseModel): + id: str + role: str + content: str + created_at: datetime + + model_config = {"from_attributes": True} + + +class AssistantChatIn(BaseModel): + message: str = Field(min_length=1, max_length=4000) + + +class SessionCategoryIn(BaseModel): + category: str | None = Field(default=None, pattern="^(coding|thinking|deployment|meeting|review|other)?$") diff --git a/src/services/assistant.py b/src/services/assistant.py new file mode 100644 index 0000000..3777ab2 --- /dev/null +++ b/src/services/assistant.py @@ -0,0 +1,534 @@ +"""AI assistant service: gap detection, anomaly analysis, tool-use chat.""" +from __future__ import annotations + +import json +import logging +from collections import defaultdict +from datetime import date, datetime, timedelta, timezone +from typing import Any, AsyncIterator + +from sqlalchemy import select +from sqlalchemy.ext.asyncio import AsyncSession + +from src.config import settings +from src.models import AiFlag, AssistantMessage, ManualEntry, Project, Session, Task, User +from src.services.aggregator import _union_hours + +log = logging.getLogger(__name__) + +# ── Gap detection constants ─────────────────────────────────────────────────── +GAP_THRESHOLD_MINUTES = 30 # gaps between sessions longer than this may be unlogged work +LONG_SESSION_HOURS = 8 # single session wall-clock > this is suspicious +LONG_DAY_HOURS = 14 # total day wall-clock > this is suspicious +RATIO_THRESHOLD = 0.4 # active_hours / wall_clock < 40% → likely thinking time + + +# ── Gap / anomaly detection ─────────────────────────────────────────────────── + +async def detect_day_anomalies( + user: User, + check_date: date, + db: AsyncSession, +) -> list[dict[str, Any]]: + """Analyse a single day and return list of anomaly dicts (not persisted).""" + sessions_result = await db.execute( + select(Session, Project.display_name) + .join(Project, Session.project_id == Project.id) + .where(Session.user_id == user.id, Session.date == check_date) + .order_by(Session.start_at) + ) + rows = sessions_result.all() + + anomalies: list[dict[str, Any]] = [] + + if not rows: + return anomalies + + intervals = [(s.start_at, s.end_at) for s, _ in rows] + total_wall = _union_hours(intervals) + + # Suspiciously long day + if total_wall > LONG_DAY_HOURS: + anomalies.append({ + "kind": "long_session", + "description": f"Day total {total_wall:.1f}h exceeds {LONG_DAY_HOURS}h — verify all entries", + "entity_type": "day", + "entity_id": check_date.isoformat(), + }) + + # Per-session checks + for s, display_name in rows: + wall = (s.end_at - s.start_at).total_seconds() / 3600 + + if wall > LONG_SESSION_HOURS: + anomalies.append({ + "kind": "long_session", + "description": ( + f"Session on {display_name} lasted {wall:.1f}h " + f"({s.start_at.strftime('%H:%M')}–{s.end_at.strftime('%H:%M')}). " + "Consider splitting into coding + thinking/deployment entries." + ), + "entity_type": "session", + "entity_id": s.id, + }) + + if wall > 0 and s.active_hours / wall < RATIO_THRESHOLD: + anomalies.append({ + "kind": "uncategorized", + "description": ( + f"Session on {display_name}: active_hours ({s.active_hours:.1f}h) is only " + f"{s.active_hours / wall * 100:.0f}% of wall-clock ({wall:.1f}h). " + "The difference might be thinking/review time — add a manual entry." + ), + "entity_type": "session", + "entity_id": s.id, + }) + + if not s.category: + anomalies.append({ + "kind": "uncategorized", + "description": f"Session on {display_name} ({s.start_at.strftime('%H:%M')}–{s.end_at.strftime('%H:%M')}) has no category.", + "entity_type": "session", + "entity_id": s.id, + }) + + # Gaps between consecutive sessions + sorted_intervals = sorted(intervals, key=lambda x: x[0]) + for i in range(1, len(sorted_intervals)): + prev_end = sorted_intervals[i - 1][1] + curr_start = sorted_intervals[i][0] + gap_min = (curr_start - prev_end).total_seconds() / 60 + if gap_min >= GAP_THRESHOLD_MINUTES: + anomalies.append({ + "kind": "gap", + "description": ( + f"Gap of {gap_min:.0f} min between sessions " + f"({prev_end.strftime('%H:%M')}–{curr_start.strftime('%H:%M')}). " + "Was this a meeting, call, or thinking time? Consider adding a manual entry." + ), + "entity_type": "day", + "entity_id": check_date.isoformat(), + }) + + return anomalies + + +async def persist_flags( + user: User, + check_date: date, + anomalies: list[dict[str, Any]], + db: AsyncSession, +) -> list[AiFlag]: + """Upsert-like: clear old unresolved flags for the date then re-insert.""" + existing = await db.execute( + select(AiFlag).where( + AiFlag.user_id == user.id, + AiFlag.flag_date == check_date, + AiFlag.resolved == False, # noqa: E712 + ) + ) + for flag in existing.scalars().all(): + await db.delete(flag) + + flags = [] + for a in anomalies: + flag = AiFlag( + user_id=user.id, + flag_date=check_date, + kind=a["kind"], + description=a["description"], + entity_type=a.get("entity_type"), + entity_id=a.get("entity_id"), + ) + db.add(flag) + flags.append(flag) + + await db.flush() + return flags + + +# ── Anthropic tool definitions ──────────────────────────────────────────────── + +TOOLS: list[dict] = [ + { + "name": "get_daily_summary", + "description": "Get total hours, session count, projects worked, and tasks for a specific date.", + "input_schema": { + "type": "object", + "properties": { + "date": {"type": "string", "description": "ISO date YYYY-MM-DD"} + }, + "required": ["date"], + }, + }, + { + "name": "get_sessions", + "description": "Get raw session details for a date range including start/end times, projects, summaries, and categories.", + "input_schema": { + "type": "object", + "properties": { + "from_date": {"type": "string", "description": "Start date YYYY-MM-DD"}, + "to_date": {"type": "string", "description": "End date YYYY-MM-DD"}, + }, + "required": ["from_date", "to_date"], + }, + }, + { + "name": "get_project_stats", + "description": "Get total hours logged per project for a date range.", + "input_schema": { + "type": "object", + "properties": { + "from_date": {"type": "string", "description": "Start date YYYY-MM-DD"}, + "to_date": {"type": "string", "description": "End date YYYY-MM-DD"}, + }, + "required": ["from_date", "to_date"], + }, + }, + { + "name": "detect_anomalies", + "description": ( + "Analyse a day for time-tracking anomalies: large gaps between sessions, " + "sessions without categories, unusually long sessions, low active/wall-clock ratio." + ), + "input_schema": { + "type": "object", + "properties": { + "date": {"type": "string", "description": "ISO date YYYY-MM-DD"} + }, + "required": ["date"], + }, + }, + { + "name": "create_manual_entry", + "description": ( + "Create a manual time entry (e.g. for a meeting, deployment, thinking session). " + "project_id is optional — pass null if unknown." + ), + "input_schema": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "start_at": {"type": "string", "description": "ISO datetime with TZ, e.g. 2026-05-06T14:00:00+00:00"}, + "end_at": {"type": "string", "description": "ISO datetime with TZ"}, + "category": { + "type": "string", + "enum": ["coding", "thinking", "deployment", "meeting", "review", "other"], + }, + "project_id": {"type": ["string", "null"]}, + }, + "required": ["title", "start_at", "end_at", "category"], + }, + }, + { + "name": "set_session_category", + "description": "Set the category of a specific session.", + "input_schema": { + "type": "object", + "properties": { + "session_id": {"type": "string"}, + "category": { + "type": "string", + "enum": ["coding", "thinking", "deployment", "meeting", "review", "other"], + }, + }, + "required": ["session_id", "category"], + }, + }, + { + "name": "get_unresolved_flags", + "description": "Get all unresolved AI-detected anomaly flags for the user.", + "input_schema": { + "type": "object", + "properties": { + "days_back": {"type": "integer", "description": "How many days back to look (default 7)"} + }, + "required": [], + }, + }, +] + + +# ── Tool execution ──────────────────────────────────────────────────────────── + +async def execute_tool( + tool_name: str, + tool_input: dict, + user: User, + db: AsyncSession, +) -> Any: + """Execute an assistant tool call and return a JSON-serialisable result.""" + today = datetime.now(timezone.utc).date() + + if tool_name == "get_daily_summary": + d = date.fromisoformat(tool_input["date"]) + sessions_result = await db.execute( + select(Session, Project.display_name) + .join(Project, Session.project_id == Project.id) + .where(Session.user_id == user.id, Session.date == d) + .order_by(Session.start_at) + ) + rows = sessions_result.all() + intervals = [(s.start_at, s.end_at) for s, _ in rows] + total = _union_hours(intervals) + (user.daily_overhead_hours if intervals else 0) + tasks_result = await db.execute( + select(Task).where(Task.user_id == user.id, Task.planned_date == d) + ) + tasks = tasks_result.scalars().all() + projects: dict[str, float] = {} + for s, name in rows: + h = (s.end_at - s.start_at).total_seconds() / 3600 + projects[name] = projects.get(name, 0) + h + return { + "date": d.isoformat(), + "total_hours": round(total, 2), + "session_count": len(rows), + "projects": {k: round(v, 2) for k, v in projects.items()}, + "tasks_done": sum(1 for t in tasks if t.status == "done"), + "tasks_total": len(tasks), + } + + if tool_name == "get_sessions": + fd = date.fromisoformat(tool_input["from_date"]) + td = date.fromisoformat(tool_input["to_date"]) + result = await db.execute( + select(Session, Project.display_name, Project.job_number) + .join(Project, Session.project_id == Project.id) + .where(Session.user_id == user.id, Session.date >= fd, Session.date <= td) + .order_by(Session.start_at) + ) + return [ + { + "id": s.id, + "date": s.date.isoformat(), + "project": name, + "job_number": job_number, + "start": s.start_at.isoformat(), + "end": s.end_at.isoformat(), + "wall_clock_h": round((s.end_at - s.start_at).total_seconds() / 3600, 2), + "active_h": round(s.active_hours, 2), + "category": s.category, + "summary": s.work_summary[:200] if s.work_summary else "", + } + for s, name, job_number in result.all() + ] + + if tool_name == "get_project_stats": + fd = date.fromisoformat(tool_input["from_date"]) + td = date.fromisoformat(tool_input["to_date"]) + result = await db.execute( + select(Session, Project.display_name, Project.job_number) + .join(Project, Session.project_id == Project.id) + .where(Session.user_id == user.id, Session.date >= fd, Session.date <= td) + ) + by_project: dict[str, list] = defaultdict(list) + for s, name, job_number in result.all(): + label = f"{job_number} {name}".strip() if job_number else name + by_project[label].append((s.start_at, s.end_at)) + return { + label: round(sum((e - st).total_seconds() / 3600 for st, e in intervals), 2) + for label, intervals in sorted(by_project.items(), key=lambda x: -sum((e - s).total_seconds() for s, e in x[1])) + } + + if tool_name == "detect_anomalies": + d = date.fromisoformat(tool_input["date"]) + anomalies = await detect_day_anomalies(user, d, db) + return {"date": d.isoformat(), "anomalies": anomalies, "count": len(anomalies)} + + if tool_name == "create_manual_entry": + from src.models import ManualEntry + entry = ManualEntry( + user_id=user.id, + project_id=tool_input.get("project_id"), + start_at=datetime.fromisoformat(tool_input["start_at"]), + end_at=datetime.fromisoformat(tool_input["end_at"]), + title=tool_input["title"], + category=tool_input.get("category"), + source="assistant", + ) + db.add(entry) + await db.flush() + hours = (entry.end_at - entry.start_at).total_seconds() / 3600 + return {"created": entry.id, "hours": round(hours, 2), "title": entry.title} + + if tool_name == "set_session_category": + session = await db.get(Session, tool_input["session_id"]) + if not session or session.user_id != user.id: + return {"error": "Session not found"} + session.category = tool_input["category"] + await db.flush() + return {"updated": session.id, "category": session.category} + + if tool_name == "get_unresolved_flags": + days_back = int(tool_input.get("days_back", 7)) + since = today - timedelta(days=days_back) + result = await db.execute( + select(AiFlag).where( + AiFlag.user_id == user.id, + AiFlag.flag_date >= since, + AiFlag.resolved == False, # noqa: E712 + ).order_by(AiFlag.flag_date.desc()) + ) + flags = result.scalars().all() + return [ + { + "id": f.id, + "date": f.flag_date.isoformat(), + "kind": f.kind, + "description": f.description, + "entity_type": f.entity_type, + "entity_id": f.entity_id, + } + for f in flags + ] + + return {"error": f"Unknown tool: {tool_name}"} + + +# ── Streaming chat ──────────────────────────────────────────────────────────── + +SYSTEM_PROMPT = """You are a time-tracking assistant for CC Dashboard — a productivity app for a developer/manager at Oliver Agency. + +Your job is to: +1. Help the user understand how they spent their time +2. Detect and flag inaccuracies in time logs: gaps between sessions, uncategorized sessions, missing thinking/deployment/meeting time +3. Suggest and create manual entries for unlogged work +4. Set categories on sessions when appropriate +5. Answer questions about projects, hours, and task completion + +Always be proactive: when asked about a day, automatically check for anomalies and mention them. +When you detect gaps or uncategorized sessions, suggest specific actions (create manual entry, set category). +Respond concisely. Use Markdown for structure when helpful. + +Today's date: {today} +User's daily overhead: {overhead}h/day +""" + + +async def chat_stream( + user: User, + user_message: str, + db: AsyncSession, +) -> AsyncIterator[str]: + """ + Stream SSE events for the chat response. + Yields strings formatted as SSE data lines. + Uses Anthropic tool_use loop with up to 5 rounds. + Falls back to a static error message if no API key. + """ + if not settings.ANTHROPIC_API_KEY: + yield f"data: {json.dumps({'type': 'text', 'text': 'Anthropic API key not configured.'})}\n\n" + yield "data: [DONE]\n\n" + return + + import anthropic + + # Persist user message + user_msg_rec = AssistantMessage( + user_id=user.id, + role="user", + content=user_message, + ) + db.add(user_msg_rec) + await db.flush() + + # Load recent history (last 20 turns to stay within context) + history_result = await db.execute( + select(AssistantMessage) + .where(AssistantMessage.user_id == user.id) + .order_by(AssistantMessage.created_at.desc()) + .limit(20) + ) + history = list(reversed(history_result.scalars().all())) + + messages: list[dict] = [] + for m in history: + if m.tool_calls: + # assistant message with tool_use + messages.append({"role": "assistant", "content": json.loads(m.tool_calls) if isinstance(m.tool_calls, str) else m.tool_calls}) + else: + messages.append({"role": m.role, "content": m.content}) + + today = datetime.now(timezone.utc).date() + client = anthropic.AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY) + + system = SYSTEM_PROMPT.format(today=today.isoformat(), overhead=user.daily_overhead_hours) + + full_response_text = "" + tool_calls_log: list[dict] = [] + + try: + # Agentic loop — up to 5 tool-use rounds + for _round in range(5): + response = await client.messages.create( + model="claude-sonnet-4-6", + max_tokens=2048, + system=system, + tools=TOOLS, + messages=messages, + ) + + # Collect text from this round + round_text = "" + has_tool_use = False + + for block in response.content: + if block.type == "text": + round_text += block.text + full_response_text += block.text + # Stream text chunk + yield f"data: {json.dumps({'type': 'text', 'text': block.text})}\n\n" + + elif block.type == "tool_use": + has_tool_use = True + tool_name = block.name + tool_input = block.input + + yield f"data: {json.dumps({'type': 'tool_start', 'tool': tool_name})}\n\n" + + try: + result = await execute_tool(tool_name, tool_input, user, db) + await db.flush() + except Exception as exc: + log.warning("assistant.tool_error", extra={"tool": tool_name, "error": str(exc)}) + result = {"error": str(exc)} + + tool_calls_log.append({"tool": tool_name, "input": tool_input, "result": result}) + + yield f"data: {json.dumps({'type': 'tool_result', 'tool': tool_name, 'result': result})}\n\n" + + # Add assistant tool_use + tool_result to messages for next round + messages.append({ + "role": "assistant", + "content": [{"type": "tool_use", "id": block.id, "name": tool_name, "input": tool_input}], + }) + messages.append({ + "role": "user", + "content": [{"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(result)}], + }) + + if not has_tool_use: + # No more tool calls — done + break + + if round_text: + # Between-round text already streamed, add to messages + messages.append({"role": "assistant", "content": round_text}) + + # Persist assistant response + if full_response_text or tool_calls_log: + asst_msg = AssistantMessage( + user_id=user.id, + role="assistant", + content=full_response_text, + tool_calls=tool_calls_log if tool_calls_log else None, + ) + db.add(asst_msg) + await db.commit() + + except Exception as exc: + await db.rollback() + log.error("assistant.chat_error", extra={"user_id": user.id, "error": str(exc)}) + yield f"data: {json.dumps({'type': 'error', 'text': 'Assistant error — please try again.'})}\n\n" + finally: + yield "data: [DONE]\n\n" diff --git a/src/services/scheduler.py b/src/services/scheduler.py index 41872e9..eaf4f16 100644 --- a/src/services/scheduler.py +++ b/src/services/scheduler.py @@ -37,6 +37,29 @@ async def _weekly_report_job() -> None: log.info("weekly_reports.completed") +async def _anomaly_scan_job() -> None: + """Scan yesterday + today for all users and persist AI flags.""" + from datetime import date, timedelta, datetime, timezone + + from sqlalchemy import select + + from src.database import AsyncSessionLocal + from src.models import User + from src.services.assistant import detect_day_anomalies, persist_flags + + today = datetime.now(timezone.utc).date() + yesterday = today - timedelta(days=1) + + async with AsyncSessionLocal() as db: + result = await db.execute(select(User).where(User.is_active == True)) # noqa: E712 + for user in result.scalars().all(): + for d in (yesterday, today): + anomalies = await detect_day_anomalies(user, d, db) + await persist_flags(user, d, anomalies, db) + await db.commit() + log.info("anomaly_scan.completed") + + def setup_scheduler() -> None: if settings.ADO_PAT: scheduler.add_job( @@ -64,3 +87,12 @@ def setup_scheduler() -> None: id="weekly_report", replace_existing=True, ) + + # Scan for time-tracking anomalies every hour + scheduler.add_job( + _anomaly_scan_job, + "interval", + hours=1, + id="anomaly_scan", + replace_existing=True, + ) diff --git a/web/src/App.vue b/web/src/App.vue index f285369..48e6498 100644 --- a/web/src/App.vue +++ b/web/src/App.vue @@ -1,9 +1,16 @@