Oliver-ai-bot_2.0/implementation_plan.md
SamoilenkoVadym 67fe791702 Phase 1 Complete: Environment Setup
Infrastructure Setup:
- Initialized Git repository
- Created project directory structure (backend, frontend, infrastructure)
- Configured Docker Compose with 4 services:
  * PostgreSQL 16 (database)
  * Redis 7 (cache & task queue)
  * Qdrant (vector database)
  * Backend (placeholder)

Configuration:
- Created comprehensive .env.example with all required environment variables
- Added .gitignore for Python and Node.js
- Created backend Dockerfile (placeholder for Phase 2)
- Added healthchecks for all services

Documentation:
- Created README.md with quick start guide and project overview
- Documented implementation plan (6 phases)
- Included concept and technical specifications

Verification:
- docker-compose build:  Successful
- All services started and passed health checks:
  * PostgreSQL: Accepting connections on port 5432
  * Redis: Responding to ping on port 6379
  * Qdrant: API healthy on port 6333

Next Phase: Backend Core (FastAPI, Entra ID Auth, RBAC, Alembic migrations)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-02-12 17:31:54 +00:00

67 KiB

📋 Enterprise AI Hub "Nexus" - Implementation Plan

🎯 Project Overview

A unified, role-based corporate AI platform with three core modes:

  • Mode A: Corporate RAG Agent (SharePoint knowledge base)
  • Mode B: AI Executive Assistant (Productivity tools: summarization, transcription, translation)
  • Mode C: Notebook Mode (Isolated analysis via NotebookLlama)

🏗️ Technology Stack

Frontend

  • Framework: Next.js 14+ (App Router)
  • Language: TypeScript
  • UI: Shadcn/UI + Tailwind CSS
  • State: Zustand (client state) + TanStack Query (server state)
  • Streaming: Server-Sent Events (EventSource)

Backend

  • Framework: FastAPI (Python 3.11+)
  • Orchestration: LangChain / LangGraph
  • Task Queue: Celery + Redis
  • Auth: Microsoft Entra ID (OAuth2/OIDC) → Backend JWT

Databases

  • Relational: PostgreSQL 15+
  • Vector Store: Qdrant
  • Cache: Redis 7+

LLM Strategy (Per-Mode)

  • RAG Mode: GPT-5.1 (High reasoning for accurate knowledge retrieval)
  • Assistant Mode (Summarization): Gemini 2.5 Flash (Fast + cost-efficient)
  • Assistant Mode (Writing): Claude Sonnet 4.5 (Creative tasks, email drafting)
  • Embeddings: OpenAI text-embedding-3-large (3072 dimensions)

📊 Database Schema

Core Tables

1. Taxonomy (Region → Department Hierarchy)

regions
├─ id (uuid, PK)
├─ code (string, unique) -- 'UK', 'US', 'APAC'
├─ name (string) -- 'United Kingdom', 'United States'
└─ created_at (timestamp)

departments
├─ id (uuid, PK)
├─ region_id (FK  regions.id, ON DELETE CASCADE)
├─ code (string) -- 'HR', 'IT', 'FINANCE'
├─ name (string) -- 'Human Resources'
├─ unique_key (region_id + code) -- Enforces UK/HR, US/HR as separate
└─ created_at (timestamp)

Seed Data Example:

Region Department Full Path
UK HR UK/HR
UK IT UK/IT
US HR US/HR
US Finance US/Finance
APAC Sales APAC/Sales

2. Users & RBAC

users
├─ id (uuid, PK)
├─ entra_id (string, unique, NOT NULL) -- Azure AD object ID
├─ email (string, unique, NOT NULL)
├─ display_name (string)
├─ role (enum: 'super_admin', 'content_manager', 'user', default='user')
├─ department_id (FK  departments.id, nullable)
├─ is_active (boolean, default=true)
├─ created_at (timestamp)
├─ last_login_at (timestamp)
└─ index: (entra_id), (email), (department_id)

Auto-Provisioning Logic:

  • First-time Entra ID login → Create user with role='user'
  • If Entra ID profile has department attribute → Map to departments table
  • Super Admin can later upgrade roles via UI

3. Conversations & Messages

conversations
├─ id (uuid, PK)
├─ user_id (FK  users.id, ON DELETE CASCADE)
├─ mode (enum: 'rag', 'assistant', 'notebook')
├─ title (string, nullable) -- Auto-generated from first message
├─ created_at (timestamp)
├─ updated_at (timestamp)
└─ index: (user_id, mode, updated_at DESC)

messages
├─ id (uuid, PK)
├─ conversation_id (FK  conversations.id, ON DELETE CASCADE)
├─ role (enum: 'user', 'assistant', 'system')
├─ content (text)
├─ input_tokens (int, nullable)
├─ output_tokens (int, nullable)
├─ llm_provider (string, nullable) -- 'openai', 'google', 'anthropic'
├─ llm_model (string, nullable) -- 'gpt-5.1', 'gemini-2.5-flash', 'claude-sonnet-4.5'
├─ created_at (timestamp)
└─ index: (conversation_id, created_at)

4. Feedback System

feedback
├─ id (uuid, PK)
├─ message_id (FK  messages.id, ON DELETE CASCADE)
├─ conversation_id (FK  conversations.id)
├─ user_id (FK  users.id)
├─ rating (enum: 'positive', 'negative')
├─ user_comment (text, nullable)
├─ retrieved_sources (jsonb, nullable)
   -- Example: [{"qdrant_id": "abc123", "file_name": "Policy.pdf", "url": "...", "score": 0.85}]
├─ status (enum: 'pending', 'reviewed', 'resolved', default='pending')
├─ reviewed_by (FK  users.id, nullable)
├─ reviewed_at (timestamp, nullable)
├─ created_at (timestamp)
└─ index: (status, created_at)

5. SharePoint Integration

sharepoint_libraries
├─ id (uuid, PK)
├─ site_url (string, NOT NULL) -- 'https://company.sharepoint.com/sites/HR'
├─ library_name (string, NOT NULL) -- 'Documents'
├─ department_id (FK  departments.id, nullable)
├─ is_active (boolean, default=true)
├─ delta_token (string, nullable) -- MS Graph delta link
├─ last_sync_at (timestamp, nullable)
├─ last_sync_status (enum: 'idle', 'syncing', 'success', 'error')
├─ error_message (text, nullable)
└─ unique: (site_url, library_name)

sharepoint_documents
├─ id (uuid, PK)
├─ library_id (FK  sharepoint_libraries.id, ON DELETE CASCADE)
├─ sharepoint_id (string, unique, NOT NULL) -- MS Graph item ID
├─ file_name (string, NOT NULL)
├─ file_url (string, NOT NULL) -- Deep link to SharePoint
├─ file_type (string) -- 'pdf', 'docx', 'xlsx', 'pptx'
├─ file_size (bigint) -- bytes
├─ department_id (FK  departments.id, nullable)
├─ last_modified (timestamp) -- From SharePoint metadata
├─ qdrant_point_ids (jsonb) -- Array of vector IDs: ["vec_1", "vec_2", ...]
├─ chunk_count (int, default=0)
├─ ingested_at (timestamp)
├─ is_active (boolean, default=true)
└─ index: (sharepoint_id), (library_id, is_active), (department_id)

Metadata Tagging Strategy:

  • Content Manager maps SharePoint library → Department (e.g., "UK_HR_Docs" library → UK/HR)
  • All documents in that library inherit department_id
  • Qdrant vectors get department_id in payload for filtered search

6. Notebook Mode (NotebookLlama)

