4-stage agent pipeline (Data Mapper, Input Collector, Strategy Writer, Report Builder) with React wizard, Postgres persistence, HITL + YOLO modes, Apify embed hydration, clone-for-next-month, and slide-deck HTML output. Proven end-to-end against real Cif Meltwater data (Instagram + TikTok) with Anthropic Opus 4.7 (strategy) and Sonnet 4.6 (report builder).
167 lines
5.5 KiB
Python
167 lines
5.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Standalone end-to-end harness: run the full pipeline against the two real
|
|
Cif Excels and write the final HTML slide deck next to this script.
|
|
|
|
Run from inside the api container:
|
|
docker compose run --rm api python scripts/run_e2e_cif.py
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import sys
|
|
from datetime import date
|
|
from pathlib import Path
|
|
from uuid import uuid4
|
|
|
|
|
|
async def _main() -> int:
|
|
from app.agents.base import StageInput
|
|
from app.agents.data_mapper import DataMapperStage
|
|
from app.agents.input_collector import InputCollectorStage
|
|
from app.agents.strategy_writer import StrategyWriterStage
|
|
from app.agents.report_builder import ReportBuilderStage
|
|
|
|
ig = Path("/app/Build-Information/Sample Data Input/Cif MW YTD Instagram 2026.xlsx")
|
|
tk = Path("/app/Build-Information/Sample Data Input/Cif MW YTD TikTok 2026.xlsx")
|
|
if not ig.exists() or not tk.exists():
|
|
print(f"missing fixtures: {ig} / {tk}", file=sys.stderr)
|
|
return 2
|
|
|
|
brand = {
|
|
"id": str(uuid4()),
|
|
"name": "Cif",
|
|
"slug": "cif",
|
|
"primary_colour": "#00A19A",
|
|
"secondary_colour": "#111827",
|
|
"accent_colour": "#67E8F9",
|
|
"logo_data_uri": None,
|
|
}
|
|
report_id = uuid4()
|
|
|
|
print("Stage 1: Data Mapper…")
|
|
benchmarks = {
|
|
"instagram": {"posts": 10, "views_growth": 40, "er": 2.0, "vtr": 10.0},
|
|
"tiktok": {"posts": 10, "views_growth": 40, "er": 2.0, "vtr": 10.0},
|
|
}
|
|
s1 = await DataMapperStage().run(
|
|
StageInput(
|
|
report_id=report_id,
|
|
stage_no=1,
|
|
attempt=1,
|
|
upstream_artefacts={},
|
|
user_inputs={
|
|
"selected_platforms": ["instagram", "tiktok"],
|
|
"reporting_month": date(2026, 4, 1),
|
|
"prev_month": date(2026, 3, 1),
|
|
"benchmarks": benchmarks,
|
|
"uploads": [
|
|
{
|
|
"kind": "meltwater_export",
|
|
"platform": "Instagram",
|
|
"path": str(ig),
|
|
"original_filename": ig.name,
|
|
},
|
|
{
|
|
"kind": "meltwater_export",
|
|
"platform": "TikTok",
|
|
"path": str(tk),
|
|
"original_filename": tk.name,
|
|
},
|
|
],
|
|
},
|
|
brand=brand,
|
|
upfront_context_md="",
|
|
)
|
|
)
|
|
print(f" ✓ {len(s1.inline_json.get('top_posts', {}))} top-post groups")
|
|
|
|
print("Stage 2: Input Collector…")
|
|
embeds = {}
|
|
insights = {}
|
|
for key_prefix in ("Cif_Instagram", "Cif_TikTok"):
|
|
for i in range(1, 4):
|
|
key = f"{key_prefix}_Post{i}"
|
|
embeds[key] = (
|
|
"<blockquote class='tiktok-embed'></blockquote><script async src='https://www.tiktok.com/embed.js'></script>"
|
|
if "TikTok" in key
|
|
else "<blockquote class='instagram-media'></blockquote><script async src='//www.instagram.com/embed.js'></script>"
|
|
)
|
|
insights[key] = "Strong hook, clear benefit, trending audio."
|
|
s2 = await InputCollectorStage().run(
|
|
StageInput(
|
|
report_id=report_id,
|
|
stage_no=2,
|
|
attempt=1,
|
|
upstream_artefacts={"stage_1_output": s1.inline_json},
|
|
user_inputs={
|
|
"logo_data_uri": None,
|
|
"embeds": embeds,
|
|
"insights": insights,
|
|
},
|
|
brand=brand,
|
|
upfront_context_md="",
|
|
)
|
|
)
|
|
print(" ✓")
|
|
|
|
if not os.environ.get("ANTHROPIC_API_KEY"):
|
|
print("ANTHROPIC_API_KEY not set — skipping stages 3 & 4.")
|
|
return 0
|
|
|
|
print("Stage 3: Strategy Writer…")
|
|
s3 = await StrategyWriterStage().run(
|
|
StageInput(
|
|
report_id=report_id,
|
|
stage_no=3,
|
|
attempt=1,
|
|
upstream_artefacts={
|
|
"stage_1_output": s1.inline_json,
|
|
"stage_2_output": s2.inline_json,
|
|
},
|
|
user_inputs={"benchmarks": benchmarks},
|
|
brand=brand,
|
|
upfront_context_md="Cif — affordable, trusted, satisfying clean.",
|
|
)
|
|
)
|
|
print(f" ✓ {len((s3.inline_text or '').split())} words")
|
|
|
|
print("Stage 4: Report Builder…")
|
|
s4 = await ReportBuilderStage().run(
|
|
StageInput(
|
|
report_id=report_id,
|
|
stage_no=4,
|
|
attempt=1,
|
|
upstream_artefacts={
|
|
"stage_1_output": s1.inline_json,
|
|
"stage_2_output": s2.inline_json,
|
|
"stage_3_output": s3.inline_json or {"text": s3.inline_text},
|
|
"stage_3_markdown": s3.inline_text or "",
|
|
},
|
|
user_inputs={"benchmarks": benchmarks},
|
|
brand=brand,
|
|
upfront_context_md="",
|
|
)
|
|
)
|
|
|
|
html = s4.inline_text or ""
|
|
if not html and s4.storage_key:
|
|
from app.config import get_settings
|
|
from app.services.storage import get_storage
|
|
|
|
settings = get_settings()
|
|
html = (
|
|
get_storage()
|
|
.get_bytes(settings.minio_bucket_reports, s4.storage_key)
|
|
.decode("utf-8", errors="ignore")
|
|
)
|
|
|
|
out_path = Path("/app/rendered") / f"cif-e2e-{report_id}.html"
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_text(html, encoding="utf-8")
|
|
print(f" ✓ wrote {out_path} ({len(html):,} chars)")
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(asyncio.run(_main()))
|