Fix backfill script: use focus_group_messages collection + correct field names
This commit is contained in:
parent
39ad2f00b5
commit
539c5eaaee
1 changed files with 97 additions and 78 deletions
|
|
@ -31,7 +31,6 @@ def _estimate_tokens(text: str, model: str) -> dict:
|
|||
if not text:
|
||||
return {"prompt": 0, "completion": 0}
|
||||
|
||||
# Try tiktoken for OpenAI models, fall back to char-based estimate
|
||||
if model and ("gpt" in model.lower() or "openai" in model.lower()):
|
||||
try:
|
||||
import tiktoken
|
||||
|
|
@ -47,10 +46,9 @@ def _estimate_tokens(text: str, model: str) -> dict:
|
|||
|
||||
|
||||
def _estimate_cost(prompt_tokens: int, completion_tokens: int, model: str) -> float:
|
||||
"""Very rough cost estimate in USD (used only for backfill estimates)."""
|
||||
# Approximate per-million-token prices for common models
|
||||
"""Rough cost estimate in USD."""
|
||||
rate_per_m = {
|
||||
"gemini": (0.35, 1.05), # input, output USD/1M tokens
|
||||
"gemini": (0.35, 1.05),
|
||||
"gpt-4": (30.00, 60.00),
|
||||
"gpt-3": (0.50, 1.50),
|
||||
}
|
||||
|
|
@ -86,73 +84,87 @@ def connect():
|
|||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Backfill focus-group messages
|
||||
# Messages are in the separate `focus_group_messages` collection (NOT embedded).
|
||||
# Fields: focus_group_id (str), text, type, senderId, created_at
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def backfill_messages(db, dry_run: bool) -> int:
|
||||
"""Walk all focus groups and create estimated usage events for messages."""
|
||||
created = 0
|
||||
focus_groups = list(db.focus_groups.find({}))
|
||||
print(f"\n[messages] Found {len(focus_groups)} focus groups to process")
|
||||
|
||||
for fg in focus_groups:
|
||||
fg_id = str(fg["_id"])
|
||||
fg_model = fg.get("llm_model") or "gemini-3.1-pro-preview"
|
||||
messages = fg.get("messages", [])
|
||||
# Build a lookup: focus_group_id -> {llm_model, user_id}
|
||||
fg_meta = {}
|
||||
for fg in db.focus_groups.find({}, {"llm_model": 1, "created_by": 1}):
|
||||
fg_meta[str(fg["_id"])] = {
|
||||
"model": fg.get("llm_model") or "gemini-3.1-pro-preview",
|
||||
"user_id": str(fg.get("created_by") or ""),
|
||||
}
|
||||
|
||||
for msg in messages:
|
||||
msg_id = str(msg.get("id") or msg.get("_id") or "")
|
||||
if not msg_id:
|
||||
continue
|
||||
total_messages = db.focus_group_messages.count_documents({})
|
||||
print(f"\n[messages] Found {total_messages} messages across all focus groups")
|
||||
|
||||
# Idempotent: skip if an estimated event already exists for this message
|
||||
existing = db.usage_events.find_one({
|
||||
"source_message_id": msg_id,
|
||||
"is_estimated": True,
|
||||
})
|
||||
if existing:
|
||||
continue
|
||||
for msg in db.focus_group_messages.find({}):
|
||||
msg_id = str(msg["_id"])
|
||||
fg_id = str(msg.get("focus_group_id") or "")
|
||||
|
||||
text = msg.get("content") or ""
|
||||
tokens = _estimate_tokens(text, fg_model)
|
||||
# For responses we add a rough output token estimate
|
||||
tokens["completion"] = max(1, int(len(text) / 5.0))
|
||||
cost = _estimate_cost(tokens["prompt"], tokens["completion"], fg_model)
|
||||
# Skip non-AI messages (only persona responses and moderator questions cost money)
|
||||
msg_type = msg.get("type", "")
|
||||
if msg_type not in ("response", "question", "moderator", "ai", ""):
|
||||
continue
|
||||
|
||||
ts = msg.get("timestamp")
|
||||
if isinstance(ts, str):
|
||||
try:
|
||||
ts = datetime.fromisoformat(ts)
|
||||
except Exception:
|
||||
ts = None
|
||||
ts = ts or fg.get("date") or datetime.now(timezone.utc)
|
||||
# Idempotent check
|
||||
if db.usage_events.find_one({"source_message_id": msg_id, "is_estimated": True}):
|
||||
continue
|
||||
|
||||
event = {
|
||||
"ts": ts,
|
||||
"provider": "gemini" if "gemini" in fg_model.lower() else "openai",
|
||||
"model": fg_model,
|
||||
"feature": "autonomous_conversation",
|
||||
"user_id": str(fg.get("user_id") or ""),
|
||||
"focus_group_id": fg_id,
|
||||
"persona_id": str(msg.get("personaId") or msg.get("persona_id") or ""),
|
||||
"prompt_tokens": tokens["prompt"],
|
||||
"completion_tokens": tokens["completion"],
|
||||
"cached_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"cost_usd": {
|
||||
"input": round(cost * 0.4, 8),
|
||||
"output": round(cost * 0.6, 8),
|
||||
"total": cost,
|
||||
},
|
||||
"duration_ms": 0,
|
||||
"retry_count": 0,
|
||||
"status": "estimated",
|
||||
"is_estimated": True,
|
||||
"source_message_id": msg_id,
|
||||
}
|
||||
meta = fg_meta.get(fg_id, {"model": "gemini-3.1-pro-preview", "user_id": ""})
|
||||
fg_model = meta["model"]
|
||||
user_id = meta["user_id"]
|
||||
|
||||
if not dry_run:
|
||||
db.usage_events.insert_one(event)
|
||||
created += 1
|
||||
text = msg.get("text") or msg.get("content") or ""
|
||||
tokens = _estimate_tokens(text, fg_model)
|
||||
tokens["completion"] = max(1, int(len(text) / 5.0))
|
||||
cost = _estimate_cost(tokens["prompt"], tokens["completion"], fg_model)
|
||||
|
||||
ts = msg.get("created_at") or msg.get("timestamp")
|
||||
if isinstance(ts, str):
|
||||
try:
|
||||
ts = datetime.fromisoformat(ts)
|
||||
except Exception:
|
||||
ts = None
|
||||
ts = ts or datetime.now(timezone.utc)
|
||||
|
||||
feature = "moderator" if msg_type in ("question", "moderator") else "persona_response"
|
||||
|
||||
event = {
|
||||
"ts": ts,
|
||||
"provider": "gemini" if "gemini" in fg_model.lower() else "openai",
|
||||
"model": fg_model,
|
||||
"feature": feature,
|
||||
"user_id": user_id,
|
||||
"focus_group_id": fg_id,
|
||||
"persona_id": str(msg.get("senderId") or msg.get("persona_id") or ""),
|
||||
"prompt_tokens": tokens["prompt"],
|
||||
"completion_tokens": tokens["completion"],
|
||||
"cached_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"total_tokens": tokens["prompt"] + tokens["completion"],
|
||||
"cost_usd": {
|
||||
"input": round(cost * 0.4, 8),
|
||||
"output": round(cost * 0.6, 8),
|
||||
"cached": 0,
|
||||
"reasoning": 0,
|
||||
"total": cost,
|
||||
},
|
||||
"duration_ms": 0,
|
||||
"retry_count": 0,
|
||||
"status": "success",
|
||||
"is_estimated": True,
|
||||
"estimate_method": "char_div_3_8",
|
||||
"source_message_id": msg_id,
|
||||
}
|
||||
|
||||
if not dry_run:
|
||||
db.usage_events.insert_one(event)
|
||||
created += 1
|
||||
|
||||
print(f"[messages] {'Would create' if dry_run else 'Created'} {created} estimated usage events")
|
||||
return created
|
||||
|
|
@ -160,32 +172,34 @@ def backfill_messages(db, dry_run: bool) -> int:
|
|||
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
# Backfill persona generation
|
||||
# Personas: fields background, description, name; created_by = user_id
|
||||
# ─────────────────────────────────────────────────────────────────────────────
|
||||
|
||||
def backfill_personas(db, dry_run: bool) -> int:
|
||||
"""Walk all personas and create an estimated usage event for narrative generation."""
|
||||
created = 0
|
||||
personas = list(db.personas.find({}))
|
||||
print(f"\n[personas] Found {len(personas)} personas to process")
|
||||
|
||||
for persona in personas:
|
||||
persona_id = str(persona["_id"])
|
||||
narrative = persona.get("narrative") or ""
|
||||
if not narrative:
|
||||
continue # No narrative to estimate from — skip
|
||||
|
||||
# Idempotent check
|
||||
existing = db.usage_events.find_one({
|
||||
"persona_id": persona_id,
|
||||
"feature": "persona_generate",
|
||||
"is_estimated": True,
|
||||
})
|
||||
if existing:
|
||||
# Use background + description as the generation text
|
||||
text = " ".join(filter(None, [
|
||||
persona.get("background") or "",
|
||||
persona.get("description") or "",
|
||||
persona.get("goals") or "",
|
||||
])).strip()
|
||||
|
||||
if not text:
|
||||
continue
|
||||
|
||||
model = "gemini-3.1-pro-preview" # default; personas are usually generated via default model
|
||||
tokens = _estimate_tokens(narrative, model)
|
||||
tokens["completion"] = max(1, int(len(narrative) / 4.0))
|
||||
# Idempotent check
|
||||
if db.usage_events.find_one({"source_persona_id": persona_id, "feature": "persona_generate", "is_estimated": True}):
|
||||
continue
|
||||
|
||||
model = "gemini-3.1-pro-preview"
|
||||
tokens = _estimate_tokens(text, model)
|
||||
tokens["completion"] = max(1, int(len(text) / 4.0))
|
||||
cost = _estimate_cost(tokens["prompt"], tokens["completion"], model)
|
||||
|
||||
ts = persona.get("created_at") or persona.get("updatedAt") or datetime.now(timezone.utc)
|
||||
|
|
@ -200,22 +214,27 @@ def backfill_personas(db, dry_run: bool) -> int:
|
|||
"provider": "gemini",
|
||||
"model": model,
|
||||
"feature": "persona_generate",
|
||||
"user_id": str(persona.get("user_id") or ""),
|
||||
"focus_group_id": str(persona.get("focus_group_id") or ""),
|
||||
"user_id": str(persona.get("created_by") or persona.get("user_id") or ""),
|
||||
"focus_group_id": "",
|
||||
"persona_id": persona_id,
|
||||
"prompt_tokens": tokens["prompt"],
|
||||
"completion_tokens": tokens["completion"],
|
||||
"cached_tokens": 0,
|
||||
"reasoning_tokens": 0,
|
||||
"total_tokens": tokens["prompt"] + tokens["completion"],
|
||||
"cost_usd": {
|
||||
"input": round(cost * 0.4, 8),
|
||||
"output": round(cost * 0.6, 8),
|
||||
"cached": 0,
|
||||
"reasoning": 0,
|
||||
"total": cost,
|
||||
},
|
||||
"duration_ms": 0,
|
||||
"retry_count": 0,
|
||||
"status": "estimated",
|
||||
"status": "success",
|
||||
"is_estimated": True,
|
||||
"estimate_method": "char_div_3_8",
|
||||
"source_persona_id": persona_id,
|
||||
}
|
||||
|
||||
if not dry_run:
|
||||
|
|
@ -232,7 +251,7 @@ def backfill_personas(db, dry_run: bool) -> int:
|
|||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Backfill usage_events from existing data")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview what would be created without writing")
|
||||
parser.add_argument("--dry-run", action="store_true", help="Preview without writing")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.dry_run:
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue