feat(w-13): structured Job.failure schema, PROCESSING_FAILED status, audit actions

Add JobFailure model (step, type, message, retriable, occurred_at,
retry_count) to job.py. Add PROCESSING_FAILED to JobStatus (legacy
TTS_FAILED/RENDER_FAILED preserved for back-compat).

Add missing Job fields that existed in DB but not the Pydantic model:
organization_id, brief_id, gcs_prefix, initial_linguist_id,
initial_reviewer_id, failure, retry_count.

Add JOB_TASK_FAILED, JOB_RETRY, JOB_BULK_RETRY to AuditAction enum.

Add migration 2026-04-29-000000: processing_failed in schema validator +
compound indexes (failure.step/status) and (status/org_id/created_at).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-04-29 20:27:28 +01:00
parent 38038862c9
commit 3e3be935c6
3 changed files with 77 additions and 2 deletions

View file

@ -0,0 +1,53 @@
"""Add PROCESSING_FAILED status to job schema validator and create failure indexes."""
from ..migrator import Migration
class Migration(Migration):
version = "2026-04-29-000000"
description = "Add processing_failed status and failure/status compound indexes on jobs"
async def up(self) -> None:
db = self.db
# Add processing_failed to the schema validator enum (if validator exists)
try:
validator_info = await db.command(
"listCollections", filter={"name": "jobs"}
)
collections = [c async for c in validator_info["cursor"]]
if collections and collections[0].get("options", {}).get("validator"):
existing_validator = collections[0]["options"]["validator"]
status_path = (
existing_validator.get("$jsonSchema", {})
.get("properties", {})
.get("status", {})
.get("enum", [])
)
if status_path and "processing_failed" not in status_path:
status_path.append("processing_failed")
await db.command(
"collMod",
"jobs",
validator=existing_validator,
validationAction="warn",
)
except Exception:
# No validator or unsupported — skip gracefully
pass
# Indexes for failure dashboard queries
await db.jobs.create_index(
[("failure.step", 1), ("status", 1)],
name="idx_jobs_failure_step_status",
background=True,
)
await db.jobs.create_index(
[("status", 1), ("organization_id", 1), ("created_at", -1)],
name="idx_jobs_status_org_created",
background=True,
)
async def down(self) -> None:
db = self.db
await db.jobs.drop_index("idx_jobs_failure_step_status")
await db.jobs.drop_index("idx_jobs_status_org_created")

View file

@ -36,6 +36,9 @@ class AuditAction(str, Enum):
JOB_REJECT = "job.reject"
JOB_CANCEL = "job.cancel"
JOB_STATUS_CHANGE = "job.status.change"
JOB_TASK_FAILED = "job.task.failed"
JOB_RETRY = "job.retry"
JOB_BULK_RETRY = "job.bulk_retry"
# File operations
FILE_UPLOAD = "file.upload"

View file

@ -4,6 +4,8 @@ from typing import Any, Literal, Optional
from pydantic import BaseModel, Field, constr
FailureStep = Literal["ingestion", "ai_processing", "translation", "tts", "render"]
class JobStatus(str, Enum):
CREATED = "created"
@ -16,9 +18,10 @@ class JobStatus(str, Enum):
QC_FEEDBACK = "qc_feedback"
TRANSLATING = "translating"
TTS_GENERATING = "tts_generating"
TTS_FAILED = "tts_failed" # TTS synthesis failed after retries, requires reprocessing
TTS_FAILED = "tts_failed" # legacy: use PROCESSING_FAILED + failure.step="tts" for new failures
RENDERING_VIDEO = "rendering_video" # Accessible video rendering in progress
RENDER_FAILED = "render_failed" # Accessible video rendering failed, requires reprocessing
RENDER_FAILED = "render_failed" # legacy: use PROCESSING_FAILED + failure.step="render" for new failures
PROCESSING_FAILED = "processing_failed" # unified failure status; see Job.failure for step details
RENDERING_QC = "rendering_qc" # Re-rendering accessible video during QC review
PENDING_FINAL_REVIEW = "pending_final_review"
COMPLETED = "completed"
@ -29,6 +32,15 @@ class JobStatus(str, Enum):
return status in [cls.APPROVED_ENGLISH.value, cls.APPROVED_SOURCE.value]
class JobFailure(BaseModel):
step: FailureStep
type: str
message: str
retriable: bool = True
occurred_at: datetime
retry_count: int = 0
class Source(BaseModel):
filename: str
original_filename: Optional[str] = None
@ -238,8 +250,15 @@ class Job(BaseModel):
accessible_video_progress: Optional[dict[str, AccessibleVideoProgressItem]] = None
ai: Optional[AISection] = None
error: Optional[dict[str, Any]] = None
failure: Optional[JobFailure] = None # structured failure info; see failure.step for pipeline stage
retry_count: int = 0 # total number of manual retries attempted
tts_rewrites: Optional[list[dict[str, Any]]] = None # Track auto-rewritten TTS cues
project_id: Optional[str] = None # Platform project this job belongs to (Client → Project → Job)
organization_id: Optional[str] = None # org-tenant ID; backfilled by 2026-04-28-000003 migration
brief_id: Optional[str] = None # JobBrief that originated this job (W-12)
gcs_prefix: Optional[str] = None # GCS path prefix; None = legacy flat {job_id}/ layout
initial_linguist_id: Optional[str] = None
initial_reviewer_id: Optional[str] = None
brand_context: Optional[str] = None # Brand names present in the video for accurate product identification
cost_tracker_project_id: Optional[str] = None # External project ID for AI cost attribution
deadline: Optional[datetime] = None # job-level PM deadline (overdue if past and not completed)