Round 2.5 feedback: TM replacements take effect, supplementary files reach LLM, larger briefs fit, free-text channel uploads

TM upload-replacement bug (critical):
- Uploads were writing to /storage/clients/<uuid>/tm/... but the pipeline
  reads from /storage/amazon/tm/... — replacements were silently ignored
- upload_tm_file now writes to the canonical pipeline path
  /storage/amazon/tm/<locale>/flat_<channel>_<lc>.json (overwrites in place)
- Filename casing is preserved when an existing file is being replaced
  (the on-disk seeded files use mixed casing: flat_MASS, flat_value,
  flat_PrimeSpeed); falls back to CHANNEL_FILE_MAP, then user-typed case
- Registry upsert by (client_id, locale_code, channel): replaces row in
  place rather than inserting duplicates
- Verified: replacement file at canonical path, registry COUNT=1, no dupes

Supplementary files now reach the LLM (critical):
- New supplementary_files field on FileManifest
- _resolve_file_manifest scans /storage/jobs/<job_id>/supplementary/ and
  populates the manifest, with per-locale gating by filename prefix
  (e.g. de-DE_glossary.txt only goes to de-DE; global_brief.txt goes to all)
- _format_supplementary_for_prompt reads each file (.txt/.md/.json/.csv/.tsv
  /.docx) and inlines its text into the LLM user message under a
  "## SUPPLEMENTARY MATERIAL" header, capped at 40k chars per file
- .docx files are extracted via inline zipfile read (no new dependency)

New job wizard:
- Per-supplementary-file locale dropdown ("Global" or one of 12 locales)
- Filename gets prefixed with the locale on upload (de-DE_brief.docx)

Admin TM upload:
- Channel field is now a free-text input with autocomplete suggestions
  (datalist of known channels) — lets users add brand-new channels like
  PrimeCBM that didn't exist before

Pipeline scaling:
- Bumped dynamic max_tokens tiers: 80+ lines now gets 64k output budget
  (was 32k); 132-line briefs no longer truncate. Sonnet 4.6 caps at 64k
- Added stop_reason logging — "max_tokens" stop now shows up in logs
  loud and clear rather than silently truncating

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
DJP 2026-05-05 14:28:20 -04:00
parent 9825b0497c
commit d3f6a57386
9 changed files with 334 additions and 31 deletions

View file

@ -95,12 +95,21 @@ class LLMClient:
+ output_tokens * output_rate
)
stop_reason = getattr(response, "stop_reason", None)
if stop_reason == "max_tokens":
logger.warning(
"LLM hit max_tokens (%d) — response was truncated. "
"Consider raising max_tokens or batching the input.",
max_tokens,
)
usage = {
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"total_tokens": total_tokens,
"estimated_cost_usd": round(estimated_cost, 6),
"model": self.model,
"stop_reason": stop_reason,
}
self.last_usage = usage

View file

