diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index 66c1fb4..7f94f40 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -8,7 +8,13 @@ from motor.motor_asyncio import AsyncIOMotorDatabase from ...core.config import settings from ...core.database import get_database -from ...core.dependencies import get_accessible_project_ids, get_current_user, require_roles +from ...core.dependencies import ( + assert_job_in_user_org, + get_accessible_project_ids, + get_current_user, + get_user_org_ids, + require_roles, +) from ...core.logging import get_logger from ...lib.vtt import VTTEditor from ...models.job import JobStatus, RequestedOutputs @@ -75,6 +81,7 @@ async def create_job( file: UploadFile = File(...), brand_context: Optional[str] = Form(None), project_id: Optional[str] = Form(None), + client_id: Optional[str] = Form(None), request: Request = None, current_user: User = Depends(get_current_user), db: AsyncIOMotorDatabase = Depends(get_database), @@ -103,11 +110,21 @@ async def create_job( file, f"{job_id}/source.mp4" ) + # Resolve organization_id: prefer project's client_id, then form-supplied client_id + organization_id: Optional[str] = None + if project_id: + project_doc = await db.projects.find_one({"_id": project_id}, {"client_id": 1}) + if project_doc: + organization_id = str(project_doc["client_id"]) + if not organization_id and client_id: + organization_id = client_id + # Create job document # Language is always "auto" - will be detected by AI during processing job_data = { "_id": job_id, - "client_id": str(current_user.id), + "client_id": organization_id or str(current_user.id), + "organization_id": organization_id, "title": title, "source": { "filename": f"{job_id}/source.mp4", @@ -145,25 +162,6 @@ async def create_job( logger.info(f"Task state: {task.state}") logger.info(f"Task backend: {task.backend}") - # Try to get the task result to see if it was actually queued - try: - # This should timeout quickly since task just started - import time - time.sleep(1) # Give it a moment - task_info = task.state - logger.info(f"Task state after 1 second: {task_info}") - - # Check celery inspect to see active/scheduled tasks - from celery import current_app - i = current_app.control.inspect() - active_tasks = i.active() - scheduled_tasks = i.scheduled() - logger.info(f"Active tasks across all workers: {active_tasks}") - logger.info(f"Scheduled tasks across all workers: {scheduled_tasks}") - - except Exception as e: - logger.warning(f"Could not inspect task status: {e}") - # Store task ID in job document for monitoring await db.jobs.update_one( {"_id": job_id}, @@ -180,6 +178,7 @@ async def create_job( await log_job_action( AuditAction.JOB_CREATE, job_id, current_user, request, details={"title": title, "filename": file.filename}, + organization_id=organization_id, ) return JobResponse( id=job_id, @@ -211,6 +210,8 @@ async def bulk_delete_jobs( already_deleted = 0 errors = [] + user_org_ids = await get_user_org_ids(current_user, db) + for job_id in unique_job_ids: try: job_doc = await db.jobs.find_one({"_id": job_id}) @@ -220,6 +221,15 @@ async def bulk_delete_jobs( logger.debug(f"Job {job_id} not found (may have been deleted by concurrent request)") continue + # Tenant isolation: skip jobs outside the user's org + if user_org_ids is not None: + job_org = job_doc.get("organization_id") or job_doc.get("client_id") + if job_org not in user_org_ids: + logger.warning( + f"User {current_user.id} attempted cross-tenant bulk-delete on job {job_id}" + ) + continue + # Cancel task if exists task_id = job_doc.get("task_id") if task_id: @@ -271,6 +281,7 @@ async def bulk_approve_jobs( approved_count = 0 errors = [] + user_org_ids = await get_user_org_ids(current_user, db) for job_id in unique_job_ids: try: @@ -280,6 +291,16 @@ async def bulk_approve_jobs( errors.append(f"Job {job_id}: not found") continue + # Tenant isolation + if user_org_ids is not None: + job_org = job_doc.get("organization_id") or job_doc.get("client_id") + if job_org not in user_org_ids: + logger.warning( + f"User {current_user.id} attempted cross-tenant bulk-approve on job {job_id}" + ) + errors.append(f"Job {job_id}: access denied") + continue + if job_doc["status"] != JobStatus.PENDING_QC.value: errors.append(f"Job {job_id}: not in pending QC status") continue @@ -353,6 +374,7 @@ async def bulk_return_to_qc( returned_count = 0 errors = [] + user_org_ids = await get_user_org_ids(current_user, db) for job_id in unique_job_ids: try: @@ -361,6 +383,16 @@ async def bulk_return_to_qc( errors.append(f"Job {job_id}: not found") continue + # Tenant isolation + if user_org_ids is not None: + job_org = job_doc.get("organization_id") or job_doc.get("client_id") + if job_org not in user_org_ids: + logger.warning( + f"User {current_user.id} attempted cross-tenant bulk-return-to-qc on job {job_id}" + ) + errors.append(f"Job {job_id}: access denied") + continue + if job_doc["status"] not in RETURN_TO_QC_ELIGIBLE_STATUSES: errors.append(f"Job {job_id}: cannot return to QC from status '{job_doc['status']}'") continue @@ -613,14 +645,8 @@ async def get_job( detail="Job not found" ) - # Check access permissions - if current_user.role in (UserRole.CLIENT, UserRole.PROJECT_MANAGER): - accessible = await get_accessible_project_ids(current_user, db) - job_project_id = job_doc.get("project_id") - is_own_legacy = job_doc.get("client_id") == str(current_user.id) and not job_project_id - in_project = accessible is None or (job_project_id and job_project_id in (accessible or [])) - if not is_own_legacy and not in_project: - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Access denied") + # Enforce tenant isolation for every non-admin role + await assert_job_in_user_org(job_doc, current_user, db) # Check task status if task_id exists task_id = job_doc.get("task_id") diff --git a/backend/app/api/v1/routes_review_notes.py b/backend/app/api/v1/routes_review_notes.py index 9618f77..f7b2442 100644 --- a/backend/app/api/v1/routes_review_notes.py +++ b/backend/app/api/v1/routes_review_notes.py @@ -8,7 +8,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, status from motor.motor_asyncio import AsyncIOMotorDatabase from ...core.database import get_database -from ...core.dependencies import get_current_user, require_roles +from ...core.dependencies import assert_job_in_user_org, get_current_user, require_roles from ...core.logging import get_logger from ...models.user import User, UserRole from ...schemas.review_note import ( @@ -26,17 +26,18 @@ router = APIRouter(prefix="/jobs/{job_id}/review-notes", tags=["review-notes"]) async def list_review_notes( job_id: str, asset_key: Optional[str] = Query(None, description="Filter notes by asset key"), - current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)), + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """List all review notes for a job, optionally filtered by asset key.""" - # Verify job exists + # Verify job exists and enforce tenant isolation job = await db.jobs.find_one({"_id": job_id}) if not job: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Job not found" ) + await assert_job_in_user_org(job, current_user, db) # Build query query = {"job_id": job_id} @@ -57,24 +58,27 @@ async def list_review_notes( async def create_review_note( job_id: str, request: ReviewNoteCreateRequest, - current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)), + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Create a new review note for a video asset.""" - # Verify job exists + # Verify job exists and enforce tenant isolation job = await db.jobs.find_one({"_id": job_id}) if not job: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="Job not found" ) + await assert_job_in_user_org(job, current_user, db) # Create note document note_id = str(ObjectId()) + organization_id = job.get("organization_id") now = datetime.utcnow() note_data = { "_id": note_id, + "organization_id": organization_id, "job_id": job_id, "asset_key": request.asset_key, "timestamp_seconds": request.timestamp_seconds, @@ -95,10 +99,14 @@ async def create_review_note( async def get_review_note( job_id: str, note_id: str, - current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)), + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Get a single review note by ID.""" + job = await db.jobs.find_one({"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1}) + if not job: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + await assert_job_in_user_org(job, current_user, db) note = await db.review_notes.find_one({"_id": note_id, "job_id": job_id}) if not note: raise HTTPException( @@ -114,10 +122,14 @@ async def update_review_note( job_id: str, note_id: str, request: ReviewNoteUpdateRequest, - current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)), + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Update a review note. Only the note owner can update.""" + job = await db.jobs.find_one({"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1}) + if not job: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + await assert_job_in_user_org(job, current_user, db) note = await db.review_notes.find_one({"_id": note_id, "job_id": job_id}) if not note: raise HTTPException( @@ -150,10 +162,14 @@ async def update_review_note( async def delete_review_note( job_id: str, note_id: str, - current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN)), + current_user: User = Depends(require_roles(UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN)), db: AsyncIOMotorDatabase = Depends(get_database), ): """Delete a review note. Only the note owner can delete.""" + job = await db.jobs.find_one({"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1}) + if not job: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + await assert_job_in_user_org(job, current_user, db) note = await db.review_notes.find_one({"_id": note_id, "job_id": job_id}) if not note: raise HTTPException( diff --git a/backend/app/api/v1/routes_vtt_versions.py b/backend/app/api/v1/routes_vtt_versions.py index 90ce300..d213949 100644 --- a/backend/app/api/v1/routes_vtt_versions.py +++ b/backend/app/api/v1/routes_vtt_versions.py @@ -4,7 +4,7 @@ from fastapi import APIRouter, Depends, HTTPException, Query, Request, status from motor.motor_asyncio import AsyncIOMotorDatabase from ...core.database import get_database -from ...core.dependencies import require_roles +from ...core.dependencies import assert_job_in_user_org, require_roles from ...models.user import User, UserRole from ...models.vtt_version import VttDiffResponse, VttKind, VttVersionListResponse, VttVersionSummary from ...services import vtt_versioning @@ -15,7 +15,16 @@ from ...core.config import settings router = APIRouter(prefix="/jobs", tags=["vtt-versions"]) -_EDITABLE_ROLES = (UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.ADMIN) +_EDITABLE_ROLES = (UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION, UserRole.PROJECT_MANAGER, UserRole.ADMIN) + + +async def _assert_job_access(job_id: str, user: User, db) -> None: + job = await db.jobs.find_one( + {"_id": job_id}, {"_id": 1, "organization_id": 1, "project_id": 1, "client_id": 1} + ) + if not job: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + await assert_job_in_user_org(job, user, db) @router.get("/{job_id}/vtt/versions", response_model=VttVersionListResponse) @@ -29,6 +38,7 @@ async def list_vtt_versions( db: AsyncIOMotorDatabase = Depends(get_database), ): """List all VTT versions for a job/lang/kind, newest first.""" + await _assert_job_access(job_id, current_user, db) return await vtt_versioning.list_versions(db, job_id, lang, kind, skip, limit) @@ -42,6 +52,7 @@ async def get_vtt_version( db: AsyncIOMotorDatabase = Depends(get_database), ): """Get full VTT content for a specific version.""" + await _assert_job_access(job_id, current_user, db) v = await vtt_versioning.get_version(db, job_id, lang, kind, version) if not v: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Version not found") @@ -72,6 +83,7 @@ async def diff_vtt_versions( db: AsyncIOMotorDatabase = Depends(get_database), ): """Line-level diff between two versions of a VTT file.""" + await _assert_job_access(job_id, current_user, db) v_from = await vtt_versioning.get_version(db, job_id, lang, kind, from_version) v_to = await vtt_versioning.get_version(db, job_id, lang, kind, to_version) if not v_from: @@ -100,6 +112,7 @@ async def restore_vtt_version( Non-destructive: creates a new version entry whose content mirrors the old one, then overwrites the live GCS file. """ + await _assert_job_access(job_id, current_user, db) src = await vtt_versioning.get_version(db, job_id, lang, kind, version) if not src: raise HTTPException(status_code=404, detail="Version not found") diff --git a/backend/app/api/v1/routes_websockets.py b/backend/app/api/v1/routes_websockets.py index b1e3180..623e669 100644 --- a/backend/app/api/v1/routes_websockets.py +++ b/backend/app/api/v1/routes_websockets.py @@ -18,8 +18,9 @@ from ...services.websocket import ( ConnectionManager ) from ...models.job import Job +from ...models.user import UserRole from ...core.database import get_database -from ...core.dependencies import get_current_user +from ...core.dependencies import get_current_user, get_user_org_ids logger = logging.getLogger(__name__) @@ -68,23 +69,25 @@ async def websocket_job_status( await websocket.close(code=4004, reason="Job not found") return - # Check permissions - users can only access their own jobs unless they're admin/reviewer - user = await db["users"].find_one({"_id": user_id}) - if not user: - try: - from bson import ObjectId - user = await db["users"].find_one({"_id": ObjectId(user_id)}) - except Exception: - pass # Invalid ObjectId format - - if not user: + # Verify user and check org membership + user_doc = await db["users"].find_one({"_id": user_id}) + if not user_doc: await websocket.close(code=4001, reason="User not found") return - - # Check access permissions - if user["role"] == "client" and job.get("created_by") != user_id: - await websocket.close(code=4003, reason="Access denied") - return + + from ...models.user import User + user_obj = User(**user_doc) + + # ADMIN sees everything; others must belong to the job's org + if user_obj.role != UserRole.ADMIN: + org_ids = await get_user_org_ids(user_obj, db) + job_org = job.get("organization_id") or job.get("client_id") + if job_org and org_ids is not None and job_org not in org_ids: + logger.warning( + f"WS: user {user_id} tried to subscribe to job {job_id} from a different org" + ) + await websocket.close(code=4003, reason="Access denied") + return # Connect to job status updates await manager.connect_job_status(websocket, user_id, job_id) diff --git a/backend/app/core/dependencies.py b/backend/app/core/dependencies.py index 8184d80..c854e05 100644 --- a/backend/app/core/dependencies.py +++ b/backend/app/core/dependencies.py @@ -1,11 +1,9 @@ -from typing import Optional from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from motor.motor_asyncio import AsyncIOMotorDatabase from ..models.user import User, UserRole -from .config import settings from .database import get_database from .security import decode_token @@ -15,6 +13,57 @@ security = HTTPBearer() STAFF_ROLES = {UserRole.ADMIN} +async def get_user_org_ids(user: User, db: AsyncIOMotorDatabase) -> list[str] | None: + """Return organization IDs accessible to the user, or None meaning unrestricted (ADMIN only).""" + if user.role == UserRole.ADMIN: + return None # ADMIN sees everything + user_id = str(user.id) + membership_cursor = db.memberships.find({"user_id": user_id}, {"organization_id": 1}) + org_ids = [doc["organization_id"] async for doc in membership_cursor] + if org_ids: + return org_ids + # Legacy: team membership + teams = await db.teams.find({"member_user_ids": user_id}, {"client_id": 1}).to_list(None) + client_ids = list({t["client_id"] for t in teams if t.get("client_id")}) + if client_ids: + return client_ids + # PM legacy via pm_client_ids + if user.role == UserRole.PROJECT_MANAGER and user.pm_client_ids: + return list(user.pm_client_ids) + # No membership at all — return empty list (see nothing) + return [] + + +async def assert_job_in_user_org(job_doc: dict, user: User, db: AsyncIOMotorDatabase) -> None: + """Raise 404 if the user cannot access the job's organization.""" + if user.role == UserRole.ADMIN: + return + org_ids = await get_user_org_ids(user, db) + if org_ids is None: + return # unrestricted (should only happen for ADMIN but defensive) + + # Check explicit organization_id field first (new jobs) + job_org = job_doc.get("organization_id") + if job_org: + if job_org not in org_ids: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + return + + # Fallback: derive org from project + project_id = job_doc.get("project_id") + if project_id: + project = await db.projects.find_one({"_id": project_id}, {"client_id": 1}) + if project and project.get("client_id") in org_ids: + return + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + + # Legacy jobs with no project: only visible to the user who created them + if job_doc.get("client_id") == str(user.id): + return + # Admin-only for fully legacy jobs with no org context + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + + async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), db: AsyncIOMotorDatabase = Depends(get_database), @@ -73,7 +122,7 @@ def require_roles(*required_roles: UserRole): async def get_current_user_optional( request: Request, db: AsyncIOMotorDatabase = Depends(get_database), -) -> Optional[User]: +) -> User | None: authorization: str = request.headers.get("Authorization") if not authorization: return None @@ -104,7 +153,7 @@ async def get_current_user_optional( async def get_accessible_project_ids( user: User, db: AsyncIOMotorDatabase, -) -> Optional[list[str]]: +) -> list[str] | None: """ Returns project IDs the user may access, or None meaning "see everything". @@ -156,11 +205,8 @@ async def get_accessible_project_ids( ).to_list(None) return [str(p["_id"]) for p in projects] - # Staff with no team assignments → unrestricted until teams are configured - if user.role in {UserRole.REVIEWER, UserRole.LINGUIST, UserRole.PRODUCTION}: - return None - - # CLIENT with no memberships and no teams → show nothing + # Staff with no team assignments → show nothing (no org context yet) + # Returning None here would give unrestricted cross-tenant access. return [] diff --git a/backend/app/models/audit_log.py b/backend/app/models/audit_log.py index 7f4fa08..17c92d1 100644 --- a/backend/app/models/audit_log.py +++ b/backend/app/models/audit_log.py @@ -93,11 +93,14 @@ class AuditLog(BaseModel): id: Optional[PyObjectId] = Field(default_factory=lambda: str(ObjectId()), alias="_id") + # Tenant isolation + organization_id: Optional[str] = None + # Core audit fields timestamp: datetime = Field(default_factory=datetime.utcnow) action: AuditAction severity: AuditLogSeverity = AuditLogSeverity.INFO - + # Actor information user_id: Optional[PyObjectId] = None user_email: Optional[str] = None diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 6a1effc..d308ed9 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -235,6 +235,7 @@ class Job(BaseModel): ai: Optional[AISection] = None error: Optional[dict[str, Any]] = None tts_rewrites: Optional[list[dict[str, Any]]] = None # Track auto-rewritten TTS cues + organization_id: Optional[str] = None # Denormalized client_id (org) for tenant isolation project_id: Optional[str] = None # Platform project this job belongs to (Client → Project → Job) brand_context: Optional[str] = None # Brand names present in the video for accurate product identification cost_tracker_project_id: Optional[str] = None # External project ID for AI cost attribution diff --git a/backend/app/models/review_note.py b/backend/app/models/review_note.py index 4611264..0da37e4 100644 --- a/backend/app/models/review_note.py +++ b/backend/app/models/review_note.py @@ -10,6 +10,7 @@ class ReviewNote(BaseModel): """A timestamped note attached to a video asset during review.""" id: Optional[str] = Field(None, alias="_id") + organization_id: Optional[str] = None # Denormalized for tenant isolation job_id: str asset_key: str # e.g., "en", "es", "en_accessible" timestamp_seconds: float # Video timestamp when note was created diff --git a/backend/app/models/vtt_version.py b/backend/app/models/vtt_version.py index acecdfa..8a43a2d 100644 --- a/backend/app/models/vtt_version.py +++ b/backend/app/models/vtt_version.py @@ -14,6 +14,7 @@ class VttVersionActor(BaseModel): class VttVersion(BaseModel): id: Optional[str] = Field(None, alias="_id") + organization_id: Optional[str] = None # Denormalized for tenant isolation job_id: str lang: str kind: VttKind diff --git a/backend/app/services/audit_logger.py b/backend/app/services/audit_logger.py index 39261b4..429fa10 100644 --- a/backend/app/services/audit_logger.py +++ b/backend/app/services/audit_logger.py @@ -47,7 +47,8 @@ class AuditLogger: details: Optional[Dict[str, Any]] = None, severity: AuditLogSeverity = AuditLogSeverity.INFO, success: bool = True, - error_message: Optional[str] = None + error_message: Optional[str] = None, + organization_id: Optional[str] = None, ) -> str: """ Log an audit event. @@ -77,6 +78,7 @@ class AuditLogger: action=action, severity=severity, description=description, + organization_id=organization_id, user_id=user.id if user else None, user_email=user.email if user else None, user_role=(user.role.value if hasattr(user.role, "value") else user.role) if user else None, @@ -277,16 +279,23 @@ async def log_auth_failure(email: str, request: Request, reason: str): ) -async def log_job_action(action: AuditAction, job_id: str, user: User, request: Request, details: Optional[Dict] = None): +async def log_job_action( + action: AuditAction, + job_id: str, + user: User, + request: Request, + details: Optional[Dict] = None, + organization_id: Optional[str] = None, +): """Log job-related actions.""" action_descriptions = { AuditAction.JOB_CREATE: "Job created", - AuditAction.JOB_APPROVE: "Job approved", + AuditAction.JOB_APPROVE: "Job approved", AuditAction.JOB_REJECT: "Job rejected", AuditAction.JOB_CANCEL: "Job cancelled", AuditAction.JOB_UPDATE: "Job updated" } - + await audit_logger.log_action( action=action, description=f"{action_descriptions.get(action, str(action))} by {user.email}", @@ -294,7 +303,8 @@ async def log_job_action(action: AuditAction, job_id: str, user: User, request: request=request, resource_type="job", resource_id=job_id, - details=details + details=details, + organization_id=organization_id, ) diff --git a/backend/migrations/2026_05_add_organization_id.py b/backend/migrations/2026_05_add_organization_id.py new file mode 100644 index 0000000..4f3dc0c --- /dev/null +++ b/backend/migrations/2026_05_add_organization_id.py @@ -0,0 +1,125 @@ +""" +Migration: add organization_id to jobs, review_notes, vtt_versions, audit_logs. + +Run once after deploying the multi-tenant isolation PR. + +Usage (inside the backend container): + python -m migrations.2026_05_add_organization_id + +Strategy: +- Jobs with a project_id → organization_id = project.client_id +- Jobs with no project_id → organization_id stays None (legacy single-tenant data; + the assert_job_in_user_org fallback handles these via + client_id == user.id check) +- review_notes / vtt_versions / audit_logs → copy organization_id from their parent job +""" + +import asyncio +import logging +from motor.motor_asyncio import AsyncIOMotorClient + +from app.core.config import get_settings + +logger = logging.getLogger(__name__) +logging.basicConfig(level=logging.INFO) + + +async def run(): + settings = get_settings() + client = AsyncIOMotorClient(settings.mongodb_url) + db = client[settings.mongodb_db_name] + + # ── Step 1: backfill jobs ──────────────────────────────────────────────── + logger.info("Backfilling organization_id on jobs…") + + # Build project_id → client_id lookup + project_map: dict[str, str] = {} + async for proj in db.projects.find({}, {"_id": 1, "client_id": 1}): + project_map[str(proj["_id"])] = str(proj["client_id"]) + + jobs_updated = 0 + async for job in db.jobs.find( + {"organization_id": {"$exists": False}, "project_id": {"$ne": None}}, + {"_id": 1, "project_id": 1}, + ): + pid = job.get("project_id") + if pid and pid in project_map: + await db.jobs.update_one( + {"_id": job["_id"]}, + {"$set": {"organization_id": project_map[pid]}}, + ) + jobs_updated += 1 + + logger.info(f"Jobs updated: {jobs_updated}") + + # ── Step 2: build job_id → organization_id for downstream tables ───────── + job_org_map: dict[str, str] = {} + async for job in db.jobs.find( + {"organization_id": {"$exists": True}}, + {"_id": 1, "organization_id": 1}, + ): + if job.get("organization_id"): + job_org_map[str(job["_id"])] = job["organization_id"] + + # ── Step 3: backfill review_notes ──────────────────────────────────────── + logger.info("Backfilling organization_id on review_notes…") + notes_updated = 0 + async for note in db.review_notes.find( + {"organization_id": {"$exists": False}}, + {"_id": 1, "job_id": 1}, + ): + org = job_org_map.get(note.get("job_id", "")) + if org: + await db.review_notes.update_one( + {"_id": note["_id"]}, + {"$set": {"organization_id": org}}, + ) + notes_updated += 1 + logger.info(f"Review notes updated: {notes_updated}") + + # ── Step 4: backfill vtt_versions ──────────────────────────────────────── + logger.info("Backfilling organization_id on vtt_versions…") + vtt_updated = 0 + async for v in db.vtt_versions.find( + {"organization_id": {"$exists": False}}, + {"_id": 1, "job_id": 1}, + ): + org = job_org_map.get(v.get("job_id", "")) + if org: + await db.vtt_versions.update_one( + {"_id": v["_id"]}, + {"$set": {"organization_id": org}}, + ) + vtt_updated += 1 + logger.info(f"VTT versions updated: {vtt_updated}") + + # ── Step 5: backfill audit_logs ────────────────────────────────────────── + logger.info("Backfilling organization_id on audit_logs…") + audit_updated = 0 + async for entry in db.audit_logs.find( + {"organization_id": {"$exists": False}, "resource_type": "job"}, + {"_id": 1, "resource_id": 1}, + ): + org = job_org_map.get(entry.get("resource_id", "")) + if org: + await db.audit_logs.update_one( + {"_id": entry["_id"]}, + {"$set": {"organization_id": org}}, + ) + audit_updated += 1 + logger.info(f"Audit logs updated: {audit_updated}") + + # ── Step 6: create indexes ─────────────────────────────────────────────── + logger.info("Creating organization_id indexes…") + await db.jobs.create_index([("organization_id", 1), ("created_at", -1)]) + await db.review_notes.create_index([("organization_id", 1), ("job_id", 1)]) + await db.vtt_versions.create_index([("organization_id", 1), ("job_id", 1)]) + await db.audit_logs.create_index([("organization_id", 1), ("timestamp", -1)]) + logger.info("Indexes created.") + + client.close() + logger.info("Migration complete.") + + +if __name__ == "__main__": + asyncio.run(run()) diff --git a/backend/tests/unit/test_cross_tenant_isolation.py b/backend/tests/unit/test_cross_tenant_isolation.py new file mode 100644 index 0000000..b9bf060 --- /dev/null +++ b/backend/tests/unit/test_cross_tenant_isolation.py @@ -0,0 +1,191 @@ +""" +Cross-tenant isolation tests. + +Verifies that assert_job_in_user_org raises 404 (not 403, to avoid information +disclosure) when a user from org A tries to access a job belonging to org B. +Also verifies ADMIN always passes and legacy (no organization_id) jobs fall back +to project-based checks. +""" + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch +from fastapi import HTTPException + +from app.core.dependencies import assert_job_in_user_org, get_user_org_ids +from app.models.user import User, UserRole + + +# ── Helpers ──────────────────────────────────────────────────────────────────── + +def _make_user(role: UserRole, user_id: str = "user-a") -> User: + return User( + **{ + "_id": user_id, + "email": f"{user_id}@example.com", + "full_name": "Test User", + "role": role, + "is_active": True, + } + ) + + +def _make_db(memberships: list[dict], teams: list[dict] | None = None, project: dict | None = None): + """Return a mock AsyncIOMotorDatabase with memberships/teams/projects collections.""" + db = MagicMock() + + # memberships.find(...) + async def _mem_find(*args, **kwargs): + return memberships + + mem_cursor = MagicMock() + mem_cursor.__aiter__ = AsyncMock( + return_value=iter(memberships) + ) + # Use async list comprehension-compatible mock + async def _mem_iter(): + for m in memberships: + yield m + + db.memberships.find.return_value = _AsyncIterableMock(memberships) + + # teams.find(...) + team_list = teams or [] + db.teams.find.return_value = MagicMock() + db.teams.find.return_value.to_list = AsyncMock(return_value=team_list) + + # projects.find_one(...) + db.projects.find_one = AsyncMock(return_value=project) + + return db + + +class _AsyncIterableMock: + """Supports `async for doc in cursor` pattern used in get_user_org_ids.""" + + def __init__(self, items: list): + self._items = items + + def __aiter__(self): + return self._aiter() + + async def _aiter(self): + for item in self._items: + yield item + + +# ── get_user_org_ids ─────────────────────────────────────────────────────────── + +class TestGetUserOrgIds: + @pytest.mark.asyncio + async def test_admin_returns_none(self): + user = _make_user(UserRole.ADMIN) + db = _make_db([]) + result = await get_user_org_ids(user, db) + assert result is None + + @pytest.mark.asyncio + async def test_staff_with_membership_returns_orgs(self): + user = _make_user(UserRole.LINGUIST) + db = _make_db([{"organization_id": "org-a"}, {"organization_id": "org-b"}]) + result = await get_user_org_ids(user, db) + assert result == ["org-a", "org-b"] + + @pytest.mark.asyncio + async def test_staff_no_membership_no_team_returns_empty(self): + user = _make_user(UserRole.REVIEWER) + db = _make_db([]) + result = await get_user_org_ids(user, db) + assert result == [] + + @pytest.mark.asyncio + async def test_pm_legacy_pm_client_ids(self): + user = _make_user(UserRole.PROJECT_MANAGER) + user.pm_client_ids = ["client-x"] + db = _make_db([]) + result = await get_user_org_ids(user, db) + assert result == ["client-x"] + + @pytest.mark.asyncio + async def test_staff_falls_back_to_team_membership(self): + user = _make_user(UserRole.PRODUCTION) + db = _make_db( + memberships=[], # no memberships + teams=[{"client_id": "client-y"}], + ) + result = await get_user_org_ids(user, db) + assert result == ["client-y"] + + +# ── assert_job_in_user_org ───────────────────────────────────────────────────── + +class TestAssertJobInUserOrg: + @pytest.mark.asyncio + async def test_admin_always_passes(self): + user = _make_user(UserRole.ADMIN) + job = {"_id": "job-1", "organization_id": "org-b"} + db = _make_db([]) + await assert_job_in_user_org(job, user, db) # must not raise + + @pytest.mark.asyncio + async def test_same_org_passes(self): + user = _make_user(UserRole.LINGUIST) + db = _make_db([{"organization_id": "org-a"}]) + job = {"_id": "job-1", "organization_id": "org-a"} + await assert_job_in_user_org(job, user, db) # must not raise + + @pytest.mark.asyncio + async def test_cross_org_raises_404(self): + user = _make_user(UserRole.LINGUIST) + db = _make_db([{"organization_id": "org-a"}]) + job = {"_id": "job-1", "organization_id": "org-b"} + with pytest.raises(HTTPException) as exc: + await assert_job_in_user_org(job, user, db) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_no_organization_id_project_fallback_passes(self): + user = _make_user(UserRole.REVIEWER) + db = _make_db( + memberships=[{"organization_id": "org-a"}], + project={"_id": "proj-1", "client_id": "org-a"}, + ) + job = {"_id": "job-1", "project_id": "proj-1"} # no organization_id + await assert_job_in_user_org(job, user, db) + + @pytest.mark.asyncio + async def test_no_organization_id_project_from_other_org_raises_404(self): + user = _make_user(UserRole.REVIEWER) + db = _make_db( + memberships=[{"organization_id": "org-a"}], + project={"_id": "proj-1", "client_id": "org-b"}, + ) + job = {"_id": "job-1", "project_id": "proj-1"} + with pytest.raises(HTTPException) as exc: + await assert_job_in_user_org(job, user, db) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_legacy_job_own_user_passes(self): + user = _make_user(UserRole.LINGUIST, user_id="user-a") + db = _make_db([]) # no memberships + job = {"_id": "job-1", "client_id": "user-a"} # legacy: client_id = creator user_id + await assert_job_in_user_org(job, user, db) + + @pytest.mark.asyncio + async def test_legacy_job_other_user_raises_404(self): + user = _make_user(UserRole.LINGUIST, user_id="user-a") + db = _make_db([]) # no memberships + job = {"_id": "job-1", "client_id": "user-b"} # belongs to a different user + with pytest.raises(HTTPException) as exc: + await assert_job_in_user_org(job, user, db) + assert exc.value.status_code == 404 + + @pytest.mark.asyncio + async def test_user_with_no_org_context_denied_access_to_org_job(self): + """Staff with no memberships and no team cannot access any org-tagged job.""" + user = _make_user(UserRole.PRODUCTION) + db = _make_db([]) + job = {"_id": "job-1", "organization_id": "org-c"} + with pytest.raises(HTTPException) as exc: + await assert_job_in_user_org(job, user, db) + assert exc.value.status_code == 404