#!/usr/bin/env python3 """ Backfill cost_usd / token counts for all existing sessions. Reads ALL JSONL files in ~/.claude/projects/ (no lookback limit), computes input_tokens / output_tokens / cost_usd per session-day bucket, and re-POSTs them to /api/ingest (ON CONFLICT DO UPDATE updates cost fields). Usage: CC_API_KEY=cc_xxx CC_SERVER=https://optical-dev.oliver.solutions/cc-dashboard \ python3 scripts/backfill_session_costs.py Optional env vars (same as cc-collector.py): CC_ROOT_PATH — comma-separated project roots (default: $HOME) CC_SERVER — dashboard base URL CC_API_KEY — API key """ import json import os import sys from collections import defaultdict from datetime import datetime, timezone from pathlib import Path SERVER = os.environ.get("CC_SERVER", "https://optical-dev.oliver.solutions/cc-dashboard").rstrip("/") API_KEY = os.environ.get("CC_API_KEY", "") _raw_root = os.environ.get("CC_ROOT_PATH", str(Path.home())) ROOT_PATHS = [p.strip() for p in _raw_root.split(",") if p.strip()] ROOT_PATH = ROOT_PATHS[0] CLAUDE_PROJECTS = Path.home() / ".claude" / "projects" # Pricing per million tokens: (input, output, cache_read, cache_creation) _MODEL_PRICING: dict[str, tuple[float, float, float, float]] = { "claude-opus-4": (15.0, 75.0, 1.50, 18.75), "claude-sonnet-4": (3.0, 15.0, 0.30, 3.75), "claude-haiku-4": (0.80, 4.0, 0.08, 1.00), "claude-opus-3": (15.0, 75.0, 1.50, 18.75), "claude-sonnet-3": (3.0, 15.0, 0.30, 3.75), "claude-haiku-3": (0.25, 1.25, 0.03, 0.30), } _DEFAULT_PRICING = (15.0, 75.0, 1.50, 18.75) def _get_pricing(model: str) -> tuple[float, float, float, float]: model = (model or "").lower() for key, price in _MODEL_PRICING.items(): if key in model: return price return _DEFAULT_PRICING def _root_prefix(root_path: str) -> str: return root_path.rstrip("/").replace("/", "-") def _match_root(folder_key: str) -> str | None: for rp in ROOT_PATHS: prefix = _root_prefix(rp) if folder_key == prefix or folder_key.startswith(prefix + "-"): return rp return None def _infer_slug(folder_name: str, root_path: str) -> str: prefix = _root_prefix(root_path).lstrip("-") name = folder_name.lstrip("-") if name == prefix: return "general" if name.startswith(prefix + "-"): return name[len(prefix) + 1:] return name.split("-")[-1] or name def collect_all() -> list[dict]: if not CLAUDE_PROJECTS.exists(): print("~/.claude/projects/ not found", file=sys.stderr) return [] sessions_to_send: list[dict] = [] total_files = 0 for folder in sorted(CLAUDE_PROJECTS.iterdir()): if not folder.is_dir(): continue matched_root = _match_root(folder.name) if matched_root is None: continue slug = _infer_slug(folder.name, matched_root) raw_sessions: dict = defaultdict(lambda: {"timestamps": [], "messages": []}) for jf in sorted(folder.glob("*.jsonl")): total_files += 1 try: with open(jf, encoding="utf-8", errors="ignore") as f: for line in f: line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: continue ts = obj.get("timestamp") sid = obj.get("sessionId") if not ts or not sid: continue try: dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) except ValueError: continue raw_sessions[sid]["timestamps"].append(dt) raw_sessions[sid]["messages"].append(obj) except Exception as e: print(f" Warning: could not read {jf}: {e}", file=sys.stderr) for sid, data in raw_sessions.items(): if not data["timestamps"]: continue paired = sorted(zip(data["timestamps"], data["messages"]), key=lambda x: x[0]) day_buckets: dict = defaultdict(lambda: {"timestamps": [], "messages": []}) for dt, obj in paired: day_buckets[dt.strftime("%Y-%m-%d")]["timestamps"].append(dt) day_buckets[dt.strftime("%Y-%m-%d")]["messages"].append(obj) for date_str, bucket in day_buckets.items(): ts_sorted = bucket["timestamps"] start = ts_sorted[0] end = ts_sorted[-1] # Count tokens input_tokens = output_tokens = 0 cost_usd = 0.0 for obj in bucket["messages"]: msg = obj.get("message", {}) if not isinstance(msg, dict) or msg.get("role") != "assistant": continue usage = msg.get("usage") if not isinstance(usage, dict): continue model = msg.get("model", "") inp_p, out_p, cr_p, cc_p = _get_pricing(model) m = 1_000_000 i = usage.get("input_tokens", 0) o = usage.get("output_tokens", 0) cr = usage.get("cache_read_input_tokens", 0) cc = usage.get("cache_creation_input_tokens", 0) input_tokens += i output_tokens += o cost_usd += i * inp_p / m + o * out_p / m + cr * cr_p / m + cc * cc_p / m if input_tokens == 0 and output_tokens == 0: continue # skip sessions with no usage data sessions_to_send.append({ "session_id": sid, "project_slug": slug, "date": date_str, "start_at": start.isoformat(), "end_at": end.isoformat(), "message_count": len(ts_sorted), "active_hours": 0.0, # not updating hours in backfill "work_summary": "", "commits": [], "tools_used": {}, "files_changed": [], "repo_url": "", "raw_stats": {}, "input_tokens": input_tokens, "output_tokens": output_tokens, "cost_usd": round(cost_usd, 6), }) print(f"Scanned {total_files} JSONL files, found {len(sessions_to_send)} session-days with token data") return sessions_to_send def send_batch(sessions: list[dict]) -> None: import urllib.request BATCH = 50 total_accepted = total_skipped = 0 for i in range(0, len(sessions), BATCH): batch = sessions[i:i + BATCH] payload = json.dumps({"root_path": ROOT_PATH, "sessions": batch}).encode() req = urllib.request.Request( f"{SERVER}/api/ingest", data=payload, headers={"Content-Type": "application/json", "X-API-Key": API_KEY}, method="POST", ) try: with urllib.request.urlopen(req, timeout=30) as resp: result = json.loads(resp.read()) total_accepted += result.get("accepted", 0) total_skipped += result.get("skipped", 0) print(f" Batch {i // BATCH + 1}: accepted={result.get('accepted', 0)}, skipped={result.get('skipped', 0)}") except Exception as e: print(f" Batch {i // BATCH + 1} failed: {e}", file=sys.stderr) print(f"\nDone. Total accepted={total_accepted}, skipped={total_skipped}") if __name__ == "__main__": if not API_KEY: raise SystemExit("CC_API_KEY not set") print(f"Server: {SERVER}") print(f"Root paths: {ROOT_PATHS}\n") sessions = collect_all() if not sessions: print("No sessions with token data found — nothing to send.") sys.exit(0) total_cost = sum(s["cost_usd"] for s in sessions) print(f"Total estimated cost across all sessions: ${total_cost:.4f}\n") send_batch(sessions)