@ -157,6 +157,88 @@ def _format_ref_data_for_prompt(ref_data: dict[str, Any]) -> str:
return "\n".join(sections)
# Per-supplementary-file size cap. Big enough to fit a typical reference
# brief or glossary; truncated if larger to protect the input budget.
_SUPP_MAX_CHARS_PER_FILE = 40_000
def _read_docx_text(path: str) -> str:
"""Extract plain text from a .docx file.
Avoids adding python-docx as a dependency by reading the underlying
XML directly: a .docx is a zip archive whose document body lives at
word/document.xml. Each paragraph (`<w:p>`) is converted to a line.
"""
import re
import zipfile
try:
with zipfile.ZipFile(path) as z:
with z.open("word/document.xml") as fh:
xml = fh.read().decode("utf-8", errors="replace")
except Exception as exc:
logger.warning("Could not unzip .docx %s: %s", path, exc)
return ""
xml = re.sub(r"</w:p>", "\n", xml)
xml = re.sub(r"<w:tab/>", "\t", xml)
xml = re.sub(r"<w:br/>", "\n", xml)
text = re.sub(r"<[^>]+>", "", xml)
# Decode common XML entities
text = (
text.replace("&amp;", "&")
.replace("&lt;", "<")
.replace("&gt;", ">")
.replace("&quot;", '"')
.replace("&apos;", "'")
)
lines = [re.sub(r"[ \t]+", " ", ln).strip() for ln in text.split("\n")]
return "\n".join(ln for ln in lines if ln)
def _format_supplementary_for_prompt(file_paths: list[str]) -> str:
"""Read each supplementary file and inline its text into the prompt.
Supports text-based formats (.txt, .md, .json, .csv, .tsv) and Word
documents (.docx) for .docx we extract the plain text. Binary
formats we can't read (e.g. .pdf without a parser, images) are noted
by filename only so the agent at least knows they were attached.
"""
if not file_paths:
return ""
sections: list[str] = ["## SUPPLEMENTARY MATERIAL", ""]
for fpath in file_paths:
fname = os.path.basename(fpath)
ext = os.path.splitext(fname)[1].lower()
sections.append(f"### File: {fname}")
try:
if ext in (".txt", ".md", ".json", ".csv", ".tsv", ".jsonl"):
with open(fpath, "r", encoding="utf-8", errors="replace") as f:
text = f.read()
elif ext == ".docx":
text = _read_docx_text(fpath)
else:
text = ""
sections.append(
f"(Binary or unsupported format — filename only attached.)"
)
except Exception as exc: # pragma: no cover - defensive
logger.warning("Could not read supplementary %s: %s", fpath, exc)
text = ""
if text:
if len(text) > _SUPP_MAX_CHARS_PER_FILE:
text = (
text[:_SUPP_MAX_CHARS_PER_FILE]
+ f"\n[...truncated at {_SUPP_MAX_CHARS_PER_FILE} chars]"
)
sections.append(text)
sections.append("")
return "\n".join(sections)
# ---------------------------------------------------------------------------
# Markdown table parser
# ---------------------------------------------------------------------------
@ -428,6 +510,13 @@ class AgentSingle(BaseAgent):
if ref_data:
parts.append(_format_ref_data_for_prompt(ref_data))
# Supplementary files attached to the job (locale-gated by filename
# prefix in _resolve_file_manifest).
supp_paths = list(getattr(context.file_manifest, "supplementary_files", []) or [])
if supp_paths:
logger.info("Including %d supplementary files in prompt", len(supp_paths))
parts.append(_format_supplementary_for_prompt(supp_paths))
user_message = "\n".join(parts)
# ── Call LLM ─────────────────────────────────────────────────
@ -439,17 +528,19 @@ class AgentSingle(BaseAgent):
len(system_prompt), len(user_message),
)
# Scale max_tokens with source line count. Empirically each output
# row consumes ~250 output tokens (option text + BT + rationale x3
# + table padding). 64k is the Sonnet 4.6 cap; 32k covers ~120 rows
# with comfortable headroom; 16k was the Round 1 default and was
# truncating large briefs (172 lines → 65 rows).
# Scale max_tokens with source line count. In practice each output
# row averages closer to ~400 output tokens (option text + BT +
# rationale x3 + the LLM's pre-table padding). The 32k tier was
# truncating real 132-line briefs, so we now budget 64k for any
# job past ~80 rows. Sonnet 4.6 caps at 64k; if a brief is bigger
# than that headroom allows (~160 rows with margin) we will need
# to introduce real source-line batching with prompt caching.
line_count = len(context.source_lines)
if line_count <= 30:
if line_count <= 25:
max_tokens = 8192
elif line_count <= 80:
elif line_count <= 60:
max_tokens = 16384
elif line_count <= 150:
elif line_count <= 80:
max_tokens = 32768
else:
max_tokens = 64000
@ -467,10 +558,11 @@ class AgentSingle(BaseAgent):
)
logger.info(
"LLM response: %d chars, input_tokens=%d, output_tokens=%d",
"LLM response: %d chars, input_tokens=%d, output_tokens=%d, stop_reason=%s",
len(response_text),
usage.get("input_tokens", 0),
usage.get("output_tokens", 0),
usage.get("stop_reason"),
)
# Track tokens

View file

@ -41,6 +41,11 @@ class FileManifest(BaseModel):
tov_supplement_file: str | None = None
locale_considerations_file: str | None = None
date_pct_formats_file: str | None = None
# Per-job user-uploaded supplementary files (paths on disk). Each
# entry's filename may begin with a locale code prefix to gate the
# file to that locale only (e.g. "de-DE_glossary.txt"); files without
# a recognised locale prefix apply to all locales.
supplementary_files: list[str] = []
class JobParams(BaseModel):

View file

