feat(security): PR-1 multi-tenant isolation foundations

- Add `organization_id` field (denormalized from project.client_id) to Job,
  ReviewNote, VttVersion, and AuditLog models
- Introduce `get_user_org_ids()` and `assert_job_in_user_org()` helpers in
  `core/dependencies.py` — all staff roles now scope to their orgs; the
  dangerous `None` (unrestricted) fallback for LINGUIST/REVIEWER/PRODUCTION
  with no team assignment is eliminated (returns `[]` instead)
- Apply `assert_job_in_user_org` to `GET /jobs/{id}`, review-notes, and
  vtt-versions endpoints; bulk delete/approve/return-to-qc now skip jobs
  outside the requester's org instead of mutating cross-tenant data
- WebSocket `/ws/jobs/{job_id}` subscribe checks org membership before
  accepting the connection
- `POST /jobs` accepts `client_id` form field; derives `organization_id`
  from project lookup; removes blocking `time.sleep(1)` debug artifact
- `audit_logger.log_action` and `log_job_action` propagate `organization_id`
  so audit entries are org-scoped
- Add migration script `migrations/2026_05_add_organization_id.py` to
  backfill existing documents and create compound indexes
- Add `tests/unit/test_cross_tenant_isolation.py` with 10 unit tests
  covering ADMIN bypass, same-org pass, cross-org 404, project fallback,
  and legacy-job owner check

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-29 18:01:09 +01:00
parent a168af1aa7
commit 4949873440
12 changed files with 506 additions and 70 deletions

View file

