diff --git a/backend/app/api/v1/routes_jobs.py b/backend/app/api/v1/routes_jobs.py index c2236ce..9306bbc 100644 --- a/backend/app/api/v1/routes_jobs.py +++ b/backend/app/api/v1/routes_jobs.py @@ -21,6 +21,7 @@ from ...core.config import settings from ...core.database import get_database from ...core.authz import MembershipContext, get_job_or_403, get_membership_context from ...core.dependencies import ( + assert_job_in_user_org, get_accessible_project_ids, get_current_user, require_roles, @@ -43,6 +44,7 @@ from ...schemas.job import ( ApproveEnglishRequest, ApproveSourceRequest, AssetValidationResponse, + BlockedOnSourceRequest, BulkApproveRequest, BulkApproveResponse, BulkDeleteRequest, @@ -56,6 +58,7 @@ from ...schemas.job import ( JobListResponse, JobResponse, JobUpdateRequest, + PromoteToQCRequest, RejectJobRequest, ReturnToQCRequest, UpdateTTSPreferencesRequest, @@ -75,8 +78,6 @@ from ...services.validation import asset_validation_service from ...services.websocket import connection_manager from ...services.zip_download import generate_zip_filename, generate_zip_stream from ...tasks import celery_app -from ...tasks.ingest_and_ai import ingest_and_ai_task -from ...tasks.translate_and_synthesize import translate_and_synthesize_task logger = get_logger(__name__) router = APIRouter(prefix="/jobs", tags=["jobs"]) @@ -1093,6 +1094,120 @@ async def return_to_qc( ) +@router.post("/{job_id}/actions/blocked_on_source", response_model=JobResponse) +async def blocked_on_source( + job_id: str, + request: BlockedOnSourceRequest, + http_request: Request, + current_user: User = Depends(require_roles( + UserRole.LINGUIST, UserRole.REVIEWER, UserRole.PRODUCTION, UserRole.ADMIN, + )), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Linguist/Reviewer flags that the source video has an issue requiring PM attention (L-18). + Transitions job to QC_FEEDBACK and notifies the PM.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException(status_code=404, detail="Job not found") + await assert_job_in_user_org(job_doc, current_user, db) + + now = datetime.utcnow() + result = await db.jobs.find_one_and_update( + {"_id": job_id, "status": {"$in": [ + JobStatus.PENDING_QC.value, JobStatus.QC_FEEDBACK.value, + ]}}, + {"$set": { + "status": JobStatus.QC_FEEDBACK.value, + "review.notes": f"[BLOCKED ON SOURCE] {request.reason}", + "blocked_on_source_reason": request.reason, + "blocked_on_source_at": now, + "blocked_on_source_by": str(current_user.id), + "updated_at": now, + }}, + return_document=True, + ) + if not result: + raise HTTPException(status_code=400, detail="Job is not in a QC status") + + await log_job_action( + AuditAction.JOB_STATUS_CHANGE, job_id, current_user, http_request, + details={"new_status": JobStatus.QC_FEEDBACK.value, "reason": request.reason, "action": "blocked_on_source"}, + ) + return JobResponse( + id=str(result["_id"]), + title=result["title"], + status=result["status"], + source=result["source"], + requested_outputs=RequestedOutputs(**result["requested_outputs"]), + review=result.get("review", {"notes": "", "history": []}), + outputs=result.get("outputs"), + created_at=result["created_at"].isoformat(), + updated_at=result["updated_at"].isoformat(), + ) + + +@router.post("/{job_id}/actions/promote_to_qc", response_model=JobResponse) +async def promote_to_qc( + job_id: str, + request: PromoteToQCRequest, + http_request: Request, + current_user: User = Depends(require_roles(UserRole.PRODUCTION, UserRole.ADMIN)), + db: AsyncIOMotorDatabase = Depends(get_database), +): + """Production/Admin manually promotes a job to PENDING_QC, bypassing AI processing (PR-10). + Used when AI fails on an edge-case and Production wants to proceed manually.""" + job_doc = await db.jobs.find_one({"_id": job_id}) + if not job_doc: + raise HTTPException(status_code=404, detail="Job not found") + await assert_job_in_user_org(job_doc, current_user, db) + + promotable_statuses = [ + JobStatus.AI_PROCESSING.value, + JobStatus.INGESTING.value, + JobStatus.PROCESSING_FAILED.value, + JobStatus.TTS_FAILED.value, + JobStatus.RENDER_FAILED.value, + ] + now = datetime.utcnow() + result = await db.jobs.find_one_and_update( + {"_id": job_id, "status": {"$in": promotable_statuses}}, + {"$set": { + "status": JobStatus.PENDING_QC.value, + "updated_at": now, + }, + "$push": { + "review.history": { + "at": now, + "status": JobStatus.PENDING_QC.value, + "by": str(current_user.id), + "notes": request.notes or "Manually promoted to QC", + } + }}, + return_document=True, + ) + if not result: + raise HTTPException( + status_code=400, + detail=f"Job cannot be promoted to QC from its current status. Must be one of: {', '.join(promotable_statuses)}", + ) + + await log_job_action( + AuditAction.JOB_STATUS_CHANGE, job_id, current_user, http_request, + details={"new_status": JobStatus.PENDING_QC.value, "notes": request.notes, "action": "promote_to_qc"}, + ) + return JobResponse( + id=str(result["_id"]), + title=result["title"], + status=result["status"], + source=result["source"], + requested_outputs=RequestedOutputs(**result["requested_outputs"]), + review=result.get("review", {"notes": "", "history": []}), + outputs=result.get("outputs"), + created_at=result["created_at"].isoformat(), + updated_at=result["updated_at"].isoformat(), + ) + + @router.get("/{job_id}/downloads", response_model=JobDownloadsResponse) async def get_job_downloads( job_id: str, diff --git a/backend/app/core/dependencies.py b/backend/app/core/dependencies.py index e943e3e..4a5b0f9 100644 --- a/backend/app/core/dependencies.py +++ b/backend/app/core/dependencies.py @@ -170,6 +170,67 @@ async def get_accessible_project_ids( return [] +async def get_user_org_ids(user: User, db: AsyncIOMotorDatabase) -> list[str] | None: + """Return org IDs the user belongs to, or None meaning unrestricted (ADMIN). + + Priority: memberships → pm_client_ids (PM legacy) → team.member_user_ids (staff legacy) + """ + if user.role == UserRole.ADMIN: + return None + + user_id = str(user.id) + + # Primary: Membership collection + org_ids: list[str] = [] + async for m in db.memberships.find({"user_id": user_id}, {"organization_id": 1}): + if m.get("organization_id"): + org_ids.append(str(m["organization_id"])) + if org_ids: + return org_ids + + # PM legacy: pm_client_ids + if user.role == UserRole.PROJECT_MANAGER: + return list(user.pm_client_ids or []) + + # Staff legacy: team.member_user_ids + teams = await db.teams.find({"member_user_ids": user_id}, {"client_id": 1}).to_list(None) + if teams: + return [str(t["client_id"]) for t in teams if t.get("client_id")] + + return [] + + +async def assert_job_in_user_org(job: dict, user: User, db: AsyncIOMotorDatabase) -> None: + """Raise 404 (not 403) when user cannot access this job — avoids information disclosure.""" + if user.role == UserRole.ADMIN: + return + + org_ids = await get_user_org_ids(user, db) + if org_ids is None: + return # unrestricted + + job_org = job.get("organization_id") + if job_org: + if job_org in org_ids: + return + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + + # No organization_id — try project fallback + project_id = job.get("project_id") + if project_id: + project = await db.projects.find_one({"_id": project_id}, {"client_id": 1}) + if project and project.get("client_id") in org_ids: + return + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + + # Legacy: client_id == creator user_id + job_client_id = job.get("client_id") + if job_client_id and job_client_id == str(user.id): + return + + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Job not found") + + def require_pm_for_client(client_id_param: str = "client_id"): """Dependency: ensures the current user is an Admin or PM for the given client.""" async def checker( diff --git a/backend/app/schemas/job.py b/backend/app/schemas/job.py index 095b450..55bab4a 100644 --- a/backend/app/schemas/job.py +++ b/backend/app/schemas/job.py @@ -149,3 +149,11 @@ class BulkReturnToQCResponse(BaseModel): class BulkDownloadRequest(BaseModel): """Request to download multiple jobs as a single zip file""" job_ids: list[str] + + +class BlockedOnSourceRequest(BaseModel): + reason: str # brief description of what is wrong with the source video + + +class PromoteToQCRequest(BaseModel): + notes: str = "" # optional context for the QC team diff --git a/backend/app/services/language_qc.py b/backend/app/services/language_qc.py index 137f16b..b6b8634 100644 --- a/backend/app/services/language_qc.py +++ b/backend/app/services/language_qc.py @@ -515,6 +515,7 @@ async def submit_for_review( **(current_state_raw if isinstance(current_state_raw, dict) else {}), "status": LanguageQCStatus.PENDING_REVIEW.value, "submitted_for_review_at": now, + "reviewed_cues": 0, # R-12: reviewer must re-acknowledge cues after each resubmit "history": history, }