notebook_sessions
├─ id (uuid, PK)
├─ user_id (FK  users.id, ON DELETE CASCADE)
├─ conversation_id (FK  conversations.id, ON DELETE CASCADE)
├─ title (string, nullable)
├─ is_pinned (boolean, default=false)
├─ total_file_size (bigint, default=0) -- Sum of all uploaded files
├─ expires_at (timestamp, nullable) -- NULL if pinned, NOW() + 24h otherwise
├─ created_at (timestamp)
└─ index: (user_id, expires_at)

uploaded_files
├─ id (uuid, PK)
├─ session_id (FK  notebook_sessions.id, ON DELETE CASCADE)
├─ file_name (string, NOT NULL)
├─ file_size (bigint)
├─ file_type (string)
├─ storage_path (string) -- '/uploads/{session_id}/{file_name}'
├─ notebookllama_file_id (string, nullable)
├─ uploaded_at (timestamp)
└─ index: (session_id)

Pin Feature Logic:

  • Default: expires_at = NOW() + INTERVAL '24 hours'
  • User clicks "Pin Session" → Set is_pinned=true, expires_at=NULL
  • Celery cleanup task: Delete sessions where expires_at < NOW() AND is_pinned=false

7. LLM Provider Configuration

llm_configs
├─ id (uuid, PK)
├─ mode (enum: 'rag', 'assistant_summary', 'assistant_writing')
├─ provider (string) -- 'openai', 'google', 'anthropic'
├─ model_name (string) -- 'gpt-5.1', 'gemini-2.5-flash', 'claude-sonnet-4.5'
├─ api_key_env_var (string) -- 'OPENAI_API_KEY', 'GOOGLE_API_KEY'
├─ temperature (float, default=0.7)
├─ max_tokens (int, nullable)
├─ is_active (boolean, default=true)
├─ created_at (timestamp)
└─ unique: (mode) -- Only one active config per mode

Seed Data:

INSERT INTO llm_configs (mode, provider, model_name, api_key_env_var) VALUES
  ('rag', 'openai', 'gpt-5.1', 'OPENAI_API_KEY'),
  ('assistant_summary', 'google', 'gemini-2.5-flash', 'GOOGLE_API_KEY'),
  ('assistant_writing', 'anthropic', 'claude-sonnet-4.5', 'ANTHROPIC_API_KEY');

8. System Configuration

system_prompts
├─ id (uuid, PK)
├─ mode (enum: 'rag', 'assistant_summary', 'assistant_writing', 'notebook')
├─ prompt_text (text, NOT NULL)
├─ is_active (boolean, default=true)
├─ created_by (FK  users.id, nullable)
├─ created_at (timestamp)
└─ index: (mode, is_active)

Example RAG System Prompt:

You are a corporate knowledge assistant. Answer questions ONLY based on the provided context.
If the context does not contain the answer, respond: "I don't have enough information to answer this question."
Always cite sources using numbered references [1], [2], etc.

🗂️ Qdrant Vector Store Design

Collection: sharepoint_docs

Configuration:

{
  "vectors": {
    "size": 3072,  # text-embedding-3-large
    "distance": "Cosine"
  },
  "payload_schema": {
    "sharepoint_id": "keyword",
    "document_id": "keyword",  # UUID from sharepoint_documents table
    "file_name": "text",
    "file_url": "keyword",
    "chunk_index": "integer",
    "total_chunks": "integer",
    "text": "text",
    "department_id": "keyword",  # For filtering
    "region_code": "keyword",    # For filtering (e.g., 'UK', 'US')
    "file_type": "keyword",
    "last_modified": "datetime",
    "is_active": "bool"
  }
}

Indexes (for fast filtering):

  • department_id
  • region_code
  • file_type
  • is_active

Search Query Example:

qdrant_client.search(
    collection_name="sharepoint_docs",
    query_vector=embedding,
    query_filter=models.Filter(
        must=[
            models.FieldCondition(key="is_active", match=models.MatchValue(value=True)),
            models.FieldCondition(key="department_id", match=models.MatchValue(value=user.department_id))
        ]
    ),
    limit=5
)

🚀 Implementation Phases


Phase 1: Environment Setup (Week 1)

Goal

Set up development environment with Docker Compose orchestrating all services.

Tasks

1.1 Project Structure

oliver-ai-hub/
├── backend/
│   ├── app/
│   │   ├── __init__.py
│   │   ├── main.py              # FastAPI app entry point
│   │   ├── config.py            # Settings (env vars, secrets)
│   │   ├── database.py          # SQLAlchemy setup
│   │   ├── models/              # SQLAlchemy ORM models
│   │   ├── schemas/             # Pydantic models (request/response)
│   │   ├── api/
│   │   │   ├── v1/
│   │   │   │   ├── endpoints/
│   │   │   │   │   ├── auth.py
│   │   │   │   │   ├── chat.py
│   │   │   │   │   ├── admin.py
│   │   │   │   │   └── notebook.py
│   │   │   │   └── router.py
│   │   ├── core/
│   │   │   ├── auth.py          # JWT utilities
│   │   │   ├── security.py      # Password hashing (if needed)
│   │   │   └── dependencies.py  # FastAPI dependencies
│   │   ├── services/
│   │   │   ├── llm_router.py    # Mode → LLM selection logic
│   │   │   ├── rag_service.py
│   │   │   ├── notebook_proxy.py
│   │   │   └── sharepoint_sync.py
│   │   ├── tasks/               # Celery tasks
│   │   │   ├── celery_app.py
│   │   │   ├── sync_sharepoint.py
│   │   │   └── cleanup_sessions.py
│   │   └── utils/
│   │       ├── embeddings.py
│   │       └── text_processing.py
│   ├── alembic/                 # Database migrations
│   ├── tests/
│   ├── requirements.txt
│   ├── Dockerfile
│   └── .env.example
├── frontend/
│   ├── app/
│   │   ├── (auth)/
│   │   │   └── login/
│   │   ├── (dashboard)/
│   │   │   ├── chat/
│   │   │   ├── notebook/
│   │   │   └── admin/
│   │   ├── layout.tsx
│   │   └── page.tsx
│   ├── components/
│   │   ├── ui/                  # Shadcn components
│   │   ├── chat/
│   │   │   ├── ChatInterface.tsx
│   │   │   ├── MessageList.tsx
│   │   │   └── StreamingMessage.tsx
│   │   └── notebook/
│   │       ├── FileUploader.tsx
│   │       └── SessionManager.tsx
│   ├── lib/
│   │   ├── api-client.ts
│   │   ├── auth-store.ts        # Zustand
│   │   └── chat-store.ts
│   ├── public/
│   ├── .env.local.example
│   ├── package.json
│   ├── tsconfig.json
│   └── Dockerfile
├── docker-compose.yml
├── .gitignore
└── README.md

1.2 Docker Compose Configuration

File: docker-compose.yml

version: '3.9'