@ -8,7 +8,13 @@ from motor.motor_asyncio import AsyncIOMotorDatabase
from ...core.config import settings
from ...core.database import get_database
from ...core.dependencies import get_accessible_project_ids, get_current_user, require_roles
from ...core.dependencies import (
assert_job_in_user_org,
get_accessible_project_ids,
get_current_user,
get_user_org_ids,
require_roles,
)
from ...core.logging import get_logger
from ...lib.vtt import VTTEditor
from ...models.job import JobStatus, RequestedOutputs
@ -75,6 +81,7 @@ async def create_job(
file: UploadFile = File(...),
brand_context: Optional[str] = Form(None),
project_id: Optional[str] = Form(None),
client_id: Optional[str] = Form(None),
request: Request = None,
current_user: User = Depends(get_current_user),
db: AsyncIOMotorDatabase = Depends(get_database),
@ -103,11 +110,21 @@ async def create_job(
file, f"{job_id}/source.mp4"
)
# Resolve organization_id: prefer project's client_id, then form-supplied client_id
organization_id: Optional[str] = None
if project_id:
project_doc = await db.projects.find_one({"_id": project_id}, {"client_id": 1})
if project_doc:
organization_id = str(project_doc["client_id"])
if not organization_id and client_id:
organization_id = client_id
# Create job document
# Language is always "auto" - will be detected by AI during processing
job_data = {
"_id": job_id,
"client_id": str(current_user.id),
"client_id": organization_id or str(current_user.id),
"organization_id": organization_id,
"title": title,
"source": {
"filename": f"{job_id}/source.mp4",
@ -145,25 +162,6 @@ async def create_job(
logger.info(f"Task state: {task.state}")
logger.info(f"Task backend: {task.backend}")
# Try to get the task result to see if it was actually queued
try:
# This should timeout quickly since task just started
import time
time.sleep(1) # Give it a moment
task_info = task.state
logger.info(f"Task state after 1 second: {task_info}")
# Check celery inspect to see active/scheduled tasks
from celery import current_app
i = current_app.control.inspect()
active_tasks = i.active()
scheduled_tasks = i.scheduled()
logger.info(f"Active tasks across all workers: {active_tasks}")
logger.info(f"Scheduled tasks across all workers: {scheduled_tasks}")
except Exception as e:
logger.warning(f"Could not inspect task status: {e}")
# Store task ID in job document for monitoring
await db.jobs.update_one(
{"_id": job_id},
@ -180,6 +178,7 @@ async def create_job(
await log_job_action(
AuditAction.JOB_CREATE, job_id, current_user, request,
details={"title": title, "filename": file.filename},
organization_id=organization_id,
)
return JobResponse(
id=job_id,
@ -211,6 +210,8 @@ async def bulk_delete_jobs(
already_deleted = 0
errors = []
user_org_ids = await get_user_org_ids(current_user, db)
for job_id in unique_job_ids:
try:
job_doc = await db.jobs.find_one({"_id": job_id})
@ -220,6 +221,15 @@ async def bulk_delete_jobs(
logger.debug(f"Job {job_id} not found (may have been deleted by concurrent request)")
continue
# Tenant isolation: skip jobs outside the user's org
if user_org_ids is not None:
job_org = job_doc.get("organization_id") or job_doc.get("client_id")
if job_org not in user_org_ids:
logger.warning(
f"User {current_user.id} attempted cross-tenant bulk-delete on job {job_id}"
)
continue
# Cancel task if exists
task_id = job_doc.get("task_id")
if task_id:
@ -271,6 +281,7 @@ async def bulk_approve_jobs(
approved_count = 0
errors = []
user_org_ids = await get_user_org_ids(current_user, db)
for job_id in unique_job_ids:
try:
@ -280,6 +291,16 @@ async def bulk_approve_jobs(
errors.append(f"Job {job_id}: not found")
continue
# Tenant isolation
if user_org_ids is not None:
job_org = job_doc.get("organization_id") or job_doc.get("client_id")
if job_org not in user_org_ids:
logger.warning(
f"User {current_user.id} attempted cross-tenant bulk-approve on job {job_id}"
)
errors.append(f"Job {job_id}: access denied")
continue
if job_doc["status"] != JobStatus.PENDING_QC.value:
errors.append(f"Job {job_id}: not in pending QC status")
continue
@ -353,6 +374,7 @@ async def bulk_return_to_qc(
returned_count = 0
errors = []
user_org_ids = await get_user_org_ids(current_user, db)
for job_id in unique_job_ids:
try:
@ -361,6 +383,16 @@ async def bulk_return_to_qc(
errors.append(f"Job {job_id}: not found")
continue
# Tenant isolation
if user_org_ids is not None:
job_org = job_doc.get("organization_id") or job_doc.get("client_id")
if job_org not in user_org_ids:
logger.warning(
f"User {current_user.id} attempted cross-tenant bulk-return-to-qc on job {job_id}"
)
errors.append(f"Job {job_id}: access denied")
continue
if job_doc["status"] not in RETURN_TO_QC_ELIGIBLE_STATUSES:
errors.append(f"Job {job_id}: cannot return to QC from status '{job_doc['status']}'")
continue
@ -613,14 +645,8 @@ async def get_job(
detail="Job not found"
)
# Check access permissions
if current_user.role in (UserRole.CLIENT, UserRole.PROJECT_MANAGER):
accessible = await get_accessible_project_ids(current_user, db)
job_project_id = job_doc.get("project_id")
is_own_legacy = job_doc.get("client_id") == str(current_user.id) and not job_project_id
in_project = accessible is None or (job_project_id and job_project_id in (accessible or []))
if not is_own_legacy and not in_project:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied")
# Enforce tenant isolation for every non-admin role
await assert_job_in_user_org(job_doc, current_user, db)
# Check task status if task_id exists
task_id = job_doc.get("task_id")

View file

@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, status
from motor.motor_asyncio import AsyncIOMotorDatabase
from ...core.database import get_database
from ...core.dependencies import get_current_user, require_roles
from ...core.dependencies import assert_job_in_user_org, get_current_user, require_roles
from ...core.logging import get_logger
from ...models.user import User, UserRole
from ...schemas.review_note import (
@ -26,17 +26,18 @@ router = APIRouter(prefix="/jobs/{job_id}/review-notes", tags=["review-notes"])
async def list_review_notes(
job_id: str,
asset_key: Optional[str] = Query(None, description="Filter notes by asset key"),
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)),
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""List all review notes for a job, optionally filtered by asset key."""
# Verify job exists
# Verify job exists and enforce tenant isolation
job = await db.jobs.find_one({"_id": job_id})
if not job:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Job not found"
)
await assert_job_in_user_org(job, current_user, db)
# Build query
query = {"job_id": job_id}
@ -57,24 +58,27 @@ async def list_review_notes(
async def create_review_note(
job_id: str,
request: ReviewNoteCreateRequest,
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)),
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Create a new review note for a video asset."""
# Verify job exists
# Verify job exists and enforce tenant isolation
job = await db.jobs.find_one({"_id": job_id})
if not job:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Job not found"
)
await assert_job_in_user_org(job, current_user, db)
# Create note document
note_id = str(ObjectId())
organization_id = job.get("organization_id")
now = datetime.utcnow()
note_data = {
"_id": note_id,
"organization_id": organization_id,
"job_id": job_id,
"asset_key": request.asset_key,
"timestamp_seconds": request.timestamp_seconds,
@ -95,10 +99,14 @@ async def create_review_note(
async def get_review_note(
job_id: str,
note_id: str,
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)),
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Get a single review note by ID."""
job = await db.jobs.find_one({"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1})
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
await assert_job_in_user_org(job, current_user, db)
note = await db.review_notes.find_one({"_id": note_id, "job_id": job_id})
if not note:
raise HTTPException(
@ -114,10 +122,14 @@ async def update_review_note(
job_id: str,
note_id: str,
request: ReviewNoteUpdateRequest,
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)),
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Update a review note. Only the note owner can update."""
job = await db.jobs.find_one({"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1})
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
await assert_job_in_user_org(job, current_user, db)
note = await db.review_notes.find_one({"_id": note_id, "job_id": job_id})
if not note:
raise HTTPException(
@ -150,10 +162,14 @@ async def update_review_note(
async def delete_review_note(
job_id: str,
note_id: str,
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)),
current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)),
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Delete a review note. Only the note owner can delete."""
job = await db.jobs.find_one({"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1})
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
await assert_job_in_user_org(job, current_user, db)
note = await db.review_notes.find_one({"_id": note_id, "job_id": job_id})
if not note:
raise HTTPException(

View file

@ -4,7 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Request, status
from motor.motor_asyncio import AsyncIOMotorDatabase
from ...core.database import get_database
from ...core.dependencies import require_roles
from ...core.dependencies import assert_job_in_user_org, require_roles
from ...models.user import User, UserRole
from ...models.vtt_version import VttDiffResponse, VttKind, VttVersionListResponse, VttVersionSummary
from ...services import vtt_versioning
@ -15,7 +15,16 @@ from ...core.config import settings
router = APIRouter(prefix="/jobs", tags=["vtt-versions"])
_EDITABLE_ROLES = (UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)
_EDITABLE_ROLES = (UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)
async def _assert_job_access(job_id: str, user: User, db) -> None:
job = await db.jobs.find_one(
{"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1}
)
if not job:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
await assert_job_in_user_org(job, user, db)
@router.get("/{job_id}/vtt/versions", response_model=VttVersionListResponse)
@ -29,6 +38,7 @@ async def list_vtt_versions(
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""List all VTT versions for a job/lang/kind, newest first."""
await _assert_job_access(job_id, current_user, db)
return await vtt_versioning.list_versions(db, job_id, lang, kind, skip, limit)
@ -42,6 +52,7 @@ async def get_vtt_version(
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Get full VTT content for a specific version."""
await _assert_job_access(job_id, current_user, db)
v = await vtt_versioning.get_version(db, job_id, lang, kind, version)
if not v:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found")
@ -72,6 +83,7 @@ async def diff_vtt_versions(
db: AsyncIOMotorDatabase = Depends(get_database),
):
"""Line-level diff between two versions of a VTT file."""
await _assert_job_access(job_id, current_user, db)
v_from = await vtt_versioning.get_version(db, job_id, lang, kind, from_version)
v_to = await vtt_versioning.get_version(db, job_id, lang, kind, to_version)
if not v_from:
@ -100,6 +112,7 @@ async def restore_vtt_version(
Non-destructive: creates a new version entry whose content mirrors the old one,
then overwrites the live GCS file.
"""
await _assert_job_access(job_id, current_user, db)
src = await vtt_versioning.get_version(db, job_id, lang, kind, version)
if not src:
raise HTTPException(status_code=404, detail="Version not found")

View file

@ -18,8 +18,9 @@ from ...services.websocket import (
ConnectionManager
)
from ...models.job import Job
from ...models.user import UserRole
from ...core.database import get_database
from ...core.dependencies import get_current_user
from ...core.dependencies import get_current_user, get_user_org_ids
logger = logging.getLogger(__name__)
@ -68,23 +69,25 @@ async def websocket_job_status(
await websocket.close(code=4004, reason="Job not found")
return
# Check permissions - users can only access their own jobs unless they're admin/reviewer
user = await db["users"].find_one({"_id": user_id})
if not user:
try:
from bson import ObjectId
user = await db["users"].find_one({"_id": ObjectId(user_id)})
except Exception:
pass # Invalid ObjectId format
if not user:
# Verify user and check org membership
user_doc = await db["users"].find_one({"_id": user_id})
if not user_doc:
await websocket.close(code=4001, reason="User not found")
return
# Check access permissions
if user["role"] == "client" and job.get("created_by") != user_id:
await websocket.close(code=4003, reason="Access denied")
return
from ...models.user import User
user_obj = User(**user_doc)
# ADMIN sees everything; others must belong to the job's org
if user_obj.role != UserRole.ADMIN:
org_ids = await get_user_org_ids(user_obj, db)
job_org = job.get("organization_id") or job.get("client_id")
if job_org and org_ids is not None and job_org not in org_ids:
logger.warning(
f"WS: user {user_id} tried to subscribe to job {job_id} from a different org"
)
await websocket.close(code=4003, reason="Access denied")
return
# Connect to job status updates
await manager.connect_job_status(websocket, user_id, job_id)

View file

@ -1,11 +1,9 @@
from typing import Optional
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from motor.motor_asyncio import AsyncIOMotorDatabase
from ..models.user import User, UserRole
from .config import settings
from .database import get_database
from .security import decode_token
@ -15,6 +13,57 @@ security = HTTPBearer()
STAFF_ROLES = {UserRole.ADMIN}
async def get_user_org_ids(user: User, db: AsyncIOMotorDatabase) -> list[str] | None:
"""Return organization IDs accessible to the user, or None meaning unrestricted (ADMIN only)."""
if user.role == UserRole.ADMIN:
return None # ADMIN sees everything
user_id = str(user.id)
membership_cursor = db.memberships.find({"user_id": user_id}, {"organization_id": 1})
org_ids = [doc["organization_id"] async for doc in membership_cursor]
if org_ids:
return org_ids
# Legacy: team membership
teams = await db.teams.find({"member_user_ids": user_id}, {"client_id": 1}).to_list(None)
client_ids = list({t["client_id"] for t in teams if t.get("client_id")})
if client_ids:
return client_ids
# PM legacy via pm_client_ids
if user.role == UserRole.PROJECT_MANAGER and user.pm_client_ids:
return list(user.pm_client_ids)
# No membership at all — return empty list (see nothing)
return []
async def assert_job_in_user_org(job_doc: dict, user: User, db: AsyncIOMotorDatabase) -> None:
"""Raise 404 if the user cannot access the job's organization."""
if user.role == UserRole.ADMIN:
return
org_ids = await get_user_org_ids(user, db)
if org_ids is None:
return # unrestricted (should only happen for ADMIN but defensive)
# Check explicit organization_id field first (new jobs)
job_org = job_doc.get("organization_id")
if job_org:
if job_org not in org_ids:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
return
# Fallback: derive org from project
project_id = job_doc.get("project_id")
if project_id:
project = await db.projects.find_one({"_id": project_id}, {"client_id": 1})
if project and project.get("client_id") in org_ids:
return
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
# Legacy jobs with no project: only visible to the user who created them
if job_doc.get("client_id") == str(user.id):
return
# Admin-only for fully legacy jobs with no org context
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found")
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncIOMotorDatabase = Depends(get_database),
@ -73,7 +122,7 @@ def require_roles(*required_roles: UserRole):
async def get_current_user_optional(
request: Request,
db: AsyncIOMotorDatabase = Depends(get_database),
) -> Optional[User]:
) -> User | None:
authorization: str = request.headers.get("Authorization")
if not authorization:
return None
@ -104,7 +153,7 @@ async def get_current_user_optional(
async def get_accessible_project_ids(
user: User,
db: AsyncIOMotorDatabase,
) -> Optional[list[str]]:
) -> list[str] | None:
"""
Returns project IDs the user may access, or None meaning "see everything".
@ -156,11 +205,8 @@ async def get_accessible_project_ids(
).to_list(None)
return [str(p["_id"]) for p in projects]
# Staff with no team assignments → unrestricted until teams are configured
if user.role in {UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION}:
return None
# CLIENT with no memberships and no teams → show nothing
# Staff with no team assignments → show nothing (no org context yet)
# Returning None here would give unrestricted cross-tenant access.
return []

View file

@ -93,11 +93,14 @@ class AuditLog(BaseModel):
id: Optional[PyObjectId] = Field(default_factory=lambda: str(ObjectId()), alias="_id")
# Tenant isolation
organization_id: Optional[str] = None
# Core audit fields
timestamp: datetime = Field(default_factory=datetime.utcnow)
action: AuditAction
severity: AuditLogSeverity = AuditLogSeverity.INFO
# Actor information
user_id: Optional[PyObjectId] = None
user_email: Optional[str] = None

View file

@ -235,6 +235,7 @@ class Job(BaseModel):
ai: Optional[AISection] = None
error: Optional[dict[str, Any]] = None
tts_rewrites: Optional[list[dict[str, Any]]] = None # Track auto-rewritten TTS cues
organization_id: Optional[str] = None # Denormalized client_id (org) for tenant isolation
project_id: Optional[str] = None # Platform project this job belongs to (Client → Project → Job)
brand_context: Optional[str] = None # Brand names present in the video for accurate product identification
cost_tracker_project_id: Optional[str] = None # External project ID for AI cost attribution

View file

@ -10,6 +10,7 @@ class ReviewNote(BaseModel):
"""A timestamped note attached to a video asset during review."""
id: Optional[str] = Field(None, alias="_id")
organization_id: Optional[str] = None # Denormalized for tenant isolation
job_id: str
asset_key: str # e.g., "en", "es", "en_accessible"
timestamp_seconds: float # Video timestamp when note was created

View file

@ -14,6 +14,7 @@ class VttVersionActor(BaseModel):
class VttVersion(BaseModel):
id: Optional[str] = Field(None, alias="_id")
organization_id: Optional[str] = None # Denormalized for tenant isolation
job_id: str
lang: str
kind: VttKind

View file

@ -47,7 +47,8 @@ class AuditLogger:
details: Optional[Dict[str, Any]] = None,
severity: AuditLogSeverity = AuditLogSeverity.INFO,
success: bool = True,
error_message: Optional[str] = None
error_message: Optional[str] = None,
organization_id: Optional[str] = None,
) -> str:
"""
Log an audit event.
@ -77,6 +78,7 @@ class AuditLogger:
action=action,
severity=severity,
description=description,
organization_id=organization_id,
user_id=user.id if user else None,
user_email=user.email if user else None,
user_role=(user.role.value if hasattr(user.role, "value") else user.role) if user else None,
@ -277,16 +279,23 @@ async def log_auth_failure(email: str, request: Request, reason: str):
)
async def log_job_action(action: AuditAction, job_id: str, user: User, request: Request, details: Optional[Dict] = None):
async def log_job_action(
action: AuditAction,
job_id: str,
user: User,
request: Request,
details: Optional[Dict] = None,
organization_id: Optional[str] = None,
):
"""Log job-related actions."""
action_descriptions = {
AuditAction.JOB_CREATE: "Job created",
AuditAction.JOB_APPROVE: "Job approved",
AuditAction.JOB_APPROVE: "Job approved",
AuditAction.JOB_REJECT: "Job rejected",
AuditAction.JOB_CANCEL: "Job cancelled",
AuditAction.JOB_UPDATE: "Job updated"
}
await audit_logger.log_action(
action=action,
description=f"{action_descriptions.get(action, str(action))} by {user.email}",
@ -294,7 +303,8 @@ async def log_job_action(action: AuditAction, job_id: str, user: User, request:
request=request,
resource_type="job",
resource_id=job_id,
details=details
details=details,
organization_id=organization_id,
)

View file

@ -0,0 +1,125 @@
"""
Migration: add organization_id to jobs, review_notes, vtt_versions, audit_logs.
Run once after deploying the multi-tenant isolation PR.
Usage (inside the backend container):
python -m migrations.2026_05_add_organization_id
Strategy:
- Jobs with a project_id organization_id = project.client_id
- Jobs with no project_id organization_id stays None (legacy single-tenant data;
the assert_job_in_user_org fallback handles these via
client_id == user.id check)
- review_notes / vtt_versions / audit_logs copy organization_id from their parent job
"""
import asyncio
import logging
from motor.motor_asyncio import AsyncIOMotorClient
from app.core.config import get_settings
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
async def run():
settings = get_settings()
client = AsyncIOMotorClient(settings.mongodb_url)
db = client[settings.mongodb_db_name]
# ── Step 1: backfill jobs ────────────────────────────────────────────────
logger.info("Backfilling organization_id on jobs…")
# Build project_id → client_id lookup
project_map: dict[str, str] = {}
async for proj in db.projects.find({}, {"_id": 1, "client_id": 1}):
project_map[str(proj["_id"])] = str(proj["client_id"])
jobs_updated = 0
async for job in db.jobs.find(
{"organization_id": {"$exists": False}, "project_id": {"$ne": None}},
{"_id": 1, "project_id": 1},
):
pid = job.get("project_id")
if pid and pid in project_map:
await db.jobs.update_one(
{"_id": job["_id"]},
{"$set": {"organization_id": project_map[pid]}},
)
jobs_updated += 1
logger.info(f"Jobs updated: {jobs_updated}")
# ── Step 2: build job_id → organization_id for downstream tables ─────────
job_org_map: dict[str, str] = {}
async for job in db.jobs.find(
{"organization_id": {"$exists": True}},
{"_id": 1, "organization_id": 1},
):
if job.get("organization_id"):
job_org_map[str(job["_id"])] = job["organization_id"]
# ── Step 3: backfill review_notes ────────────────────────────────────────
logger.info("Backfilling organization_id on review_notes…")
notes_updated = 0
async for note in db.review_notes.find(
{"organization_id": {"$exists": False}},
{"_id": 1, "job_id": 1},
):
org = job_org_map.get(note.get("job_id", ""))
if org:
await db.review_notes.update_one(
{"_id": note["_id"]},
{"$set": {"organization_id": org}},
)
notes_updated += 1
logger.info(f"Review notes updated: {notes_updated}")
# ── Step 4: backfill vtt_versions ────────────────────────────────────────
logger.info("Backfilling organization_id on vtt_versions…")
vtt_updated = 0
async for v in db.vtt_versions.find(
{"organization_id": {"$exists": False}},
{"_id": 1, "job_id": 1},
):
org = job_org_map.get(v.get("job_id", ""))
if org:
await db.vtt_versions.update_one(
{"_id": v["_id"]},
{"$set": {"organization_id": org}},
)
vtt_updated += 1
logger.info(f"VTT versions updated: {vtt_updated}")
# ── Step 5: backfill audit_logs ──────────────────────────────────────────
logger.info("Backfilling organization_id on audit_logs…")
audit_updated = 0
async for entry in db.audit_logs.find(
{"organization_id": {"$exists": False}, "resource_type": "job"},
{"_id": 1, "resource_id": 1},
):
org = job_org_map.get(entry.get("resource_id", ""))
if org:
await db.audit_logs.update_one(
{"_id": entry["_id"]},
{"$set": {"organization_id": org}},
)
audit_updated += 1
logger.info(f"Audit logs updated: {audit_updated}")
# ── Step 6: create indexes ───────────────────────────────────────────────
logger.info("Creating organization_id indexes…")
await db.jobs.create_index([("organization_id", 1), ("created_at", -1)])
await db.review_notes.create_index([("organization_id", 1), ("job_id", 1)])
await db.vtt_versions.create_index([("organization_id", 1), ("job_id", 1)])
await db.audit_logs.create_index([("organization_id", 1), ("timestamp", -1)])
logger.info("Indexes created.")
client.close()
logger.info("Migration complete.")
if __name__ == "__main__":
asyncio.run(run())

View file

@ -0,0 +1,191 @@
"""
Cross-tenant isolation tests.
Verifies that assert_job_in_user_org raises 404 (not 403, to avoid information
disclosure) when a user from org A tries to access a job belonging to org B.
Also verifies ADMIN always passes and legacy (no organization_id) jobs fall back
to project-based checks.
"""
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
from fastapi import HTTPException
from app.core.dependencies import assert_job_in_user_org, get_user_org_ids
from app.models.user import User, UserRole
# ── Helpers ────────────────────────────────────────────────────────────────────
def _make_user(role: UserRole, user_id: str = "user-a") -> User:
return User(
**{
"_id": user_id,
"email": f"{user_id}@example.com",
"full_name": "Test User",
"role": role,
"is_active": True,
}
)
def _make_db(memberships: list[dict], teams: list[dict] | None = None, project: dict | None = None):
"""Return a mock AsyncIOMotorDatabase with memberships/teams/projects collections."""
db = MagicMock()
# memberships.find(...)
async def _mem_find(*args, **kwargs):
return memberships
mem_cursor = MagicMock()
mem_cursor.__aiter__ = AsyncMock(
return_value=iter(memberships)
)
# Use async list comprehension-compatible mock
async def _mem_iter():
for m in memberships:
yield m
db.memberships.find.return_value = _AsyncIterableMock(memberships)
# teams.find(...)
team_list = teams or []
db.teams.find.return_value = MagicMock()
db.teams.find.return_value.to_list = AsyncMock(return_value=team_list)
# projects.find_one(...)
db.projects.find_one = AsyncMock(return_value=project)
return db
class _AsyncIterableMock:
"""Supports `async for doc in cursor` pattern used in get_user_org_ids."""
def __init__(self, items: list):
self._items = items
def __aiter__(self):
return self._aiter()
async def _aiter(self):
for item in self._items:
yield item
# ── get_user_org_ids ───────────────────────────────────────────────────────────
class TestGetUserOrgIds:
@pytest.mark.asyncio
async def test_admin_returns_none(self):
user = _make_user(UserRole.ADMIN)
db = _make_db([])
result = await get_user_org_ids(user, db)
assert result is None
@pytest.mark.asyncio
async def test_staff_with_membership_returns_orgs(self):
user = _make_user(UserRole.LINGUIST)
db = _make_db([{"organization_id": "org-a"}, {"organization_id": "org-b"}])
result = await get_user_org_ids(user, db)
assert result == ["org-a", "org-b"]
@pytest.mark.asyncio
async def test_staff_no_membership_no_team_returns_empty(self):
user = _make_user(UserRole.REVIEWER)
db = _make_db([])
result = await get_user_org_ids(user, db)
assert result == []
@pytest.mark.asyncio
async def test_pm_legacy_pm_client_ids(self):
user = _make_user(UserRole.PROJECT_MANAGER)
user.pm_client_ids = ["client-x"]
db = _make_db([])
result = await get_user_org_ids(user, db)
assert result == ["client-x"]
@pytest.mark.asyncio
async def test_staff_falls_back_to_team_membership(self):
user = _make_user(UserRole.PRODUCTION)
db = _make_db(
memberships=[], # no memberships
teams=[{"client_id": "client-y"}],
)
result = await get_user_org_ids(user, db)
assert result == ["client-y"]
# ── assert_job_in_user_org ─────────────────────────────────────────────────────
class TestAssertJobInUserOrg:
@pytest.mark.asyncio
async def test_admin_always_passes(self):
user = _make_user(UserRole.ADMIN)
job = {"_id": "job-1", "organization_id": "org-b"}
db = _make_db([])
await assert_job_in_user_org(job, user, db) # must not raise
@pytest.mark.asyncio
async def test_same_org_passes(self):
user = _make_user(UserRole.LINGUIST)
db = _make_db([{"organization_id": "org-a"}])
job = {"_id": "job-1", "organization_id": "org-a"}
await assert_job_in_user_org(job, user, db) # must not raise
@pytest.mark.asyncio
async def test_cross_org_raises_404(self):
user = _make_user(UserRole.LINGUIST)
db = _make_db([{"organization_id": "org-a"}])
job = {"_id": "job-1", "organization_id": "org-b"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_no_organization_id_project_fallback_passes(self):
user = _make_user(UserRole.REVIEWER)
db = _make_db(
memberships=[{"organization_id": "org-a"}],
project={"_id": "proj-1", "client_id": "org-a"},
)
job = {"_id": "job-1", "project_id": "proj-1"} # no organization_id
await assert_job_in_user_org(job, user, db)
@pytest.mark.asyncio
async def test_no_organization_id_project_from_other_org_raises_404(self):
user = _make_user(UserRole.REVIEWER)
db = _make_db(
memberships=[{"organization_id": "org-a"}],
project={"_id": "proj-1", "client_id": "org-b"},
)
job = {"_id": "job-1", "project_id": "proj-1"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_legacy_job_own_user_passes(self):
user = _make_user(UserRole.LINGUIST, user_id="user-a")
db = _make_db([]) # no memberships
job = {"_id": "job-1", "client_id": "user-a"} # legacy: client_id = creator user_id
await assert_job_in_user_org(job, user, db)
@pytest.mark.asyncio
async def test_legacy_job_other_user_raises_404(self):
user = _make_user(UserRole.LINGUIST, user_id="user-a")
db = _make_db([]) # no memberships
job = {"_id": "job-1", "client_id": "user-b"} # belongs to a different user
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404
@pytest.mark.asyncio
async def test_user_with_no_org_context_denied_access_to_org_job(self):
"""Staff with no memberships and no team cannot access any org-tagged job."""
user = _make_user(UserRole.PRODUCTION)
db = _make_db([])
job = {"_id": "job-1", "organization_id": "org-c"}
with pytest.raises(HTTPException) as exc:
await assert_job_in_user_org(job, user, db)
assert exc.value.status_code == 404