amazon-transcreation/backend/app/services/file_service.py
DJP d3f6a57386 Round 2.5 feedback: TM replacements take effect, supplementary files reach LLM, larger briefs fit, free-text channel uploads
TM upload-replacement bug (critical):
- Uploads were writing to /storage/clients/<uuid>/tm/... but the pipeline
  reads from /storage/amazon/tm/... — replacements were silently ignored
- upload_tm_file now writes to the canonical pipeline path
  /storage/amazon/tm/<locale>/flat_<channel>_<lc>.json (overwrites in place)
- Filename casing is preserved when an existing file is being replaced
  (the on-disk seeded files use mixed casing: flat_MASS, flat_value,
  flat_PrimeSpeed); falls back to CHANNEL_FILE_MAP, then user-typed case
- Registry upsert by (client_id, locale_code, channel): replaces row in
  place rather than inserting duplicates
- Verified: replacement file at canonical path, registry COUNT=1, no dupes

Supplementary files now reach the LLM (critical):
- New supplementary_files field on FileManifest
- _resolve_file_manifest scans /storage/jobs/<job_id>/supplementary/ and
  populates the manifest, with per-locale gating by filename prefix
  (e.g. de-DE_glossary.txt only goes to de-DE; global_brief.txt goes to all)
- _format_supplementary_for_prompt reads each file (.txt/.md/.json/.csv/.tsv
  /.docx) and inlines its text into the LLM user message under a
  "## SUPPLEMENTARY MATERIAL" header, capped at 40k chars per file
- .docx files are extracted via inline zipfile read (no new dependency)

New job wizard:
- Per-supplementary-file locale dropdown ("Global" or one of 12 locales)
- Filename gets prefixed with the locale on upload (de-DE_brief.docx)

Admin TM upload:
- Channel field is now a free-text input with autocomplete suggestions
  (datalist of known channels) — lets users add brand-new channels like
  PrimeCBM that didn't exist before

Pipeline scaling:
- Bumped dynamic max_tokens tiers: 80+ lines now gets 64k output budget
  (was 32k); 132-line briefs no longer truncate. Sonnet 4.6 caps at 64k
- Added stop_reason logging — "max_tokens" stop now shows up in logs
  loud and clear rather than silently truncating

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-05 14:28:20 -04:00

315 lines
11 KiB
Python

