Infrastructure Setup:
- Initialized Git repository
- Created project directory structure (backend, frontend, infrastructure)
- Configured Docker Compose with 4 services:
* PostgreSQL 16 (database)
* Redis 7 (cache & task queue)
* Qdrant (vector database)
* Backend (placeholder)
Configuration:
- Created comprehensive .env.example with all required environment variables
- Added .gitignore for Python and Node.js
- Created backend Dockerfile (placeholder for Phase 2)
- Added healthchecks for all services
Documentation:
- Created README.md with quick start guide and project overview
- Documented implementation plan (6 phases)
- Included concept and technical specifications
Verification:
- docker-compose build: ✅ Successful
- All services started and passed health checks:
* PostgreSQL: Accepting connections on port 5432
* Redis: Responding to ping on port 6379
* Qdrant: API healthy on port 6333
Next Phase: Backend Core (FastAPI, Entra ID Auth, RBAC, Alembic migrations)
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
67 KiB
📋 Enterprise AI Hub "Nexus" - Implementation Plan
🎯 Project Overview
A unified, role-based corporate AI platform with three core modes:
- Mode A: Corporate RAG Agent (SharePoint knowledge base)
- Mode B: AI Executive Assistant (Productivity tools: summarization, transcription, translation)
- Mode C: Notebook Mode (Isolated analysis via NotebookLlama)
🏗️ Technology Stack
Frontend
- Framework: Next.js 14+ (App Router)
- Language: TypeScript
- UI: Shadcn/UI + Tailwind CSS
- State: Zustand (client state) + TanStack Query (server state)
- Streaming: Server-Sent Events (EventSource)
Backend
- Framework: FastAPI (Python 3.11+)
- Orchestration: LangChain / LangGraph
- Task Queue: Celery + Redis
- Auth: Microsoft Entra ID (OAuth2/OIDC) → Backend JWT
Databases
- Relational: PostgreSQL 15+
- Vector Store: Qdrant
- Cache: Redis 7+
LLM Strategy (Per-Mode)
- RAG Mode: GPT-5.1 (High reasoning for accurate knowledge retrieval)
- Assistant Mode (Summarization): Gemini 2.5 Flash (Fast + cost-efficient)
- Assistant Mode (Writing): Claude Sonnet 4.5 (Creative tasks, email drafting)
- Embeddings: OpenAI text-embedding-3-large (3072 dimensions)
📊 Database Schema
Core Tables
1. Taxonomy (Region → Department Hierarchy)
regions
├─ id (uuid, PK)
├─ code (string, unique) -- 'UK', 'US', 'APAC'
├─ name (string) -- 'United Kingdom', 'United States'
└─ created_at (timestamp)
departments
├─ id (uuid, PK)
├─ region_id (FK → regions.id, ON DELETE CASCADE)
├─ code (string) -- 'HR', 'IT', 'FINANCE'
├─ name (string) -- 'Human Resources'
├─ unique_key (region_id + code) -- Enforces UK/HR, US/HR as separate
└─ created_at (timestamp)
Seed Data Example:
| Region | Department | Full Path |
|---|---|---|
| UK | HR | UK/HR |
| UK | IT | UK/IT |
| US | HR | US/HR |
| US | Finance | US/Finance |
| APAC | Sales | APAC/Sales |
2. Users & RBAC
users
├─ id (uuid, PK)
├─ entra_id (string, unique, NOT NULL) -- Azure AD object ID
├─ email (string, unique, NOT NULL)
├─ display_name (string)
├─ role (enum: 'super_admin', 'content_manager', 'user', default='user')
├─ department_id (FK → departments.id, nullable)
├─ is_active (boolean, default=true)
├─ created_at (timestamp)
├─ last_login_at (timestamp)
└─ index: (entra_id), (email), (department_id)
Auto-Provisioning Logic:
- First-time Entra ID login → Create user with
role='user' - If Entra ID profile has
departmentattribute → Map todepartmentstable - Super Admin can later upgrade roles via UI
3. Conversations & Messages
conversations
├─ id (uuid, PK)
├─ user_id (FK → users.id, ON DELETE CASCADE)
├─ mode (enum: 'rag', 'assistant', 'notebook')
├─ title (string, nullable) -- Auto-generated from first message
├─ created_at (timestamp)
├─ updated_at (timestamp)
└─ index: (user_id, mode, updated_at DESC)
messages
├─ id (uuid, PK)
├─ conversation_id (FK → conversations.id, ON DELETE CASCADE)
├─ role (enum: 'user', 'assistant', 'system')
├─ content (text)
├─ input_tokens (int, nullable)
├─ output_tokens (int, nullable)
├─ llm_provider (string, nullable) -- 'openai', 'google', 'anthropic'
├─ llm_model (string, nullable) -- 'gpt-5.1', 'gemini-2.5-flash', 'claude-sonnet-4.5'
├─ created_at (timestamp)
└─ index: (conversation_id, created_at)
4. Feedback System
feedback
├─ id (uuid, PK)
├─ message_id (FK → messages.id, ON DELETE CASCADE)
├─ conversation_id (FK → conversations.id)
├─ user_id (FK → users.id)
├─ rating (enum: 'positive', 'negative')
├─ user_comment (text, nullable)
├─ retrieved_sources (jsonb, nullable)
│ -- Example: [{"qdrant_id": "abc123", "file_name": "Policy.pdf", "url": "...", "score": 0.85}]
├─ status (enum: 'pending', 'reviewed', 'resolved', default='pending')
├─ reviewed_by (FK → users.id, nullable)
├─ reviewed_at (timestamp, nullable)
├─ created_at (timestamp)
└─ index: (status, created_at)
5. SharePoint Integration
sharepoint_libraries
├─ id (uuid, PK)
├─ site_url (string, NOT NULL) -- 'https://company.sharepoint.com/sites/HR'
├─ library_name (string, NOT NULL) -- 'Documents'
├─ department_id (FK → departments.id, nullable)
├─ is_active (boolean, default=true)
├─ delta_token (string, nullable) -- MS Graph delta link
├─ last_sync_at (timestamp, nullable)
├─ last_sync_status (enum: 'idle', 'syncing', 'success', 'error')
├─ error_message (text, nullable)
└─ unique: (site_url, library_name)
sharepoint_documents
├─ id (uuid, PK)
├─ library_id (FK → sharepoint_libraries.id, ON DELETE CASCADE)
├─ sharepoint_id (string, unique, NOT NULL) -- MS Graph item ID
├─ file_name (string, NOT NULL)
├─ file_url (string, NOT NULL) -- Deep link to SharePoint
├─ file_type (string) -- 'pdf', 'docx', 'xlsx', 'pptx'
├─ file_size (bigint) -- bytes
├─ department_id (FK → departments.id, nullable)
├─ last_modified (timestamp) -- From SharePoint metadata
├─ qdrant_point_ids (jsonb) -- Array of vector IDs: ["vec_1", "vec_2", ...]
├─ chunk_count (int, default=0)
├─ ingested_at (timestamp)
├─ is_active (boolean, default=true)
└─ index: (sharepoint_id), (library_id, is_active), (department_id)
Metadata Tagging Strategy:
- Content Manager maps SharePoint library → Department (e.g., "UK_HR_Docs" library → UK/HR)
- All documents in that library inherit
department_id - Qdrant vectors get
department_idin payload for filtered search
6. Notebook Mode (NotebookLlama)
notebook_sessions
├─ id (uuid, PK)
├─ user_id (FK → users.id, ON DELETE CASCADE)
├─ conversation_id (FK → conversations.id, ON DELETE CASCADE)
├─ title (string, nullable)
├─ is_pinned (boolean, default=false)
├─ total_file_size (bigint, default=0) -- Sum of all uploaded files
├─ expires_at (timestamp, nullable) -- NULL if pinned, NOW() + 24h otherwise
├─ created_at (timestamp)
└─ index: (user_id, expires_at)
uploaded_files
├─ id (uuid, PK)
├─ session_id (FK → notebook_sessions.id, ON DELETE CASCADE)
├─ file_name (string, NOT NULL)
├─ file_size (bigint)
├─ file_type (string)
├─ storage_path (string) -- '/uploads/{session_id}/{file_name}'
├─ notebookllama_file_id (string, nullable)
├─ uploaded_at (timestamp)
└─ index: (session_id)
Pin Feature Logic:
- Default:
expires_at = NOW() + INTERVAL '24 hours' - User clicks "Pin Session" → Set
is_pinned=true,expires_at=NULL - Celery cleanup task: Delete sessions where
expires_at < NOW() AND is_pinned=false
7. LLM Provider Configuration
llm_configs
├─ id (uuid, PK)
├─ mode (enum: 'rag', 'assistant_summary', 'assistant_writing')
├─ provider (string) -- 'openai', 'google', 'anthropic'
├─ model_name (string) -- 'gpt-5.1', 'gemini-2.5-flash', 'claude-sonnet-4.5'
├─ api_key_env_var (string) -- 'OPENAI_API_KEY', 'GOOGLE_API_KEY'
├─ temperature (float, default=0.7)
├─ max_tokens (int, nullable)
├─ is_active (boolean, default=true)
├─ created_at (timestamp)
└─ unique: (mode) -- Only one active config per mode
Seed Data:
INSERT INTO llm_configs (mode, provider, model_name, api_key_env_var) VALUES
('rag', 'openai', 'gpt-5.1', 'OPENAI_API_KEY'),
('assistant_summary', 'google', 'gemini-2.5-flash', 'GOOGLE_API_KEY'),
('assistant_writing', 'anthropic', 'claude-sonnet-4.5', 'ANTHROPIC_API_KEY');
8. System Configuration
system_prompts
├─ id (uuid, PK)
├─ mode (enum: 'rag', 'assistant_summary', 'assistant_writing', 'notebook')
├─ prompt_text (text, NOT NULL)
├─ is_active (boolean, default=true)
├─ created_by (FK → users.id, nullable)
├─ created_at (timestamp)
└─ index: (mode, is_active)
Example RAG System Prompt:
You are a corporate knowledge assistant. Answer questions ONLY based on the provided context.
If the context does not contain the answer, respond: "I don't have enough information to answer this question."
Always cite sources using numbered references [1], [2], etc.
🗂️ Qdrant Vector Store Design
Collection: sharepoint_docs
Configuration:
{
"vectors": {
"size": 3072, # text-embedding-3-large
"distance": "Cosine"
},
"payload_schema": {
"sharepoint_id": "keyword",
"document_id": "keyword", # UUID from sharepoint_documents table
"file_name": "text",
"file_url": "keyword",
"chunk_index": "integer",
"total_chunks": "integer",
"text": "text",
"department_id": "keyword", # For filtering
"region_code": "keyword", # For filtering (e.g., 'UK', 'US')
"file_type": "keyword",
"last_modified": "datetime",
"is_active": "bool"
}
}
Indexes (for fast filtering):
department_idregion_codefile_typeis_active
Search Query Example:
qdrant_client.search(
collection_name="sharepoint_docs",
query_vector=embedding,
query_filter=models.Filter(
must=[
models.FieldCondition(key="is_active", match=models.MatchValue(value=True)),
models.FieldCondition(key="department_id", match=models.MatchValue(value=user.department_id))
]
),
limit=5
)
🚀 Implementation Phases
Phase 1: Environment Setup (Week 1)
Goal
Set up development environment with Docker Compose orchestrating all services.
Tasks
1.1 Project Structure
oliver-ai-hub/
├── backend/
│ ├── app/
│ │ ├── __init__.py
│ │ ├── main.py # FastAPI app entry point
│ │ ├── config.py # Settings (env vars, secrets)
│ │ ├── database.py # SQLAlchemy setup
│ │ ├── models/ # SQLAlchemy ORM models
│ │ ├── schemas/ # Pydantic models (request/response)
│ │ ├── api/
│ │ │ ├── v1/
│ │ │ │ ├── endpoints/
│ │ │ │ │ ├── auth.py
│ │ │ │ │ ├── chat.py
│ │ │ │ │ ├── admin.py
│ │ │ │ │ └── notebook.py
│ │ │ │ └── router.py
│ │ ├── core/
│ │ │ ├── auth.py # JWT utilities
│ │ │ ├── security.py # Password hashing (if needed)
│ │ │ └── dependencies.py # FastAPI dependencies
│ │ ├── services/
│ │ │ ├── llm_router.py # Mode → LLM selection logic
│ │ │ ├── rag_service.py
│ │ │ ├── notebook_proxy.py
│ │ │ └── sharepoint_sync.py
│ │ ├── tasks/ # Celery tasks
│ │ │ ├── celery_app.py
│ │ │ ├── sync_sharepoint.py
│ │ │ └── cleanup_sessions.py
│ │ └── utils/
│ │ ├── embeddings.py
│ │ └── text_processing.py
│ ├── alembic/ # Database migrations
│ ├── tests/
│ ├── requirements.txt
│ ├── Dockerfile
│ └── .env.example
├── frontend/
│ ├── app/
│ │ ├── (auth)/
│ │ │ └── login/
│ │ ├── (dashboard)/
│ │ │ ├── chat/
│ │ │ ├── notebook/
│ │ │ └── admin/
│ │ ├── layout.tsx
│ │ └── page.tsx
│ ├── components/
│ │ ├── ui/ # Shadcn components
│ │ ├── chat/
│ │ │ ├── ChatInterface.tsx
│ │ │ ├── MessageList.tsx
│ │ │ └── StreamingMessage.tsx
│ │ └── notebook/
│ │ ├── FileUploader.tsx
│ │ └── SessionManager.tsx
│ ├── lib/
│ │ ├── api-client.ts
│ │ ├── auth-store.ts # Zustand
│ │ └── chat-store.ts
│ ├── public/
│ ├── .env.local.example
│ ├── package.json
│ ├── tsconfig.json
│ └── Dockerfile
├── docker-compose.yml
├── .gitignore
└── README.md
1.2 Docker Compose Configuration
File: docker-compose.yml
version: '3.9'
services:
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: nexus_db
POSTGRES_USER: nexus_user
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
ports:
- "6379:6379"
volumes:
- redis_data:/data
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
- "6334:6334"
volumes:
- qdrant_data:/qdrant/storage
backend:
build: ./backend
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
ports:
- "8000:8000"
environment:
DATABASE_URL: postgresql://nexus_user:${POSTGRES_PASSWORD}@postgres:5432/nexus_db
REDIS_URL: redis://redis:6379/0
QDRANT_URL: http://qdrant:6333
ENTRA_CLIENT_ID: ${ENTRA_CLIENT_ID}
ENTRA_CLIENT_SECRET: ${ENTRA_CLIENT_SECRET}
ENTRA_TENANT_ID: ${ENTRA_TENANT_ID}
OPENAI_API_KEY: ${OPENAI_API_KEY}
GOOGLE_API_KEY: ${GOOGLE_API_KEY}
ANTHROPIC_API_KEY: ${ANTHROPIC_API_KEY}
JWT_SECRET: ${JWT_SECRET}
depends_on:
- postgres
- redis
- qdrant
volumes:
- ./backend:/app
- uploads_data:/app/uploads
celery_worker:
build: ./backend
command: celery -A app.tasks.celery_app worker --loglevel=info
environment:
DATABASE_URL: postgresql://nexus_user:${POSTGRES_PASSWORD}@postgres:5432/nexus_db
REDIS_URL: redis://redis:6379/0
QDRANT_URL: http://qdrant:6333
OPENAI_API_KEY: ${OPENAI_API_KEY}
depends_on:
- postgres
- redis
- qdrant
volumes:
- ./backend:/app
- uploads_data:/app/uploads
celery_beat:
build: ./backend
command: celery -A app.tasks.celery_app beat --loglevel=info
environment:
DATABASE_URL: postgresql://nexus_user:${POSTGRES_PASSWORD}@postgres:5432/nexus_db
REDIS_URL: redis://redis:6379/0
depends_on:
- redis
volumes:
- ./backend:/app
frontend:
build: ./frontend
command: npm run dev
ports:
- "3000:3000"
environment:
NEXT_PUBLIC_API_URL: http://localhost:8000
depends_on:
- backend
volumes:
- ./frontend:/app
- /app/node_modules
volumes:
postgres_data:
redis_data:
qdrant_data:
uploads_data:
1.3 Environment Variables
File: backend/.env.example
# Database
POSTGRES_PASSWORD=your_secure_password
DATABASE_URL=postgresql://nexus_user:your_secure_password@localhost:5432/nexus_db
# Redis
REDIS_URL=redis://localhost:6379/0
# Qdrant
QDRANT_URL=http://localhost:6333
# Microsoft Entra ID
ENTRA_CLIENT_ID=your_client_id
ENTRA_CLIENT_SECRET=your_client_secret
ENTRA_TENANT_ID=your_tenant_id
ENTRA_REDIRECT_URI=http://localhost:8000/api/v1/auth/callback
# LLM Providers
OPENAI_API_KEY=sk-...
GOOGLE_API_KEY=AIza...
ANTHROPIC_API_KEY=sk-ant-...
# JWT
JWT_SECRET=your_jwt_secret_key_min_32_chars
JWT_ALGORITHM=HS256
JWT_EXPIRATION_MINUTES=15
REFRESH_TOKEN_EXPIRATION_DAYS=7
# NotebookLlama
NOTEBOOKLLAMA_URL=http://internal-notebook-server:8080
# File Upload
MAX_UPLOAD_SIZE_MB=100
UPLOAD_DIR=/app/uploads
1.4 Deliverables
- ✅ Docker Compose successfully starts all services
- ✅ PostgreSQL accepting connections
- ✅ Qdrant Web UI accessible at
http://localhost:6333/dashboard - ✅ FastAPI docs accessible at
http://localhost:8000/docs - ✅ Next.js dev server running at
http://localhost:3000
Phase 2: Backend Core (Week 2)
Goal
Implement authentication, user management, and RBAC middleware.
Tasks
2.1 Database Setup
File: backend/app/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from app.config import settings
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True,
pool_size=10,
max_overflow=20
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
2.2 Alembic Migrations
# Initialize Alembic
cd backend
alembic init alembic
# Create initial migration
alembic revision --autogenerate -m "Initial schema: users, regions, departments"
# Apply migration
alembic upgrade head
Migration Order:
001_initial_tables.py: regions, departments, users002_conversations.py: conversations, messages003_feedback.py: feedback004_sharepoint.py: sharepoint_libraries, sharepoint_documents005_notebook.py: notebook_sessions, uploaded_files006_config.py: llm_configs, system_prompts
2.3 SQLAlchemy Models
File: backend/app/models/user.py
from sqlalchemy import Column, String, Boolean, DateTime, Enum, ForeignKey
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
import uuid
from datetime import datetime
import enum
class UserRole(enum.Enum):
SUPER_ADMIN = "super_admin"
CONTENT_MANAGER = "content_manager"
USER = "user"
class User(Base):
__tablename__ = "users"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
entra_id = Column(String, unique=True, nullable=False, index=True)
email = Column(String, unique=True, nullable=False, index=True)
display_name = Column(String)
role = Column(Enum(UserRole), default=UserRole.USER, nullable=False)
department_id = Column(UUID(as_uuid=True), ForeignKey("departments.id"), nullable=True)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.utcnow)
last_login_at = Column(DateTime, nullable=True)
# Relationships
department = relationship("Department", back_populates="users")
conversations = relationship("Conversation", back_populates="user", cascade="all, delete-orphan")
File: backend/app/models/taxonomy.py
from sqlalchemy import Column, String, DateTime, ForeignKey, UniqueConstraint
from sqlalchemy.dialects.postgresql import UUID
from sqlalchemy.orm import relationship
from app.database import Base
import uuid
from datetime import datetime
class Region(Base):
__tablename__ = "regions"
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
code = Column(String(10), unique=True, nullable=False) # 'UK', 'US', 'APAC'
name = Column(String(100), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
departments = relationship("Department", back_populates="region", cascade="all, delete-orphan")
class Department(Base):
__tablename__ = "departments"
__table_args__ = (
UniqueConstraint('region_id', 'code', name='uix_region_dept'),
)
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
region_id = Column(UUID(as_uuid=True), ForeignKey("regions.id", ondelete="CASCADE"), nullable=False)
code = Column(String(50), nullable=False) # 'HR', 'IT', 'FINANCE'
name = Column(String(100), nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
region = relationship("Region", back_populates="departments")
users = relationship("User", back_populates="department")
sharepoint_documents = relationship("SharePointDocument", back_populates="department")
2.4 Microsoft Entra ID Authentication
File: backend/app/core/auth.py
from datetime import datetime, timedelta
from typing import Optional
import jwt
from fastapi import HTTPException, status
from app.config import settings
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=settings.JWT_EXPIRATION_MINUTES))
to_encode.update({"exp": expire, "type": "access"})
return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def create_refresh_token(data: dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(days=settings.REFRESH_TOKEN_EXPIRATION_DAYS)
to_encode.update({"exp": expire, "type": "refresh"})
return jwt.encode(to_encode, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def verify_token(token: str) -> dict:
try:
payload = jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Token expired")
except jwt.JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
File: backend/app/api/v1/endpoints/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from msal import ConfidentialClientApplication
from app.database import get_db
from app.models.user import User, UserRole
from app.core.auth import create_access_token, create_refresh_token
from app.config import settings
import httpx
router = APIRouter()
# Initialize MSAL Client
msal_app = ConfidentialClientApplication(
settings.ENTRA_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{settings.ENTRA_TENANT_ID}",
client_credential=settings.ENTRA_CLIENT_SECRET,
)
@router.post("/login")
async def login(code: str, db: Session = Depends(get_db)):
"""
Exchange Entra ID authorization code for access token,
create/update user in database, return JWT tokens.
"""
# Exchange code for Entra ID token
result = msal_app.acquire_token_by_authorization_code(
code,
scopes=["User.Read"],
redirect_uri=settings.ENTRA_REDIRECT_URI
)
if "error" in result:
raise HTTPException(status_code=400, detail=result.get("error_description"))
# Fetch user profile from MS Graph
graph_token = result["access_token"]
async with httpx.AsyncClient() as client:
response = await client.get(
"https://graph.microsoft.com/v1.0/me",
headers={"Authorization": f"Bearer {graph_token}"}
)
profile = response.json()
entra_id = profile["id"]
email = profile["mail"] or profile["userPrincipalName"]
display_name = profile.get("displayName", email)
# Auto-provision user
user = db.query(User).filter(User.entra_id == entra_id).first()
if not user:
user = User(
entra_id=entra_id,
email=email,
display_name=display_name,
role=UserRole.USER
)
db.add(user)
user.last_login_at = datetime.utcnow()
db.commit()
db.refresh(user)
# Generate backend JWT tokens
access_token = create_access_token({"sub": str(user.id), "role": user.role.value})
refresh_token = create_refresh_token({"sub": str(user.id)})
# Store refresh token in Redis (optional)
# redis_client.setex(f"refresh:{user.id}", 7*24*60*60, refresh_token)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"user": {
"id": str(user.id),
"email": user.email,
"display_name": user.display_name,
"role": user.role.value
}
}
2.5 RBAC Middleware
File: backend/app/core/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
from app.database import get_db
from app.models.user import User, UserRole
from app.core.auth import verify_token
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db)
) -> User:
token = credentials.credentials
payload = verify_token(token)
user_id = payload.get("sub")
user = db.query(User).filter(User.id == user_id, User.is_active == True).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
async def require_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role not in [UserRole.SUPER_ADMIN, UserRole.CONTENT_MANAGER]:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return current_user
async def require_super_admin(current_user: User = Depends(get_current_user)) -> User:
if current_user.role != UserRole.SUPER_ADMIN:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Super admin access required")
return current_user
2.6 Deliverables
- ✅ Database schema created with Alembic
- ✅ Seed data script for regions/departments
- ✅
/api/v1/auth/loginendpoint functional - ✅ JWT validation middleware working
- ✅ RBAC decorators (
require_admin,require_super_admin) enforced - ✅ Unit tests for auth flow
Phase 3: SharePoint Ingestion Pipeline (Week 3-4)
Goal
Sync documents from SharePoint using MS Graph API, extract text, chunk, embed, and store in Qdrant.
Tasks
3.1 MS Graph API Integration
File: backend/app/services/sharepoint_sync.py
import httpx
from typing import List, Dict
from sqlalchemy.orm import Session
from app.models.sharepoint import SharePointLibrary, SharePointDocument
from app.config import settings
from msal import ConfidentialClientApplication
class SharePointSyncService:
def __init__(self):
self.msal_app = ConfidentialClientApplication(
settings.ENTRA_CLIENT_ID,
authority=f"https://login.microsoftonline.com/{settings.ENTRA_TENANT_ID}",
client_credential=settings.ENTRA_CLIENT_SECRET,
)
def get_access_token(self) -> str:
"""Get Application Permission token (Client Credentials Flow)"""
result = self.msal_app.acquire_token_for_client(
scopes=["https://graph.microsoft.com/.default"]
)
return result["access_token"]
async def fetch_delta_changes(self, library: SharePointLibrary) -> List[Dict]:
"""
Fetch changed documents using MS Graph Delta API
https://graph.microsoft.com/v1.0/sites/{site-id}/drives/{drive-id}/root/delta
"""
token = self.get_access_token()
headers = {"Authorization": f"Bearer {token}"}
# Use stored delta_token or initial sync
url = library.delta_token or self._get_initial_delta_url(library)
all_changes = []
async with httpx.AsyncClient() as client:
while url:
response = await client.get(url, headers=headers)
data = response.json()
all_changes.extend(data.get("value", []))
# Get next page or delta link
url = data.get("@odata.nextLink") or data.get("@odata.deltaLink")
if "@odata.deltaLink" in data:
# Save for next sync
library.delta_token = data["@odata.deltaLink"]
break
return all_changes
def _get_initial_delta_url(self, library: SharePointLibrary) -> str:
# Convert site URL to site ID via Graph API
# Returns: /sites/{site-id}/drives/{drive-id}/root/delta
pass
3.2 Document Processing
File: backend/app/utils/text_processing.py
from langchain.text_splitter import RecursiveCharacterTextSplitter
from typing import List
import PyPDF2
import docx
import openpyxl
def extract_text_from_file(file_path: str, file_type: str) -> str:
"""Extract text based on file type"""
if file_type == "pdf":
return _extract_pdf(file_path)
elif file_type in ["docx", "doc"]:
return _extract_docx(file_path)
elif file_type in ["xlsx", "xls"]:
return _extract_xlsx(file_path)
elif file_type == "txt":
with open(file_path, 'r', encoding='utf-8') as f:
return f.read()
else:
raise ValueError(f"Unsupported file type: {file_type}")
def _extract_pdf(file_path: str) -> str:
text = []
with open(file_path, 'rb') as f:
reader = PyPDF2.PdfReader(f)
for page in reader.pages:
text.append(page.extract_text())
return "\n\n".join(text)
def _extract_docx(file_path: str) -> str:
doc = docx.Document(file_path)
return "\n\n".join([para.text for para in doc.paragraphs])
def _extract_xlsx(file_path: str) -> str:
wb = openpyxl.load_workbook(file_path, data_only=True)
text = []
for sheet in wb.worksheets:
for row in sheet.iter_rows(values_only=True):
text.append(" | ".join([str(cell) for cell in row if cell]))
return "\n".join(text)
def chunk_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
"""Split text into overlapping chunks"""
splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
)
return splitter.split_text(text)
3.3 Embedding & Qdrant Indexing
File: backend/app/utils/embeddings.py
from openai import AsyncOpenAI
from typing import List
from app.config import settings
client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
async def generate_embeddings(texts: List[str]) -> List[List[float]]:
"""Generate embeddings using OpenAI text-embedding-3-large"""
response = await client.embeddings.create(
model="text-embedding-3-large",
input=texts,
dimensions=3072
)
return [item.embedding for item in response.data]
File: backend/app/services/qdrant_service.py
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from typing import List, Dict
from app.config import settings
import uuid
class QdrantService:
def __init__(self):
self.client = QdrantClient(url=settings.QDRANT_URL)
self.collection_name = "sharepoint_docs"
self._ensure_collection()
def _ensure_collection(self):
collections = self.client.get_collections().collections
if not any(c.name == self.collection_name for c in collections):
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=3072, distance=Distance.COSINE)
)
# Create payload indexes
self.client.create_payload_index(
collection_name=self.collection_name,
field_name="department_id",
field_schema="keyword"
)
self.client.create_payload_index(
collection_name=self.collection_name,
field_name="is_active",
field_schema="bool"
)
async def upsert_document_chunks(
self,
document_id: str,
chunks: List[str],
embeddings: List[List[float]],
metadata: Dict
) -> List[str]:
"""Upsert document chunks to Qdrant"""
points = []
point_ids = []
for i, (chunk, embedding) in enumerate(zip(chunks, embeddings)):
point_id = str(uuid.uuid4())
point_ids.append(point_id)
points.append(PointStruct(
id=point_id,
vector=embedding,
payload={
"document_id": document_id,
"sharepoint_id": metadata["sharepoint_id"],
"file_name": metadata["file_name"],
"file_url": metadata["file_url"],
"chunk_index": i,
"total_chunks": len(chunks),
"text": chunk,
"department_id": metadata.get("department_id"),
"region_code": metadata.get("region_code"),
"file_type": metadata["file_type"],
"last_modified": metadata["last_modified"],
"is_active": True
}
))
self.client.upsert(
collection_name=self.collection_name,
points=points
)
return point_ids
async def search(
self,
query_vector: List[float],
department_id: str = None,
limit: int = 5
) -> List[Dict]:
"""Search for relevant chunks with optional department filtering"""
filter_conditions = [
FieldCondition(key="is_active", match=MatchValue(value=True))
]
if department_id:
filter_conditions.append(
FieldCondition(key="department_id", match=MatchValue(value=department_id))
)
results = self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
query_filter=Filter(must=filter_conditions) if filter_conditions else None,
limit=limit,
with_payload=True
)
return [
{
"id": hit.id,
"score": hit.score,
"text": hit.payload["text"],
"file_name": hit.payload["file_name"],
"file_url": hit.payload["file_url"],
"chunk_index": hit.payload["chunk_index"]
}
for hit in results
]
async def deactivate_document(self, document_id: str):
"""Mark document vectors as inactive (soft delete)"""
# Qdrant doesn't support direct updates, so we need to search and update
# Alternatively, store point IDs in PostgreSQL and update by ID
pass
3.4 Celery Sync Task
File: backend/app/tasks/sync_sharepoint.py
from celery import Task
from app.tasks.celery_app import celery_app
from app.database import SessionLocal
from app.models.sharepoint import SharePointLibrary, SharePointDocument
from app.services.sharepoint_sync import SharePointSyncService
from app.utils.text_processing import extract_text_from_file, chunk_text
from app.utils.embeddings import generate_embeddings
from app.services.qdrant_service import QdrantService
import tempfile
import httpx
@celery_app.task(bind=True, max_retries=3)
def sync_sharepoint_library(self: Task, library_id: str):
"""
Celery task to sync a SharePoint library
- Fetch delta changes from MS Graph
- Download new/modified documents
- Extract text, chunk, embed
- Upsert to Qdrant and PostgreSQL
"""
db = SessionLocal()
try:
library = db.query(SharePointLibrary).filter(SharePointLibrary.id == library_id).first()
if not library or not library.is_active:
return
library.last_sync_status = "syncing"
db.commit()
sync_service = SharePointSyncService()
qdrant_service = QdrantService()
# Fetch delta changes
changes = await sync_service.fetch_delta_changes(library)
for item in changes:
if item.get("deleted"):
# Handle deletion
_handle_deleted_document(db, qdrant_service, item)
elif item.get("file"):
# Handle new/updated file
await _process_document(db, sync_service, qdrant_service, library, item)
library.last_sync_status = "success"
library.last_sync_at = datetime.utcnow()
db.commit()
except Exception as e:
library.last_sync_status = "error"
library.error_message = str(e)
db.commit()
# Retry with exponential backoff
raise self.retry(exc=e, countdown=60 * (2 ** self.request.retries))
finally:
db.close()
async def _process_document(db, sync_service, qdrant_service, library, item):
"""Process a single document: download, extract, embed, index"""
sharepoint_id = item["id"]
file_name = item["name"]
file_url = item["webUrl"]
file_type = file_name.split(".")[-1].lower()
last_modified = item["lastModifiedDateTime"]
# Check if already processed
existing = db.query(SharePointDocument).filter(
SharePointDocument.sharepoint_id == sharepoint_id
).first()
if existing and existing.last_modified >= last_modified:
# No changes, skip
return
# Download file
token = sync_service.get_access_token()
download_url = item["@microsoft.graph.downloadUrl"]
async with httpx.AsyncClient() as client:
response = await client.get(download_url)
file_content = response.content
# Save to temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=f".{file_type}") as tmp:
tmp.write(file_content)
tmp_path = tmp.name
# Extract text
text = extract_text_from_file(tmp_path, file_type)
# Chunk text
chunks = chunk_text(text)
# Generate embeddings
embeddings = await generate_embeddings(chunks)
# Upsert to Qdrant
metadata = {
"sharepoint_id": sharepoint_id,
"file_name": file_name,
"file_url": file_url,
"file_type": file_type,
"department_id": str(library.department_id) if library.department_id else None,
"region_code": library.department.region.code if library.department else None,
"last_modified": last_modified
}
point_ids = await qdrant_service.upsert_document_chunks(
document_id=str(existing.id) if existing else str(uuid.uuid4()),
chunks=chunks,
embeddings=embeddings,
metadata=metadata
)
# Update PostgreSQL
if existing:
# Deactivate old vectors
await qdrant_service.deactivate_document(str(existing.id))
existing.qdrant_point_ids = point_ids
existing.chunk_count = len(chunks)
existing.last_modified = last_modified
existing.ingested_at = datetime.utcnow()
else:
doc = SharePointDocument(
library_id=library.id,
sharepoint_id=sharepoint_id,
file_name=file_name,
file_url=file_url,
file_type=file_type,
file_size=len(file_content),
department_id=library.department_id,
last_modified=last_modified,
qdrant_point_ids=point_ids,
chunk_count=len(chunks)
)
db.add(doc)
db.commit()
3.5 Celery Beat Schedule
File: backend/app/tasks/celery_app.py
from celery import Celery
from celery.schedules import crontab
from app.config import settings
celery_app = Celery(
"nexus",
broker=settings.REDIS_URL,
backend=settings.REDIS_URL
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='UTC',
enable_utc=True,
beat_schedule={
'sync-sharepoint-hourly': {
'task': 'app.tasks.sync_sharepoint.sync_all_libraries',
'schedule': crontab(minute=0), # Every hour
},
'cleanup-expired-sessions': {
'task': 'app.tasks.cleanup_sessions.cleanup_notebook_sessions',
'schedule': crontab(hour=2, minute=0), # Daily at 2 AM
},
}
)
3.6 Admin UI: Library Management
Endpoint: POST /api/v1/admin/sharepoint/libraries
@router.post("/libraries", dependencies=[Depends(require_admin)])
async def create_library(
site_url: str,
library_name: str,
department_id: str,
db: Session = Depends(get_db)
):
"""Content Manager adds a SharePoint library to sync"""
library = SharePointLibrary(
site_url=site_url,
library_name=library_name,
department_id=department_id,
is_active=True
)
db.add(library)
db.commit()
# Trigger initial sync
sync_sharepoint_library.delay(str(library.id))
return {"id": str(library.id), "status": "sync_initiated"}
3.7 Deliverables
- ✅ MS Graph API Client Credentials flow working
- ✅ Delta sync fetching changed documents
- ✅ Text extraction for PDF, DOCX, XLSX
- ✅ Chunking + OpenAI embedding generation
- ✅ Qdrant collection created with indexes
- ✅ Celery task syncing documents hourly
- ✅ Admin UI to add/remove SharePoint libraries
- ✅ Soft-delete handling (mark vectors as
is_active=false)
Phase 4: RAG Logic & LLM Router (Week 5)
Goal
Implement RAG retrieval, LLM routing based on mode, citation formatting, and streaming responses.
Tasks
4.1 LLM Router Service
File: backend/app/services/llm_router.py
from typing import List, Dict, AsyncGenerator
from openai import AsyncOpenAI
from anthropic import AsyncAnthropic
import google.generativeai as genai
from app.config import settings
from app.models.llm_config import LLMConfig
from sqlalchemy.orm import Session
class LLMRouter:
def __init__(self, db: Session):
self.db = db
self.openai_client = AsyncOpenAI(api_key=settings.OPENAI_API_KEY)
self.anthropic_client = AsyncAnthropic(api_key=settings.ANTHROPIC_API_KEY)
genai.configure(api_key=settings.GOOGLE_API_KEY)
def get_llm_config(self, mode: str) -> LLMConfig:
"""Fetch active LLM config for given mode"""
config = self.db.query(LLMConfig).filter(
LLMConfig.mode == mode,
LLMConfig.is_active == True
).first()
if not config:
raise ValueError(f"No active LLM config for mode: {mode}")
return config
async def stream_completion(
self,
mode: str,
messages: List[Dict[str, str]],
temperature: float = None
) -> AsyncGenerator[str, None]:
"""Route to appropriate LLM provider and stream tokens"""
config = self.get_llm_config(mode)
temp = temperature or config.temperature
if config.provider == "openai":
async for token in self._stream_openai(config.model_name, messages, temp):
yield token
elif config.provider == "anthropic":
async for token in self._stream_anthropic(config.model_name, messages, temp):
yield token
elif config.provider == "google":
async for token in self._stream_google(config.model_name, messages, temp):
yield token
async def _stream_openai(self, model: str, messages: List[Dict], temp: float):
stream = await self.openai_client.chat.completions.create(
model=model,
messages=messages,
temperature=temp,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
async def _stream_anthropic(self, model: str, messages: List[Dict], temp: float):
# Convert messages format (OpenAI → Anthropic)
system_msg = next((m["content"] for m in messages if m["role"] == "system"), None)
user_messages = [m for m in messages if m["role"] != "system"]
async with self.anthropic_client.messages.stream(
model=model,
max_tokens=4096,
temperature=temp,
system=system_msg,
messages=user_messages
) as stream:
async for text in stream.text_stream:
yield text
async def _stream_google(self, model: str, messages: List[Dict], temp: float):
# Implement Gemini streaming
pass
4.2 RAG Service
File: backend/app/services/rag_service.py
from typing import List, Dict, AsyncGenerator
from app.services.qdrant_service import QdrantService
from app.services.llm_router import LLMRouter
from app.utils.embeddings import generate_embeddings
from sqlalchemy.orm import Session
class RAGService:
def __init__(self, db: Session):
self.db = db
self.qdrant = QdrantService()
self.llm_router = LLMRouter(db)
async def query(
self,
user_query: str,
department_id: str = None,
top_k: int = 5
) -> AsyncGenerator[str, None]:
"""
Perform RAG query:
1. Embed user query
2. Search Qdrant for relevant chunks
3. Build context with citations
4. Stream LLM response
"""
# Generate query embedding
query_embedding = (await generate_embeddings([user_query]))[0]
# Search Qdrant
search_results = await self.qdrant.search(
query_vector=query_embedding,
department_id=department_id,
limit=top_k
)
# Build context with numbered citations
context_parts = []
citations = {}
for i, result in enumerate(search_results, start=1):
context_parts.append(f"[{i}] {result['text']}")
citations[i] = {
"file_name": result["file_name"],
"file_url": result["file_url"],
"score": result["score"]
}
context = "\n\n".join(context_parts)
# Build prompt
system_prompt = """You are a corporate knowledge assistant. Answer questions ONLY based on the provided context.
If the context does not contain the answer, respond: "I don't have enough information to answer this question."
Always cite sources using numbered references [1], [2], etc."""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {user_query}"}
]
# Stream response from GPT-5.1 (configured for RAG mode)
full_response = ""
async for token in self.llm_router.stream_completion(mode="rag", messages=messages):
full_response += token
yield token
# Append citations at the end
citation_text = "\n\nSources:\n"
for num, cite in citations.items():
citation_text += f"[{num}] {cite['file_name']} - {cite['file_url']}\n"
yield citation_text
4.3 Streaming Endpoint
File: backend/app/api/v1/endpoints/chat.py
from fastapi import APIRouter, Depends, Query
from fastapi.responses import StreamingResponse
from sqlalchemy.orm import Session
from app.database import get_db
from app.core.dependencies import get_current_user
from app.models.user import User
from app.models.conversation import Conversation, Message, ConversationMode
from app.services.rag_service import RAGService
import json
router = APIRouter()
@router.get("/stream")
async def stream_chat(
conversation_id: str = Query(...),
query: str = Query(...),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Stream RAG responses via Server-Sent Events"""
# Verify conversation ownership
conversation = db.query(Conversation).filter(
Conversation.id == conversation_id,
Conversation.user_id == current_user.id
).first()
if not conversation:
return {"error": "Conversation not found"}
# Save user message
user_msg = Message(
conversation_id=conversation_id,
role="user",
content=query
)
db.add(user_msg)
db.commit()
# Stream RAG response
rag_service = RAGService(db)
async def event_generator():
full_response = ""
input_tokens = 0
output_tokens = 0
try:
async for token in rag_service.query(
user_query=query,
department_id=str(current_user.department_id) if current_user.department_id else None
):
full_response += token
output_tokens += 1 # Rough estimate
yield f"data: {json.dumps({'token': token})}\n\n"
# Save assistant message
assistant_msg = Message(
conversation_id=conversation_id,
role="assistant",
content=full_response,
input_tokens=input_tokens,
output_tokens=output_tokens,
llm_provider="openai",
llm_model="gpt-5.1"
)
db.add(assistant_msg)
db.commit()
yield f"data: {json.dumps({'done': True, 'message_id': str(assistant_msg.id)})}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
4.4 Deliverables
- ✅ LLM router correctly selects provider per mode
- ✅ RAG service embeds query → searches Qdrant → builds context
- ✅ Citations formatted as [1], [2] with source URLs
- ✅ Streaming endpoint returns SSE stream
- ✅ Token counts tracked per message
- ✅ Department filtering applied to search results
Phase 5: NotebookLlama Integration (Week 6)
Goal
Implement file upload proxy, session management, and chat forwarding to NotebookLlama.
Tasks
5.1 NotebookLlama Proxy Service
File: backend/app/services/notebook_proxy.py
import httpx
from typing import List, Dict, AsyncGenerator
from app.config import settings
from app.models.notebook import NotebookSession, UploadedFile
from sqlalchemy.orm import Session
import os
class NotebookLlamaProxy:
def __init__(self):
self.base_url = settings.NOTEBOOKLLAMA_URL
async def upload_file(self, session_id: str, file_path: str) -> str:
"""Upload file to NotebookLlama, return file_id"""
async with httpx.AsyncClient() as client:
with open(file_path, 'rb') as f:
files = {'file': f}
response = await client.post(
f"{self.base_url}/upload",
files=files,
data={"session_id": session_id}
)
result = response.json()
return result["file_id"]
async def chat(
self,
session_id: str,
query: str,
file_ids: List[str]
) -> AsyncGenerator[str, None]:
"""Stream chat response from NotebookLlama"""
async with httpx.AsyncClient(timeout=60.0) as client:
async with client.stream(
"POST",
f"{self.base_url}/chat",
json={
"session_id": session_id,
"query": query,
"file_ids": file_ids
}
) as response:
async for line in response.aiter_lines():
if line.startswith("data: "):
yield line[6:] # Remove "data: " prefix
5.2 Upload Endpoint
File: backend/app/api/v1/endpoints/notebook.py
from fastapi import APIRouter, Depends, UploadFile, File, HTTPException
from sqlalchemy.orm import Session
from app.database import get_db
from app.core.dependencies import get_current_user
from app.models.user import User
from app.models.notebook import NotebookSession, UploadedFile
from app.services.notebook_proxy import NotebookLlamaProxy
import os
import uuid
from datetime import datetime, timedelta
router = APIRouter()
MAX_FILE_SIZE = 100 * 1024 * 1024 # 100 MB
UPLOAD_DIR = "/app/uploads"
@router.post("/sessions/{session_id}/upload")
async def upload_file(
session_id: str,
file: UploadFile = File(...),
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Upload file to NotebookLlama session"""
# Verify session ownership
session = db.query(NotebookSession).filter(
NotebookSession.id == session_id,
NotebookSession.user_id == current_user.id
).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
# Validate file size
file_size = 0
content = await file.read()
file_size = len(content)
if file_size > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large")
# Check session total size
if session.total_file_size + file_size > MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="Session storage quota exceeded")
# Save file locally
session_dir = os.path.join(UPLOAD_DIR, str(session_id))
os.makedirs(session_dir, exist_ok=True)
file_id = str(uuid.uuid4())
file_ext = os.path.splitext(file.filename)[1]
storage_path = os.path.join(session_dir, f"{file_id}{file_ext}")
with open(storage_path, 'wb') as f:
f.write(content)
# Upload to NotebookLlama
proxy = NotebookLlamaProxy()
notebookllama_file_id = await proxy.upload_file(str(session_id), storage_path)
# Save to database
uploaded = UploadedFile(
session_id=session_id,
file_name=file.filename,
file_size=file_size,
file_type=file_ext[1:] if file_ext else "unknown",
storage_path=storage_path,
notebookllama_file_id=notebookllama_file_id
)
db.add(uploaded)
session.total_file_size += file_size
db.commit()
return {
"file_id": str(uploaded.id),
"file_name": file.filename,
"size": file_size
}
@router.post("/sessions/{session_id}/pin")
async def pin_session(
session_id: str,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""Pin session for permanent storage"""
session = db.query(NotebookSession).filter(
NotebookSession.id == session_id,
NotebookSession.user_id == current_user.id
).first()
if not session:
raise HTTPException(status_code=404, detail="Session not found")
session.is_pinned = True
session.expires_at = None
db.commit()
return {"status": "pinned"}
5.3 Cleanup Task
File: backend/app/tasks/cleanup_sessions.py
from celery import Task
from app.tasks.celery_app import celery_app
from app.database import SessionLocal
from app.models.notebook import NotebookSession, UploadedFile
from datetime import datetime
import os
import shutil
@celery_app.task
def cleanup_notebook_sessions():
"""Delete expired notebook sessions and their files"""
db = SessionLocal()
try:
expired_sessions = db.query(NotebookSession).filter(
NotebookSession.is_pinned == False,
NotebookSession.expires_at < datetime.utcnow()
).all()
for session in expired_sessions:
# Delete files from disk
session_dir = f"/app/uploads/{session.id}"
if os.path.exists(session_dir):
shutil.rmtree(session_dir)
# Delete from database (cascade will remove uploaded_files)
db.delete(session)
db.commit()
finally:
db.close()
5.4 Deliverables
- ✅ File upload endpoint with size validation
- ✅ Files proxied to NotebookLlama
- ✅ Session management (pin/unpin)
- ✅ Celery task cleaning up expired sessions
- ✅ Chat endpoint streaming from NotebookLlama
Phase 6: Frontend Development (Week 7-9)
Goal
Build Next.js UI with Shadcn components, implement chat interface, streaming, and admin dashboard.
Tasks
6.1 Project Setup
npx create-next-app@latest frontend --typescript --tailwind --app
cd frontend
npx shadcn-ui@latest init
npx shadcn-ui@latest add button input textarea card dropdown-menu tabs
npm install zustand @tanstack/react-query axios
6.2 API Client
File: frontend/lib/api-client.ts
import axios from 'axios';
const apiClient = axios.create({
baseURL: process.env.NEXT_PUBLIC_API_URL || 'http://localhost:8000/api/v1',
headers: {
'Content-Type': 'application/json',
},
});
// Request interceptor to add JWT token
apiClient.interceptors.request.use((config) => {
const token = localStorage.getItem('access_token');
if (token) {
config.headers.Authorization = `Bearer ${token}`;
}
return config;
});
// Response interceptor for token refresh
apiClient.interceptors.response.use(
(response) => response,
async (error) => {
if (error.response?.status === 401) {
// Try to refresh token
const refreshToken = localStorage.getItem('refresh_token');
if (refreshToken) {
try {
const { data } = await axios.post(`${apiClient.defaults.baseURL}/auth/refresh`, {
refresh_token: refreshToken,
});
localStorage.setItem('access_token', data.access_token);
error.config.headers.Authorization = `Bearer ${data.access_token}`;
return apiClient.request(error.config);
} catch {
// Refresh failed, redirect to login
window.location.href = '/login';
}
}
}
return Promise.reject(error);
}
);
export default apiClient;
6.3 Zustand Store
File: frontend/lib/chat-store.ts
import { create } from 'zustand';
interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
timestamp: Date;
}
interface Conversation {
id: string;
mode: 'rag' | 'assistant' | 'notebook';
title: string;
messages: Message[];
}
interface ChatStore {
conversations: Conversation[];
activeConversationId: string | null;
setActiveConversation: (id: string) => void;
addMessage: (conversationId: string, message: Message) => void;
createConversation: (mode: 'rag' | 'assistant' | 'notebook') => void;
}
export const useChatStore = create<ChatStore>((set) => ({
conversations: [],
activeConversationId: null,
setActiveConversation: (id) => set({ activeConversationId: id }),
addMessage: (conversationId, message) => set((state) => ({
conversations: state.conversations.map((conv) =>
conv.id === conversationId
? { ...conv, messages: [...conv.messages, message] }
: conv
),
})),
createConversation: (mode) => set((state) => {
const newConv: Conversation = {
id: crypto.randomUUID(),
mode,
title: 'New Chat',
messages: [],
};
return {
conversations: [...state.conversations, newConv],
activeConversationId: newConv.id,
};
}),
}));
6.4 Streaming Chat Component
File: frontend/components/chat/StreamingMessage.tsx
'use client';
import { useEffect, useState } from 'react';
import { Card } from '@/components/ui/card';
interface StreamingMessageProps {
conversationId: string;
query: string;
onComplete: (fullText: string, messageId: string) => void;
}
export function StreamingMessage({ conversationId, query, onComplete }: StreamingMessageProps) {
const [text, setText] = useState('');
const [isComplete, setIsComplete] = useState(false);
useEffect(() => {
const eventSource = new EventSource(
`${process.env.NEXT_PUBLIC_API_URL}/chat/stream?conversation_id=${conversationId}&query=${encodeURIComponent(query)}`
);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) {
setIsComplete(true);
onComplete(text, data.message_id);
eventSource.close();
} else if (data.token) {
setText((prev) => prev + data.token);
} else if (data.error) {
console.error('Streaming error:', data.error);
eventSource.close();
}
};
eventSource.onerror = () => {
eventSource.close();
};
return () => {
eventSource.close();
};
}, [conversationId, query]);
return (
<Card className="p-4">
<div className="prose dark:prose-invert">
{text}
{!isComplete && <span className="animate-pulse">▊</span>}
</div>
</Card>
);
}
6.5 Chat Interface
File: frontend/app/(dashboard)/chat/page.tsx
'use client';
import { useState } from 'react';
import { useChatStore } from '@/lib/chat-store';
import { StreamingMessage } from '@/components/chat/StreamingMessage';
import { Input } from '@/components/ui/input';
import { Button } from '@/components/ui/button';
export default function ChatPage() {
const { activeConversationId, conversations, addMessage } = useChatStore();
const [query, setQuery] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const activeConversation = conversations.find((c) => c.id === activeConversationId);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
if (!query.trim() || !activeConversationId) return;
// Add user message
addMessage(activeConversationId, {
id: crypto.randomUUID(),
role: 'user',
content: query,
timestamp: new Date(),
});
setIsStreaming(true);
setQuery('');
};
const handleStreamComplete = (fullText: string, messageId: string) => {
if (!activeConversationId) return;
addMessage(activeConversationId, {
id: messageId,
role: 'assistant',
content: fullText,
timestamp: new Date(),
});
setIsStreaming(false);
};
return (
<div className="flex h-screen flex-col">
{/* Messages */}
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{activeConversation?.messages.map((msg) => (
<div key={msg.id} className={msg.role === 'user' ? 'text-right' : 'text-left'}>
<div className="inline-block max-w-[80%]">
<div className="font-semibold text-sm mb-1">
{msg.role === 'user' ? 'You' : 'Assistant'}
</div>
<div className="bg-secondary p-3 rounded-lg">
{msg.content}
</div>
</div>
</div>
))}
{isStreaming && activeConversationId && (
<StreamingMessage
conversationId={activeConversationId}
query={query}
onComplete={handleStreamComplete}
/>
)}
</div>
{/* Input */}
<form onSubmit={handleSubmit} className="border-t p-4 flex gap-2">
<Input
value={query}
onChange={(e) => setQuery(e.target.value)}
placeholder="Ask a question..."
disabled={isStreaming}
className="flex-1"
/>
<Button type="submit" disabled={isStreaming || !query.trim()}>
Send
</Button>
</form>
</div>
);
}
6.6 Admin Dashboard
File: frontend/app/(dashboard)/admin/page.tsx
'use client';
import { useState } from 'react';
import { useQuery, useMutation } from '@tanstack/react-query';
import apiClient from '@/lib/api-client';
import { Button } from '@/components/ui/button';
import { Input } from '@/components/ui/input';
import { Card } from '@/components/ui/card';
export default function AdminPage() {
const [siteUrl, setSiteUrl] = useState('');
const [libraryName, setLibraryName] = useState('');
const [departmentId, setDepartmentId] = useState('');
const { data: libraries } = useQuery({
queryKey: ['libraries'],
queryFn: async () => {
const { data } = await apiClient.get('/admin/sharepoint/libraries');
return data;
},
});
const createLibrary = useMutation({
mutationFn: async (data: any) => {
return apiClient.post('/admin/sharepoint/libraries', data);
},
onSuccess: () => {
// Refetch libraries
setSiteUrl('');
setLibraryName('');
setDepartmentId('');
},
});
return (
<div className="p-8">
<h1 className="text-3xl font-bold mb-6">Admin Dashboard</h1>
<Card className="p-6 mb-6">
<h2 className="text-xl font-semibold mb-4">Add SharePoint Library</h2>
<div className="space-y-4">
<Input
placeholder="Site URL"
value={siteUrl}
onChange={(e) => setSiteUrl(e.target.value)}
/>
<Input
placeholder="Library Name"
value={libraryName}
onChange={(e) => setLibraryName(e.target.value)}
/>
<Input
placeholder="Department ID"
value={departmentId}
onChange={(e) => setDepartmentId(e.target.value)}
/>
<Button
onClick={() => createLibrary.mutate({ site_url: siteUrl, library_name: libraryName, department_id: departmentId })}
>
Add Library
</Button>
</div>
</Card>
<Card className="p-6">
<h2 className="text-xl font-semibold mb-4">Configured Libraries</h2>
{/* Table of libraries */}
</Card>
</div>
);
}
6.7 Deliverables
- ✅ Next.js app with App Router
- ✅ Shadcn/UI components styled with Tailwind
- ✅ Entra ID OAuth login flow
- ✅ Streaming chat interface with SSE
- ✅ Mode switcher (RAG / Assistant / Notebook)
- ✅ Admin dashboard for SharePoint library management
- ✅ Responsive mobile design
- ✅ Dark/Light mode toggle
🔐 Security Checklist
- Entra ID Client Credentials stored in environment variables only
- JWT secret key minimum 32 characters
- HTTPS enforced in production
- CORS configured to frontend domain only
- SQL injection protection via SQLAlchemy ORM
- File upload size limits enforced
- XSS protection via React's automatic escaping
- Rate limiting (Phase 2)
- Audit logging for admin actions
- Secrets never committed to Git (.env in .gitignore)
📈 Testing Strategy
Unit Tests
- Backend:
pytestfor services, utilities - Frontend:
vitestfor components, stores
Integration Tests
- API endpoints with test database
- Celery task execution with test Redis
E2E Tests
- Playwright for critical user flows (login, chat, upload)
🚀 Deployment
Docker Compose (Staging)
docker-compose up -d
alembic upgrade head
python seed_data.py # Insert regions, departments, LLM configs
Production (Kubernetes)
- Separate deployments for: FastAPI, Celery Worker, Celery Beat, Frontend
- Managed PostgreSQL (Azure Database for PostgreSQL)
- Managed Redis (Azure Cache for Redis)
- Qdrant Cloud or self-hosted Qdrant cluster
- Azure App Service for frontend (or Vercel)
📅 Timeline Summary
| Phase | Duration | Key Deliverables |
|---|---|---|
| 1. Environment Setup | 1 week | Docker Compose running all services |
| 2. Backend Core | 1 week | Auth, RBAC, database migrations |
| 3. SharePoint Pipeline | 2 weeks | Delta sync, embeddings, Qdrant indexing |
| 4. RAG Logic | 1 week | LLM router, streaming, citations |
| 5. NotebookLlama | 1 week | Proxy, uploads, session management |
| 6. Frontend | 3 weeks | Next.js UI, chat, admin dashboard |
| Total | 9 weeks | Production-ready MVP |
🎯 Success Metrics
- Performance: RAG query response time < 3 seconds
- Accuracy: >85% positive feedback on RAG answers
- Availability: 99.5% uptime
- Cost: <$500/month for 100 users (excluding Azure infrastructure)
📝 Next Steps
- Week 1: Clone repo structure, set up Docker Compose, verify all services start
- Week 2: Implement Entra ID auth, create database migrations, seed taxonomy data
- Week 3-4: Build SharePoint sync pipeline, test with sample library
- Week 5: Integrate LLM router, test RAG queries with citations
- Week 6: Implement NotebookLlama proxy, test file uploads
- Week 7-9: Build frontend, conduct user testing, deploy to staging
End of Implementation Plan