services:
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: nexus_db
      POSTGRES_USER: nexus_user
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
      - "6334:6334"
    volumes:
      - qdrant_data:/qdrant/storage

  backend:
    build: ./backend
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL: postgresql://nexus_user:${POSTGRES_PASSWORD}@postgres:5432/nexus_db
      REDIS_URL: redis://redis:6379/0
      QDRANT_URL: http://qdrant:6333
      ENTRA_CLIENT_ID: ${ENTRA_CLIENT_ID}
      ENTRA_CLIENT_SECRET: ${ENTRA_CLIENT_SECRET}
      ENTRA_TENANT_ID: ${ENTRA_TENANT_ID}
      OPENAI_API_KEY: ${OPENAI_API_KEY}
      GOOGLE_API_KEY: ${GOOGLE_API_KEY}
      ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
      JWT_SECRET: ${JWT_SECRET}
    depends_on:
      - postgres
      - redis
      - qdrant
    volumes:
      - ./backend:/app
      - uploads_data:/app/uploads

  celery_worker:
    build: ./backend
    command: celery -A app.tasks.celery_app worker --loglevel=info
    environment:
      DATABASE_URL: postgresql://nexus_user:${POSTGRES_PASSWORD}@postgres:5432/nexus_db
      REDIS_URL: redis://redis:6379/0
      QDRANT_URL: http://qdrant:6333
      OPENAI_API_KEY: ${OPENAI_API_KEY}
    depends_on:
      - postgres
      - redis
      - qdrant
    volumes:
      - ./backend:/app
      - uploads_data:/app/uploads

  celery_beat:
    build: ./backend
    command: celery -A app.tasks.celery_app beat --loglevel=info
    environment:
      DATABASE_URL: postgresql://nexus_user:${POSTGRES_PASSWORD}@postgres:5432/nexus_db
      REDIS_URL: redis://redis:6379/0
    depends_on:
      - redis
    volumes:
      - ./backend:/app

  frontend:
    build: ./frontend
    command: npm run dev
    ports:
      - "3000:3000"
    environment:
      NEXT_PUBLIC_API_URL: http://localhost:8000
    depends_on:
      - backend
    volumes:
      - ./frontend:/app
      - /app/node_modules

volumes:
  postgres_data:
  redis_data:
  qdrant_data:
  uploads_data:

1.3 Environment Variables

File: backend/.env.example

# Database
POSTGRES_PASSWORD=your_secure_password
DATABASE_URL=postgresql://nexus_user:your_secure_password@localhost:5432/nexus_db

# Redis
REDIS_URL=redis://localhost:6379/0

# Qdrant
QDRANT_URL=http://localhost:6333

# Microsoft Entra ID
ENTRA_CLIENT_ID=your_client_id
ENTRA_CLIENT_SECRET=your_client_secret
ENTRA_TENANT_ID=your_tenant_id
ENTRA_REDIRECT_URI=http://localhost:8000/api/v1/auth/callback

# LLM Providers
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=AIza...
ANTHROPIC_API_KEY=sk-ant-...

# JWT
JWT_SECRET=your_jwt_secret_key_min_32_chars
JWT_ALGORITHM=HS256
JWT_EXPIRATION_MINUTES=15
REFRESH_TOKEN_EXPIRATION_DAYS=7

# NotebookLlama
NOTEBOOKLLAMA_URL=http://internal-notebook-server:8080

# File Upload
MAX_UPLOAD_SIZE_MB=100
UPLOAD_DIR=/app/uploads

1.4 Deliverables

  • Docker Compose successfully starts all services
  • PostgreSQL accepting connections
  • Qdrant Web UI accessible at http://localhost:6333/dashboard
  • FastAPI docs accessible at http://localhost:8000/docs
  • Next.js dev server running at http://localhost:3000

Phase 2: Backend Core (Week 2)

Goal

Implement authentication, user management, and RBAC middleware.

Tasks

2.1 Database Setup

File: backend/app/database.py

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings

engine = create_engine(
    settings.DATABASE_URL,
    pool_pre_ping=True,
    pool_size=10,
    max_overflow=20
)

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

2.2 Alembic Migrations

# Initialize Alembic
cd backend
alembic init alembic

# Create initial migration
alembic revision --autogenerate -m "Initial schema: users, regions, departments"

# Apply migration
alembic upgrade head

Migration Order:

  1. 001_initial_tables.py: regions, departments, users
  2. 002_conversations.py: conversations, messages
  3. 003_feedback.py: feedback
  4. 004_sharepoint.py: sharepoint_libraries, sharepoint_documents
  5. 005_notebook.py: notebook_sessions, uploaded_files
  6. 006_config.py: llm_configs, system_prompts

2.3 SQLAlchemy Models

File: backend/app/models/user.py

from sqlalchemy import Column, String, Boolean, DateTime, Enum, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
import uuid
from datetime import datetime
import enum

class UserRole(enum.Enum):
    SUPER_ADMIN = "super_admin"
    CONTENT_MANAGER = "content_manager"
    USER = "user"

class User(Base):
    __tablename__ = "users"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    entra_id = Column(String, unique=True, nullable=False, index=True)
    email = Column(String, unique=True, nullable=False, index=True)
    display_name = Column(String)
    role = Column(Enum(UserRole), default=UserRole.USER, nullable=False)
    department_id = Column(UUID(as_uuid=True), ForeignKey("departments.id"), nullable=True)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.utcnow)
    last_login_at = Column(DateTime, nullable=True)

    # Relationships
    department = relationship("Department", back_populates="users")
    conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")

File: backend/app/models/taxonomy.py

from sqlalchemy import Column, String, DateTime, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
import uuid
from datetime import datetime

class Region(Base):
    __tablename__ = "regions"

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    code = Column(String(10), unique=True, nullable=False)  # 'UK', 'US', 'APAC'
    name = Column(String(100), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    departments = relationship("Department", back_populates="region", cascade="all, delete-orphan")

class Department(Base):
    __tablename__ = "departments"
    __table_args__ = (
        UniqueConstraint('region_id', 'code', name='uix_region_dept'),
    )

    id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
    region_id = Column(UUID(as_uuid=True), ForeignKey("regions.id", ondelete="CASCADE"), nullable=False)
    code = Column(String(50), nullable=False)  # 'HR', 'IT', 'FINANCE'
    name = Column(String(100), nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)

    region = relationship("Region", back_populates="departments")
    users = relationship("User", back_populates="department")
    sharepoint_documents = relationship("SharePointDocument", back_populates="department")

2.4 Microsoft Entra ID Authentication

File: backend/app/core/auth.py

from datetime import datetime, timedelta
from typing import Optional
import jwt
from fastapi import HTTPException, status
from app.config import settings

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.JWT_EXPIRATION_MINUTES))
    to_encode.update({"exp": expire, "type": "access"})
    return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)

def create_refresh_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRATION_DAYS)
    to_encode.update({"exp": expire, "type": "refresh"})
    return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)

def verify_token(token: str) -> dict:
    try:
        payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
    except jwt.JWTError:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")

File: backend/app/api/v1/endpoints/auth.py

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from msal import ConfidentialClientApplication
from app.database import get_db
from app.models.user import User, UserRole
from app.core.auth import create_access_token, create_refresh_token
from app.config import settings
import httpx

router = APIRouter()

# Initialize MSAL Client
msal_app = ConfidentialClientApplication(
    settings.ENTRA_CLIENT_ID,
    authority=f"https://login.microsoftonline.com/{settings.ENTRA_TENANT_ID}",
    client_credential=settings.ENTRA_CLIENT_SECRET,
)

@router.post("/login")
async def login(code: str, db: Session = Depends(get_db)):
    """
    Exchange Entra ID authorization code for access token,
    create/update user in database, return JWT tokens.
    """
    # Exchange code for Entra ID token
    result = msal_app.acquire_token_by_authorization_code(
        code,
        scopes=["User.Read"],
        redirect_uri=settings.ENTRA_REDIRECT_URI
    )

    if "error" in result:
        raise HTTPException(status_code=400, detail=result.get("error_description"))

    # Fetch user profile from MS Graph
    graph_token = result["access_token"]
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "https://graph.microsoft.com/v1.0/me",
            headers={"Authorization": f"Bearer {graph_token}"}
        )
        profile = response.json()

    entra_id = profile["id"]
    email = profile["mail"] or profile["userPrincipalName"]
    display_name = profile.get("displayName", email)

    # Auto-provision user
    user = db.query(User).filter(User.entra_id == entra_id).first()
    if not user:
        user = User(
            entra_id=entra_id,
            email=email,
            display_name=display_name,
            role=UserRole.USER
        )
        db.add(user)

    user.last_login_at = datetime.utcnow()
    db.commit()
    db.refresh(user)

    # Generate backend JWT tokens
    access_token = create_access_token({"sub": str(user.id), "role": user.role.value})
    refresh_token = create_refresh_token({"sub": str(user.id)})

    # Store refresh token in Redis (optional)
    # redis_client.setex(f"refresh:{user.id}", 7*24*60*60, refresh_token)

    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
        "user": {
            "id": str(user.id),
            "email": user.email,
            "display_name": user.display_name,
            "role": user.role.value
        }
    }