@ -25,6 +25,40 @@ class FileService:
path.parent.mkdir(parents=True, exist_ok=True)
return path
def _canonical_tm_filename(self, channel: str, locale_code: str) -> str:
"""Return the canonical TM filename to use for (channel, locale).
Honours the existing on-disk casing if a file already exists for
this channel that way an upload truly replaces the seeded file
rather than creating a sibling with different casing. Falls back
to the pipeline's hardcoded CHANNEL_FILE_MAP, then to the
user-supplied channel as-typed.
"""
from app.pipeline.agents.agent_2_tm_retrieval import CHANNEL_FILE_MAP
lc_lower = locale_code.lower()
# 1) An existing file on disk for this channel (case-insensitive).
tm_dir = self.storage_root / "amazon" / "tm" / locale_code
if tm_dir.is_dir():
target_lc_suffix = f"_{lc_lower}.json"
channel_lower = channel.lower()
for existing in tm_dir.iterdir():
name = existing.name
if not name.lower().startswith("flat_") or not name.lower().endswith(target_lc_suffix):
continue
# Extract the channel segment from "flat_<Channel>_<lc>.json"
stem = name[len("flat_") : -len(target_lc_suffix)]
if stem.lower() == channel_lower:
return name # preserve existing casing
# 2) Hardcoded patterns for the original channels.
pattern = CHANNEL_FILE_MAP.get(channel.lower())
if pattern:
return pattern.format(lc=lc_lower)
# 3) New channel — use the user-typed casing.
return f"flat_{channel}_{lc_lower}.json"
async def upload_source_file(
self,
db: AsyncSession,
@ -90,9 +124,30 @@ class FileService:
filename: str,
uploaded_by: UUID | None = None,
) -> TMFileRegistry:
"""Upload a TM file and create a registry entry."""
"""Upload a TM file and register it.
Writes the file to the canonical path the pipeline reads from
(`/storage/amazon/tm/{locale_code}/flat_{channel}_{lc}.json`) so a
replacement upload always overrides the version used at runtime
without this, the pipeline kept using the seeded original. The
filename is canonicalised to `flat_{channel}_{lc}.json` regardless
of what the user uploaded so replacements always match.
If a registry row already exists for this (client_id, locale_code,
channel) tuple, it's updated in place instead of inserting a
duplicate row.
"""
# Canonical filename + path that the pipeline reads.
# Existing files on disk follow inconsistent channel casing
# (e.g. flat_MASS_de-de.json, flat_value_de-de.json,
# flat_Onsite_de-de.json). To replace one in place, we honour the
# casing already used for that channel — looked up first via the
# pipeline's CHANNEL_FILE_MAP, then by scanning the locale's TM
# directory for any matching filename. Falls back to the channel
# as the user typed it if neither yields a match.
canonical_filename = self._canonical_tm_filename(channel, locale_code)
file_path = self._resolve_path(
"clients", str(client_id), "tm", locale_code, filename
"amazon", "tm", locale_code, canonical_filename
)
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
@ -104,11 +159,37 @@ class FileService:
if line.strip():
segment_count += 1
# Upsert by (client_id, locale_code, channel). If a row exists
# (from prior upload OR seeder), update its fields rather than
# inserting a duplicate. Match on either filename to catch any
# legacy rows where casing differs.
from sqlalchemy import or_
existing_q = await db.execute(
select(TMFileRegistry).where(
TMFileRegistry.client_id == client_id,
TMFileRegistry.locale_code == locale_code,
or_(
TMFileRegistry.channel == channel,
TMFileRegistry.channel.ilike(channel),
),
)
)
existing = existing_q.scalars().first()
if existing is not None:
existing.filename = canonical_filename
existing.file_path = str(file_path)
existing.segment_count = segment_count
if uploaded_by is not None:
existing.uploaded_by = uploaded_by
await db.flush()
return existing
tm_file = TMFileRegistry(
client_id=client_id,
locale_code=locale_code,
channel=channel,
filename=filename,
filename=canonical_filename,
file_path=str(file_path),
segment_count=segment_count,
uploaded_by=uploaded_by,

View file

@ -27,7 +27,7 @@ TM_CHANNEL_REGISTRY: dict[str, str] = {
def _resolve_file_manifest(
locale_code: str, channel: str, client_id: str
locale_code: str, channel: str, client_id: str, job_id: str | None = None,
) -> dict:
"""Resolve all reference and TM file paths for a locale.
@ -87,6 +87,20 @@ def _resolve_file_manifest(
f"{storage}/amazon/ref/tov_supplement/DE_AT_TOV_Guidelines.json"
)
# Per-job supplementary files. Each filename may carry a locale
# prefix (e.g. "de-DE_terms.txt") to restrict the file to that locale
# only; files without a recognised locale prefix are global.
supplementary_files: list[str] = []
if job_id:
supp_dir = os.path.join(storage, "jobs", str(job_id), "supplementary")
if os.path.isdir(supp_dir):
for fname in sorted(os.listdir(supp_dir)):
fpath = os.path.join(supp_dir, fname)
if not os.path.isfile(fpath):
continue
if _supplementary_applies_to_locale(fname, locale_code):
supplementary_files.append(fpath)
return {
"tm_files": tm_files,
"glossary_file": glossary,
@ -95,9 +109,39 @@ def _resolve_file_manifest(
"tov_supplement_file": tov_supplement,
"locale_considerations_file": locale_considerations,
"date_pct_formats_file": date_pct,
"supplementary_files": supplementary_files,
}
# All 12 supported locale codes (lowercased) — used to detect locale-
# prefixed supplementary filenames.
_LOCALE_CODES = {
"de-de", "de-at", "fr-fr", "fr-be", "it-it", "es-es",
"ca-es", "nl-nl", "nl-be", "sv-se", "pl-pl", "pt-pt",
}
def _supplementary_applies_to_locale(filename: str, locale_code: str) -> bool:
"""Return True if a supplementary file applies to the given locale.
Locale gating is by filename prefix. Examples:
"de-DE_terms.txt" only de-DE
"de_DE_terms.txt" only de-DE (underscore variant)
"fr-FR_brief.docx" only fr-FR
"global_glossary.txt" all locales (no recognised prefix)
"campaign-brief.pdf" all locales (no recognised prefix)
"""
name = filename.lower()
target = locale_code.lower()
# Match either "de-de_..." or "de_de_..." prefixes
for code in _LOCALE_CODES:
underscored = code.replace("-", "_")
if name.startswith(code + "_") or name.startswith(underscored + "_"):
# File is locale-gated; include only if it matches.
return code == target
return True # No locale prefix → global, applies to all locales
def _get_async_session_factory() -> async_sessionmaker[AsyncSession]:
"""Create a fresh async session factory for use in Celery tasks."""
engine = create_async_engine(settings.DATABASE_URL, pool_pre_ping=True)
@ -244,7 +288,7 @@ def process_locale_instance(self, job_id: str, locale_code: str) -> dict:
# Resolve file manifest for this locale
file_manifest = _resolve_file_manifest(
locale_code, job.channel, str(job.client_id)
locale_code, job.channel, str(job.client_id), job_id=str(job_id)
)
# Run pipeline with progress callback

View file

@ -233,16 +233,20 @@ export default function TMRegistryPage() {
</div>
<div className="space-y-2">
<Label>Channel</Label>
<Select value={uploadChannel} onValueChange={setUploadChannel}>
<SelectTrigger>
<SelectValue placeholder="Select channel" />
</SelectTrigger>
<SelectContent>
{CHANNELS.map((ch) => (
<SelectItem key={ch} value={ch}>{ch}</SelectItem>
))}
</SelectContent>
</Select>
<Input
list="tm-channel-suggestions"
value={uploadChannel}
onChange={(e) => setUploadChannel(e.target.value)}
placeholder="e.g. PrimeCBM (or pick from list)"
/>
<datalist id="tm-channel-suggestions">
{CHANNELS.map((ch) => (
<option key={ch} value={ch} />
))}
</datalist>
<p className="text-xs text-gray-400">
Type a new channel name to add it, or pick an existing one.
</p>
</div>
<Button
onClick={handleUpload}

View file

@ -20,6 +20,10 @@ export interface JobFormData {
locales: string[];
source_file: File | null;
supplementary_files: File[];
// Parallel array: locale code each file is gated to, or empty string
// for "global" (applies to all locales). Always the same length as
// supplementary_files; index lines up.
supplementary_locales: string[];
context_override: string;
}
@ -36,6 +40,7 @@ const initialFormData: JobFormData = {
locales: [],
source_file: null,
supplementary_files: [],
supplementary_locales: [],
context_override: "",
};

View file

@ -16,7 +16,7 @@ import {
ChevronDown,
Rocket,
FileSpreadsheet,
File,
File as FileIcon,
AlertTriangle,
AlertCircle,
Globe,
@ -64,10 +64,23 @@ export function StepReview({ data, onBack }: StepReviewProps) {
await uploadSource(job.id, data.source_file);
}
// 3. Upload supplementary files
// 3. Upload supplementary files. Prefix the filename with the
// selected locale so the backend can gate the file to that
// locale only (e.g. de-DE_brief.docx). Empty locale = global.
for (let i = 0; i < data.supplementary_files.length; i++) {
setLaunchStatus(`Uploading supplementary file ${i + 1}/${data.supplementary_files.length}...`);
await uploadSupplementary(job.id, data.supplementary_files[i]);
const original = data.supplementary_files[i];
const locale = data.supplementary_locales[i] || "";
// Use the global File constructor (the imported `File` symbol
// from lucide-react is the icon component).
const upload = locale
? new (window as unknown as { File: typeof File }).File(
[original],
`${locale}_${original.name}`,
{ type: original.type, lastModified: original.lastModified }
)
: original;
await uploadSupplementary(job.id, upload);
}
// 4. Launch the job
@ -225,7 +238,7 @@ export function StepReview({ data, onBack }: StepReviewProps) {
<Card>
<CardHeader>
<CardTitle className="flex items-center gap-2">
<File className="h-5 w-5 text-amazon-teal" />
<FileIcon className="h-5 w-5 text-amazon-teal" />
Supplementary Files ({data.supplementary_files.length})
</CardTitle>
</CardHeader>
@ -236,7 +249,7 @@ export function StepReview({ data, onBack }: StepReviewProps) {
key={i}
className="flex items-center gap-3 p-2 bg-gray-50 rounded"
>
<File className="h-4 w-4 text-gray-400" />
<FileIcon className="h-4 w-4 text-gray-400" />
<span className="text-sm">{file.name}</span>
<span className="text-xs text-gray-400">
{(file.size / 1024).toFixed(1)} KB

View file

@ -5,7 +5,20 @@ import { Button } from "@/components/ui/button";
import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card";
import { Badge } from "@/components/ui/badge";
import { Textarea } from "@/components/ui/textarea";
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import type { JobFormData } from "@/app/jobs/new/page";
// Locale options for supplementary file gating. "" = global.
const ALL_LOCALES = [
"de-DE", "fr-FR", "it-IT", "es-ES", "nl-NL",
"sv-SE", "pl-PL", "pt-PT", "de-AT", "fr-BE", "nl-BE", "ca-ES",
];
import type { ValidationResult } from "@/lib/types";
import { validateSource } from "@/lib/api";
import {
@ -77,9 +90,17 @@ export function StepUpload({ data, onChange, onBack, onNext }: StepUploadProps)
const handleSupplementarySelect = useCallback(
(e: React.ChangeEvent<HTMLInputElement>) => {
const files = Array.from(e.target.files || []);
onChange({ supplementary_files: [...data.supplementary_files, ...files] });
onChange({
supplementary_files: [...data.supplementary_files, ...files],
// New uploads default to "global" (empty string); user can
// change per-file via the locale dropdown.
supplementary_locales: [
...data.supplementary_locales,
...files.map(() => ""),
],
});
},
[data.supplementary_files, onChange]
[data.supplementary_files, data.supplementary_locales, onChange]
);
const removeSupplementary = (index: number) => {
@ -87,9 +108,18 @@ export function StepUpload({ data, onChange, onBack, onNext }: StepUploadProps)
supplementary_files: data.supplementary_files.filter(
(_, i) => i !== index
),
supplementary_locales: data.supplementary_locales.filter(
(_, i) => i !== index
),
});
};
const setSupplementaryLocale = (index: number, locale: string) => {
const next = [...data.supplementary_locales];
next[index] = locale;
onChange({ supplementary_locales: next });
};
const clearSource = () => {
onChange({ source_file: null });
setValidation(null);
@ -297,6 +327,10 @@ export function StepUpload({ data, onChange, onBack, onNext }: StepUploadProps)
{data.supplementary_files.length > 0 && (
<div className="space-y-2">
<p className="text-xs text-gray-500">
Pick a locale per file to gate it (saves tokens). Default is
<strong> Global</strong> sent to every locale.
</p>
{data.supplementary_files.map((file, i) => (
<div
key={i}
@ -304,6 +338,22 @@ export function StepUpload({ data, onChange, onBack, onNext }: StepUploadProps)
>
<File className="h-4 w-4 text-gray-400" />
<span className="text-sm flex-1 truncate">{file.name}</span>
<Select
value={data.supplementary_locales[i] || "__global__"}
onValueChange={(val) =>
setSupplementaryLocale(i, val === "__global__" ? "" : val)
}
>
<SelectTrigger className="w-[120px] h-8 text-xs">
<SelectValue />
</SelectTrigger>
<SelectContent>
<SelectItem value="__global__">Global</SelectItem>
{ALL_LOCALES.map((lc) => (
<SelectItem key={lc} value={lc}>{lc}</SelectItem>
))}
</SelectContent>
</Select>
<span className="text-xs text-gray-400">
{(file.size / 1024).toFixed(1)} KB
</span>