From 539c5eaaeecafc410ee370e84a094087743ad0ef Mon Sep 17 00:00:00 2001 From: Vadym Samoilenko Date: Fri, 24 Apr 2026 18:49:59 +0100 Subject: [PATCH] Fix backfill script: use focus_group_messages collection + correct field names --- backend/scripts/backfill_usage.py | 175 +++++++++++++++++------------- 1 file changed, 97 insertions(+), 78 deletions(-) diff --git a/backend/scripts/backfill_usage.py b/backend/scripts/backfill_usage.py index 9598096d..e83a8b73 100644 --- a/backend/scripts/backfill_usage.py +++ b/backend/scripts/backfill_usage.py @@ -31,7 +31,6 @@ def _estimate_tokens(text: str, model: str) -> dict: if not text: return {"prompt": 0, "completion": 0} - # Try tiktoken for OpenAI models, fall back to char-based estimate if model and ("gpt" in model.lower() or "openai" in model.lower()): try: import tiktoken @@ -47,10 +46,9 @@ def _estimate_tokens(text: str, model: str) -> dict: def _estimate_cost(prompt_tokens: int, completion_tokens: int, model: str) -> float: - """Very rough cost estimate in USD (used only for backfill estimates).""" - # Approximate per-million-token prices for common models + """Rough cost estimate in USD.""" rate_per_m = { - "gemini": (0.35, 1.05), # input, output USD/1M tokens + "gemini": (0.35, 1.05), "gpt-4": (30.00, 60.00), "gpt-3": (0.50, 1.50), } @@ -86,73 +84,87 @@ def connect(): # ───────────────────────────────────────────────────────────────────────────── # Backfill focus-group messages +# Messages are in the separate `focus_group_messages` collection (NOT embedded). +# Fields: focus_group_id (str), text, type, senderId, created_at # ───────────────────────────────────────────────────────────────────────────── def backfill_messages(db, dry_run: bool) -> int: - """Walk all focus groups and create estimated usage events for messages.""" created = 0 - focus_groups = list(db.focus_groups.find({})) - print(f"\n[messages] Found {len(focus_groups)} focus groups to process") - for fg in focus_groups: - fg_id = str(fg["_id"]) - fg_model = fg.get("llm_model") or "gemini-3.1-pro-preview" - messages = fg.get("messages", []) + # Build a lookup: focus_group_id -> {llm_model, user_id} + fg_meta = {} + for fg in db.focus_groups.find({}, {"llm_model": 1, "created_by": 1}): + fg_meta[str(fg["_id"])] = { + "model": fg.get("llm_model") or "gemini-3.1-pro-preview", + "user_id": str(fg.get("created_by") or ""), + } - for msg in messages: - msg_id = str(msg.get("id") or msg.get("_id") or "") - if not msg_id: - continue + total_messages = db.focus_group_messages.count_documents({}) + print(f"\n[messages] Found {total_messages} messages across all focus groups") - # Idempotent: skip if an estimated event already exists for this message - existing = db.usage_events.find_one({ - "source_message_id": msg_id, - "is_estimated": True, - }) - if existing: - continue + for msg in db.focus_group_messages.find({}): + msg_id = str(msg["_id"]) + fg_id = str(msg.get("focus_group_id") or "") - text = msg.get("content") or "" - tokens = _estimate_tokens(text, fg_model) - # For responses we add a rough output token estimate - tokens["completion"] = max(1, int(len(text) / 5.0)) - cost = _estimate_cost(tokens["prompt"], tokens["completion"], fg_model) + # Skip non-AI messages (only persona responses and moderator questions cost money) + msg_type = msg.get("type", "") + if msg_type not in ("response", "question", "moderator", "ai", ""): + continue - ts = msg.get("timestamp") - if isinstance(ts, str): - try: - ts = datetime.fromisoformat(ts) - except Exception: - ts = None - ts = ts or fg.get("date") or datetime.now(timezone.utc) + # Idempotent check + if db.usage_events.find_one({"source_message_id": msg_id, "is_estimated": True}): + continue - event = { - "ts": ts, - "provider": "gemini" if "gemini" in fg_model.lower() else "openai", - "model": fg_model, - "feature": "autonomous_conversation", - "user_id": str(fg.get("user_id") or ""), - "focus_group_id": fg_id, - "persona_id": str(msg.get("personaId") or msg.get("persona_id") or ""), - "prompt_tokens": tokens["prompt"], - "completion_tokens": tokens["completion"], - "cached_tokens": 0, - "reasoning_tokens": 0, - "cost_usd": { - "input": round(cost * 0.4, 8), - "output": round(cost * 0.6, 8), - "total": cost, - }, - "duration_ms": 0, - "retry_count": 0, - "status": "estimated", - "is_estimated": True, - "source_message_id": msg_id, - } + meta = fg_meta.get(fg_id, {"model": "gemini-3.1-pro-preview", "user_id": ""}) + fg_model = meta["model"] + user_id = meta["user_id"] - if not dry_run: - db.usage_events.insert_one(event) - created += 1 + text = msg.get("text") or msg.get("content") or "" + tokens = _estimate_tokens(text, fg_model) + tokens["completion"] = max(1, int(len(text) / 5.0)) + cost = _estimate_cost(tokens["prompt"], tokens["completion"], fg_model) + + ts = msg.get("created_at") or msg.get("timestamp") + if isinstance(ts, str): + try: + ts = datetime.fromisoformat(ts) + except Exception: + ts = None + ts = ts or datetime.now(timezone.utc) + + feature = "moderator" if msg_type in ("question", "moderator") else "persona_response" + + event = { + "ts": ts, + "provider": "gemini" if "gemini" in fg_model.lower() else "openai", + "model": fg_model, + "feature": feature, + "user_id": user_id, + "focus_group_id": fg_id, + "persona_id": str(msg.get("senderId") or msg.get("persona_id") or ""), + "prompt_tokens": tokens["prompt"], + "completion_tokens": tokens["completion"], + "cached_tokens": 0, + "reasoning_tokens": 0, + "total_tokens": tokens["prompt"] + tokens["completion"], + "cost_usd": { + "input": round(cost * 0.4, 8), + "output": round(cost * 0.6, 8), + "cached": 0, + "reasoning": 0, + "total": cost, + }, + "duration_ms": 0, + "retry_count": 0, + "status": "success", + "is_estimated": True, + "estimate_method": "char_div_3_8", + "source_message_id": msg_id, + } + + if not dry_run: + db.usage_events.insert_one(event) + created += 1 print(f"[messages] {'Would create' if dry_run else 'Created'} {created} estimated usage events") return created @@ -160,32 +172,34 @@ def backfill_messages(db, dry_run: bool) -> int: # ───────────────────────────────────────────────────────────────────────────── # Backfill persona generation +# Personas: fields background, description, name; created_by = user_id # ───────────────────────────────────────────────────────────────────────────── def backfill_personas(db, dry_run: bool) -> int: - """Walk all personas and create an estimated usage event for narrative generation.""" created = 0 personas = list(db.personas.find({})) print(f"\n[personas] Found {len(personas)} personas to process") for persona in personas: persona_id = str(persona["_id"]) - narrative = persona.get("narrative") or "" - if not narrative: - continue # No narrative to estimate from — skip - # Idempotent check - existing = db.usage_events.find_one({ - "persona_id": persona_id, - "feature": "persona_generate", - "is_estimated": True, - }) - if existing: + # Use background + description as the generation text + text = " ".join(filter(None, [ + persona.get("background") or "", + persona.get("description") or "", + persona.get("goals") or "", + ])).strip() + + if not text: continue - model = "gemini-3.1-pro-preview" # default; personas are usually generated via default model - tokens = _estimate_tokens(narrative, model) - tokens["completion"] = max(1, int(len(narrative) / 4.0)) + # Idempotent check + if db.usage_events.find_one({"source_persona_id": persona_id, "feature": "persona_generate", "is_estimated": True}): + continue + + model = "gemini-3.1-pro-preview" + tokens = _estimate_tokens(text, model) + tokens["completion"] = max(1, int(len(text) / 4.0)) cost = _estimate_cost(tokens["prompt"], tokens["completion"], model) ts = persona.get("created_at") or persona.get("updatedAt") or datetime.now(timezone.utc) @@ -200,22 +214,27 @@ def backfill_personas(db, dry_run: bool) -> int: "provider": "gemini", "model": model, "feature": "persona_generate", - "user_id": str(persona.get("user_id") or ""), - "focus_group_id": str(persona.get("focus_group_id") or ""), + "user_id": str(persona.get("created_by") or persona.get("user_id") or ""), + "focus_group_id": "", "persona_id": persona_id, "prompt_tokens": tokens["prompt"], "completion_tokens": tokens["completion"], "cached_tokens": 0, "reasoning_tokens": 0, + "total_tokens": tokens["prompt"] + tokens["completion"], "cost_usd": { "input": round(cost * 0.4, 8), "output": round(cost * 0.6, 8), + "cached": 0, + "reasoning": 0, "total": cost, }, "duration_ms": 0, "retry_count": 0, - "status": "estimated", + "status": "success", "is_estimated": True, + "estimate_method": "char_div_3_8", + "source_persona_id": persona_id, } if not dry_run: @@ -232,7 +251,7 @@ def backfill_personas(db, dry_run: bool) -> int: def main(): parser = argparse.ArgumentParser(description="Backfill usage_events from existing data") - parser.add_argument("--dry-run", action="store_true", help="Preview what would be created without writing") + parser.add_argument("--dry-run", action="store_true", help="Preview without writing") args = parser.parse_args() if args.dry_run: