amazon-transcreation/backend/app/services/report_service.py
DJP 9825b0497c Round 2 feedback: parser fix, dynamic max_tokens, polling, TM auto-discovery, reviewer comments in export
A1 Export columns shifted (critical):
- V25 LLM occasionally emits 12/13-col tables with Copy Type/Char Limit prefix
- Parser now anchors on "Option 1" header position; robust to any prefix shift
- Verified with 23/23 unit tests covering 11/12/13-col variants
- Source-line block in prompt no longer uses pipe separators (defence in depth)

A2 Linguistic summary fallback:
- Drop the metadata key/value table fallback on Tab 2
- Show "No linguistic summary was generated" when the agent didn't produce one

A3 Dashboard stuck on "Running":
- useJobs / useJob now poll every 5s while any job/locale is in an active state
- Stops polling once everything is COMPLETED or ERROR

B1 TM auto-config: respect empty selection
- Send no TM files when user unchecks all (was auto-adding campaign channel)
- Backend distinguishes empty list vs missing field

B2 Auto-discover channels from TM registry:
- New GET /api/v1/files/tm/channels endpoint reads distinct channels from registry
- Frontend StepConfigure fetches channels per client; falls back to static list
- Pipeline TM resolution falls back to flat_<Channel>_<lc>.json pattern for any
  registered channel (no hardcoded map needed for new channels like PrimeCBM)

B3 Job inputs visible on monitoring:
- New "Inputs sent to the agent" card on /jobs/[id] showing AI model, TM files,
  supplementary file list, and context override
- New GET /api/v1/jobs/{id}/supplementary endpoint listing on-disk supplementary files

C1 Context cap (large briefs truncating):
- max_tokens scales with source line count (8k/16k/32k/64k by tier)
- 172-line briefs now have ~64k output budget instead of fixed 16k

D1 Reviewer comments in xlsx export:
- Export endpoint now copies xlsx to temp path on download, queries Feedback
  joined with User, and appends "Reviewer (Name): comment" to the rationale
  cells of options that have feedback
- Original generated file remains untouched

D2 Hide Clients & Voice from sidebar (page still reachable by URL)
D3 Remove dead notifications + settings icons from header
D4 Cost by Locale table added to Analytics with total + avg cost per brief

Makefile seed target now also runs register_storage_files so TM registry is
populated from disk on first setup (deploy.sh already does this via --init).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-05-04 16:12:47 -04:00

226 lines
8.1 KiB
Python

