hp-studios-ai-content-agent/backend/app/core/security.py
DJP 72c8a0d0fe Initial import — HP Studios AI Content Agent
Full-stack app that turns HP customer briefs (master asset + regional
supporting docs) into a set of branded Word deliverables via a RAG +
agent pipeline.

Stack
- FastAPI + SQLAlchemy + pgvector + RQ (backend, Python 3.12)
- React + Vite + TypeScript + Tailwind + TanStack Query (frontend)
- Claude Opus 4.7 (generation) + Haiku 4.5 (translation/OCR)
- Voyage voyage-3 or OpenAI text-embedding-3-small (embeddings)
- python-docx (branded Word output, Montserrat + HP blue)
- Docker Compose (5 services)

Features
- 6 built-in deliverable types (leadership themes, regional enrichment,
  LinkedIn posts, webinar spec, infographic specs, ABM enablement)
- Data-driven deliverable types: admins add new types at runtime via
  prompt + JSON schema + template_json — no code, no deploy
- Generic schema-driven review form + generic Word template renderer
- Document ingestion pipeline with translation, chunking, pgvector RAG
- Pluggable auth provider (password now, Entra SSO later); admin/user roles
- Re-roll / retry on every deliverable; cascading delete; brief editing;
  inline document upload; progress hints; router-level ErrorBoundary
- Admin panel with test-render preview for new deliverable types
- Help page at /help with architecture overview and usage guide

82 backend tests passing, 18 skipped (gated live-API tests).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-17 17:11:25 -04:00

58 lines
2 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
import bcrypt
from jose import JWTError, jwt
from app.core.config import settings
# ---------------------------------------------------------------------------
# Password hashing — use bcrypt directly; passlib has compat issues with
# bcrypt>=4 on recent Pythons. bcrypt hashes the first 72 bytes only, so we
# truncate explicitly for a consistent contract.
# ---------------------------------------------------------------------------
_BCRYPT_MAX_BYTES = 72
def _truncate(plain: str) -> bytes:
return plain.encode("utf-8")[:_BCRYPT_MAX_BYTES]
def hash_password(plain: str) -> str:
"""Return a bcrypt hash of *plain* as a UTF-8 string."""
return bcrypt.hashpw(_truncate(plain), bcrypt.gensalt()).decode("utf-8")
def verify_password(plain: str, hashed: str) -> bool:
"""Return True if *plain* matches *hashed*."""
try:
return bcrypt.checkpw(_truncate(plain), hashed.encode("utf-8"))
except ValueError:
return False
# ---------------------------------------------------------------------------
# JWT
# ---------------------------------------------------------------------------
def create_access_token(
subject: str,
extra_claims: dict[str, Any] | None = None,
expires_delta: timedelta | None = None,
) -> str:
"""Encode a JWT with *subject* (user id or email) and optional extra claims."""
expire = datetime.now(timezone.utc) + (
expires_delta or timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
)
payload: dict[str, Any] = {"sub": str(subject), "exp": expire}
if extra_claims:
payload.update(extra_claims)
return jwt.encode(payload, settings.JWT_SECRET, algorithm=settings.JWT_ALGORITHM)
def decode_access_token(token: str) -> dict[str, Any]:
"""Decode and verify a JWT. Raises ``jose.JWTError`` on failure."""
return jwt.decode(token, settings.JWT_SECRET, algorithms=[settings.JWT_ALGORITHM])