Approval workflow backend: notifications + Mailgun email

The two gated stages (3 = qualification, 14 = approval gates) now have a
real workflow. An admin / orchestrator picks approvers, the backend creates
Approval rows, fires in-app Notifications, and sends a Mailgun email with
a deeplink to the approval page. Each approval gets a unique email_token
so links are per-recipient. When the approver submits a decision the
opportunity owner is notified back.

End-to-end smoke test on the Versuni opp:
1. POST /opportunities/2/stages/3/approvals — created Approval #1,
   notification queued, Mailgun "skipped" (no API key in dev) and logged.
2. POST /opportunities/2/stages/3/complete blocked: "pending approvals
   from commercial".
3. POST /approvals/1/decision {"decision":"approve",...} — status 'approved'.
4. POST /opportunities/2/stages/3/complete — succeeds, advances to stage 4.

Schema:
- New table 'notifications' (user_id, type enum, title, body, related FKs,
  read flag, indexes on user_id+read and user_id+created_at).
- Approvals table gets email_token (unique), email_sent_at, email_to.
- Migration 0004 (filename + revision id kept short — alembic_version is
  varchar(32), names longer than that crash the migration runner).

Services:
- mailgun.send_email — gracefully no-ops when MAILGUN_API_KEY is empty,
  logging the would-be payload so dev environments work without creds.
  Selects api.eu.mailgun.net when MAILGUN_REGION=eu.