from datetime import datetime
from typing import Any
from uuid import UUID
from sqlalchemy import case, func, literal_column, select, text
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.audit import TokenUsageLog
from app.models.feedback import Feedback, FlagType
from app.models.job import Job, JobStatus, LocaleInstance, LocaleStatus
from app.models.output import OutputRow, ConfidenceTier
class ReportService:
"""Service for aggregation queries powering reports."""
async def get_usage_stats(
self,
db: AsyncSession,
client_id: UUID | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
) -> dict[str, Any]:
"""Get overall usage statistics."""
job_query = select(
func.count(Job.id).label("total_jobs"),
func.sum(Job.total_token_usage).label("total_tokens"),
func.sum(Job.total_estimated_cost).label("total_cost"),
)
if client_id:
job_query = job_query.where(Job.client_id == client_id)
if date_from:
job_query = job_query.where(Job.created_at >= date_from)
if date_to:
job_query = job_query.where(Job.created_at <= date_to)
result = await db.execute(job_query)
row = result.one()
# Status breakdown (apply same filters as main query)
status_query = select(
Job.status, func.count(Job.id)
).group_by(Job.status)
if client_id:
status_query = status_query.where(Job.client_id == client_id)
if date_from:
status_query = status_query.where(Job.created_at >= date_from)
if date_to:
status_query = status_query.where(Job.created_at <= date_to)
status_result = await db.execute(status_query)
status_breakdown = {
status.value: count for status, count in status_result.all()
}
return {
"total_jobs": row.total_jobs or 0,
"total_tokens": row.total_tokens or 0,
"total_cost": float(row.total_cost or 0.0),
"status_breakdown": status_breakdown,
}
async def get_token_cost_data(
self,
db: AsyncSession,
client_id: UUID | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
) -> list[dict[str, Any]]:
"""Get token usage and cost data grouped by date."""
day_expr = func.date_trunc(literal_column("'day'"), TokenUsageLog.timestamp)
query = (
select(
day_expr.label("date"),
func.sum(TokenUsageLog.input_tokens).label("input_tokens"),
func.sum(TokenUsageLog.output_tokens).label("output_tokens"),
func.sum(TokenUsageLog.total_tokens).label("total_tokens"),
func.sum(TokenUsageLog.estimated_cost_usd).label("total_cost"),
)
.group_by(day_expr)
.order_by(day_expr)
)
if client_id:
query = query.join(LocaleInstance).join(Job).where(
Job.client_id == client_id
)
if date_from:
query = query.where(TokenUsageLog.timestamp >= date_from)
if date_to:
query = query.where(TokenUsageLog.timestamp <= date_to)
result = await db.execute(query)
return [
{
"date": str(row.date),
"input_tokens": row.input_tokens or 0,
"output_tokens": row.output_tokens or 0,
"total_tokens": row.total_tokens or 0,
"total_cost": float(row.total_cost or 0.0),
}
for row in result.all()
]
async def get_quality_metrics(
self,
db: AsyncSession,
client_id: UUID | None = None,
) -> dict[str, Any]:
"""Get quality metrics from output confidence tiers and feedback."""
# Confidence tier distribution
tier_query = select(
OutputRow.confidence_tier, func.count(OutputRow.id)
).group_by(OutputRow.confidence_tier)
if client_id:
tier_query = tier_query.join(LocaleInstance).join(Job).where(
Job.client_id == client_id
)
tier_result = await db.execute(tier_query)
tier_breakdown = {
tier.value: count for tier, count in tier_result.all()
}
# Feedback distribution
feedback_query = select(
Feedback.flag_type, func.count(Feedback.id)
).group_by(Feedback.flag_type)
feedback_result = await db.execute(feedback_query)
feedback_breakdown = {
ft.value: count for ft, count in feedback_result.all()
}
return {
"confidence_tiers": tier_breakdown,
"feedback_distribution": feedback_breakdown,
}
async def get_locale_stats(
self,
db: AsyncSession,
client_id: UUID | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
) -> list[dict[str, Any]]:
"""Get per-locale aggregate stats (tokens, cost, avg duration, job count)."""
query = select(
LocaleInstance.locale_code,
func.count(LocaleInstance.id).label("count"),
func.sum(LocaleInstance.token_usage).label("total_tokens"),
func.sum(LocaleInstance.estimated_cost).label("total_cost"),
func.avg(
func.extract("epoch", LocaleInstance.completed_at)
- func.extract("epoch", LocaleInstance.started_at)
).label("avg_duration_seconds"),
).where(
LocaleInstance.status == LocaleStatus.complete,
LocaleInstance.started_at.isnot(None),
LocaleInstance.completed_at.isnot(None),
).group_by(LocaleInstance.locale_code)
needs_job_join = bool(client_id or date_from or date_to)
if needs_job_join:
query = query.join(Job)
if client_id:
query = query.where(Job.client_id == client_id)
if date_from:
query = query.where(Job.created_at >= date_from)
if date_to:
query = query.where(Job.created_at <= date_to)
result = await db.execute(query)
out: list[dict[str, Any]] = []
for row in result.all():
count = row.count or 0
total_cost = float(row.total_cost or 0.0)
out.append({
"locale": row.locale_code,
"count": count,
"total_tokens": row.total_tokens or 0,
"total_cost": total_cost,
"avg_cost_per_brief": round(total_cost / count, 4) if count else 0.0,
"avg_duration_minutes": round(float(row.avg_duration_seconds or 0) / 60, 1),
})
return out
async def get_jobs_over_time(
self,
db: AsyncSession,
client_id: UUID | None = None,
date_from: datetime | None = None,
date_to: datetime | None = None,
) -> list[dict[str, Any]]:
"""Get job counts grouped by week."""
week_expr = func.date_trunc(literal_column("'week'"), Job.created_at)
query = select(
week_expr.label("week"),
func.count(Job.id).label("total"),
func.sum(case((Job.status == JobStatus.complete, 1), else_=0)).label("completed_raw"),
func.sum(case((Job.status == JobStatus.error, 1), else_=0)).label("errors_raw"),
).group_by(
week_expr
).order_by(
week_expr
)
if client_id:
query = query.where(Job.client_id == client_id)
if date_from:
query = query.where(Job.created_at >= date_from)
if date_to:
query = query.where(Job.created_at <= date_to)
result = await db.execute(query)
rows = []
for row in result.all():
week_str = row.week.strftime("%b %d") if row.week else "?"
rows.append({
"name": week_str,
"jobs": row.total or 0,
"completed": int(row.completed_raw or 0),
"errors": int(row.errors_raw or 0),
})
return rows