diff --git a/backend/app/llm/client.py b/backend/app/llm/client.py
index 1ef687b..4ec2bf3 100644
--- a/backend/app/llm/client.py
+++ b/backend/app/llm/client.py
@@ -95,12 +95,21 @@ class LLMClient:
+ output_tokens * output_rate
)
+ stop_reason = getattr(response, "stop_reason", None)
+ if stop_reason == "max_tokens":
+ logger.warning(
+ "LLM hit max_tokens (%d) — response was truncated. "
+ "Consider raising max_tokens or batching the input.",
+ max_tokens,
+ )
+
usage = {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"estimated_cost_usd": round(estimated_cost, 6),
"model": self.model,
+ "stop_reason": stop_reason,
}
self.last_usage = usage
diff --git a/backend/app/pipeline/agents/agent_single.py b/backend/app/pipeline/agents/agent_single.py
index 69675ef..a478500 100644
--- a/backend/app/pipeline/agents/agent_single.py
+++ b/backend/app/pipeline/agents/agent_single.py
@@ -157,6 +157,88 @@ def _format_ref_data_for_prompt(ref_data: dict[str, Any]) -> str:
return "\n".join(sections)
+# Per-supplementary-file size cap. Big enough to fit a typical reference
+# brief or glossary; truncated if larger to protect the input budget.
+_SUPP_MAX_CHARS_PER_FILE = 40_000
+
+
+def _read_docx_text(path: str) -> str:
+ """Extract plain text from a .docx file.
+
+ Avoids adding python-docx as a dependency by reading the underlying
+ XML directly: a .docx is a zip archive whose document body lives at
+ word/document.xml. Each paragraph (``) is converted to a line.
+ """
+ import re
+ import zipfile
+
+ try:
+ with zipfile.ZipFile(path) as z:
+ with z.open("word/document.xml") as fh:
+ xml = fh.read().decode("utf-8", errors="replace")
+ except Exception as exc:
+ logger.warning("Could not unzip .docx %s: %s", path, exc)
+ return ""
+
+ xml = re.sub(r"", "\n", xml)
+ xml = re.sub(r"", "\t", xml)
+ xml = re.sub(r"", "\n", xml)
+ text = re.sub(r"<[^>]+>", "", xml)
+ # Decode common XML entities
+ text = (
+ text.replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace(""", '"')
+ .replace("'", "'")
+ )
+ lines = [re.sub(r"[ \t]+", " ", ln).strip() for ln in text.split("\n")]
+ return "\n".join(ln for ln in lines if ln)
+
+
+def _format_supplementary_for_prompt(file_paths: list[str]) -> str:
+ """Read each supplementary file and inline its text into the prompt.
+
+ Supports text-based formats (.txt, .md, .json, .csv, .tsv) and Word
+ documents (.docx) — for .docx we extract the plain text. Binary
+ formats we can't read (e.g. .pdf without a parser, images) are noted
+ by filename only so the agent at least knows they were attached.
+ """
+ if not file_paths:
+ return ""
+
+ sections: list[str] = ["## SUPPLEMENTARY MATERIAL", ""]
+ for fpath in file_paths:
+ fname = os.path.basename(fpath)
+ ext = os.path.splitext(fname)[1].lower()
+ sections.append(f"### File: {fname}")
+ try:
+ if ext in (".txt", ".md", ".json", ".csv", ".tsv", ".jsonl"):
+ with open(fpath, "r", encoding="utf-8", errors="replace") as f:
+ text = f.read()
+ elif ext == ".docx":
+ text = _read_docx_text(fpath)
+ else:
+ text = ""
+ sections.append(
+ f"(Binary or unsupported format — filename only attached.)"
+ )
+ except Exception as exc: # pragma: no cover - defensive
+ logger.warning("Could not read supplementary %s: %s", fpath, exc)
+ text = ""
+
+ if text:
+ if len(text) > _SUPP_MAX_CHARS_PER_FILE:
+ text = (
+ text[:_SUPP_MAX_CHARS_PER_FILE]
+ + f"\n[...truncated at {_SUPP_MAX_CHARS_PER_FILE} chars]"
+ )
+ sections.append(text)
+ sections.append("")
+
+ return "\n".join(sections)
+
+
# ---------------------------------------------------------------------------
# Markdown table parser
# ---------------------------------------------------------------------------
@@ -428,6 +510,13 @@ class AgentSingle(BaseAgent):
if ref_data:
parts.append(_format_ref_data_for_prompt(ref_data))
+ # Supplementary files attached to the job (locale-gated by filename
+ # prefix in _resolve_file_manifest).
+ supp_paths = list(getattr(context.file_manifest, "supplementary_files", []) or [])
+ if supp_paths:
+ logger.info("Including %d supplementary files in prompt", len(supp_paths))
+ parts.append(_format_supplementary_for_prompt(supp_paths))
+
user_message = "\n".join(parts)
# ── Call LLM ─────────────────────────────────────────────────
@@ -439,17 +528,19 @@ class AgentSingle(BaseAgent):
len(system_prompt), len(user_message),
)
- # Scale max_tokens with source line count. Empirically each output
- # row consumes ~250 output tokens (option text + BT + rationale x3
- # + table padding). 64k is the Sonnet 4.6 cap; 32k covers ~120 rows
- # with comfortable headroom; 16k was the Round 1 default and was
- # truncating large briefs (172 lines → 65 rows).
+ # Scale max_tokens with source line count. In practice each output
+ # row averages closer to ~400 output tokens (option text + BT +
+ # rationale x3 + the LLM's pre-table padding). The 32k tier was
+ # truncating real 132-line briefs, so we now budget 64k for any
+ # job past ~80 rows. Sonnet 4.6 caps at 64k; if a brief is bigger
+ # than that headroom allows (~160 rows with margin) we will need
+ # to introduce real source-line batching with prompt caching.
line_count = len(context.source_lines)
- if line_count <= 30:
+ if line_count <= 25:
max_tokens = 8192
- elif line_count <= 80:
+ elif line_count <= 60:
max_tokens = 16384
- elif line_count <= 150:
+ elif line_count <= 80:
max_tokens = 32768
else:
max_tokens = 64000
@@ -467,10 +558,11 @@ class AgentSingle(BaseAgent):
)
logger.info(
- "LLM response: %d chars, input_tokens=%d, output_tokens=%d",
+ "LLM response: %d chars, input_tokens=%d, output_tokens=%d, stop_reason=%s",
len(response_text),
usage.get("input_tokens", 0),
usage.get("output_tokens", 0),
+ usage.get("stop_reason"),
)
# Track tokens
diff --git a/backend/app/pipeline/contracts.py b/backend/app/pipeline/contracts.py
index c3261e0..d674550 100644
--- a/backend/app/pipeline/contracts.py
+++ b/backend/app/pipeline/contracts.py
@@ -41,6 +41,11 @@ class FileManifest(BaseModel):
tov_supplement_file: str | None = None
locale_considerations_file: str | None = None
date_pct_formats_file: str | None = None
+ # Per-job user-uploaded supplementary files (paths on disk). Each
+ # entry's filename may begin with a locale code prefix to gate the
+ # file to that locale only (e.g. "de-DE_glossary.txt"); files without
+ # a recognised locale prefix apply to all locales.
+ supplementary_files: list[str] = []
class JobParams(BaseModel):
diff --git a/backend/app/services/file_service.py b/backend/app/services/file_service.py
index 51f071b..22a7fa9 100644
--- a/backend/app/services/file_service.py
+++ b/backend/app/services/file_service.py
@@ -25,6 +25,40 @@ class FileService:
path.parent.mkdir(parents=True, exist_ok=True)
return path
+ def _canonical_tm_filename(self, channel: str, locale_code: str) -> str:
+ """Return the canonical TM filename to use for (channel, locale).
+
+ Honours the existing on-disk casing if a file already exists for
+ this channel — that way an upload truly replaces the seeded file
+ rather than creating a sibling with different casing. Falls back
+ to the pipeline's hardcoded CHANNEL_FILE_MAP, then to the
+ user-supplied channel as-typed.
+ """
+ from app.pipeline.agents.agent_2_tm_retrieval import CHANNEL_FILE_MAP
+
+ lc_lower = locale_code.lower()
+ # 1) An existing file on disk for this channel (case-insensitive).
+ tm_dir = self.storage_root / "amazon" / "tm" / locale_code
+ if tm_dir.is_dir():
+ target_lc_suffix = f"_{lc_lower}.json"
+ channel_lower = channel.lower()
+ for existing in tm_dir.iterdir():
+ name = existing.name
+ if not name.lower().startswith("flat_") or not name.lower().endswith(target_lc_suffix):
+ continue
+ # Extract the channel segment from "flat__.json"
+ stem = name[len("flat_") : -len(target_lc_suffix)]
+ if stem.lower() == channel_lower:
+ return name # preserve existing casing
+
+ # 2) Hardcoded patterns for the original channels.
+ pattern = CHANNEL_FILE_MAP.get(channel.lower())
+ if pattern:
+ return pattern.format(lc=lc_lower)
+
+ # 3) New channel — use the user-typed casing.
+ return f"flat_{channel}_{lc_lower}.json"
+
async def upload_source_file(
self,
db: AsyncSession,
@@ -90,9 +124,30 @@ class FileService:
filename: str,
uploaded_by: UUID | None = None,
) -> TMFileRegistry:
- """Upload a TM file and create a registry entry."""
+ """Upload a TM file and register it.
+
+ Writes the file to the canonical path the pipeline reads from
+ (`/storage/amazon/tm/{locale_code}/flat_{channel}_{lc}.json`) so a
+ replacement upload always overrides the version used at runtime —
+ without this, the pipeline kept using the seeded original. The
+ filename is canonicalised to `flat_{channel}_{lc}.json` regardless
+ of what the user uploaded so replacements always match.
+
+ If a registry row already exists for this (client_id, locale_code,
+ channel) tuple, it's updated in place instead of inserting a
+ duplicate row.
+ """
+ # Canonical filename + path that the pipeline reads.
+ # Existing files on disk follow inconsistent channel casing
+ # (e.g. flat_MASS_de-de.json, flat_value_de-de.json,
+ # flat_Onsite_de-de.json). To replace one in place, we honour the
+ # casing already used for that channel — looked up first via the
+ # pipeline's CHANNEL_FILE_MAP, then by scanning the locale's TM
+ # directory for any matching filename. Falls back to the channel
+ # as the user typed it if neither yields a match.
+ canonical_filename = self._canonical_tm_filename(channel, locale_code)
file_path = self._resolve_path(
- "clients", str(client_id), "tm", locale_code, filename
+ "amazon", "tm", locale_code, canonical_filename
)
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
@@ -104,11 +159,37 @@ class FileService:
if line.strip():
segment_count += 1
+ # Upsert by (client_id, locale_code, channel). If a row exists
+ # (from prior upload OR seeder), update its fields rather than
+ # inserting a duplicate. Match on either filename to catch any
+ # legacy rows where casing differs.
+ from sqlalchemy import or_
+
+ existing_q = await db.execute(
+ select(TMFileRegistry).where(
+ TMFileRegistry.client_id == client_id,
+ TMFileRegistry.locale_code == locale_code,
+ or_(
+ TMFileRegistry.channel == channel,
+ TMFileRegistry.channel.ilike(channel),
+ ),
+ )
+ )
+ existing = existing_q.scalars().first()
+ if existing is not None:
+ existing.filename = canonical_filename
+ existing.file_path = str(file_path)
+ existing.segment_count = segment_count
+ if uploaded_by is not None:
+ existing.uploaded_by = uploaded_by
+ await db.flush()
+ return existing
+
tm_file = TMFileRegistry(
client_id=client_id,
locale_code=locale_code,
channel=channel,
- filename=filename,
+ filename=canonical_filename,
file_path=str(file_path),
segment_count=segment_count,
uploaded_by=uploaded_by,
diff --git a/backend/app/tasks/job_tasks.py b/backend/app/tasks/job_tasks.py
index 02082be..283987f 100644
--- a/backend/app/tasks/job_tasks.py
+++ b/backend/app/tasks/job_tasks.py
@@ -27,7 +27,7 @@ TM_CHANNEL_REGISTRY: dict[str, str] = {
def _resolve_file_manifest(
- locale_code: str, channel: str, client_id: str
+ locale_code: str, channel: str, client_id: str, job_id: str | None = None,
) -> dict:
"""Resolve all reference and TM file paths for a locale.
@@ -87,6 +87,20 @@ def _resolve_file_manifest(
f"{storage}/amazon/ref/tov_supplement/DE_AT_TOV_Guidelines.json"
)
+ # Per-job supplementary files. Each filename may carry a locale
+ # prefix (e.g. "de-DE_terms.txt") to restrict the file to that locale
+ # only; files without a recognised locale prefix are global.
+ supplementary_files: list[str] = []
+ if job_id:
+ supp_dir = os.path.join(storage, "jobs", str(job_id), "supplementary")
+ if os.path.isdir(supp_dir):
+ for fname in sorted(os.listdir(supp_dir)):
+ fpath = os.path.join(supp_dir, fname)
+ if not os.path.isfile(fpath):
+ continue
+ if _supplementary_applies_to_locale(fname, locale_code):
+ supplementary_files.append(fpath)
+
return {
"tm_files": tm_files,
"glossary_file": glossary,
@@ -95,9 +109,39 @@ def _resolve_file_manifest(
"tov_supplement_file": tov_supplement,
"locale_considerations_file": locale_considerations,
"date_pct_formats_file": date_pct,
+ "supplementary_files": supplementary_files,
}
+# All 12 supported locale codes (lowercased) — used to detect locale-
+# prefixed supplementary filenames.
+_LOCALE_CODES = {
+ "de-de", "de-at", "fr-fr", "fr-be", "it-it", "es-es",
+ "ca-es", "nl-nl", "nl-be", "sv-se", "pl-pl", "pt-pt",
+}
+
+
+def _supplementary_applies_to_locale(filename: str, locale_code: str) -> bool:
+ """Return True if a supplementary file applies to the given locale.
+
+ Locale gating is by filename prefix. Examples:
+ "de-DE_terms.txt" → only de-DE
+ "de_DE_terms.txt" → only de-DE (underscore variant)
+ "fr-FR_brief.docx" → only fr-FR
+ "global_glossary.txt" → all locales (no recognised prefix)
+ "campaign-brief.pdf" → all locales (no recognised prefix)
+ """
+ name = filename.lower()
+ target = locale_code.lower()
+ # Match either "de-de_..." or "de_de_..." prefixes
+ for code in _LOCALE_CODES:
+ underscored = code.replace("-", "_")
+ if name.startswith(code + "_") or name.startswith(underscored + "_"):
+ # File is locale-gated; include only if it matches.
+ return code == target
+ return True # No locale prefix → global, applies to all locales
+
+
def _get_async_session_factory() -> async_sessionmaker[AsyncSession]:
"""Create a fresh async session factory for use in Celery tasks."""
engine = create_async_engine(settings.DATABASE_URL, pool_pre_ping=True)
@@ -244,7 +288,7 @@ def process_locale_instance(self, job_id: str, locale_code: str) -> dict:
# Resolve file manifest for this locale
file_manifest = _resolve_file_manifest(
- locale_code, job.channel, str(job.client_id)
+ locale_code, job.channel, str(job.client_id), job_id=str(job_id)
)
# Run pipeline with progress callback
diff --git a/frontend/src/app/admin/files/tm/page.tsx b/frontend/src/app/admin/files/tm/page.tsx
index 7877092..bc65b27 100644
--- a/frontend/src/app/admin/files/tm/page.tsx
+++ b/frontend/src/app/admin/files/tm/page.tsx
@@ -233,16 +233,20 @@ export default function TMRegistryPage() {