2.5 RBAC Middleware

File: backend/app/core/dependencies.py

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.user import User, UserRole
from app.core.auth import verify_token

security = HTTPBearer()

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    db: Session = Depends(get_db)
) -> User:
    token = credentials.credentials
    payload = verify_token(token)
    user_id = payload.get("sub")

    user = db.query(User).filter(User.id == user_id, User.is_active == True).first()
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")

    return user

async def require_admin(current_user: User = Depends(get_current_user)) -> User:
    if current_user.role not in [UserRole.SUPER_ADMIN, UserRole.CONTENT_MANAGER]:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
    return current_user

async def require_super_admin(current_user: User = Depends(get_current_user)) -> User:
    if current_user.role != UserRole.SUPER_ADMIN:
        raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required")
    return current_user

2.6 Deliverables

  • Database schema created with Alembic
  • Seed data script for regions/departments
  • /api/v1/auth/login endpoint functional
  • JWT validation middleware working
  • RBAC decorators (require_admin, require_super_admin) enforced
  • Unit tests for auth flow

Phase 3: SharePoint Ingestion Pipeline (Week 3-4)

Goal

Sync documents from SharePoint using MS Graph API, extract text, chunk, embed, and store in Qdrant.

Tasks

3.1 MS Graph API Integration

File: backend/app/services/sharepoint_sync.py

import httpx
from typing import List, Dict
from sqlalchemy.orm import Session
from app.models.sharepoint import SharePointLibrary, SharePointDocument
from app.config import settings
from msal import ConfidentialClientApplication

class SharePointSyncService:
    def __init__(self):
        self.msal_app = ConfidentialClientApplication(
            settings.ENTRA_CLIENT_ID,
            authority=f"https://login.microsoftonline.com/{settings.ENTRA_TENANT_ID}",
            client_credential=settings.ENTRA_CLIENT_SECRET,
        )

    def get_access_token(self) -> str:
        """Get Application Permission token (Client Credentials Flow)"""
        result = self.msal_app.acquire_token_for_client(
            scopes=["https://graph.microsoft.com/.default"]
        )
        return result["access_token"]

    async def fetch_delta_changes(self, library: SharePointLibrary) -> List[Dict]:
        """
        Fetch changed documents using MS Graph Delta API
        https://graph.microsoft.com/v1.0/sites/{site-id}/drives/{drive-id}/root/delta
        """
        token = self.get_access_token()
        headers = {"Authorization": f"Bearer {token}"}

        # Use stored delta_token or initial sync
        url = library.delta_token or self._get_initial_delta_url(library)

        all_changes = []
        async with httpx.AsyncClient() as client:
            while url:
                response = await client.get(url, headers=headers)
                data = response.json()

                all_changes.extend(data.get("value", []))

                # Get next page or delta link
                url = data.get("@odata.nextLink") or data.get("@odata.deltaLink")

                if "@odata.deltaLink" in data:
                    # Save for next sync
                    library.delta_token = data["@odata.deltaLink"]
                    break

        return all_changes

    def _get_initial_delta_url(self, library: SharePointLibrary) -> str:
        # Convert site URL to site ID via Graph API
        # Returns: /sites/{site-id}/drives/{drive-id}/root/delta
        pass

3.2 Document Processing

File: backend/app/utils/text_processing.py

from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List
import PyPDF2
import docx
import openpyxl

def extract_text_from_file(file_path: str, file_type: str) -> str:
    """Extract text based on file type"""
    if file_type == "pdf":
        return _extract_pdf(file_path)
    elif file_type in ["docx", "doc"]:
        return _extract_docx(file_path)
    elif file_type in ["xlsx", "xls"]:
        return _extract_xlsx(file_path)
    elif file_type == "txt":
        with open(file_path, 'r', encoding='utf-8') as f:
            return f.read()
    else:
        raise ValueError(f"Unsupported file type: {file_type}")

def _extract_pdf(file_path: str) -> str:
    text = []
    with open(file_path, 'rb') as f:
        reader = PyPDF2.PdfReader(f)
        for page in reader.pages:
            text.append(page.extract_text())
    return "\n\n".join(text)

def _extract_docx(file_path: str) -> str:
    doc = docx.Document(file_path)
    return "\n\n".join([para.text for para in doc.paragraphs])

def _extract_xlsx(file_path: str) -> str:
    wb = openpyxl.load_workbook(file_path, data_only=True)
    text = []
    for sheet in wb.worksheets:
        for row in sheet.iter_rows(values_only=True):
            text.append(" | ".join([str(cell) for cell in row if cell]))
    return "\n".join(text)

def chunk_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
    """Split text into overlapping chunks"""
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
    )
    return splitter.split_text(text)

3.3 Embedding & Qdrant Indexing

File: backend/app/utils/embeddings.py

from openai import AsyncOpenAI
from typing import List
from app.config import settings

client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)

async def generate_embeddings(texts: List[str]) -> List[List[float]]:
    """Generate embeddings using OpenAI text-embedding-3-large"""
    response = await client.embeddings.create(
        model="text-embedding-3-large",
        input=texts,
        dimensions=3072
    )
    return [item.embedding for item in response.data]

File: backend/app/services/qdrant_service.py

from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from typing import List, Dict
from app.config import settings
import uuid

class QdrantService:
    def __init__(self):
        self.client = QdrantClient(url=settings.QDRANT_URL)
        self.collection_name = "sharepoint_docs"
        self._ensure_collection()

    def _ensure_collection(self):
        collections = self.client.get_collections().collections
        if not any(c.name == self.collection_name for c in collections):
            self.client.create_collection(
                collection_name=self.collection_name,
                vectors_config=VectorParams(size=3072, distance=Distance.COSINE)
            )
            # Create payload indexes
            self.client.create_payload_index(
                collection_name=self.collection_name,
                field_name="department_id",
                field_schema="keyword"
            )
            self.client.create_payload_index(
                collection_name=self.collection_name,
                field_name="is_active",
                field_schema="bool"
            )

    async def upsert_document_chunks(
        self,
        document_id: str,
        chunks: List[str],
        embeddings: List[List[float]],
        metadata: Dict
    ) -> List[str]:
        """Upsert document chunks to Qdrant"""
        points = []
        point_ids = []

        for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
            point_id = str(uuid.uuid4())
            point_ids.append(point_id)

            points.append(PointStruct(
                id=point_id,
                vector=embedding,
                payload={
                    "document_id": document_id,
                    "sharepoint_id": metadata["sharepoint_id"],
                    "file_name": metadata["file_name"],
                    "file_url": metadata["file_url"],
                    "chunk_index": i,
                    "total_chunks": len(chunks),
                    "text": chunk,
                    "department_id": metadata.get("department_id"),
                    "region_code": metadata.get("region_code"),
                    "file_type": metadata["file_type"],
                    "last_modified": metadata["last_modified"],
                    "is_active": True
                }
            ))

        self.client.upsert(
            collection_name=self.collection_name,
            points=points
        )

        return point_ids

    async def search(
        self,
        query_vector: List[float],
        department_id: str = None,
        limit: int = 5
    ) -> List[Dict]:
        """Search for relevant chunks with optional department filtering"""
        filter_conditions = [
            FieldCondition(key="is_active", match=MatchValue(value=True))
        ]

        if department_id:
            filter_conditions.append(
                FieldCondition(key="department_id", match=MatchValue(value=department_id))
            )

        results = self.client.search(
            collection_name=self.collection_name,
            query_vector=query_vector,
            query_filter=Filter(must=filter_conditions) if filter_conditions else None,
            limit=limit,
            with_payload=True
        )

        return [
            {
                "id": hit.id,
                "score": hit.score,
                "text": hit.payload["text"],
                "file_name": hit.payload["file_name"],
                "file_url": hit.payload["file_url"],
                "chunk_index": hit.payload["chunk_index"]
            }
            for hit in results
        ]

    async def deactivate_document(self, document_id: str):
        """Mark document vectors as inactive (soft delete)"""
        # Qdrant doesn't support direct updates, so we need to search and update
        # Alternatively, store point IDs in PostgreSQL and update by ID
        pass

