From d3f6a57386d9df8cf8c94e997c472d7ffb6d7b61 Mon Sep 17 00:00:00 2001 From: DJP Date: Tue, 5 May 2026 14:28:20 -0400 Subject: [PATCH] Round 2.5 feedback: TM replacements take effect, supplementary files reach LLM, larger briefs fit, free-text channel uploads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit TM upload-replacement bug (critical): - Uploads were writing to /storage/clients//tm/... but the pipeline reads from /storage/amazon/tm/... — replacements were silently ignored - upload_tm_file now writes to the canonical pipeline path /storage/amazon/tm//flat__.json (overwrites in place) - Filename casing is preserved when an existing file is being replaced (the on-disk seeded files use mixed casing: flat_MASS, flat_value, flat_PrimeSpeed); falls back to CHANNEL_FILE_MAP, then user-typed case - Registry upsert by (client_id, locale_code, channel): replaces row in place rather than inserting duplicates - Verified: replacement file at canonical path, registry COUNT=1, no dupes Supplementary files now reach the LLM (critical): - New supplementary_files field on FileManifest - _resolve_file_manifest scans /storage/jobs//supplementary/ and populates the manifest, with per-locale gating by filename prefix (e.g. de-DE_glossary.txt only goes to de-DE; global_brief.txt goes to all) - _format_supplementary_for_prompt reads each file (.txt/.md/.json/.csv/.tsv /.docx) and inlines its text into the LLM user message under a "## SUPPLEMENTARY MATERIAL" header, capped at 40k chars per file - .docx files are extracted via inline zipfile read (no new dependency) New job wizard: - Per-supplementary-file locale dropdown ("Global" or one of 12 locales) - Filename gets prefixed with the locale on upload (de-DE_brief.docx) Admin TM upload: - Channel field is now a free-text input with autocomplete suggestions (datalist of known channels) — lets users add brand-new channels like PrimeCBM that didn't exist before Pipeline scaling: - Bumped dynamic max_tokens tiers: 80+ lines now gets 64k output budget (was 32k); 132-line briefs no longer truncate. Sonnet 4.6 caps at 64k - Added stop_reason logging — "max_tokens" stop now shows up in logs loud and clear rather than silently truncating Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/app/llm/client.py | 9 ++ backend/app/pipeline/agents/agent_single.py | 110 ++++++++++++++++-- backend/app/pipeline/contracts.py | 5 + backend/app/services/file_service.py | 87 +++++++++++++- backend/app/tasks/job_tasks.py | 48 +++++++- frontend/src/app/admin/files/tm/page.tsx | 24 ++-- frontend/src/app/jobs/new/page.tsx | 5 + .../components/jobs/JobWizard/StepReview.tsx | 23 +++- .../components/jobs/JobWizard/StepUpload.tsx | 54 ++++++++- 9 files changed, 334 insertions(+), 31 deletions(-) diff --git a/backend/app/llm/client.py b/backend/app/llm/client.py index 1ef687b..4ec2bf3 100644 --- a/backend/app/llm/client.py +++ b/backend/app/llm/client.py @@ -95,12 +95,21 @@ class LLMClient: + output_tokens * output_rate ) + stop_reason = getattr(response, "stop_reason", None) + if stop_reason == "max_tokens": + logger.warning( + "LLM hit max_tokens (%d) — response was truncated. " + "Consider raising max_tokens or batching the input.", + max_tokens, + ) + usage = { "input_tokens": input_tokens, "output_tokens": output_tokens, "total_tokens": total_tokens, "estimated_cost_usd": round(estimated_cost, 6), "model": self.model, + "stop_reason": stop_reason, } self.last_usage = usage diff --git a/backend/app/pipeline/agents/agent_single.py b/backend/app/pipeline/agents/agent_single.py index 69675ef..a478500 100644 --- a/backend/app/pipeline/agents/agent_single.py +++ b/backend/app/pipeline/agents/agent_single.py @@ -157,6 +157,88 @@ def _format_ref_data_for_prompt(ref_data: dict[str, Any]) -> str: return "\n".join(sections) +# Per-supplementary-file size cap. Big enough to fit a typical reference +# brief or glossary; truncated if larger to protect the input budget. +_SUPP_MAX_CHARS_PER_FILE = 40_000 + + +def _read_docx_text(path: str) -> str: + """Extract plain text from a .docx file. + + Avoids adding python-docx as a dependency by reading the underlying + XML directly: a .docx is a zip archive whose document body lives at + word/document.xml. Each paragraph (``) is converted to a line. + """ + import re + import zipfile + + try: + with zipfile.ZipFile(path) as z: + with z.open("word/document.xml") as fh: + xml = fh.read().decode("utf-8", errors="replace") + except Exception as exc: + logger.warning("Could not unzip .docx %s: %s", path, exc) + return "" + + xml = re.sub(r"", "\n", xml) + xml = re.sub(r"", "\t", xml) + xml = re.sub(r"", "\n", xml) + text = re.sub(r"<[^>]+>", "", xml) + # Decode common XML entities + text = ( + text.replace("&", "&") + .replace("<", "<") + .replace(">", ">") + .replace(""", '"') + .replace("'", "'") + ) + lines = [re.sub(r"[ \t]+", " ", ln).strip() for ln in text.split("\n")] + return "\n".join(ln for ln in lines if ln) + + +def _format_supplementary_for_prompt(file_paths: list[str]) -> str: + """Read each supplementary file and inline its text into the prompt. + + Supports text-based formats (.txt, .md, .json, .csv, .tsv) and Word + documents (.docx) — for .docx we extract the plain text. Binary + formats we can't read (e.g. .pdf without a parser, images) are noted + by filename only so the agent at least knows they were attached. + """ + if not file_paths: + return "" + + sections: list[str] = ["## SUPPLEMENTARY MATERIAL", ""] + for fpath in file_paths: + fname = os.path.basename(fpath) + ext = os.path.splitext(fname)[1].lower() + sections.append(f"### File: {fname}") + try: + if ext in (".txt", ".md", ".json", ".csv", ".tsv", ".jsonl"): + with open(fpath, "r", encoding="utf-8", errors="replace") as f: + text = f.read() + elif ext == ".docx": + text = _read_docx_text(fpath) + else: + text = "" + sections.append( + f"(Binary or unsupported format — filename only attached.)" + ) + except Exception as exc: # pragma: no cover - defensive + logger.warning("Could not read supplementary %s: %s", fpath, exc) + text = "" + + if text: + if len(text) > _SUPP_MAX_CHARS_PER_FILE: + text = ( + text[:_SUPP_MAX_CHARS_PER_FILE] + + f"\n[...truncated at {_SUPP_MAX_CHARS_PER_FILE} chars]" + ) + sections.append(text) + sections.append("") + + return "\n".join(sections) + + # --------------------------------------------------------------------------- # Markdown table parser # --------------------------------------------------------------------------- @@ -428,6 +510,13 @@ class AgentSingle(BaseAgent): if ref_data: parts.append(_format_ref_data_for_prompt(ref_data)) + # Supplementary files attached to the job (locale-gated by filename + # prefix in _resolve_file_manifest). + supp_paths = list(getattr(context.file_manifest, "supplementary_files", []) or []) + if supp_paths: + logger.info("Including %d supplementary files in prompt", len(supp_paths)) + parts.append(_format_supplementary_for_prompt(supp_paths)) + user_message = "\n".join(parts) # ── Call LLM ───────────────────────────────────────────────── @@ -439,17 +528,19 @@ class AgentSingle(BaseAgent): len(system_prompt), len(user_message), ) - # Scale max_tokens with source line count. Empirically each output - # row consumes ~250 output tokens (option text + BT + rationale x3 - # + table padding). 64k is the Sonnet 4.6 cap; 32k covers ~120 rows - # with comfortable headroom; 16k was the Round 1 default and was - # truncating large briefs (172 lines → 65 rows). + # Scale max_tokens with source line count. In practice each output + # row averages closer to ~400 output tokens (option text + BT + + # rationale x3 + the LLM's pre-table padding). The 32k tier was + # truncating real 132-line briefs, so we now budget 64k for any + # job past ~80 rows. Sonnet 4.6 caps at 64k; if a brief is bigger + # than that headroom allows (~160 rows with margin) we will need + # to introduce real source-line batching with prompt caching. line_count = len(context.source_lines) - if line_count <= 30: + if line_count <= 25: max_tokens = 8192 - elif line_count <= 80: + elif line_count <= 60: max_tokens = 16384 - elif line_count <= 150: + elif line_count <= 80: max_tokens = 32768 else: max_tokens = 64000 @@ -467,10 +558,11 @@ class AgentSingle(BaseAgent): ) logger.info( - "LLM response: %d chars, input_tokens=%d, output_tokens=%d", + "LLM response: %d chars, input_tokens=%d, output_tokens=%d, stop_reason=%s", len(response_text), usage.get("input_tokens", 0), usage.get("output_tokens", 0), + usage.get("stop_reason"), ) # Track tokens diff --git a/backend/app/pipeline/contracts.py b/backend/app/pipeline/contracts.py index c3261e0..d674550 100644 --- a/backend/app/pipeline/contracts.py +++ b/backend/app/pipeline/contracts.py @@ -41,6 +41,11 @@ class FileManifest(BaseModel): tov_supplement_file: str | None = None locale_considerations_file: str | None = None date_pct_formats_file: str | None = None + # Per-job user-uploaded supplementary files (paths on disk). Each + # entry's filename may begin with a locale code prefix to gate the + # file to that locale only (e.g. "de-DE_glossary.txt"); files without + # a recognised locale prefix apply to all locales. + supplementary_files: list[str] = [] class JobParams(BaseModel): diff --git a/backend/app/services/file_service.py b/backend/app/services/file_service.py index 51f071b..22a7fa9 100644 --- a/backend/app/services/file_service.py +++ b/backend/app/services/file_service.py @@ -25,6 +25,40 @@ class FileService: path.parent.mkdir(parents=True, exist_ok=True) return path + def _canonical_tm_filename(self, channel: str, locale_code: str) -> str: + """Return the canonical TM filename to use for (channel, locale). + + Honours the existing on-disk casing if a file already exists for + this channel — that way an upload truly replaces the seeded file + rather than creating a sibling with different casing. Falls back + to the pipeline's hardcoded CHANNEL_FILE_MAP, then to the + user-supplied channel as-typed. + """ + from app.pipeline.agents.agent_2_tm_retrieval import CHANNEL_FILE_MAP + + lc_lower = locale_code.lower() + # 1) An existing file on disk for this channel (case-insensitive). + tm_dir = self.storage_root / "amazon" / "tm" / locale_code + if tm_dir.is_dir(): + target_lc_suffix = f"_{lc_lower}.json" + channel_lower = channel.lower() + for existing in tm_dir.iterdir(): + name = existing.name + if not name.lower().startswith("flat_") or not name.lower().endswith(target_lc_suffix): + continue + # Extract the channel segment from "flat__.json" + stem = name[len("flat_") : -len(target_lc_suffix)] + if stem.lower() == channel_lower: + return name # preserve existing casing + + # 2) Hardcoded patterns for the original channels. + pattern = CHANNEL_FILE_MAP.get(channel.lower()) + if pattern: + return pattern.format(lc=lc_lower) + + # 3) New channel — use the user-typed casing. + return f"flat_{channel}_{lc_lower}.json" + async def upload_source_file( self, db: AsyncSession, @@ -90,9 +124,30 @@ class FileService: filename: str, uploaded_by: UUID | None = None, ) -> TMFileRegistry: - """Upload a TM file and create a registry entry.""" + """Upload a TM file and register it. + + Writes the file to the canonical path the pipeline reads from + (`/storage/amazon/tm/{locale_code}/flat_{channel}_{lc}.json`) so a + replacement upload always overrides the version used at runtime — + without this, the pipeline kept using the seeded original. The + filename is canonicalised to `flat_{channel}_{lc}.json` regardless + of what the user uploaded so replacements always match. + + If a registry row already exists for this (client_id, locale_code, + channel) tuple, it's updated in place instead of inserting a + duplicate row. + """ + # Canonical filename + path that the pipeline reads. + # Existing files on disk follow inconsistent channel casing + # (e.g. flat_MASS_de-de.json, flat_value_de-de.json, + # flat_Onsite_de-de.json). To replace one in place, we honour the + # casing already used for that channel — looked up first via the + # pipeline's CHANNEL_FILE_MAP, then by scanning the locale's TM + # directory for any matching filename. Falls back to the channel + # as the user typed it if neither yields a match. + canonical_filename = self._canonical_tm_filename(channel, locale_code) file_path = self._resolve_path( - "clients", str(client_id), "tm", locale_code, filename + "amazon", "tm", locale_code, canonical_filename ) with open(file_path, "wb") as f: shutil.copyfileobj(file, f) @@ -104,11 +159,37 @@ class FileService: if line.strip(): segment_count += 1 + # Upsert by (client_id, locale_code, channel). If a row exists + # (from prior upload OR seeder), update its fields rather than + # inserting a duplicate. Match on either filename to catch any + # legacy rows where casing differs. + from sqlalchemy import or_ + + existing_q = await db.execute( + select(TMFileRegistry).where( + TMFileRegistry.client_id == client_id, + TMFileRegistry.locale_code == locale_code, + or_( + TMFileRegistry.channel == channel, + TMFileRegistry.channel.ilike(channel), + ), + ) + ) + existing = existing_q.scalars().first() + if existing is not None: + existing.filename = canonical_filename + existing.file_path = str(file_path) + existing.segment_count = segment_count + if uploaded_by is not None: + existing.uploaded_by = uploaded_by + await db.flush() + return existing + tm_file = TMFileRegistry( client_id=client_id, locale_code=locale_code, channel=channel, - filename=filename, + filename=canonical_filename, file_path=str(file_path), segment_count=segment_count, uploaded_by=uploaded_by, diff --git a/backend/app/tasks/job_tasks.py b/backend/app/tasks/job_tasks.py index 02082be..283987f 100644 --- a/backend/app/tasks/job_tasks.py +++ b/backend/app/tasks/job_tasks.py @@ -27,7 +27,7 @@ TM_CHANNEL_REGISTRY: dict[str, str] = { def _resolve_file_manifest( - locale_code: str, channel: str, client_id: str + locale_code: str, channel: str, client_id: str, job_id: str | None = None, ) -> dict: """Resolve all reference and TM file paths for a locale. @@ -87,6 +87,20 @@ def _resolve_file_manifest( f"{storage}/amazon/ref/tov_supplement/DE_AT_TOV_Guidelines.json" ) + # Per-job supplementary files. Each filename may carry a locale + # prefix (e.g. "de-DE_terms.txt") to restrict the file to that locale + # only; files without a recognised locale prefix are global. + supplementary_files: list[str] = [] + if job_id: + supp_dir = os.path.join(storage, "jobs", str(job_id), "supplementary") + if os.path.isdir(supp_dir): + for fname in sorted(os.listdir(supp_dir)): + fpath = os.path.join(supp_dir, fname) + if not os.path.isfile(fpath): + continue + if _supplementary_applies_to_locale(fname, locale_code): + supplementary_files.append(fpath) + return { "tm_files": tm_files, "glossary_file": glossary, @@ -95,9 +109,39 @@ def _resolve_file_manifest( "tov_supplement_file": tov_supplement, "locale_considerations_file": locale_considerations, "date_pct_formats_file": date_pct, + "supplementary_files": supplementary_files, } +# All 12 supported locale codes (lowercased) — used to detect locale- +# prefixed supplementary filenames. +_LOCALE_CODES = { + "de-de", "de-at", "fr-fr", "fr-be", "it-it", "es-es", + "ca-es", "nl-nl", "nl-be", "sv-se", "pl-pl", "pt-pt", +} + + +def _supplementary_applies_to_locale(filename: str, locale_code: str) -> bool: + """Return True if a supplementary file applies to the given locale. + + Locale gating is by filename prefix. Examples: + "de-DE_terms.txt" → only de-DE + "de_DE_terms.txt" → only de-DE (underscore variant) + "fr-FR_brief.docx" → only fr-FR + "global_glossary.txt" → all locales (no recognised prefix) + "campaign-brief.pdf" → all locales (no recognised prefix) + """ + name = filename.lower() + target = locale_code.lower() + # Match either "de-de_..." or "de_de_..." prefixes + for code in _LOCALE_CODES: + underscored = code.replace("-", "_") + if name.startswith(code + "_") or name.startswith(underscored + "_"): + # File is locale-gated; include only if it matches. + return code == target + return True # No locale prefix → global, applies to all locales + + def _get_async_session_factory() -> async_sessionmaker[AsyncSession]: """Create a fresh async session factory for use in Celery tasks.""" engine = create_async_engine(settings.DATABASE_URL, pool_pre_ping=True) @@ -244,7 +288,7 @@ def process_locale_instance(self, job_id: str, locale_code: str) -> dict: # Resolve file manifest for this locale file_manifest = _resolve_file_manifest( - locale_code, job.channel, str(job.client_id) + locale_code, job.channel, str(job.client_id), job_id=str(job_id) ) # Run pipeline with progress callback diff --git a/frontend/src/app/admin/files/tm/page.tsx b/frontend/src/app/admin/files/tm/page.tsx index 7877092..bc65b27 100644 --- a/frontend/src/app/admin/files/tm/page.tsx +++ b/frontend/src/app/admin/files/tm/page.tsx @@ -233,16 +233,20 @@ export default function TMRegistryPage() {
- + setUploadChannel(e.target.value)} + placeholder="e.g. PrimeCBM (or pick from list)" + /> + + {CHANNELS.map((ch) => ( + +

+ Type a new channel name to add it, or pick an existing one. +