import os
import shutil
from pathlib import Path
from typing import BinaryIO
from uuid import UUID, uuid4
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import settings
from app.models.files import ReferenceFile, ReferenceFileType, TMFileRegistry
from app.models.source import SourceLine
from app.pipeline.modules.source_file_parser import parse_source_file
class FileService:
"""Service for file upload, download, path resolution, and storage management."""
def __init__(self) -> None:
self.storage_root = Path(settings.STORAGE_ROOT)
def _resolve_path(self, *parts: str) -> Path:
"""Resolve a storage path and ensure parent directories exist."""
path = self.storage_root.joinpath(*parts)
path.parent.mkdir(parents=True, exist_ok=True)
return path
def _canonical_tm_filename(self, channel: str, locale_code: str) -> str:
"""Return the canonical TM filename to use for (channel, locale).
Honours the existing on-disk casing if a file already exists for
this channel — that way an upload truly replaces the seeded file
rather than creating a sibling with different casing. Falls back
to the pipeline's hardcoded CHANNEL_FILE_MAP, then to the
user-supplied channel as-typed.
"""
from app.pipeline.agents.agent_2_tm_retrieval import CHANNEL_FILE_MAP
lc_lower = locale_code.lower()
# 1) An existing file on disk for this channel (case-insensitive).
tm_dir = self.storage_root / "amazon" / "tm" / locale_code
if tm_dir.is_dir():
target_lc_suffix = f"_{lc_lower}.json"
channel_lower = channel.lower()
for existing in tm_dir.iterdir():
name = existing.name
if not name.lower().startswith("flat_") or not name.lower().endswith(target_lc_suffix):
continue
# Extract the channel segment from "flat_<Channel>_<lc>.json"
stem = name[len("flat_") : -len(target_lc_suffix)]
if stem.lower() == channel_lower:
return name # preserve existing casing
# 2) Hardcoded patterns for the original channels.
pattern = CHANNEL_FILE_MAP.get(channel.lower())
if pattern:
return pattern.format(lc=lc_lower)
# 3) New channel — use the user-typed casing.
return f"flat_{channel}_{lc_lower}.json"
async def upload_source_file(
self,
db: AsyncSession,
job_id: UUID,
file: BinaryIO,
filename: str,
) -> list[SourceLine]:
"""Upload and parse a source xlsx file, creating SourceLine records."""
# Save to storage
file_path = self._resolve_path("jobs", str(job_id), "source", filename)
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
# Parse the xlsx
parsed_lines = parse_source_file(str(file_path))
# Delete existing source lines for this job
existing = await db.execute(
select(SourceLine).where(SourceLine.job_id == job_id)
)
for line in existing.scalars().all():
await db.delete(line)
# Create new source lines
source_lines = []
for i, row in enumerate(parsed_lines):
source_line = SourceLine(
job_id=job_id,
row_order=i + 1,
en_gb=row["en_gb"],
copy_type=row.get("copy_type"),
creative_guidance=row.get("creative_guidance"),
visual_ref=row.get("visual_ref"),
char_limit=row.get("char_limit"),
is_display_format=row.get("is_display_format", False),
)
db.add(source_line)
source_lines.append(source_line)
await db.flush()
return source_lines
async def upload_supplementary_file(
self,
db: AsyncSession,
job_id: UUID,
file: BinaryIO,
filename: str,
) -> str:
"""Upload a supplementary file (TM, glossary, etc.) for a job."""
file_path = self._resolve_path("jobs", str(job_id), "supplementary", filename)
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
return str(file_path)
async def upload_tm_file(
self,
db: AsyncSession,
client_id: UUID,
locale_code: str,
channel: str,
file: BinaryIO,
filename: str,
uploaded_by: UUID | None = None,
) -> TMFileRegistry:
"""Upload a TM file and register it.
Writes the file to the canonical path the pipeline reads from
(`/storage/amazon/tm/{locale_code}/flat_{channel}_{lc}.json`) so a
replacement upload always overrides the version used at runtime —
without this, the pipeline kept using the seeded original. The
filename is canonicalised to `flat_{channel}_{lc}.json` regardless
of what the user uploaded so replacements always match.
If a registry row already exists for this (client_id, locale_code,
channel) tuple, it's updated in place instead of inserting a
duplicate row.
"""
# Canonical filename + path that the pipeline reads.
# Existing files on disk follow inconsistent channel casing
# (e.g. flat_MASS_de-de.json, flat_value_de-de.json,
# flat_Onsite_de-de.json). To replace one in place, we honour the
# casing already used for that channel — looked up first via the
# pipeline's CHANNEL_FILE_MAP, then by scanning the locale's TM
# directory for any matching filename. Falls back to the channel
# as the user typed it if neither yields a match.
canonical_filename = self._canonical_tm_filename(channel, locale_code)
file_path = self._resolve_path(
"amazon", "tm", locale_code, canonical_filename
)
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
# Count segments (lines in JSONL)
segment_count = 0
with open(file_path, "r") as f:
for line in f:
if line.strip():
segment_count += 1
# Upsert by (client_id, locale_code, channel). If a row exists
# (from prior upload OR seeder), update its fields rather than
# inserting a duplicate. Match on either filename to catch any
# legacy rows where casing differs.
from sqlalchemy import or_
existing_q = await db.execute(
select(TMFileRegistry).where(
TMFileRegistry.client_id == client_id,
TMFileRegistry.locale_code == locale_code,
or_(
TMFileRegistry.channel == channel,
TMFileRegistry.channel.ilike(channel),
),
)
)
existing = existing_q.scalars().first()
if existing is not None:
existing.filename = canonical_filename
existing.file_path = str(file_path)
existing.segment_count = segment_count
if uploaded_by is not None:
existing.uploaded_by = uploaded_by
await db.flush()
return existing
tm_file = TMFileRegistry(
client_id=client_id,
locale_code=locale_code,
channel=channel,
filename=canonical_filename,
file_path=str(file_path),
segment_count=segment_count,
uploaded_by=uploaded_by,
)
db.add(tm_file)
await db.flush()
return tm_file
async def upload_reference_file(
self,
db: AsyncSession,
client_id: UUID,
file_type: ReferenceFileType,
locale_scope: str,
file: BinaryIO,
filename: str,
uploaded_by: UUID | None = None,
) -> ReferenceFile:
"""Upload a reference file and create a registry entry."""
file_path = self._resolve_path(
"clients", str(client_id), "reference", file_type.value, filename
)
with open(file_path, "wb") as f:
shutil.copyfileobj(file, f)
ref_file = ReferenceFile(
client_id=client_id,
file_type=file_type,
locale_scope=locale_scope,
filename=filename,
file_path=str(file_path),
uploaded_by=uploaded_by,
)
db.add(ref_file)
await db.flush()
return ref_file
async def list_tm_files(
self,
db: AsyncSession,
client_id: UUID,
locale_code: str | None = None,
channel: str | None = None,
) -> list[TMFileRegistry]:
"""List TM files for a client with optional filters."""
query = select(TMFileRegistry).where(TMFileRegistry.client_id == client_id)
if locale_code:
query = query.where(TMFileRegistry.locale_code == locale_code)
if channel:
query = query.where(TMFileRegistry.channel == channel)
result = await db.execute(query.order_by(TMFileRegistry.uploaded_at.desc()))
return list(result.scalars().all())
async def list_reference_files(
self,
db: AsyncSession,
client_id: UUID,
file_type: ReferenceFileType | None = None,
locale_scope: str | None = None,
) -> list[ReferenceFile]:
"""List reference files for a client with optional filters."""
query = select(ReferenceFile).where(ReferenceFile.client_id == client_id)
if file_type:
query = query.where(ReferenceFile.file_type == file_type)
if locale_scope:
query = query.where(ReferenceFile.locale_scope == locale_scope)
result = await db.execute(query.order_by(ReferenceFile.uploaded_at.desc()))
return list(result.scalars().all())
def get_file_path(self, stored_path: str) -> Path | None:
"""Resolve a stored file path and verify it exists."""
path = Path(stored_path)
if path.exists():
return path
return None
async def delete_tm_file(
self, db: AsyncSession, file_id: UUID
) -> bool:
"""Delete a TM file from storage and database."""
result = await db.execute(
select(TMFileRegistry).where(TMFileRegistry.id == file_id)
)
tm_file = result.scalar_one_or_none()
if tm_file is None:
return False
# Remove from filesystem
file_path = Path(tm_file.file_path)
if file_path.exists():
os.remove(file_path)
await db.delete(tm_file)
await db.flush()
return True
async def delete_reference_file(
self, db: AsyncSession, file_id: UUID
) -> bool:
"""Delete a reference file from storage and database."""
result = await db.execute(
select(ReferenceFile).where(ReferenceFile.id == file_id)
)
ref_file = result.scalar_one_or_none()
if ref_file is None:
return False
file_path = Path(ref_file.file_path)
if file_path.exists():
os.remove(file_path)
await db.delete(ref_file)
await db.flush()
return True
def validate_file_extension(
self, filename: str, allowed_extensions: list[str]
) -> bool:
"""Validate that a file has an allowed extension."""
ext = Path(filename).suffix.lower()
return ext in allowed_extensions