Full-stack app that turns HP customer briefs (master asset + regional supporting docs) into a set of branded Word deliverables via a RAG + agent pipeline. Stack - FastAPI + SQLAlchemy + pgvector + RQ (backend, Python 3.12) - React + Vite + TypeScript + Tailwind + TanStack Query (frontend) - Claude Opus 4.7 (generation) + Haiku 4.5 (translation/OCR) - Voyage voyage-3 or OpenAI text-embedding-3-small (embeddings) - python-docx (branded Word output, Montserrat + HP blue) - Docker Compose (5 services) Features - 6 built-in deliverable types (leadership themes, regional enrichment, LinkedIn posts, webinar spec, infographic specs, ABM enablement) - Data-driven deliverable types: admins add new types at runtime via prompt + JSON schema + template_json — no code, no deploy - Generic schema-driven review form + generic Word template renderer - Document ingestion pipeline with translation, chunking, pgvector RAG - Pluggable auth provider (password now, Entra SSO later); admin/user roles - Re-roll / retry on every deliverable; cascading delete; brief editing; inline document upload; progress hints; router-level ErrorBoundary - Admin panel with test-render preview for new deliverable types - Help page at /help with architecture overview and usage guide 82 backend tests passing, 18 skipped (gated live-API tests). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
58 lines
2 KiB
Python
58 lines
2 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import bcrypt
|
|
from jose import JWTError, jwt
|
|
|
|
from app.core.config import settings
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Password hashing — use bcrypt directly; passlib has compat issues with
|
|
# bcrypt>=4 on recent Pythons. bcrypt hashes the first 72 bytes only, so we
|
|
# truncate explicitly for a consistent contract.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_BCRYPT_MAX_BYTES = 72
|
|
|
|
|
|
def _truncate(plain: str) -> bytes:
|
|
return plain.encode("utf-8")[:_BCRYPT_MAX_BYTES]
|
|
|
|
|
|
def hash_password(plain: str) -> str:
|
|
"""Return a bcrypt hash of *plain* as a UTF-8 string."""
|
|
return bcrypt.hashpw(_truncate(plain), bcrypt.gensalt()).decode("utf-8")
|
|
|
|
|
|
def verify_password(plain: str, hashed: str) -> bool:
|
|
"""Return True if *plain* matches *hashed*."""
|
|
try:
|
|
return bcrypt.checkpw(_truncate(plain), hashed.encode("utf-8"))
|
|
except ValueError:
|
|
return False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# JWT
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_access_token(
|
|
subject: str,
|
|
extra_claims: dict[str, Any] | None = None,
|
|
expires_delta: timedelta | None = None,
|
|
) -> str:
|
|
"""Encode a JWT with *subject* (user id or email) and optional extra claims."""
|
|
expire = datetime.now(timezone.utc) + (
|
|
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
|
|
)
|
|
payload: dict[str, Any] = {"sub": str(subject), "exp": expire}
|
|
if extra_claims:
|
|
payload.update(extra_claims)
|
|
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
|
|
|
|
|
|
def decode_access_token(token: str) -> dict[str, Any]:
|
|
"""Decode and verify a JWT. Raises ``jose.JWTError`` on failure."""
|
|
return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])
|