3.4 Celery Sync Task

File: backend/app/tasks/sync_sharepoint.py

from celery import Task
from app.tasks.celery_app import celery_app
from app.database import SessionLocal
from app.models.sharepoint import SharePointLibrary, SharePointDocument
from app.services.sharepoint_sync import SharePointSyncService
from app.utils.text_processing import extract_text_from_file, chunk_text
from app.utils.embeddings import generate_embeddings
from app.services.qdrant_service import QdrantService
import tempfile
import httpx

@celery_app.task(bind=True, max_retries=3)
def sync_sharepoint_library(self: Task, library_id: str):
    """
    Celery task to sync a SharePoint library
    - Fetch delta changes from MS Graph
    - Download new/modified documents
    - Extract text, chunk, embed
    - Upsert to Qdrant and PostgreSQL
    """
    db = SessionLocal()
    try:
        library = db.query(SharePointLibrary).filter(SharePointLibrary.id == library_id).first()
        if not library or not library.is_active:
            return

        library.last_sync_status = "syncing"
        db.commit()

        sync_service = SharePointSyncService()
        qdrant_service = QdrantService()

        # Fetch delta changes
        changes = await sync_service.fetch_delta_changes(library)

        for item in changes:
            if item.get("deleted"):
                # Handle deletion
                _handle_deleted_document(db, qdrant_service, item)
            elif item.get("file"):
                # Handle new/updated file
                await _process_document(db, sync_service, qdrant_service, library, item)

        library.last_sync_status = "success"
        library.last_sync_at = datetime.utcnow()
        db.commit()

    except Exception as e:
        library.last_sync_status = "error"
        library.error_message = str(e)
        db.commit()
        # Retry with exponential backoff
        raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))

    finally:
        db.close()

async def _process_document(db, sync_service, qdrant_service, library, item):
    """Process a single document: download, extract, embed, index"""
    sharepoint_id = item["id"]
    file_name = item["name"]
    file_url = item["webUrl"]
    file_type = file_name.split(".")[-1].lower()
    last_modified = item["lastModifiedDateTime"]

    # Check if already processed
    existing = db.query(SharePointDocument).filter(
        SharePointDocument.sharepoint_id == sharepoint_id
    ).first()

    if existing and existing.last_modified >= last_modified:
        # No changes, skip
        return

    # Download file
    token = sync_service.get_access_token()
    download_url = item["@microsoft.graph.downloadUrl"]

    async with httpx.AsyncClient() as client:
        response = await client.get(download_url)
        file_content = response.content

    # Save to temp file
    with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_type}") as tmp:
        tmp.write(file_content)
        tmp_path = tmp.name

    # Extract text
    text = extract_text_from_file(tmp_path, file_type)

    # Chunk text
    chunks = chunk_text(text)

    # Generate embeddings
    embeddings = await generate_embeddings(chunks)

    # Upsert to Qdrant
    metadata = {
        "sharepoint_id": sharepoint_id,
        "file_name": file_name,
        "file_url": file_url,
        "file_type": file_type,
        "department_id": str(library.department_id) if library.department_id else None,
        "region_code": library.department.region.code if library.department else None,
        "last_modified": last_modified
    }

    point_ids = await qdrant_service.upsert_document_chunks(
        document_id=str(existing.id) if existing else str(uuid.uuid4()),
        chunks=chunks,
        embeddings=embeddings,
        metadata=metadata
    )

    # Update PostgreSQL
    if existing:
        # Deactivate old vectors
        await qdrant_service.deactivate_document(str(existing.id))
        existing.qdrant_point_ids = point_ids
        existing.chunk_count = len(chunks)
        existing.last_modified = last_modified
        existing.ingested_at = datetime.utcnow()
    else:
        doc = SharePointDocument(
            library_id=library.id,
            sharepoint_id=sharepoint_id,
            file_name=file_name,
            file_url=file_url,
            file_type=file_type,
            file_size=len(file_content),
            department_id=library.department_id,
            last_modified=last_modified,
            qdrant_point_ids=point_ids,
            chunk_count=len(chunks)
        )
        db.add(doc)

    db.commit()

3.5 Celery Beat Schedule

File: backend/app/tasks/celery_app.py

from celery import Celery
from celery.schedules import crontab
from app.config import settings

celery_app = Celery(
    "nexus",
    broker=settings.REDIS_URL,
    backend=settings.REDIS_URL
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='UTC',
    enable_utc=True,
    beat_schedule={
        'sync-sharepoint-hourly': {
            'task': 'app.tasks.sync_sharepoint.sync_all_libraries',
            'schedule': crontab(minute=0),  # Every hour
        },
        'cleanup-expired-sessions': {
            'task': 'app.tasks.cleanup_sessions.cleanup_notebook_sessions',
            'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM
        },
    }
)

3.6 Admin UI: Library Management

Endpoint: POST /api/v1/admin/sharepoint/libraries

@router.post("/libraries", dependencies=[Depends(require_admin)])
async def create_library(
    site_url: str,
    library_name: str,
    department_id: str,
    db: Session = Depends(get_db)
):
    """Content Manager adds a SharePoint library to sync"""
    library = SharePointLibrary(
        site_url=site_url,
        library_name=library_name,
        department_id=department_id,
        is_active=True
    )
    db.add(library)
    db.commit()

    # Trigger initial sync
    sync_sharepoint_library.delay(str(library.id))

    return {"id": str(library.id), "status": "sync_initiated"}

3.7 Deliverables

  • MS Graph API Client Credentials flow working
  • Delta sync fetching changed documents
  • Text extraction for PDF, DOCX, XLSX
  • Chunking + OpenAI embedding generation
  • Qdrant collection created with indexes
  • Celery task syncing documents hourly
  • Admin UI to add/remove SharePoint libraries
  • Soft-delete handling (mark vectors as is_active=false)

Phase 4: RAG Logic & LLM Router (Week 5)

Goal

Implement RAG retrieval, LLM routing based on mode, citation formatting, and streaming responses.

Tasks

4.1 LLM Router Service

File: backend/app/services/llm_router.py

from typing import List, Dict, AsyncGenerator
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
from app.config import settings
from app.models.llm_config import LLMConfig
from sqlalchemy.orm import Session

