social-reporting-tool/v2/pipeline/cli.ts
DJP 98bcae6f31 Build before QA: dashboard ready when sign-off panel appears
Two complaints, both right:
1. QA sign-off was asking humans to OK trends + paid/organic without
   anything visual to review — the dashboard wasn't built yet.
2. The QA panel just said "yes/no" and gave reviewers no specific
   guidance on what they should be looking at.

Reordered the pipeline: after Stage 9 QA gates pass, immediately run
MoM compare (if prior_report_id) + Stage 10 Build, THEN park at
status=qa with the dashboard already rendered. Stage 10 has no Claude
cost (just file-copy + Vite build) so running it before sign-off costs
nothing meaningful, and the rationale "save spend if QA fails" never
actually applied — paid review work is already done by Stage 8.

QA panel rewrite:
- Big "Open dashboard ↗" button at the top
- Inline "What you're signing off" checklist split CM vs Strategist
  (paid/organic + comments themes + sentiment risk on one side; trend
  names + relevance + lens artefacts on the other)
- Removed the now-redundant Build button
- Second sign-off auto-completes the report; no extra click needed
- Status text walks them through the next action ("Open the dashboard,
  walk through the views you're responsible for, then sign.")

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-04-30 11:44:42 -04:00

428 lines
22 KiB
TypeScript

#!/usr/bin/env tsx
// V2 pipeline CLI. Mirrors brief §11 verbatim:
// pnpm pipe brief|seed|scrape1|select|scrape2|validate|analyse|insights|trends|qa|build|deploy --report <id>
// Each subcommand loads inputs from DB + disk, runs its stage, writes outputs to
// briefs/{report_id}/. Idempotent + resumable via .state/stage{N}.done sentinels.
import { writeFileSync, existsSync } from 'node:fs';
import { sql } from '../server/db/client.js';
import { getBriefById } from '../server/db/briefs.js';
import { BRIEF_INPUT } from '../server/schemas/brief.js';
import { runStage1Seeds } from './stages/stage_1_seeds.js';
import { runStage2Pass1Scrape } from './stages/stage_2_pass1_scrape.js';
import { runStage3Select } from './stages/stage_3_select.js';
import { runStage4Pass2Enrich } from './stages/stage_4_pass2_enrich.js';
import { backfillCoversFromRawDumps } from './lib/backfill_covers.js';
import { runStage5Manifest } from './stages/stage_5_manifest.js';
import { runStage6Analyse } from './stages/stage_6_analyse.js';
import { runStage7AtomicInsights } from './stages/stage_7_atomic_insights.js';
import { runStage8Trends } from './stages/stage_8_trends.js';
import { runStage8cLenses } from './stages/stage_8c_lenses.js';
import { runStage9Qa } from './stages/stage_9_qa.js';
import { runStage10Build } from './stages/stage_10_build.js';
import { runMomCompare } from './lib/mom_compare.js';
import { PATHS, ensureDir } from './lib/paths.js';
import { onClaudeUsage } from './lib/claude.js';
import { onApifyCost } from './lib/apify_client.js';
import { onDriftEvent } from './lib/drift_log.js';
import { configureLiveActivity, setStage as setLiveStage, writeActivity, clearActivity } from './lib/live_activity.js';
import {
finishReport, logCostEvent, updateReportStatus, type ReportStatus,
} from '../server/db/reports.js';
import type { RecipeId } from './lib/recipes.js';
interface Args {
command: string;
reportId: string | null;
runId: string | null;
flags: Record<string, string | boolean>;
}
// Stage context — set by withStage(), read by the cost-event DB writer.
let currentStage: { num: number; name: ReportStatus } = { num: 0, name: 'pending' };
function parseArgs(argv: string[]): Args {
const [, , command = '', ...rest] = argv;
const flags: Record<string, string | boolean> = {};
for (let i = 0; i < rest.length; i++) {
const tok = rest[i]!;
if (!tok.startsWith('--')) continue;
const key = tok.slice(2);
const next = rest[i + 1];
if (next && !next.startsWith('--')) { flags[key] = next; i++; }
else flags[key] = true;
}
const reportId = (typeof flags.report === 'string' ? flags.report : null);
const runId = (typeof flags['run-id'] === 'string' ? flags['run-id'] : null);
return { command, reportId, runId, flags };
}
function usage(): never {
console.error(`Usage: npm run pipe <command> --report <brief-id> [flags]
Commands:
brief Dump the brief from DB to brief.json on disk
backfill-covers Re-derive cover URLs from pass1/raw dumps when normaliseRaw missed them
seed Stage 1 — expand the brief into hashtag tiers + search terms
scrape1 Stage 2 — broad Apify pull with engagement floor
select Stage 3 — recipe-led selection (--recipe A|B|C|D, --custom "...")
scrape2 Stage 4 — id-keyed deep enrichment per video
validate Stage 5 — manifest gate (--drop-failing for auto-backfill)
analyse Stage 6 — per-video Claude analysis
insights Stage 7 — atomic insight extraction
trends Stage 8 — trend synthesis (§4.5 rubrics)
qa Stage 9 — paid/organic + coverage gates
build Stage 10 — dataset_v2.json + claude.ai HTML bundle (--target dashboard|html|compare)
all Drive every stage end-to-end (used by the operator-app run trigger)
Flags:
--report <brief-id> UUID of the brief in the briefs table
--run-id <uuid> Reports row to track this run against (writes cost_events + status)
--force Invalidate stage sentinels and rerun
--drop-failing Stage 5: auto-backfill failures from next-best recipe candidates
`);
process.exit(1);
}
async function loadBrief(reportId: string): Promise<{ briefRow: NonNullable<Awaited<ReturnType<typeof getBriefById>>>; brief: ReturnType<typeof BRIEF_INPUT.parse> }> {
const briefRow = await getBriefById(reportId);
if (!briefRow) throw new Error(`Brief not found: ${reportId}`);
const parsed = BRIEF_INPUT.parse(briefRow.brief_yaml);
return { briefRow, brief: parsed };
}
async function loadBriefAndRow(reportId: string): ReturnType<typeof loadBrief> {
return loadBrief(reportId);
}
function logCost(runId: string | null): void {
let claudeTotal = 0;
let apifyTotal = 0;
onClaudeUsage((u, label) => {
claudeTotal += u.cost_usd;
console.log(`[claude] ${label}: ${u.input_tokens} in / ${u.output_tokens} out / $${u.cost_usd.toFixed(4)} (running: $${claudeTotal.toFixed(4)})`);
// Refresh the heartbeat so the run-page banner stays "alive" during long
// Claude stages (per-video analysis, atomic insights, trend synthesis).
// Without this the banner goes stale after 90s and looks like the pipeline
// wedged when really it's just churning through Claude calls.
writeActivity(`claude · ${label}`, 'completed', { runningCostUsd: apifyTotal + claudeTotal });
if (runId) {
logCostEvent({
report_id: runId,
stage: currentStage.num,
stage_name: currentStage.name,
source: 'claude',
label,
model: u.model,
input_tokens: u.input_tokens,
output_tokens: u.output_tokens,
cost_usd: u.cost_usd,
}).catch((e) => console.error('[cost-event] db write failed:', (e as Error).message));
}
});
onApifyCost((e) => {
apifyTotal += e.cost_usd;
console.log(`[apify] ${e.label}: $${e.cost_usd.toFixed(4)} (running: $${apifyTotal.toFixed(4)})`);
writeActivity(`apify · ${e.label}`, 'completed', { runningCostUsd: apifyTotal + claudeTotal });
if (runId) {
logCostEvent({
report_id: runId,
stage: currentStage.num,
stage_name: currentStage.name,
source: 'apify',
label: e.label,
model: e.actor_id,
cost_usd: e.cost_usd,
metadata: { run_id: e.run_id, dataset_id: e.dataset_id },
}).catch((err) => console.error('[cost-event] db write failed:', (err as Error).message));
}
});
onDriftEvent((d) => {
console.warn(`[drift] ${d.actor} ${d.reason}: id=${d.extracted_id ?? '?'}`);
});
}
async function withStage<T>(
runId: string | null,
stage: number,
name: ReportStatus,
fn: () => Promise<T>,
): Promise<T> {
currentStage = { num: stage, name };
setLiveStage(stage, name);
writeActivity(name, 'starting');
if (runId) {
await updateReportStatus(runId, name, stage).catch((e) =>
console.error('[status] db write failed:', (e as Error).message),
);
}
return fn();
}
function writeStageDone(reportId: string, n: number, payload: Record<string, unknown>): void {
ensureDir(PATHS.stateDir(reportId));
writeFileSync(PATHS.stageDone(reportId, n), JSON.stringify(payload, null, 2));
}
function isStageDone(reportId: string, n: number, force = false): boolean {
if (force) return false;
return existsSync(PATHS.stageDone(reportId, n));
}
async function main(): Promise<void> {
const { command, reportId, runId, flags } = parseArgs(process.argv);
if (!command) usage();
if (!reportId) { console.error('Missing --report <brief-id>'); usage(); }
const force = !!flags.force;
configureLiveActivity(reportId);
logCost(runId);
switch (command) {
case 'seed': {
if (isStageDone(reportId, 1, force)) {
console.log(`[stage 1] already done; pass --force to rerun. Output: ${PATHS.seedsJson(reportId)}`);
break;
}
const { brief } = await loadBrief(reportId);
const result = await runStage1Seeds({ reportId, brief });
writeStageDone(reportId, 1, { command, at: new Date().toISOString(), outputs: result.outputs });
console.log(`[stage 1] OK — seeds → ${result.outputs.seeds}`);
break;
}
case 'brief': {
const { brief } = await loadBrief(reportId);
const path = PATHS.briefJson(reportId);
writeFileSync(path, JSON.stringify(brief, null, 2));
console.log(`[brief] dumped → ${path}`);
break;
}
case 'backfill-covers': {
const result = backfillCoversFromRawDumps(reportId);
console.log(`[backfill] patched ${result.patched} of ${result.total} pass1 records with cover/mp4 URLs`);
break;
}
case 'scrape1': {
if (isStageDone(reportId, 2, force)) {
console.log(`[stage 2] already done; pass --force to rerun. Output: ${PATHS.pass1Videos(reportId)}`);
break;
}
const { brief } = await loadBrief(reportId);
const result = await runStage2Pass1Scrape({ reportId, brief });
writeStageDone(reportId, 2, { command, at: new Date().toISOString(), outputs: result.outputs, total_videos: result.total_videos, total_cost_usd: result.total_cost_usd });
console.log(`[stage 2] OK — ${result.total_videos} videos, $${result.total_cost_usd.toFixed(2)}`);
break;
}
case 'select': {
if (isStageDone(reportId, 3, force)) {
console.log(`[stage 3] already done; pass --force to rerun. Output: ${PATHS.selectedIds(reportId)}`);
break;
}
const { brief } = await loadBrief(reportId);
const recipe = (typeof flags.recipe === 'string' ? flags.recipe.toUpperCase() : undefined) as RecipeId | undefined;
const customFilter = typeof flags.custom === 'string' ? flags.custom : undefined;
const argsObj: Parameters<typeof runStage3Select>[0] = { reportId, brief };
if (recipe) argsObj.forceRecipe = recipe;
if (customFilter) argsObj.customFilter = customFilter;
const result = await runStage3Select(argsObj);
writeStageDone(reportId, 3, { command, at: new Date().toISOString(), outputs: result.outputs, rules: result.rules });
console.log(`[stage 3] OK — ${result.selected.length} selected, recipe=${result.rules.recipe_id}`);
break;
}
case 'scrape2': {
if (isStageDone(reportId, 4, force)) {
console.log(`[stage 4] already done; pass --force to rerun. Output: ${PATHS.enriched(reportId)}`);
break;
}
const { brief } = await loadBrief(reportId);
const result = await runStage4Pass2Enrich({ reportId, brief });
writeStageDone(reportId, 4, { command, at: new Date().toISOString(), outputs: result.outputs, total_attempted: result.total_attempted, total_bundled: result.total_bundled, total_dropped: result.total_dropped, drift_events: result.drift_events });
console.log(`[stage 4] OK — bundled=${result.total_bundled} dropped=${result.total_dropped} drift=${result.drift_events}`);
break;
}
case 'validate': {
const dropFailing = !!flags['drop-failing'];
const { brief } = await loadBrief(reportId);
const result = await runStage5Manifest({ reportId, brief, dropFailing });
writeStageDone(reportId, 5, { command, at: new Date().toISOString(), passed: result.passed, coverage_pct: result.manifest.summary.coverage_pct, backfill_rounds: result.backfill_rounds });
if (!result.passed) {
console.error(`[stage 5] FAIL — coverage ${result.manifest.summary.coverage_pct}% (${result.manifest.summary.all_ok}/${result.manifest.selected_count}). Missing per video printed in manifest.json.`);
process.exit(3);
}
console.log(`[stage 5] PASS — coverage 100%`);
break;
}
case 'analyse': {
if (isStageDone(reportId, 6, force)) {
console.log(`[stage 6] already done; pass --force to rerun.`);
break;
}
const result = await runStage6Analyse(reportId);
writeStageDone(reportId, 6, { command, at: new Date().toISOString(), total: result.total, cached: result.cached, fresh: result.fresh });
console.log(`[stage 6] OK — ${result.total} analyses (${result.cached} cached, ${result.fresh} fresh)`);
break;
}
case 'insights': {
if (isStageDone(reportId, 7, force)) {
console.log(`[stage 7] already done; pass --force to rerun.`);
break;
}
const result = await runStage7AtomicInsights(reportId);
writeStageDone(reportId, 7, { command, at: new Date().toISOString(), total_insights: result.total_insights, by_type: result.by_type });
console.log(`[stage 7] OK — ${result.total_insights} atomic insights (hook=${result.by_type.hook} visual=${result.by_type.visual} audio=${result.by_type.audio} narrative=${result.by_type.narrative})`);
break;
}
case 'trends': {
if (isStageDone(reportId, 8, force)) {
console.log(`[stage 8] already done; pass --force to rerun.`);
break;
}
const { brief } = await loadBrief(reportId);
const result = await runStage8Trends(reportId, brief);
writeStageDone(reportId, 8, { command, at: new Date().toISOString(), total_trends: result.total_trends, core_trends: result.core_trends, peripheral_trends: result.peripheral_trends, dropped_trends: result.dropped_trends });
console.log(`[stage 8] OK — ${result.total_trends} trends across ${result.categories.length} categories`);
break;
}
case 'lenses': {
const result = await runStage8cLenses(reportId);
console.log(`[stage 8c] OK — hooks=${result.hooks} visual=${result.visual} audio=${result.audio} sentiment=${result.sentiment}`);
break;
}
case 'qa': {
if (isStageDone(reportId, 9, force)) {
console.log(`[stage 9] already done; pass --force to rerun.`);
break;
}
const result = await runStage9Qa(reportId);
writeStageDone(reportId, 9, { command, at: new Date().toISOString(), paid_creators: result.paid_creators, mixed_creators: result.mixed_creators, coverage_pct: result.coverage_pct });
console.log(`[stage 9] OK — paid=${result.paid_creators} mixed=${result.mixed_creators} coverage=${result.coverage_pct}%`);
break;
}
case 'build': {
const target = typeof flags.target === 'string' ? flags.target : 'all';
const { brief, briefRow } = await loadBriefAndRow(reportId);
// 8c is the gate from atomic insights → lens views; build needs them present
// to populate dataset_v2.json.lenses. If 8c was skipped during `all`, run it now.
try { await runStage8cLenses(reportId); }
catch (e) { console.warn(`[stage 8c] skipping or failed: ${(e as Error).message}`); }
if (target === 'all' || target === 'compare') {
if (briefRow.prior_report_id) {
console.log(`[mom] running compare against prior_report_id=${briefRow.prior_report_id}`);
await runMomCompare(reportId, briefRow.prior_report_id);
} else if (target === 'compare') {
console.error('[mom] target=compare but brief has no prior_report_id; refusing per §16');
process.exit(4);
}
}
if (target === 'all' || target === 'dashboard' || target === 'html') {
const result = await withStage(runId, 10, 'build', () => runStage10Build(reportId, brief));
writeStageDone(reportId, 10, { command, at: new Date().toISOString(), trend_count: result.trend_count, html_size_bytes: result.html_size_bytes });
console.log(`[stage 10] OK — dataset=${(result.dataset_size_bytes / 1024).toFixed(1)} KB, html=${(result.html_size_bytes / 1024).toFixed(1)} KB, trends=${result.trend_count}`);
if (runId) await finishReport(runId, 'completed').catch(() => {});
}
break;
}
case 'all': {
// Drive every stage end-to-end. validate runs with --drop-failing so the
// manifest gate isn't a hard stop on first try; auto-backfill kicks in.
// Each stage is idempotent: completed stages (sentinel present) are
// skipped unless --force. That makes "retry pipeline" automatically
// resume from where it failed.
const { brief, briefRow } = await loadBriefAndRow(reportId);
const dropFailing = flags['drop-failing'] !== false;
const recipe = (typeof flags.recipe === 'string' ? flags.recipe.toUpperCase() : undefined) as RecipeId | undefined;
const rid = reportId; // close over a narrowed non-null reference for the helpers
async function maybeRun<T>(
stage: number, name: ReportStatus, fn: () => Promise<T>, sentinelExtras?: Record<string, unknown>,
): Promise<T | undefined> {
if (isStageDone(rid, stage, force)) {
console.log(`[stage ${stage}] cached (use --force to re-run)`);
if (runId) await updateReportStatus(runId, name, stage).catch(() => {});
return undefined;
}
const result = await withStage(runId, stage, name, fn);
writeStageDone(rid, stage, { command, at: new Date().toISOString(), ...(sentinelExtras ?? {}) });
return result;
}
await maybeRun(1, 'seeds', () => runStage1Seeds({ reportId, brief }));
await maybeRun(2, 'pass1', () => runStage2Pass1Scrape({ reportId, brief }));
await maybeRun(3, 'select', () => {
const a: Parameters<typeof runStage3Select>[0] = { reportId, brief };
if (recipe) a.forceRecipe = recipe;
return runStage3Select(a);
});
await maybeRun(4, 'pass2', () => runStage4Pass2Enrich({ reportId, brief }));
// Validate is special: re-runs every time even if a sentinel exists,
// because --drop-failing changes selected_video_ids and the gate must
// re-evaluate on the (possibly updated) selection.
const validateResult = await withStage(runId, 5, 'validate', () => runStage5Manifest({ reportId, brief, dropFailing }));
writeStageDone(reportId, 5, { command, at: new Date().toISOString(), passed: validateResult.passed, coverage_pct: validateResult.manifest.summary.coverage_pct });
if (!validateResult.passed) {
// Fail loudly — the manifest panel + retry button in the UI will surface this.
const passingCount = validateResult.manifest.videos.filter((v) => v.all_ok).length;
const missingCount = validateResult.manifest.videos.filter((v) => !v.all_ok).length;
if (passingCount < 5) {
throw new Error(`Stage 5: only ${passingCount} videos have complete assets — trend synthesis needs ≥5. The seed pool isn't producing enough analysable content. Lower min_likes/min_plays on the brief, widen seed hashtags, or raise budget_usd, then Force re-run.`);
}
throw new Error(`Manifest gate failed at ${validateResult.manifest.summary.coverage_pct}% coverage; ${missingCount} videos missing assets. Retry to drop-failing-backfill. Frames + transcripts are advisory by default — if comments dominate the missing list, raise min_plays (e.g. 5000) so low-engagement videos with no comments stop being selected.`);
}
await maybeRun(6, 'analyse', () => runStage6Analyse(reportId));
await maybeRun(7, 'insights', () => runStage7AtomicInsights(reportId));
await maybeRun(8, 'trends', () => runStage8Trends(reportId, brief));
// 8c is optional — if it fails the rest of the pipeline still ships.
if (!isStageDone(reportId, 80, force)) {
try {
await runStage8cLenses(reportId);
writeStageDone(reportId, 80, { command, at: new Date().toISOString() });
} catch (e) { console.warn(`[stage 8c] lenses failed (continuing): ${(e as Error).message}`); }
} else {
console.log('[stage 8c] cached');
}
await maybeRun(9, 'qa', () => runStage9Qa(reportId));
// Build the dashboard BEFORE the human QA gate. The dashboard is the
// artefact CM and Strategist are reviewing — asking for their sign-off
// without showing them the rendered output is asking them to evaluate
// a JSON file. Stage 10 has no Claude cost (it's just file copying +
// Vite build), so running it here is essentially free; we hold at
// status=qa AFTER the dashboard exists, and the QA panel can link
// straight to /api/reports/:id/dashboard/.
if (briefRow.prior_report_id) {
console.log(`[mom] running compare against prior_report_id=${briefRow.prior_report_id}`);
try { await runMomCompare(reportId, briefRow.prior_report_id); }
catch (e) { console.warn(`[mom] compare failed (continuing without it): ${(e as Error).message}`); }
}
const buildResult = await withStage(runId, 10, 'build', () => runStage10Build(reportId, brief));
writeStageDone(reportId, 10, { command, at: new Date().toISOString(), trend_count: buildResult.trend_count, html_size_bytes: buildResult.html_size_bytes });
console.log(`[stage 10] OK — dashboard built; trends=${buildResult.trend_count}, html=${(buildResult.html_size_bytes / 1024).toFixed(1)} KB`);
// Now park at status=qa with the dashboard ready for review. The two
// sign-off endpoints (/api/reports/:id/qa/sign) auto-complete the row
// when both signatures land — no second pipeline spawn needed.
if (runId) await updateReportStatus(runId, 'qa', 10).catch(() => {});
console.log(`[pipe] stages 1-10 complete; dashboard ready at /api/reports/${runId}/dashboard/. Awaiting CM + Strategist sign-off.`);
break;
}
default:
console.error(`Unknown command: ${command}`);
usage();
}
await sql.end({ timeout: 1 });
}
main().then(() => clearActivity()).catch(async (err) => {
const msg = err instanceof Error ? err.message : String(err);
console.error('[pipe] error:', msg);
// If we're driving a tracked run, mark it failed so the UI doesn't poll forever.
const runArg = process.argv.find((a, i, arr) => arr[i - 1] === '--run-id');
if (runArg) {
await finishReport(runArg, 'failed', msg).catch(() => {});
}
clearActivity();
await sql.end({ timeout: 1 }).catch(() => {});
process.exit(1);
});