- analytics_service now queries usage_events (not usage_rollups) so data appears immediately without waiting for nightly rollup - match on ts datetime range (was string 'date' field — broken) - correct units field names: token_input/token_output/char - rollup_daily.py: same field name fix for historical aggregation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
175 lines
5.5 KiB
Python
175 lines
5.5 KiB
Python
"""MongoDB aggregation helpers for dashboard and pivot analytics."""
|
|
from datetime import datetime, timezone
|
|
from typing import Any, Optional
|
|
|
|
from motor.motor_asyncio import AsyncIOMotorDatabase
|
|
|
|
from ..core.logging import get_logger
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
def _build_match(
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
workspace_id: Optional[str] = None,
|
|
team_id: Optional[str] = None,
|
|
project_id: Optional[str] = None,
|
|
user_external_id: Optional[str] = None,
|
|
provider: Optional[str] = None,
|
|
model: Optional[str] = None,
|
|
source_app: Optional[str] = None,
|
|
) -> dict:
|
|
match: dict[str, Any] = {}
|
|
if start_date:
|
|
match.setdefault("ts", {})["$gte"] = datetime.fromisoformat(start_date + "T00:00:00+00:00")
|
|
if end_date:
|
|
match.setdefault("ts", {})["$lte"] = datetime.fromisoformat(end_date + "T23:59:59.999999+00:00")
|
|
if workspace_id:
|
|
match["workspace_id"] = workspace_id
|
|
if team_id:
|
|
match["team_id"] = team_id
|
|
if project_id:
|
|
match["project_id"] = project_id
|
|
if user_external_id:
|
|
match["user_external_id"] = user_external_id
|
|
if provider:
|
|
match["provider"] = provider
|
|
if model:
|
|
match["model"] = model
|
|
if source_app:
|
|
match["source_app"] = source_app
|
|
return match
|
|
|
|
|
|
async def get_summary(
|
|
db: AsyncIOMotorDatabase,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
**filters,
|
|
) -> dict:
|
|
match = _build_match(start_date=start_date, end_date=end_date, **filters)
|
|
pipeline = [
|
|
{"$match": match},
|
|
{"$group": {
|
|
"_id": None,
|
|
"total_cost_usd": {"$sum": "$cost_usd"},
|
|
"total_calls": {"$sum": 1},
|
|
"total_input_tokens": {"$sum": "$units.token_input"},
|
|
"total_output_tokens": {"$sum": "$units.token_output"},
|
|
"total_chars": {"$sum": "$units.char"},
|
|
}},
|
|
]
|
|
result = await db.usage_events.aggregate(pipeline).to_list(length=1)
|
|
if not result:
|
|
return {"total_cost_usd": 0, "total_calls": 0, "total_input_tokens": 0,
|
|
"total_output_tokens": 0, "total_chars": 0}
|
|
r = result[0]
|
|
r.pop("_id", None)
|
|
return r
|
|
|
|
|
|
async def get_timeseries(
|
|
db: AsyncIOMotorDatabase,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
metric: str = "total_cost_usd",
|
|
**filters,
|
|
) -> list[dict]:
|
|
match = _build_match(start_date=start_date, end_date=end_date, **filters)
|
|
_METRIC_FIELD = {
|
|
"total_cost_usd": "cost_usd",
|
|
"total_calls": None,
|
|
"total_input_tokens": "units.token_input",
|
|
"total_output_tokens": "units.token_output",
|
|
"total_chars": "units.char",
|
|
}
|
|
field = _METRIC_FIELD.get(metric, "cost_usd")
|
|
value_expr = {"$sum": 1} if field is None else {"$sum": f"${field}"}
|
|
|
|
pipeline = [
|
|
{"$match": match},
|
|
{"$group": {
|
|
"_id": {"$dateToString": {"format": "%Y-%m-%d", "date": "$ts"}},
|
|
"value": value_expr,
|
|
}},
|
|
{"$sort": {"_id": 1}},
|
|
{"$project": {"date": "$_id", "value": 1, "_id": 0}},
|
|
]
|
|
return await db.usage_events.aggregate(pipeline).to_list(length=365)
|
|
|
|
|
|
async def get_breakdown(
|
|
db: AsyncIOMotorDatabase,
|
|
dim: str,
|
|
metric: str = "total_cost_usd",
|
|
limit: int = 20,
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
**filters,
|
|
) -> list[dict]:
|
|
match = _build_match(start_date=start_date, end_date=end_date, **filters)
|
|
_METRIC_FIELD = {
|
|
"total_cost_usd": "cost_usd",
|
|
"total_calls": None,
|
|
"total_input_tokens": "units.token_input",
|
|
"total_output_tokens": "units.token_output",
|
|
"total_chars": "units.char",
|
|
}
|
|
field = _METRIC_FIELD.get(metric, "cost_usd")
|
|
value_expr = {"$sum": 1} if field is None else {"$sum": f"${field}"}
|
|
|
|
pipeline = [
|
|
{"$match": match},
|
|
{"$group": {
|
|
"_id": f"${dim}",
|
|
"value": value_expr,
|
|
}},
|
|
{"$sort": {"value": -1}},
|
|
{"$limit": limit},
|
|
{"$project": {"name": "$_id", "value": 1, "_id": 0}},
|
|
]
|
|
return await db.usage_events.aggregate(pipeline).to_list(length=limit)
|
|
|
|
|
|
async def get_pivot(
|
|
db: AsyncIOMotorDatabase,
|
|
row_dims: list[str],
|
|
col_dim: str,
|
|
metric: str = "total_cost_usd",
|
|
start_date: Optional[str] = None,
|
|
end_date: Optional[str] = None,
|
|
limit: int = 200,
|
|
**filters,
|
|
) -> list[dict]:
|
|
match = _build_match(start_date=start_date, end_date=end_date, **filters)
|
|
_METRIC_FIELD = {
|
|
"total_cost_usd": "cost_usd",
|
|
"total_calls": None,
|
|
"total_input_tokens": "units.token_input",
|
|
"total_output_tokens": "units.token_output",
|
|
"total_chars": "units.char",
|
|
}
|
|
field = _METRIC_FIELD.get(metric, "cost_usd")
|
|
value_expr = {"$sum": 1} if field is None else {"$sum": f"${field}"}
|
|
|
|
group_id = {dim: f"${dim}" for dim in row_dims}
|
|
group_id["__col__"] = f"${col_dim}"
|
|
|
|
pipeline = [
|
|
{"$match": match},
|
|
{"$group": {
|
|
"_id": group_id,
|
|
"value": value_expr,
|
|
}},
|
|
{"$sort": {"value": -1}},
|
|
{"$limit": limit},
|
|
]
|
|
raw = await db.usage_events.aggregate(pipeline).to_list(length=limit)
|
|
result = []
|
|
for r in raw:
|
|
row = {dim: r["_id"].get(dim) for dim in row_dims}
|
|
row["col_key"] = r["_id"].get("__col__")
|
|
row["value"] = r["value"]
|
|
result.append(row)
|
|
return result
|