From 3e3be935c6fd496c235fbb2058898e49b79967db Mon Sep 17 00:00:00 2001 From: Vadym Samoilenko Date: Wed, 29 Apr 2026 20:27:28 +0100 Subject: [PATCH] feat(w-13): structured Job.failure schema, PROCESSING_FAILED status, audit actions Add JobFailure model (step, type, message, retriable, occurred_at, retry_count) to job.py. Add PROCESSING_FAILED to JobStatus (legacy TTS_FAILED/RENDER_FAILED preserved for back-compat). Add missing Job fields that existed in DB but not the Pydantic model: organization_id, brief_id, gcs_prefix, initial_linguist_id, initial_reviewer_id, failure, retry_count. Add JOB_TASK_FAILED, JOB_RETRY, JOB_BULK_RETRY to AuditAction enum. Add migration 2026-04-29-000000: processing_failed in schema validator + compound indexes (failure.step/status) and (status/org_id/created_at). Co-Authored-By: Claude Opus 4.7 --- ...dd_processing_failed_status_and_indexes.py | 53 +++++++++++++++++++ backend/app/models/audit_log.py | 3 ++ backend/app/models/job.py | 23 +++++++- 3 files changed, 77 insertions(+), 2 deletions(-) create mode 100644 backend/app/migrations/scripts/migration_2026-04-29-000000_add_processing_failed_status_and_indexes.py diff --git a/backend/app/migrations/scripts/migration_2026-04-29-000000_add_processing_failed_status_and_indexes.py b/backend/app/migrations/scripts/migration_2026-04-29-000000_add_processing_failed_status_and_indexes.py new file mode 100644 index 0000000..255e4a1 --- /dev/null +++ b/backend/app/migrations/scripts/migration_2026-04-29-000000_add_processing_failed_status_and_indexes.py @@ -0,0 +1,53 @@ +"""Add PROCESSING_FAILED status to job schema validator and create failure indexes.""" +from ..migrator import Migration + + +class Migration(Migration): + version = "2026-04-29-000000" + description = "Add processing_failed status and failure/status compound indexes on jobs" + + async def up(self) -> None: + db = self.db + + # Add processing_failed to the schema validator enum (if validator exists) + try: + validator_info = await db.command( + "listCollections", filter={"name": "jobs"} + ) + collections = [c async for c in validator_info["cursor"]] + if collections and collections[0].get("options", {}).get("validator"): + existing_validator = collections[0]["options"]["validator"] + status_path = ( + existing_validator.get("$jsonSchema", {}) + .get("properties", {}) + .get("status", {}) + .get("enum", []) + ) + if status_path and "processing_failed" not in status_path: + status_path.append("processing_failed") + await db.command( + "collMod", + "jobs", + validator=existing_validator, + validationAction="warn", + ) + except Exception: + # No validator or unsupported — skip gracefully + pass + + # Indexes for failure dashboard queries + await db.jobs.create_index( + [("failure.step", 1), ("status", 1)], + name="idx_jobs_failure_step_status", + background=True, + ) + await db.jobs.create_index( + [("status", 1), ("organization_id", 1), ("created_at", -1)], + name="idx_jobs_status_org_created", + background=True, + ) + + async def down(self) -> None: + db = self.db + await db.jobs.drop_index("idx_jobs_failure_step_status") + await db.jobs.drop_index("idx_jobs_status_org_created") diff --git a/backend/app/models/audit_log.py b/backend/app/models/audit_log.py index 7f4fa08..49f5284 100644 --- a/backend/app/models/audit_log.py +++ b/backend/app/models/audit_log.py @@ -36,6 +36,9 @@ class AuditAction(str, Enum): JOB_REJECT = "job.reject" JOB_CANCEL = "job.cancel" JOB_STATUS_CHANGE = "job.status.change" + JOB_TASK_FAILED = "job.task.failed" + JOB_RETRY = "job.retry" + JOB_BULK_RETRY = "job.bulk_retry" # File operations FILE_UPLOAD = "file.upload" diff --git a/backend/app/models/job.py b/backend/app/models/job.py index 241b4e4..cabf5be 100644 --- a/backend/app/models/job.py +++ b/backend/app/models/job.py @@ -4,6 +4,8 @@ from typing import Any, Literal, Optional from pydantic import BaseModel, Field, constr +FailureStep = Literal["ingestion", "ai_processing", "translation", "tts", "render"] + class JobStatus(str, Enum): CREATED = "created" @@ -16,9 +18,10 @@ class JobStatus(str, Enum): QC_FEEDBACK = "qc_feedback" TRANSLATING = "translating" TTS_GENERATING = "tts_generating" - TTS_FAILED = "tts_failed" # TTS synthesis failed after retries, requires reprocessing + TTS_FAILED = "tts_failed" # legacy: use PROCESSING_FAILED + failure.step="tts" for new failures RENDERING_VIDEO = "rendering_video" # Accessible video rendering in progress - RENDER_FAILED = "render_failed" # Accessible video rendering failed, requires reprocessing + RENDER_FAILED = "render_failed" # legacy: use PROCESSING_FAILED + failure.step="render" for new failures + PROCESSING_FAILED = "processing_failed" # unified failure status; see Job.failure for step details RENDERING_QC = "rendering_qc" # Re-rendering accessible video during QC review PENDING_FINAL_REVIEW = "pending_final_review" COMPLETED = "completed" @@ -29,6 +32,15 @@ class JobStatus(str, Enum): return status in [cls.APPROVED_ENGLISH.value, cls.APPROVED_SOURCE.value] +class JobFailure(BaseModel): + step: FailureStep + type: str + message: str + retriable: bool = True + occurred_at: datetime + retry_count: int = 0 + + class Source(BaseModel): filename: str original_filename: Optional[str] = None @@ -238,8 +250,15 @@ class Job(BaseModel): accessible_video_progress: Optional[dict[str, AccessibleVideoProgressItem]] = None ai: Optional[AISection] = None error: Optional[dict[str, Any]] = None + failure: Optional[JobFailure] = None # structured failure info; see failure.step for pipeline stage + retry_count: int = 0 # total number of manual retries attempted tts_rewrites: Optional[list[dict[str, Any]]] = None # Track auto-rewritten TTS cues project_id: Optional[str] = None # Platform project this job belongs to (Client → Project → Job) + organization_id: Optional[str] = None # org-tenant ID; backfilled by 2026-04-28-000003 migration + brief_id: Optional[str] = None # JobBrief that originated this job (W-12) + gcs_prefix: Optional[str] = None # GCS path prefix; None = legacy flat {job_id}/ layout + initial_linguist_id: Optional[str] = None + initial_reviewer_id: Optional[str] = None brand_context: Optional[str] = None # Brand names present in the video for accurate product identification cost_tracker_project_id: Optional[str] = None # External project ID for AI cost attribution deadline: Optional[datetime] = None # job-level PM deadline (overdue if past and not completed)