- approval_service.request_approval — creates approval + notification +
  fires email, all in one call. Email is best-effort (logs on failure
  but doesn't roll back the approval).
- approval_service.record_decision — flips status, stamps decided_at,
  notifies the opportunity owner if there is one.
- notification_service.create_notification — thin helper.

Auth middleware now upserts the AppUser on every authenticated request
(including the dev bypass), so logged-in users automatically appear in
the approver directory at /api/users.

API:
- POST /opportunities/{id}/stages/{n}/approvals (only stages 3, 14)
- GET /opportunities/{id}/stages/{n}/approvals
- GET /approvals/me — my pending approvals
- GET /approvals/{id} — context (approval + opportunity summary)
- GET /approvals/by-token/{token} — same context, opened by email link
- POST /approvals/{id}/decision — {decision: 'approve'|'reject', comment}
- GET /notifications/me, /me/unread-count, PUT {id}/read,
  POST /me/mark-all-read
- GET /users, /users/me

Config:
- New env vars: MAILGUN_API_KEY, MAILGUN_DOMAIN, MAILGUN_FROM,
  MAILGUN_REGION, APP_PUBLIC_URL, APP_PATH_PREFIX (used to build
  email links). Plumbed into docker-compose.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
DJP 2026-04-27 13:28:22 -04:00
parent 5ad6d01846
commit 3f7531a1bf
16 changed files with 997 additions and 44 deletions

View file

@ -8,3 +8,11 @@ VITE_DEV_AUTH_BYPASS=
# Absolute path to the directory containing reference data (GMAL Excel etc.)
# Defaults to ./data (relative to repo) if not set
DATA_DIR=./data
# Mailgun — leave MAILGUN_API_KEY blank in dev to log emails instead of sending
MAILGUN_API_KEY=
MAILGUN_DOMAIN=
MAILGUN_FROM=OLIVER Sales Ops <noreply@example.com>
MAILGUN_REGION=us
# Public URL the app serves on — used to build approval email links
APP_PUBLIC_URL=http://localhost:3011
APP_PATH_PREFIX=/osop

View file

@ -0,0 +1,76 @@
"""notifications + approval email tokens
Revision ID: 0004_notifications
Revises: 0003_clarification_questions
Create Date: 2026-04-27
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ENUM
revision: str = "0004_notifications"
down_revision: Union[str, None] = "0003_clarification_questions"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
notification_type_enum = ENUM(
"APPROVAL_REQUESTED", "APPROVAL_APPROVED", "APPROVAL_REJECTED",
"STAGE_COMPLETED", "GENERIC",
name="notificationtype", create_type=False,
)
def upgrade() -> None:
bind = op.get_bind()
notification_type_enum.create(bind, checkfirst=True)
op.create_table(
"notifications",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("user_id", sa.Integer(), sa.ForeignKey("app_users.id", ondelete="CASCADE"), nullable=False),
sa.Column("type", notification_type_enum, nullable=False, server_default="GENERIC"),
sa.Column("title", sa.String(255), nullable=False),
sa.Column("body", sa.Text()),
sa.Column(
"related_opportunity_id",
sa.Integer(),
sa.ForeignKey("opportunities.id", ondelete="CASCADE"),
),
sa.Column(
"related_approval_id",
sa.Integer(),
sa.ForeignKey("approvals.id", ondelete="CASCADE"),
),
sa.Column("link_path", sa.String(500)),
sa.Column("read", sa.Boolean(), nullable=False, server_default=sa.false()),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now(), nullable=False),
sa.Column("read_at", sa.DateTime()),
)
op.create_index("idx_notification_user_read", "notifications", ["user_id", "read"])
op.create_index("idx_notification_user_created", "notifications", ["user_id", "created_at"])
# Approvals get an email_token + email_sent_at + email_to columns. The token
# is a uuid that lets the recipient open the approval page without an active
# SSO session yet — they can still complete auth before submitting a decision.
op.add_column("approvals", sa.Column("email_token", sa.String(64), unique=True))
op.add_column("approvals", sa.Column("email_sent_at", sa.DateTime()))
op.add_column("approvals", sa.Column("email_to", sa.String(255)))
op.create_index("idx_approval_email_token", "approvals", ["email_token"])
def downgrade() -> None:
op.drop_index("idx_approval_email_token", table_name="approvals")
op.drop_column("approvals", "email_to")
op.drop_column("approvals", "email_sent_at")
op.drop_column("approvals", "email_token")
op.drop_index("idx_notification_user_created", table_name="notifications")
op.drop_index("idx_notification_user_read", table_name="notifications")
op.drop_table("notifications")
bind = op.get_bind()
notification_type_enum.drop(bind, checkfirst=True)

View file

@ -0,0 +1,223 @@
"""Approval endpoints: request, decide, list, by-token."""
import logging
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.middleware.auth import get_current_user
from app.models.opportunity import Opportunity
from app.models.stage import Approval, ApprovalStatus
from app.models.user import AppUser
from app.schemas.approval import (
ApprovalContextOut,
ApprovalDecisionPayload,
ApprovalOut,
ApprovalRequestPayload,
ApproverInfo,
)
from app.services.approval_service import record_decision, request_approval
router = APIRouter()
logger = logging.getLogger(__name__)
GATED_STAGES = {3, 14}
def _approval_to_out(a: Approval, approver: AppUser | None = None) -> ApprovalOut:
return ApprovalOut(
id=a.id,
opportunity_id=a.opportunity_id,
stage_number=a.stage_number,
role_required=a.role_required,
approver_user_id=a.approver_user_id,
approver=ApproverInfo(id=approver.id, email=approver.email, name=approver.name) if approver else None,
status=a.status.value,
comment=a.comment,
requested_at=a.requested_at,
decided_at=a.decided_at,
email_sent_at=a.email_sent_at,
email_to=a.email_to,
)
@router.post("/opportunities/{opportunity_id}/stages/{stage_number}/approvals", response_model=list[ApprovalOut])
async def create_approval_requests(
opportunity_id: int,
stage_number: int,
payload: ApprovalRequestPayload,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
if stage_number not in GATED_STAGES:
raise HTTPException(
status_code=400,
detail=f"Stage {stage_number} is not a gated stage. Approval requests are only valid for stages {sorted(GATED_STAGES)}.",
)
opp_result = await db.execute(select(Opportunity).where(Opportunity.id == opportunity_id))
opp = opp_result.scalar_one_or_none()
if opp is None:
raise HTTPException(status_code=404, detail=f"Opportunity {opportunity_id} not found")
created: list[tuple[Approval, AppUser]] = []
for item in payload.requests:
user_result = await db.execute(select(AppUser).where(AppUser.id == item.approver_user_id))
approver = user_result.scalar_one_or_none()
if approver is None:
raise HTTPException(
status_code=400,
detail=f"Approver user {item.approver_user_id} not found",
)
approval = await request_approval(
db,
opportunity=opp,
stage_number=stage_number,
role_required=item.role_required,
approver=approver,
)
created.append((approval, approver))
await db.commit()
return [_approval_to_out(a, approver) for a, approver in created]
@router.get("/opportunities/{opportunity_id}/stages/{stage_number}/approvals", response_model=list[ApprovalOut])
async def list_stage_approvals(
opportunity_id: int,
stage_number: int,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
opp_result = await db.execute(select(Opportunity).where(Opportunity.id == opportunity_id))
opp = opp_result.scalar_one_or_none()
if opp is None:
raise HTTPException(status_code=404, detail=f"Opportunity {opportunity_id} not found")
result = await db.execute(
select(Approval, AppUser)
.outerjoin(AppUser, Approval.approver_user_id == AppUser.id)
.where(
Approval.opportunity_id == opportunity_id,
Approval.stage_number == stage_number,
)
.order_by(Approval.requested_at)
)
return [_approval_to_out(a, u) for a, u in result.all()]
@router.get("/approvals/me", response_model=list[ApprovalOut])
async def my_pending_approvals(
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
user_id = current.get("user_id")
if user_id is None:
raise HTTPException(status_code=401, detail="No user context")
result = await db.execute(
select(Approval, AppUser)
.outerjoin(AppUser, Approval.approver_user_id == AppUser.id)
.where(
Approval.approver_user_id == user_id,
Approval.status == ApprovalStatus.PENDING,
)
.order_by(Approval.requested_at)
)
return [_approval_to_out(a, u) for a, u in result.all()]
@router.get("/approvals/{approval_id}", response_model=ApprovalContextOut)
async def get_approval_context(
approval_id: int,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
return await _build_context(db, approval_id=approval_id)
@router.get("/approvals/by-token/{token}", response_model=ApprovalContextOut)
async def get_approval_by_token(
token: str,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
"""Open an approval by its email-link token. Authenticated session still
required so we can attribute the decision later."""
return await _build_context(db, token=token)
@router.post("/approvals/{approval_id}/decision", response_model=ApprovalOut)
async def submit_decision(
approval_id: int,
payload: ApprovalDecisionPayload,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
user_id = current.get("user_id")
result = await db.execute(
select(Approval, AppUser)
.outerjoin(AppUser, Approval.approver_user_id == AppUser.id)
.where(Approval.id == approval_id)
)
row = result.first()
if row is None:
raise HTTPException(status_code=404, detail=f"Approval {approval_id} not found")
approval, approver = row
# Only the assigned approver (or admin) can decide
if approval.approver_user_id and approval.approver_user_id != user_id and current.get("role") != "admin":
raise HTTPException(status_code=403, detail="You aren't the approver for this request")
decision = (payload.decision or "").lower().strip()
if decision == "approve":
new_status = ApprovalStatus.APPROVED
elif decision == "reject":
new_status = ApprovalStatus.REJECTED
else:
raise HTTPException(status_code=400, detail="decision must be 'approve' or 'reject'")
try:
await record_decision(db, approval=approval, new_status=new_status, comment=payload.comment)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
await db.commit()
return _approval_to_out(approval, approver)
async def _build_context(
db: AsyncSession,
approval_id: int | None = None,
token: str | None = None,
) -> ApprovalContextOut:
if approval_id is not None:
clause = Approval.id == approval_id
elif token is not None:
clause = Approval.email_token == token
else:
raise HTTPException(status_code=400, detail="approval_id or token required")
result = await db.execute(
select(Approval, AppUser, Opportunity)
.outerjoin(AppUser, Approval.approver_user_id == AppUser.id)
.join(Opportunity, Approval.opportunity_id == Opportunity.id)
.where(clause)
)
row = result.first()
if row is None:
raise HTTPException(status_code=404, detail="Approval not found")
approval, approver, opp = row
return ApprovalContextOut(
approval=_approval_to_out(approval, approver),
opportunity_name=opp.name,
opportunity_id=opp.id,
client_name=opp.client_name,
region=opp.region,
deadline=opp.deadline.isoformat() if opp.deadline else None,
summary=opp.description,
)

View file

@ -0,0 +1,109 @@
"""In-app notification endpoints."""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.middleware.auth import get_current_user
from app.models.notification import Notification
from app.schemas.approval import NotificationOut
router = APIRouter()
def _to_out(n: Notification) -> NotificationOut:
return NotificationOut(
id=n.id,
type=n.type.value,
title=n.title,
body=n.body,
related_opportunity_id=n.related_opportunity_id,
related_approval_id=n.related_approval_id,
link_path=n.link_path,
read=n.read,
created_at=n.created_at,
read_at=n.read_at,
)
@router.get("/me", response_model=list[NotificationOut])
async def list_my_notifications(
unread_only: bool = False,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
user_id = current.get("user_id")
if user_id is None:
raise HTTPException(status_code=401, detail="No user context")
query = select(Notification).where(Notification.user_id == user_id)
if unread_only:
query = query.where(Notification.read == False)
query = query.order_by(Notification.created_at.desc()).limit(50)
result = await db.execute(query)
return [_to_out(n) for n in result.scalars().all()]
@router.get("/me/unread-count")
async def unread_count(
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
user_id = current.get("user_id")
if user_id is None:
raise HTTPException(status_code=401, detail="No user context")
result = await db.execute(
select(func.count(Notification.id)).where(
Notification.user_id == user_id,
Notification.read == False,
)
)
return {"count": int(result.scalar() or 0)}
@router.put("/{notification_id}/read", response_model=NotificationOut)
async def mark_read(
notification_id: int,
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
user_id = current.get("user_id")
result = await db.execute(
select(Notification).where(
Notification.id == notification_id,
Notification.user_id == user_id,
)
)
n = result.scalar_one_or_none()
if n is None:
raise HTTPException(status_code=404, detail="Notification not found")
if not n.read:
n.read = True
n.read_at = datetime.utcnow()
await db.commit()
await db.refresh(n)
return _to_out(n)
@router.post("/me/mark-all-read")
async def mark_all_read(
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
user_id = current.get("user_id")
result = await db.execute(
select(Notification).where(
Notification.user_id == user_id,
Notification.read == False,
)
)
now = datetime.utcnow()
count = 0
for n in result.scalars().all():
n.read = True
n.read_at = now
count += 1
await db.commit()
return {"marked_read": count}

43
backend/app/api/users.py Normal file
View file

@ -0,0 +1,43 @@
"""User directory endpoints — list users (used by approver picker)."""
from fastapi import APIRouter, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.middleware.auth import get_current_user
from app.models.user import AppUser
from app.schemas.approval import UserBrief
router = APIRouter()
@router.get("", response_model=list[UserBrief])
async def list_users(
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
"""Returns all known AppUsers — the directory of potential approvers.
Users land in this table on first authenticated request."""
result = await db.execute(select(AppUser).order_by(AppUser.email))
return [
UserBrief(
id=u.id, email=u.email, name=u.name,
role=u.role.value, last_login=u.last_login,
)
for u in result.scalars().all()
]
@router.get("/me", response_model=UserBrief)
async def me(
db: AsyncSession = Depends(get_db),
current=Depends(get_current_user),
):
"""Current authenticated user's own row."""
result = await db.execute(select(AppUser).where(AppUser.id == current["user_id"]))
u = result.scalar_one()
return UserBrief(
id=u.id, email=u.email, name=u.name,
role=u.role.value, last_login=u.last_login,
)

View file

@ -8,6 +8,19 @@ class Settings(BaseSettings):
anthropic_api_key: str = ""
data_dir: str = "/app/data"
# Mailgun (https://api.mailgun.net/v3/<domain>/messages). When mailgun_api_key
# is empty, the email service logs the would-be message and skips the API call,
# so dev environments work without mail credentials.
mailgun_api_key: str = ""
mailgun_domain: str = "" # e.g. "mg.oliver.solutions"
mailgun_from: str = "OLIVER Sales Ops <noreply@example.com>"
mailgun_region: str = "us" # 'us' -> api.mailgun.net, 'eu' -> api.eu.mailgun.net
# Public URL the app is served on, used to build email links.
# Local dev: http://localhost:3011, prod: https://optical-dev.oliver.solutions
app_public_url: str = "http://localhost:3011"
app_path_prefix: str = "/osop"
class Config:
env_file = ".env"

View file

@ -7,7 +7,7 @@ from fastapi.middleware.cors import CORSMiddleware
logging.basicConfig(level=logging.INFO, format="%(levelname)s [%(name)s] %(message)s")
from app.api import health, gmal, ingest, opportunities
from app.api import health, gmal, ingest, opportunities, approvals, notifications, users
from app.middleware.auth import get_current_user
app = FastAPI(title="OLIVER Sales Ops Platform", version="0.1.0")
@ -31,6 +31,9 @@ app.include_router(health.router, prefix="/api", tags=["Health"])
app.include_router(gmal.router, prefix="/api/gmal", tags=["GMAL"], dependencies=[_auth])
app.include_router(ingest.router, prefix="/api/gmal", tags=["Ingest"], dependencies=[_auth])
app.include_router(opportunities.router, prefix="/api/opportunities", tags=["Opportunities"], dependencies=[_auth])
app.include_router(approvals.router, prefix="/api", tags=["Approvals"], dependencies=[_auth])
app.include_router(notifications.router, prefix="/api/notifications", tags=["Notifications"], dependencies=[_auth])
app.include_router(users.router, prefix="/api/users", tags=["Users"], dependencies=[_auth])
@app.get("/api/ai/usage", dependencies=[_auth])

View file

@ -1,10 +1,22 @@
"""Azure SSO middleware — validates Microsoft Entra ID-issued ID tokens."""
"""Azure SSO middleware — validates Microsoft Entra ID-issued ID tokens
and upserts the AppUser row so the user is available as a potential approver.
"""
import logging
import os
from datetime import datetime
import httpx
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.models.user import AppUser, UserRole
logger = logging.getLogger(__name__)
TENANT_ID = os.environ.get("AZURE_TENANT_ID", "")
CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "")
@ -28,50 +40,89 @@ async def _get_jwks() -> dict:
return _jwks_cache
async def _decode_token(token: str) -> dict:
jwks = await _get_jwks()
header = jwt.get_unverified_header(token)
key = next((k for k in jwks["keys"] if k.get("kid") == header.get("kid")), None)
if key is None:
global _jwks_cache
_jwks_cache = None
jwks = await _get_jwks()
key = next((k for k in jwks["keys"] if k.get("kid") == header.get("kid")), None)
if key is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unknown signing key")
payload = jwt.decode(
token, key,
algorithms=["RS256"],
audience=CLIENT_ID,
issuer=ISSUER,
options={"verify_at_hash": False},
)
return {
"oid": payload.get("oid"),
"name": payload.get("name"),
"email": (
payload.get("preferred_username")
or payload.get("upn")
or payload.get("email")
),
}
async def _upsert_app_user(db: AsyncSession, identity: dict) -> AppUser:
"""Find or create the AppUser for this identity. Stamps last_login on hit."""
email = (identity.get("email") or "").strip().lower()
oid = identity.get("oid")
if not email:
# Best-effort: use oid as fallback synthetic email so we still get a row.
email = f"{oid or 'unknown'}@unknown.local"
result = await db.execute(select(AppUser).where(AppUser.email == email))
user = result.scalar_one_or_none()
if user is None:
user = AppUser(
email=email,
name=identity.get("name"),
azure_oid=oid,
role=UserRole.EDITOR,
last_login=datetime.utcnow(),
)
db.add(user)
await db.flush()
logger.info("Created AppUser %s (%s)", user.id, email)
else:
user.last_login = datetime.utcnow()
if oid and not user.azure_oid:
user.azure_oid = oid
if identity.get("name") and not user.name:
user.name = identity["name"]
await db.flush()
return user
async def get_current_user(
credentials: HTTPAuthorizationCredentials | None = Depends(bearer_scheme),
db: AsyncSession = Depends(get_db),
) -> dict:
"""Validate the bearer token (or accept dev bypass), upsert the AppUser,
and return a dict of identity + the user's row id/role so downstream
handlers can use it as the actor.
"""
if os.environ.get("DEV_AUTH_BYPASS", "").lower() in ("1", "true", "yes"):
return {"oid": "dev-user", "name": "Dev User", "email": "dev@localhost"}
identity = {"oid": "dev-user", "name": "Dev User", "email": "dev@localhost"}
else:
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
try:
identity = await _decode_token(credentials.credentials)
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}")
if credentials is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Not authenticated")
token = credentials.credentials
try:
jwks = await _get_jwks()
header = jwt.get_unverified_header(token)
key = next(
(k for k in jwks["keys"] if k.get("kid") == header.get("kid")),
None,
)
if key is None:
global _jwks_cache
_jwks_cache = None
jwks = await _get_jwks()
key = next(
(k for k in jwks["keys"] if k.get("kid") == header.get("kid")),
None,
)
if key is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unknown signing key")
payload = jwt.decode(
token,
key,
algorithms=["RS256"],
audience=CLIENT_ID,
issuer=ISSUER,
options={"verify_at_hash": False},
)
return {
"oid": payload.get("oid"),
"name": payload.get("name"),
"email": (
payload.get("preferred_username")
or payload.get("upn")
or payload.get("email")
),
}
except JWTError as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=f"Invalid token: {e}")
user = await _upsert_app_user(db, identity)
await db.commit()
return {
**identity,
"user_id": user.id,
"role": user.role.value,
}