class LLMRouter:
    def __init__(self, db: Session):
        self.db = db
        self.openai_client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
        self.anthropic_client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
        genai.configure(api_key=settings.GOOGLE_API_KEY)

    def get_llm_config(self, mode: str) -> LLMConfig:
        """Fetch active LLM config for given mode"""
        config = self.db.query(LLMConfig).filter(
            LLMConfig.mode == mode,
            LLMConfig.is_active == True
        ).first()

        if not config:
            raise ValueError(f"No active LLM config for mode: {mode}")

        return config

    async def stream_completion(
        self,
        mode: str,
        messages: List[Dict[str, str]],
        temperature: float = None
    ) -> AsyncGenerator[str, None]:
        """Route to appropriate LLM provider and stream tokens"""
        config = self.get_llm_config(mode)
        temp = temperature or config.temperature

        if config.provider == "openai":
            async for token in self._stream_openai(config.model_name, messages, temp):
                yield token

        elif config.provider == "anthropic":
            async for token in self._stream_anthropic(config.model_name, messages, temp):
                yield token

        elif config.provider == "google":
            async for token in self._stream_google(config.model_name, messages, temp):
                yield token

    async def _stream_openai(self, model: str, messages: List[Dict], temp: float):
        stream = await self.openai_client.chat.completions.create(
            model=model,
            messages=messages,
            temperature=temp,
            stream=True
        )

        async for chunk in stream:
            if chunk.choices[0].delta.content:
                yield chunk.choices[0].delta.content

    async def _stream_anthropic(self, model: str, messages: List[Dict], temp: float):
        # Convert messages format (OpenAI → Anthropic)
        system_msg = next((m["content"] for m in messages if m["role"] == "system"), None)
        user_messages = [m for m in messages if m["role"] != "system"]

        async with self.anthropic_client.messages.stream(
            model=model,
            max_tokens=4096,
            temperature=temp,
            system=system_msg,
            messages=user_messages
        ) as stream:
            async for text in stream.text_stream:
                yield text

    async def _stream_google(self, model: str, messages: List[Dict], temp: float):
        # Implement Gemini streaming
        pass

4.2 RAG Service

File: backend/app/services/rag_service.py

from typing import List, Dict, AsyncGenerator
from app.services.qdrant_service import QdrantService
from app.services.llm_router import LLMRouter
from app.utils.embeddings import generate_embeddings
from sqlalchemy.orm import Session

class RAGService:
    def __init__(self, db: Session):
        self.db = db
        self.qdrant = QdrantService()
        self.llm_router = LLMRouter(db)

    async def query(
        self,
        user_query: str,
        department_id: str = None,
        top_k: int = 5
    ) -> AsyncGenerator[str, None]:
        """
        Perform RAG query:
        1. Embed user query
        2. Search Qdrant for relevant chunks
        3. Build context with citations
        4. Stream LLM response
        """
        # Generate query embedding
        query_embedding = (await generate_embeddings([user_query]))[0]

        # Search Qdrant
        search_results = await self.qdrant.search(
            query_vector=query_embedding,
            department_id=department_id,
            limit=top_k
        )

        # Build context with numbered citations
        context_parts = []
        citations = {}

        for i, result in enumerate(search_results, start=1):
            context_parts.append(f"[{i}] {result['text']}")
            citations[i] = {
                "file_name": result["file_name"],
                "file_url": result["file_url"],
                "score": result["score"]
            }

        context = "\n\n".join(context_parts)

        # Build prompt
        system_prompt = """You are a corporate knowledge assistant. Answer questions ONLY based on the provided context.
        If the context does not contain the answer, respond: "I don't have enough information to answer this question."
        Always cite sources using numbered references [1], [2], etc."""

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {user_query}"}
        ]

        # Stream response from GPT-5.1 (configured for RAG mode)
        full_response = ""
        async for token in self.llm_router.stream_completion(mode="rag", messages=messages):
            full_response += token
            yield token

        # Append citations at the end
        citation_text = "\n\nSources:\n"
        for num, cite in citations.items():
            citation_text += f"[{num}] {cite['file_name']} - {cite['file_url']}\n"

        yield citation_text

4.3 Streaming Endpoint

File: backend/app/api/v1/endpoints/chat.py

from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.core.dependencies import get_current_user
from app.models.user import User
from app.models.conversation import Conversation, Message, ConversationMode
from app.services.rag_service import RAGService
import json

router = APIRouter()

