From 376802db4112f04735954fc0758ee83acac69d86 Mon Sep 17 00:00:00 2001 From: DJP Date: Wed, 29 Apr 2026 20:11:37 -0400 Subject: [PATCH] Pipeline retry: idempotent `all` + retry endpoint + UI buttons MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pipeline `all` command now skips stages whose .state/stage{N}.done sentinel is present (unless --force), so retrying picks up exactly where the previous run failed without re-spending on completed stages. Stage 5 (validate) is the deliberate exception — it always re-runs because --drop-failing changes the selection set. - pipeline/cli.ts: `all` wraps every stage in a maybeRun() helper that checks the sentinel + writes one after running. Validate runs every retry; if it doesn't reach 100% coverage the pipeline now fails LOUDLY with a clear error rather than crashing in Stage 6. - routes/reports.ts: handleRetryReport — POST /api/reports/:id/retry, resets reports.status to pending (clears error_message + finished_at), spawns `pipeline cli all` with --drop-failing (and --force when the client passes {force:true}). Same singleton-running guard as run. - operator-app: useRetryReport hook + two new buttons on Reports detail: - "Retry pipeline" / "Force re-run" on the failure panel. - "Retry with drop-failing" on the manifest panel (Stage 5 specific). Items explicitly deferred (documented in head): - SSE for live progress (3s React Query polling already in place). - Conversational CLI brief intake (V3 §0; the operator-app form covers it). 62/62 unit tests pass. SPA build 284 kB. Co-Authored-By: Claude Opus 4.7 (1M context) --- v2/operator-app/src/api/reports.ts | 9 ++ v2/operator-app/src/routes/reports/detail.tsx | 83 ++++++++++++++++--- v2/pipeline/cli.ts | 54 +++++++++--- v2/server/routes/reports.ts | 49 +++++++++++ 4 files changed, 171 insertions(+), 24 deletions(-) diff --git a/v2/operator-app/src/api/reports.ts b/v2/operator-app/src/api/reports.ts index ac51b9f..ffea5da 100644 --- a/v2/operator-app/src/api/reports.ts +++ b/v2/operator-app/src/api/reports.ts @@ -113,3 +113,12 @@ export function useBuildReport(reportId: string | undefined) { onSuccess: () => qc.invalidateQueries({ queryKey: ['reports', reportId] }), }); } + +export function useRetryReport(reportId: string | undefined) { + const qc = useQueryClient(); + return useMutation<{ ok: true }, Error, { force?: boolean } | void>({ + mutationFn: (body) => + fetcher(`/reports/${reportId}/retry`, { method: 'POST', body: JSON.stringify(body ?? {}) }), + onSuccess: () => qc.invalidateQueries({ queryKey: ['reports', reportId] }), + }); +} diff --git a/v2/operator-app/src/routes/reports/detail.tsx b/v2/operator-app/src/routes/reports/detail.tsx index 348ccaa..b6dd8e2 100644 --- a/v2/operator-app/src/routes/reports/detail.tsx +++ b/v2/operator-app/src/routes/reports/detail.tsx @@ -1,7 +1,7 @@ import { useState } from 'react'; import { useParams } from 'react-router-dom'; import { - useReport, useQaSignoff, useBuildReport, + useReport, useQaSignoff, useBuildReport, useRetryReport, type CostEvent, type Report, type ReportStatus, TERMINAL_STATUSES, type ManifestSummary, type QaState, } from '../../api/reports'; @@ -80,14 +80,9 @@ export default function ReportDetail() { - {report.error_message && ( -
-
Pipeline failed
-
{report.error_message}
-
- )} + {report.error_message && } - {manifest && manifest.missing.length > 0 && } + {manifest && manifest.missing.length > 0 && } {showSignoffPanel && qa && } @@ -215,13 +210,75 @@ function EventLog({ events }: { events: CostEvent[] }) { ); } -function ManifestPanel({ manifest, reportBriefSlug }: { manifest: ManifestSummary; reportBriefSlug: string }) { +function FailurePanel({ reportId, error }: { reportId: string; error: string }) { + const retry = useRetryReport(reportId); + const [err, setErr] = useState(null); + return ( +
+
+
+
Pipeline failed
+
{error}
+
+
+ + +
+
+

+ Retry resumes from the failed stage (idempotent via .state sentinels). + --drop-failing is set automatically so Stage 5 backfills missing assets. +

+ {err &&
{err}
} +
+ ); +} + +function ManifestPanel({ manifest, reportId, reportBriefSlug }: { manifest: ManifestSummary; reportId: string; reportBriefSlug: string }) { const s = manifest.summary; + const retry = useRetryReport(reportId); return (
-

- Manifest gate — {s.coverage_pct}% coverage ({s.all_ok}/{s.selected_count}) -

+
+

+ Manifest gate — {s.coverage_pct}% coverage ({s.all_ok}/{s.selected_count}) +

+ +
{([ ['metadata', s.metadata_ok], @@ -251,7 +308,7 @@ function ManifestPanel({ manifest, reportBriefSlug }: { manifest: ManifestSummar

- To auto-backfill failures, on the server run: + Click Retry with drop-failing above, or run on the server: docker exec social-reporting-v2-app-v2-1 npx tsx pipeline/cli.ts validate --report {reportBriefSlug} --drop-failing diff --git a/v2/pipeline/cli.ts b/v2/pipeline/cli.ts index 95271b5..317b39b 100644 --- a/v2/pipeline/cli.ts +++ b/v2/pipeline/cli.ts @@ -316,26 +316,58 @@ async function main(): Promise { case 'all': { // Drive every stage end-to-end. validate runs with --drop-failing so the // manifest gate isn't a hard stop on first try; auto-backfill kicks in. + // Each stage is idempotent: completed stages (sentinel present) are + // skipped unless --force. That makes "retry pipeline" automatically + // resume from where it failed. const { brief, briefRow } = await loadBriefAndRow(reportId); const dropFailing = flags['drop-failing'] !== false; const recipe = (typeof flags.recipe === 'string' ? flags.recipe.toUpperCase() : undefined) as RecipeId | undefined; - await withStage(runId, 1, 'seeds', () => runStage1Seeds({ reportId, brief })); - await withStage(runId, 2, 'pass1', () => runStage2Pass1Scrape({ reportId, brief })); - await withStage(runId, 3, 'select', () => { + const rid = reportId; // close over a narrowed non-null reference for the helpers + async function maybeRun( + stage: number, name: ReportStatus, fn: () => Promise, sentinelExtras?: Record, + ): Promise { + if (isStageDone(rid, stage, force)) { + console.log(`[stage ${stage}] cached (use --force to re-run)`); + if (runId) await updateReportStatus(runId, name, stage).catch(() => {}); + return undefined; + } + const result = await withStage(runId, stage, name, fn); + writeStageDone(rid, stage, { command, at: new Date().toISOString(), ...(sentinelExtras ?? {}) }); + return result; + } + + await maybeRun(1, 'seeds', () => runStage1Seeds({ reportId, brief })); + await maybeRun(2, 'pass1', () => runStage2Pass1Scrape({ reportId, brief })); + await maybeRun(3, 'select', () => { const a: Parameters[0] = { reportId, brief }; if (recipe) a.forceRecipe = recipe; return runStage3Select(a); }); - await withStage(runId, 4, 'pass2', () => runStage4Pass2Enrich({ reportId, brief })); - await withStage(runId, 5, 'validate', () => runStage5Manifest({ reportId, brief, dropFailing })); - await withStage(runId, 6, 'analyse', () => runStage6Analyse(reportId)); - await withStage(runId, 7, 'insights', () => runStage7AtomicInsights(reportId)); - await withStage(runId, 8, 'trends', () => runStage8Trends(reportId, brief)); + await maybeRun(4, 'pass2', () => runStage4Pass2Enrich({ reportId, brief })); + // Validate is special: re-runs every time even if a sentinel exists, + // because --drop-failing changes selected_video_ids and the gate must + // re-evaluate on the (possibly updated) selection. + const validateResult = await withStage(runId, 5, 'validate', () => runStage5Manifest({ reportId, brief, dropFailing })); + writeStageDone(reportId, 5, { command, at: new Date().toISOString(), passed: validateResult.passed, coverage_pct: validateResult.manifest.summary.coverage_pct }); + if (!validateResult.passed) { + // Fail loudly — the manifest panel + retry button in the UI will surface this. + const missingCount = validateResult.manifest.videos.filter((v) => !v.all_ok).length; + throw new Error(`Manifest gate failed at ${validateResult.manifest.summary.coverage_pct}% coverage; ${missingCount} videos missing assets. Retry to drop-failing-backfill, or relax MANIFEST_FRAMES_OPTIONAL=true if frames are out of scope.`); + } + await maybeRun(6, 'analyse', () => runStage6Analyse(reportId)); + await maybeRun(7, 'insights', () => runStage7AtomicInsights(reportId)); + await maybeRun(8, 'trends', () => runStage8Trends(reportId, brief)); // 8c is optional — if it fails the rest of the pipeline still ships. - try { await runStage8cLenses(reportId); } - catch (e) { console.warn(`[stage 8c] lenses failed (continuing): ${(e as Error).message}`); } - await withStage(runId, 9, 'qa', () => runStage9Qa(reportId)); + if (!isStageDone(reportId, 80, force)) { + try { + await runStage8cLenses(reportId); + writeStageDone(reportId, 80, { command, at: new Date().toISOString() }); + } catch (e) { console.warn(`[stage 8c] lenses failed (continuing): ${(e as Error).message}`); } + } else { + console.log('[stage 8c] cached'); + } + await maybeRun(9, 'qa', () => runStage9Qa(reportId)); // V3 §9: Stage 10 needs CM + Strategist sign-offs from two different // humans. The operator-app's Build button (POST /api/reports/:id/build) diff --git a/v2/server/routes/reports.ts b/v2/server/routes/reports.ts index df91d82..9295c4b 100644 --- a/v2/server/routes/reports.ts +++ b/v2/server/routes/reports.ts @@ -141,6 +141,55 @@ export async function handleRunPipeline(req: IncomingMessage, res: ServerRespons sendJSON(res, 201, { report: publicReport(report) }); } +// ── Retry: re-run `all` against an existing report row ───────────────── +// +// The pipeline is idempotent via .state/stage{N}.done sentinels — completed +// stages are skipped, so retry resumes from the failed step. Pass --force +// (`?force=1`) to rerun every stage from scratch. +import { sql } from '../db/client.js'; + +export async function handleRetryReport(req: IncomingMessage, res: ServerResponse, id: string): Promise { + const session = requireAuth(req, res); + if (!session) return; + const report = await getReport(id); + if (!report) { sendJSON(res, 404, { error: 'Report not found' }); return; } + if (!await requireTeamRole(res, session, report.team_id, 'editor')) return; + if (runningReportId) { + sendJSON(res, 409, { error: `Another pipeline is already running (report ${runningReportId}).` }); + return; + } + + let body: { force?: boolean } = {}; + try { body = await parseJSONBody<{ force?: boolean }>(req); } catch { /* tolerate empty body */ } + + // Reset the row so the UI re-polls and the cost ticker starts fresh on this retry. + await sql` + UPDATE reports SET status = 'pending', error_message = NULL, finished_at = NULL + WHERE id = ${report.id} + `; + runningReportId = report.id; + + const cwd = resolve(process.cwd()); + const tsxBin = existsSync(resolve(cwd, 'node_modules/.bin/tsx')) + ? resolve(cwd, 'node_modules/.bin/tsx') + : 'npx'; + const baseArgs = ['pipeline/cli.ts', 'all', '--report', report.brief_id, '--run-id', report.id, '--drop-failing']; + if (body.force) baseArgs.push('--force'); + const args = tsxBin === 'npx' ? ['tsx', ...baseArgs] : baseArgs; + const child = spawn(tsxBin, args, { cwd, detached: true, stdio: ['ignore', 'pipe', 'pipe'], env: { ...process.env } }); + child.stdout?.on('data', (c) => process.stdout.write(`[retry ${report.id.slice(0, 8)}] ${c}`)); + child.stderr?.on('data', (c) => process.stderr.write(`[retry ${report.id.slice(0, 8)}] ${c}`)); + child.on('exit', async (code) => { + if (code !== 0) { + await finishReport(report.id, 'failed', `Pipeline exited with code ${code}`).catch(() => {}); + } + if (runningReportId === report.id) runningReportId = null; + }); + child.unref(); + + sendJSON(res, 202, { ok: true, message: 'Retry started.' }); +} + // ── Dashboard static serve ─────────────────────────────────────────────── const STATIC_MIME: Record = { '.html': 'text/html; charset=utf-8',