View file

@ -25,3 +25,4 @@ from app.models.clarification import ( # noqa: F401
QuestionPriority,
QuestionStatus,
)
from app.models.notification import Notification, NotificationType # noqa: F401

View file

@ -0,0 +1,51 @@
"""Per-user in-app notifications.
Notifications are surfaced via a bell icon in the nav. They link to the
relevant opportunity or approval. The Mailgun email is fired alongside the
notification record (best-effort) so users get an external prompt too.
"""
import enum
from datetime import datetime
from sqlalchemy import String, Text, DateTime, Boolean, Enum, ForeignKey, Index
from sqlalchemy.orm import Mapped, mapped_column
from app.database import Base
class NotificationType(str, enum.Enum):
APPROVAL_REQUESTED = "approval_requested"
APPROVAL_APPROVED = "approval_approved"
APPROVAL_REJECTED = "approval_rejected"
STAGE_COMPLETED = "stage_completed"
GENERIC = "generic"
class Notification(Base):
__tablename__ = "notifications"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(
ForeignKey("app_users.id", ondelete="CASCADE"), nullable=False
)
type: Mapped[NotificationType] = mapped_column(
Enum(NotificationType), default=NotificationType.GENERIC, nullable=False
)
title: Mapped[str] = mapped_column(String(255), nullable=False)
body: Mapped[str | None] = mapped_column(Text)
related_opportunity_id: Mapped[int | None] = mapped_column(
ForeignKey("opportunities.id", ondelete="CASCADE")
)
related_approval_id: Mapped[int | None] = mapped_column(
ForeignKey("approvals.id", ondelete="CASCADE")
)
link_path: Mapped[str | None] = mapped_column(String(500)) # e.g. '/approvals/42'
read: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
read_at: Mapped[datetime | None] = mapped_column(DateTime)
__table_args__ = (
Index("idx_notification_user_read", "user_id", "read"),
Index("idx_notification_user_created", "user_id", "created_at"),
)