@router.get("/stream")
async def stream_chat(
    conversation_id: str = Query(...),
    query: str = Query(...),
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    """Stream RAG responses via Server-Sent Events"""
    # Verify conversation ownership
    conversation = db.query(Conversation).filter(
        Conversation.id == conversation_id,
        Conversation.user_id == current_user.id
    ).first()

    if not conversation:
        return {"error": "Conversation not found"}

    # Save user message
    user_msg = Message(
        conversation_id=conversation_id,
        role="user",
        content=query
    )
    db.add(user_msg)
    db.commit()

    # Stream RAG response
    rag_service = RAGService(db)

    async def event_generator():
        full_response = ""
        input_tokens = 0
        output_tokens = 0

        try:
            async for token in rag_service.query(
                user_query=query,
                department_id=str(current_user.department_id) if current_user.department_id else None
            ):
                full_response += token
                output_tokens += 1  # Rough estimate
                yield f"data: {json.dumps({'token': token})}\n\n"

            # Save assistant message
            assistant_msg = Message(
                conversation_id=conversation_id,
                role="assistant",
                content=full_response,
                input_tokens=input_tokens,
                output_tokens=output_tokens,
                llm_provider="openai",
                llm_model="gpt-5.1"
            )
            db.add(assistant_msg)
            db.commit()

            yield f"data: {json.dumps({'done': True, 'message_id': str(assistant_msg.id)})}\n\n"

        except Exception as e:
            yield f"data: {json.dumps({'error': str(e)})}\n\n"

    return StreamingResponse(event_generator(), media_type="text/event-stream")

4.4 Deliverables

  • LLM router correctly selects provider per mode
  • RAG service embeds query → searches Qdrant → builds context
  • Citations formatted as [1], [2] with source URLs
  • Streaming endpoint returns SSE stream
  • Token counts tracked per message
  • Department filtering applied to search results

Phase 5: NotebookLlama Integration (Week 6)

Goal

Implement file upload proxy, session management, and chat forwarding to NotebookLlama.

Tasks

5.1 NotebookLlama Proxy Service

File: backend/app/services/notebook_proxy.py

import httpx
from typing import List, Dict, AsyncGenerator
from app.config import settings
from app.models.notebook import NotebookSession, UploadedFile
from sqlalchemy.orm import Session
import os

class NotebookLlamaProxy:
    def __init__(self):
        self.base_url = settings.NOTEBOOKLLAMA_URL

    async def upload_file(self, session_id: str, file_path: str) -> str:
        """Upload file to NotebookLlama, return file_id"""
        async with httpx.AsyncClient() as client:
            with open(file_path, 'rb') as f:
                files = {'file': f}
                response = await client.post(
                    f"{self.base_url}/upload",
                    files=files,
                    data={"session_id": session_id}
                )
                result = response.json()
                return result["file_id"]

    async def chat(
        self,
        session_id: str,
        query: str,
        file_ids: List[str]
    ) -> AsyncGenerator[str, None]:
        """Stream chat response from NotebookLlama"""
        async with httpx.AsyncClient(timeout=60.0) as client:
            async with client.stream(
                "POST",
                f"{self.base_url}/chat",
                json={
                    "session_id": session_id,
                    "query": query,
                    "file_ids": file_ids
                }
            ) as response:
                async for line in response.aiter_lines():
                    if line.startswith("data: "):
                        yield line[6:]  # Remove "data: " prefix

5.2 Upload Endpoint

File: backend/app/api/v1/endpoints/notebook.py

from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from sqlalchemy.orm import Session
from app.database import get_db
from app.core.dependencies import get_current_user
from app.models.user import User
from app.models.notebook import NotebookSession, UploadedFile
from app.services.notebook_proxy import NotebookLlamaProxy
import os
import uuid
from datetime import datetime, timedelta

router = APIRouter()

MAX_FILE_SIZE = 100 * 1024 * 1024  # 100 MB
UPLOAD_DIR = "/app/uploads"

@router.post("/sessions/{session_id}/upload")
async def upload_file(
    session_id: str,
    file: UploadFile = File(...),
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    """Upload file to NotebookLlama session"""
    # Verify session ownership
    session = db.query(NotebookSession).filter(
        NotebookSession.id == session_id,
        NotebookSession.user_id == current_user.id
    ).first()

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    # Validate file size
    file_size = 0
    content = await file.read()
    file_size = len(content)

    if file_size > MAX_FILE_SIZE:
        raise HTTPException(status_code=413, detail="File too large")

    # Check session total size
    if session.total_file_size + file_size > MAX_FILE_SIZE:
        raise HTTPException(status_code=413, detail="Session storage quota exceeded")

    # Save file locally
    session_dir = os.path.join(UPLOAD_DIR, str(session_id))
    os.makedirs(session_dir, exist_ok=True)

    file_id = str(uuid.uuid4())
    file_ext = os.path.splitext(file.filename)[1]
    storage_path = os.path.join(session_dir, f"{file_id}{file_ext}")

    with open(storage_path, 'wb') as f:
        f.write(content)

    # Upload to NotebookLlama
    proxy = NotebookLlamaProxy()
    notebookllama_file_id = await proxy.upload_file(str(session_id), storage_path)

    # Save to database
    uploaded = UploadedFile(
        session_id=session_id,
        file_name=file.filename,
        file_size=file_size,
        file_type=file_ext[1:] if file_ext else "unknown",
        storage_path=storage_path,
        notebookllama_file_id=notebookllama_file_id
    )
    db.add(uploaded)

    session.total_file_size += file_size
    db.commit()

    return {
        "file_id": str(uploaded.id),
        "file_name": file.filename,
        "size": file_size
    }

@router.post("/sessions/{session_id}/pin")
async def pin_session(
    session_id: str,
    current_user: User = Depends(get_current_user),
    db: Session = Depends(get_db)
):
    """Pin session for permanent storage"""
    session = db.query(NotebookSession).filter(
        NotebookSession.id == session_id,
        NotebookSession.user_id == current_user.id
    ).first()

    if not session:
        raise HTTPException(status_code=404, detail="Session not found")

    session.is_pinned = True
    session.expires_at = None
    db.commit()

    return {"status": "pinned"}

5.3 Cleanup Task

File: backend/app/tasks/cleanup_sessions.py

from celery import Task
from app.tasks.celery_app import celery_app
from app.database import SessionLocal
from app.models.notebook import NotebookSession, UploadedFile
from datetime import datetime
import os
import shutil

@celery_app.task
def cleanup_notebook_sessions():
    """Delete expired notebook sessions and their files"""
    db = SessionLocal()
    try:
        expired_sessions = db.query(NotebookSession).filter(
            NotebookSession.is_pinned == False,
            NotebookSession.expires_at < datetime.utcnow()
        ).all()

        for session in expired_sessions:
            # Delete files from disk
            session_dir = f"/app/uploads/{session.id}"
            if os.path.exists(session_dir):
                shutil.rmtree(session_dir)

            # Delete from database (cascade will remove uploaded_files)
            db.delete(session)

        db.commit()

    finally:
        db.close()

5.4 Deliverables

  • File upload endpoint with size validation
  • Files proxied to NotebookLlama
  • Session management (pin/unpin)
  • Celery task cleaning up expired sessions
  • Chat endpoint streaming from NotebookLlama

Phase 6: Frontend Development (Week 7-9)

Goal

Build Next.js UI with Shadcn components, implement chat interface, streaming, and admin dashboard.

Tasks

6.1 Project Setup

npx create-next-app@latest frontend --typescript --tailwind --app
cd frontend
npx shadcn-ui@latest init
npx shadcn-ui@latest add button input textarea card dropdown-menu tabs
npm install zustand @tanstack/react-query axios

6.2 API Client

File: frontend/lib/api-client.ts

import axios from 'axios';

const apiClient = axios.create({
  baseURL: process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000/api/v1',
  headers: {
    'Content-Type': 'application/json',
  },
});

// Request interceptor to add JWT token
apiClient.interceptors.request.use((config) => {
  const token = localStorage.getItem('access_token');
  if (token) {
    config.headers.Authorization = `Bearer ${token}`;
  }
  return config;
});

// Response interceptor for token refresh
apiClient.interceptors.response.use(
  (response) => response,
  async (error) => {
    if (error.response?.status === 401) {
      // Try to refresh token
      const refreshToken = localStorage.getItem('refresh_token');
      if (refreshToken) {
        try {
          const { data } = await axios.post(`${apiClient.defaults.baseURL}/auth/refresh`, {
            refresh_token: refreshToken,
          });
          localStorage.setItem('access_token', data.access_token);
          error.config.headers.Authorization = `Bearer ${data.access_token}`;
          return apiClient.request(error.config);
        } catch {
          // Refresh failed, redirect to login
          window.location.href = '/login';
        }
      }
    }
    return Promise.reject(error);
  }
);

export default apiClient;

6.3 Zustand Store

File: frontend/lib/chat-store.ts

import { create } from 'zustand';

interface Message {
  id: string;
  role: 'user' | 'assistant';
  content: string;
  timestamp: Date;
}

interface Conversation {
  id: string;
  mode: 'rag' | 'assistant' | 'notebook';
  title: string;
  messages: Message[];
}

interface ChatStore {
  conversations: Conversation[];
  activeConversationId: string | null;
  setActiveConversation: (id: string) => void;
  addMessage: (conversationId: string, message: Message) => void;
  createConversation: (mode: 'rag' | 'assistant' | 'notebook') => void;
}

export const useChatStore = create<ChatStore>((set) => ({
  conversations: [],
  activeConversationId: null,

  setActiveConversation: (id) => set({ activeConversationId: id }),

  addMessage: (conversationId, message) => set((state) => ({
    conversations: state.conversations.map((conv) =>
      conv.id === conversationId
        ? { ...conv, messages: [...conv.messages, message] }
        : conv
    ),
  })),

  createConversation: (mode) => set((state) => {
    const newConv: Conversation = {
      id: crypto.randomUUID(),
      mode,
      title: 'New Chat',
      messages: [],
    };
    return {
      conversations: [...state.conversations, newConv],
      activeConversationId: newConv.id,
    };
  }),
}));

6.4 Streaming Chat Component

File: frontend/components/chat/StreamingMessage.tsx

'use client';

import { useEffect, useState } from 'react';
import { Card } from '@/components/ui/card';

interface StreamingMessageProps {
  conversationId: string;
  query: string;
  onComplete: (fullText: string, messageId: string) => void;
}

export function StreamingMessage({ conversationId, query, onComplete }: StreamingMessageProps) {
  const [text, setText] = useState('');
  const [isComplete, setIsComplete] = useState(false);

  useEffect(() => {
    const eventSource = new EventSource(
      `${process.env.NEXT_PUBLIC_API_URL}/chat/stream?conversation_id=${conversationId}&query=${encodeURIComponent(query)}`
    );

    eventSource.onmessage = (event) => {
      const data = JSON.parse(event.data);

      if (data.done) {
        setIsComplete(true);
        onComplete(text, data.message_id);
        eventSource.close();
      } else if (data.token) {
        setText((prev) => prev + data.token);
      } else if (data.error) {
        console.error('Streaming error:', data.error);
        eventSource.close();
      }
    };

    eventSource.onerror = () => {
      eventSource.close();
    };

    return () => {
      eventSource.close();
    };
  }, [conversationId, query]);

  return (
    <Card className="p-4">
      <div className="prose dark:prose-invert">
        {text}
        {!isComplete && <span className="animate-pulse"></span>}
      </div>
    </Card>
  );
}

6.5 Chat Interface

File: frontend/app/(dashboard)/chat/page.tsx

'use client';

import { useState } from 'react';
import { useChatStore } from '@/lib/chat-store';
import { StreamingMessage } from '@/components/chat/StreamingMessage';
import { Input } from '@/components/ui/input';
import { Button } from '@/components/ui/button';

export default function ChatPage() {
  const { activeConversationId, conversations, addMessage } = useChatStore();
  const [query, setQuery] = useState('');
  const [isStreaming, setIsStreaming] = useState(false);

  const activeConversation = conversations.find((c) => c.id === activeConversationId);

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    if (!query.trim() || !activeConversationId) return;

    // Add user message
    addMessage(activeConversationId, {
      id: crypto.randomUUID(),
      role: 'user',
      content: query,
      timestamp: new Date(),
    });

    setIsStreaming(true);
    setQuery('');
  };

  const handleStreamComplete = (fullText: string, messageId: string) => {
    if (!activeConversationId) return;

    addMessage(activeConversationId, {
      id: messageId,
      role: 'assistant',
      content: fullText,
      timestamp: new Date(),
    });

    setIsStreaming(false);
  };

  return (
    <div className="flex h-screen flex-col">
      {/* Messages */}
      <div className="flex-1 overflow-y-auto p-4 space-y-4">
        {activeConversation?.messages.map((msg) => (
          <div key={msg.id} className={msg.role === 'user' ? 'text-right' : 'text-left'}>
            <div className="inline-block max-w-[80%]">
              <div className="font-semibold text-sm mb-1">
                {msg.role === 'user' ? 'You' : 'Assistant'}
              </div>
              <div className="bg-secondary p-3 rounded-lg">
                {msg.content}
              </div>
            </div>
          </div>
        ))}

        {isStreaming && activeConversationId && (
          <StreamingMessage
            conversationId={activeConversationId}
            query={query}
            onComplete={handleStreamComplete}
          />
        )}
      </div>

      {/* Input */}
      <form onSubmit={handleSubmit} className="border-t p-4 flex gap-2">
        <Input
          value={query}
          onChange={(e) => setQuery(e.target.value)}
          placeholder="Ask a question..."
          disabled={isStreaming}
          className="flex-1"
        />
        <Button type="submit" disabled={isStreaming || !query.trim()}>
          Send
        </Button>
      </form>
    </div>
  );
}

6.6 Admin Dashboard

File: frontend/app/(dashboard)/admin/page.tsx

'use client';

import { useState } from 'react';
import { useQuery, useMutation } from '@tanstack/react-query';
import apiClient from '@/lib/api-client';
import { Button } from '@/components/ui/button';
import { Input } from '@/components/ui/input';
import { Card } from '@/components/ui/card';

export default function AdminPage() {
  const [siteUrl, setSiteUrl] = useState('');
  const [libraryName, setLibraryName] = useState('');
  const [departmentId, setDepartmentId] = useState('');

  const { data: libraries } = useQuery({
    queryKey: ['libraries'],
    queryFn: async () => {
      const { data } = await apiClient.get('/admin/sharepoint/libraries');
      return data;
    },
  });

  const createLibrary = useMutation({
    mutationFn: async (data: any) => {
      return apiClient.post('/admin/sharepoint/libraries', data);
    },
    onSuccess: () => {
      // Refetch libraries
      setSiteUrl('');
      setLibraryName('');
      setDepartmentId('');
    },
  });

  return (
    <div className="p-8">
      <h1 className="text-3xl font-bold mb-6">Admin Dashboard</h1>

      <Card className="p-6 mb-6">
        <h2 className="text-xl font-semibold mb-4">Add SharePoint Library</h2>
        <div className="space-y-4">
          <Input
            placeholder="Site URL"
            value={siteUrl}
            onChange={(e) => setSiteUrl(e.target.value)}
          />
          <Input
            placeholder="Library Name"
            value={libraryName}
            onChange={(e) => setLibraryName(e.target.value)}
          />
          <Input
            placeholder="Department ID"
            value={departmentId}
            onChange={(e) => setDepartmentId(e.target.value)}
          />
          <Button
            onClick={() => createLibrary.mutate({ site_url: siteUrl, library_name: libraryName, department_id: departmentId })}
          >
            Add Library
          </Button>
        </div>
      </Card>

      <Card className="p-6">
        <h2 className="text-xl font-semibold mb-4">Configured Libraries</h2>
        {/* Table of libraries */}
      </Card>
    </div>
  );
}

6.7 Deliverables

  • Next.js app with App Router
  • Shadcn/UI components styled with Tailwind
  • Entra ID OAuth login flow
  • Streaming chat interface with SSE
  • Mode switcher (RAG / Assistant / Notebook)
  • Admin dashboard for SharePoint library management
  • Responsive mobile design
  • Dark/Light mode toggle

🔐 Security Checklist

  • Entra ID Client Credentials stored in environment variables only
  • JWT secret key minimum 32 characters
  • HTTPS enforced in production
  • CORS configured to frontend domain only
  • SQL injection protection via SQLAlchemy ORM
  • File upload size limits enforced
  • XSS protection via React's automatic escaping
  • Rate limiting (Phase 2)
  • Audit logging for admin actions
  • Secrets never committed to Git (.env in .gitignore)

📈 Testing Strategy

Unit Tests

  • Backend: pytest for services, utilities
  • Frontend: vitest for components, stores

Integration Tests

  • API endpoints with test database
  • Celery task execution with test Redis

E2E Tests

  • Playwright for critical user flows (login, chat, upload)

🚀 Deployment

Docker Compose (Staging)

docker-compose up -d
alembic upgrade head
python seed_data.py  # Insert regions, departments, LLM configs

Production (Kubernetes)

  • Separate deployments for: FastAPI, Celery Worker, Celery Beat, Frontend
  • Managed PostgreSQL (Azure Database for PostgreSQL)
  • Managed Redis (Azure Cache for Redis)
  • Qdrant Cloud or self-hosted Qdrant cluster
  • Azure App Service for frontend (or Vercel)

📅 Timeline Summary

Phase Duration Key Deliverables
1. Environment Setup 1 week Docker Compose running all services
2. Backend Core 1 week Auth, RBAC, database migrations
3. SharePoint Pipeline 2 weeks Delta sync, embeddings, Qdrant indexing
4. RAG Logic 1 week LLM router, streaming, citations
5. NotebookLlama 1 week Proxy, uploads, session management
6. Frontend 3 weeks Next.js UI, chat, admin dashboard
Total 9 weeks Production-ready MVP

🎯 Success Metrics

  • Performance: RAG query response time < 3 seconds
  • Accuracy: >85% positive feedback on RAG answers
  • Availability: 99.5% uptime
  • Cost: <$500/month for 100 users (excluding Azure infrastructure)

📝 Next Steps

  1. Week 1: Clone repo structure, set up Docker Compose, verify all services start
  2. Week 2: Implement Entra ID auth, create database migrations, seed taxonomy data
  3. Week 3-4: Build SharePoint sync pipeline, test with sample library
  4. Week 5: Integrate LLM router, test RAG queries with citations
  5. Week 6: Implement NotebookLlama proxy, test file uploads
  6. Week 7-9: Build frontend, conduct user testing, deploy to staging

End of Implementation Plan