cc-dashboard/scripts/backfill_session_costs.py
Vadym Samoilenko 38d7da93e2 fix(auth): restore session before registering router to prevent refresh redirect
- Move app.use(router) after await authStore.init() so the beforeEach guard
  sees the correct isAuthenticated state on page load
- Fix backfill_session_costs._root_prefix: remove underscore→dash replacement
  that caused paths like /Users/ai_leed/... to not match ~/.claude/projects/ folders

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-13 14:14:27 +01:00

221 lines
8.2 KiB
Python

#!/usr/bin/env python3
"""
Backfill cost_usd / token counts for all existing sessions.
Reads ALL JSONL files in ~/.claude/projects/ (no lookback limit),
computes input_tokens / output_tokens / cost_usd per session-day bucket,
and re-POSTs them to /api/ingest (ON CONFLICT DO UPDATE updates cost fields).
Usage:
CC_API_KEY=cc_xxx CC_SERVER=https://optical-dev.oliver.solutions/cc-dashboard \
python3 scripts/backfill_session_costs.py
Optional env vars (same as cc-collector.py):
CC_ROOT_PATH — comma-separated project roots (default: $HOME)
CC_SERVER — dashboard base URL
CC_API_KEY — API key
"""
import json
import os
import sys
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
SERVER = os.environ.get("CC_SERVER", "https://optical-dev.oliver.solutions/cc-dashboard").rstrip("/")
API_KEY = os.environ.get("CC_API_KEY", "")
_raw_root = os.environ.get("CC_ROOT_PATH", str(Path.home()))
ROOT_PATHS = [p.strip() for p in _raw_root.split(",") if p.strip()]
ROOT_PATH = ROOT_PATHS[0]
CLAUDE_PROJECTS = Path.home() / ".claude" / "projects"
# Pricing per million tokens: (input, output, cache_read, cache_creation)
_MODEL_PRICING: dict[str, tuple[float, float, float, float]] = {
"claude-opus-4": (15.0, 75.0, 1.50, 18.75),
"claude-sonnet-4": (3.0, 15.0, 0.30, 3.75),
"claude-haiku-4": (0.80, 4.0, 0.08, 1.00),
"claude-opus-3": (15.0, 75.0, 1.50, 18.75),
"claude-sonnet-3": (3.0, 15.0, 0.30, 3.75),
"claude-haiku-3": (0.25, 1.25, 0.03, 0.30),
}
_DEFAULT_PRICING = (15.0, 75.0, 1.50, 18.75)
def _get_pricing(model: str) -> tuple[float, float, float, float]:
model = (model or "").lower()
for key, price in _MODEL_PRICING.items():
if key in model:
return price
return _DEFAULT_PRICING
def _root_prefix(root_path: str) -> str:
return root_path.rstrip("/").replace("/", "-")
def _match_root(folder_key: str) -> str | None:
for rp in ROOT_PATHS:
prefix = _root_prefix(rp)
if folder_key == prefix or folder_key.startswith(prefix + "-"):
return rp
return None
def _infer_slug(folder_name: str, root_path: str) -> str:
prefix = _root_prefix(root_path).lstrip("-")
name = folder_name.lstrip("-")
if name == prefix:
return "general"
if name.startswith(prefix + "-"):
return name[len(prefix) + 1:]
return name.split("-")[-1] or name
def collect_all() -> list[dict]:
if not CLAUDE_PROJECTS.exists():
print("~/.claude/projects/ not found", file=sys.stderr)
return []
sessions_to_send: list[dict] = []
total_files = 0
for folder in sorted(CLAUDE_PROJECTS.iterdir()):
if not folder.is_dir():
continue
matched_root = _match_root(folder.name)
if matched_root is None:
continue
slug = _infer_slug(folder.name, matched_root)
raw_sessions: dict = defaultdict(lambda: {"timestamps": [], "messages": []})
for jf in sorted(folder.glob("*.jsonl")):
total_files += 1
try:
with open(jf, encoding="utf-8", errors="ignore") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
ts = obj.get("timestamp")
sid = obj.get("sessionId")
if not ts or not sid:
continue
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
continue
raw_sessions[sid]["timestamps"].append(dt)
raw_sessions[sid]["messages"].append(obj)
except Exception as e:
print(f" Warning: could not read {jf}: {e}", file=sys.stderr)
for sid, data in raw_sessions.items():
if not data["timestamps"]:
continue
paired = sorted(zip(data["timestamps"], data["messages"]), key=lambda x: x[0])
day_buckets: dict = defaultdict(lambda: {"timestamps": [], "messages": []})
for dt, obj in paired:
day_buckets[dt.strftime("%Y-%m-%d")]["timestamps"].append(dt)
day_buckets[dt.strftime("%Y-%m-%d")]["messages"].append(obj)
for date_str, bucket in day_buckets.items():
ts_sorted = bucket["timestamps"]
start = ts_sorted[0]
end = ts_sorted[-1]
# Count tokens
input_tokens = output_tokens = 0
cost_usd = 0.0
for obj in bucket["messages"]:
msg = obj.get("message", {})
if not isinstance(msg, dict) or msg.get("role") != "assistant":
continue
usage = msg.get("usage")
if not isinstance(usage, dict):
continue
model = msg.get("model", "")
inp_p, out_p, cr_p, cc_p = _get_pricing(model)
m = 1_000_000
i = usage.get("input_tokens", 0)
o = usage.get("output_tokens", 0)
cr = usage.get("cache_read_input_tokens", 0)
cc = usage.get("cache_creation_input_tokens", 0)
input_tokens += i
output_tokens += o
cost_usd += i * inp_p / m + o * out_p / m + cr * cr_p / m + cc * cc_p / m
if input_tokens == 0 and output_tokens == 0:
continue # skip sessions with no usage data
sessions_to_send.append({
"session_id": sid,
"project_slug": slug,
"date": date_str,
"start_at": start.isoformat(),
"end_at": end.isoformat(),
"message_count": len(ts_sorted),
"active_hours": 0.0, # not updating hours in backfill
"work_summary": "",
"commits": [],
"tools_used": {},
"files_changed": [],
"repo_url": "",
"raw_stats": {},
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": round(cost_usd, 6),
})
print(f"Scanned {total_files} JSONL files, found {len(sessions_to_send)} session-days with token data")
return sessions_to_send
def send_batch(sessions: list[dict]) -> None:
import urllib.request
BATCH = 50
total_accepted = total_skipped = 0
for i in range(0, len(sessions), BATCH):
batch = sessions[i:i + BATCH]
payload = json.dumps({"root_path": ROOT_PATH, "sessions": batch}).encode()
req = urllib.request.Request(
f"{SERVER}/api/ingest",
data=payload,
headers={"Content-Type": "application/json", "X-API-Key": API_KEY},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=30) as resp:
result = json.loads(resp.read())
total_accepted += result.get("accepted", 0)
total_skipped += result.get("skipped", 0)
print(f" Batch {i // BATCH + 1}: accepted={result.get('accepted', 0)}, skipped={result.get('skipped', 0)}")
except Exception as e:
print(f" Batch {i // BATCH + 1} failed: {e}", file=sys.stderr)
print(f"\nDone. Total accepted={total_accepted}, skipped={total_skipped}")
if __name__ == "__main__":
if not API_KEY:
raise SystemExit("CC_API_KEY not set")
print(f"Server: {SERVER}")
print(f"Root paths: {ROOT_PATHS}\n")
sessions = collect_all()
if not sessions:
print("No sessions with token data found — nothing to send.")
sys.exit(0)
total_cost = sum(s["cost_usd"] for s in sessions)
print(f"Total estimated cost across all sessions: ${total_cost:.4f}\n")
send_batch(sessions)