View file

@ -100,6 +100,12 @@ class Approval(Base):
requested_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
decided_at: Mapped[datetime | None] = mapped_column(DateTime)
# Email link auth — token sent in the Mailgun email, lets the approver
# land on the approval page directly. Final decision still requires SSO.
email_token: Mapped[str | None] = mapped_column(String(64), unique=True)
email_sent_at: Mapped[datetime | None] = mapped_column(DateTime)
email_to: Mapped[str | None] = mapped_column(String(255))
opportunity: Mapped["Opportunity"] = relationship(back_populates="approvals")
__table_args__ = (

View file

@ -0,0 +1,83 @@
"""Approval + notification + user schemas."""
from datetime import datetime
from pydantic import BaseModel
class ApproverInfo(BaseModel):
id: int
email: str
name: str | None
class Config:
from_attributes = True
class ApprovalOut(BaseModel):
id: int
opportunity_id: int
stage_number: int
role_required: str
approver_user_id: int | None
approver: ApproverInfo | None = None
status: str
comment: str | None
requested_at: datetime
decided_at: datetime | None
email_sent_at: datetime | None
email_to: str | None
class Config:
from_attributes = True
class ApprovalRequestItem(BaseModel):
role_required: str
approver_user_id: int
class ApprovalRequestPayload(BaseModel):
requests: list[ApprovalRequestItem]
class ApprovalDecisionPayload(BaseModel):
decision: str # 'approve' | 'reject'
comment: str | None = None
class ApprovalContextOut(BaseModel):
"""Approval + the opportunity context the approver needs on the page."""
approval: ApprovalOut
opportunity_name: str
opportunity_id: int
client_name: str | None
region: str | None
deadline: str | None
summary: str | None
class NotificationOut(BaseModel):
id: int
type: str
title: str
body: str | None
related_opportunity_id: int | None
related_approval_id: int | None
link_path: str | None
read: bool
created_at: datetime
read_at: datetime | None
class Config:
from_attributes = True
class UserBrief(BaseModel):
id: int
email: str
name: str | None
role: str
last_login: datetime | None
class Config:
from_attributes = True

View file

@ -0,0 +1,174 @@
"""Approval workflow — request approvals and record decisions.
Requesting an approval creates an Approval row, an in-app Notification for
the approver, and (best-effort) a Mailgun email with a deeplink to the
approval page. Recording a decision updates the approval and notifies
the opportunity owner (if any).
"""
import logging
import secrets
from datetime import datetime
from html import escape
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.models.opportunity import Opportunity
from app.models.stage import Approval, ApprovalStatus
from app.models.user import AppUser
from app.models.notification import NotificationType
from app.services.mailgun import send_email
from app.services.notification_service import create_notification
logger = logging.getLogger(__name__)
def _approval_link(token: str) -> str:
"""Build the public URL the approver clicks in the email."""
base = settings.app_public_url.rstrip("/")
prefix = settings.app_path_prefix.rstrip("/")
return f"{base}{prefix}/approvals/by-token/{token}"
def _generate_token() -> str:
return secrets.token_urlsafe(32)
def _build_email_html(opp: Opportunity, approval: Approval, role_required: str, link: str) -> tuple[str, str]:
"""Return (html, text) for the approval-request email."""
deadline = opp.deadline.isoformat() if opp.deadline else ""
region = opp.region or ""
summary = (opp.description or "(no summary yet — Stage 1 intake hasn't run)").strip()
stage_label = "Qualification (Stage 3)" if approval.stage_number == 3 else f"Approval Gate (Stage {approval.stage_number})"
html = f"""
<div style="font-family: -apple-system, Segoe UI, sans-serif; max-width: 640px; margin: 0 auto; color: #111;">
<h2 style="font-size: 18px; letter-spacing:-0.01em;">Approval needed: {escape(opp.name)}</h2>
<p style="color:#555; font-size:13px;">You've been asked to review and decide on <strong>{escape(stage_label)}</strong> for this opportunity.</p>
<table style="border-collapse:collapse; font-size:13px; margin: 16px 0;">
<tr><td style="padding:4px 12px 4px 0; color:#666;">Client</td><td>{escape(opp.client_name or "")}</td></tr>
<tr><td style="padding:4px 12px 4px 0; color:#666;">Region</td><td>{escape(region)}</td></tr>
<tr><td style="padding:4px 12px 4px 0; color:#666;">Deadline</td><td>{escape(deadline)}</td></tr>
<tr><td style="padding:4px 12px 4px 0; color:#666;">Your role</td><td>{escape(role_required)}</td></tr>
</table>
<p style="font-size:13px; line-height:1.55; margin: 16px 0;">{escape(summary)}</p>
<p style="margin: 24px 0;">
<a href="{link}" style="background:#FFC407; color:#0e0f13; font-weight:600; padding:12px 22px; border-radius:8px; text-decoration:none; font-size:13px;">
Open approval page
</a>
</p>
<p style="color:#888; font-size:11px;">You'll see the full opportunity context, can approve or decline, and add notes before submitting. The link above is unique to you.</p>
</div>
""".strip()
text = (
f"Approval needed: {opp.name}\n\n"
f"You've been asked to decide on {stage_label} for this opportunity.\n\n"
f"Client: {opp.client_name or ''}\n"
f"Region: {region}\n"
f"Deadline: {deadline}\n"
f"Your role: {role_required}\n\n"
f"{summary}\n\n"
f"Open approval page: {link}\n"
)
return html, text
async def request_approval(
db: AsyncSession,
*,
opportunity: Opportunity,
stage_number: int,
role_required: str,
approver: AppUser,
) -> Approval:
"""Create an Approval row, in-app Notification, and fire a Mailgun email.
Returns the persisted Approval. Email send is best-effort failures are
logged but don't roll back the approval/notification creation.
"""
token = _generate_token()
approval = Approval(
opportunity_id=opportunity.id,
stage_number=stage_number,
role_required=role_required,
approver_user_id=approver.id,
status=ApprovalStatus.PENDING,
email_token=token,
email_to=approver.email,
)
db.add(approval)
await db.flush()
# In-app notification
await create_notification(
db,
user_id=approver.id,
type_=NotificationType.APPROVAL_REQUESTED,
title=f"Approval needed: {opportunity.name}",
body=f"You've been asked to approve Stage {stage_number} ({role_required}).",
related_opportunity_id=opportunity.id,
related_approval_id=approval.id,
link_path=f"/approvals/{approval.id}",
)
# Email — best-effort, never break the request
link = _approval_link(token)
html, text = _build_email_html(opportunity, approval, role_required, link)
subject = f"[OSOP] Approval needed: {opportunity.name} — Stage {stage_number}"
result = await send_email(approver.email, subject, html, text)
if result.get("status") == "sent" or result.get("status") == "skipped":
approval.email_sent_at = datetime.utcnow()
elif result.get("status") == "error":
logger.warning("Email send failed for approval %s: %s", approval.id, result)
await db.flush()
return approval
async def record_decision(
db: AsyncSession,
*,
approval: Approval,
new_status: ApprovalStatus,
comment: str | None,
) -> Approval:
"""Mark an approval approved/rejected and notify the opportunity owner."""
if approval.status != ApprovalStatus.PENDING:
raise ValueError(f"Approval {approval.id} has already been decided ({approval.status.value})")
if new_status == ApprovalStatus.PENDING:
raise ValueError("Decision must be APPROVED or REJECTED")
approval.status = new_status
approval.comment = comment
approval.decided_at = datetime.utcnow()
await db.flush()
# Notify opportunity owner if they exist and aren't the approver
opp_result = await db.execute(select(Opportunity).where(Opportunity.id == approval.opportunity_id))
opp = opp_result.scalar_one_or_none()
if opp and opp.owner_user_id and opp.owner_user_id != approval.approver_user_id:
notif_type = (
NotificationType.APPROVAL_APPROVED
if new_status == ApprovalStatus.APPROVED
else NotificationType.APPROVAL_REJECTED
)
verb = "approved" if new_status == ApprovalStatus.APPROVED else "rejected"
await create_notification(
db,
user_id=opp.owner_user_id,
type_=notif_type,
title=f"{opp.name}: Stage {approval.stage_number} {verb}",
body=(comment or "")[:300] if comment else None,
related_opportunity_id=opp.id,
related_approval_id=approval.id,
link_path=f"/opportunities/{opp.id}/stage/{approval.stage_number}",
)
return approval

View file

@ -0,0 +1,72 @@
"""Mailgun email service.
Sends via https://api.mailgun.net/v3/{domain}/messages (or .eu. for region='eu').
When MAILGUN_API_KEY is empty the call is logged and skipped dev environments
work without credentials.
"""
import logging
from typing import Any
import httpx
from app.config import settings
logger = logging.getLogger(__name__)
def is_configured() -> bool:
return bool(settings.mailgun_api_key) and bool(settings.mailgun_domain)
def _api_base() -> str:
region = (settings.mailgun_region or "us").lower()
host = "api.eu.mailgun.net" if region == "eu" else "api.mailgun.net"
return f"https://{host}/v3/{settings.mailgun_domain}"
async def send_email(
to: str | list[str],
subject: str,
html: str,
text: str | None = None,
from_address: str | None = None,
) -> dict[str, Any]:
"""Send an email via Mailgun. Returns Mailgun's response or a stub when
not configured. Never raises failures are logged and reported in the
return dict so callers can surface them without breaking the request flow.
"""
recipients = to if isinstance(to, list) else [to]
sender = from_address or settings.mailgun_from
if not is_configured():
logger.info(
"[mailgun:skipped] from=%s to=%s subject=%s html_len=%d",
sender, recipients, subject, len(html),
)
return {"status": "skipped", "reason": "MAILGUN_API_KEY not set"}
url = f"{_api_base()}/messages"
data = {
"from": sender,
"to": recipients,
"subject": subject,
"html": html,
}
if text:
data["text"] = text
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(url, auth=("api", settings.mailgun_api_key), data=data)
if resp.status_code >= 400:
logger.error(
"Mailgun send failed: %s %s — to=%s subject=%s",
resp.status_code, resp.text, recipients, subject,
)
return {"status": "error", "code": resp.status_code, "body": resp.text}
logger.info("Mailgun sent: to=%s subject=%s", recipients, subject)
return {"status": "sent", "response": resp.json()}
except Exception as e:
logger.error("Mailgun exception: %s", e, exc_info=True)
return {"status": "error", "error": str(e)}

View file

@ -0,0 +1,34 @@
"""Helpers for creating in-app notifications."""
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.notification import Notification, NotificationType
logger = logging.getLogger(__name__)
async def create_notification(
db: AsyncSession,
*,
user_id: int,
type_: NotificationType,
title: str,
body: str | None = None,
related_opportunity_id: int | None = None,
related_approval_id: int | None = None,
link_path: str | None = None,
) -> Notification:
n = Notification(
user_id=user_id,
type=type_,
title=title,
body=body,
related_opportunity_id=related_opportunity_id,
related_approval_id=related_approval_id,
link_path=link_path,
)
db.add(n)
await db.flush()
return n

View file

@ -45,6 +45,12 @@ services:
AZURE_TENANT_ID: ${AZURE_TENANT_ID}
AZURE_CLIENT_ID: ${AZURE_CLIENT_ID}
DEV_AUTH_BYPASS: ${DEV_AUTH_BYPASS:-}
MAILGUN_API_KEY: ${MAILGUN_API_KEY:-}
MAILGUN_DOMAIN: ${MAILGUN_DOMAIN:-}
MAILGUN_FROM: ${MAILGUN_FROM:-OLIVER Sales Ops <noreply@example.com>}
MAILGUN_REGION: ${MAILGUN_REGION:-us}
APP_PUBLIC_URL: ${APP_PUBLIC_URL:-http://localhost:3011}
APP_PATH_PREFIX: ${APP_PATH_PREFIX:-/osop}
ports:
- "127.0.0.1:8003:8000"
volumes: