TM upload-replacement bug (critical):
- Uploads were writing to /storage/clients/<uuid>/tm/... but the pipeline
reads from /storage/amazon/tm/... — replacements were silently ignored
- upload_tm_file now writes to the canonical pipeline path
/storage/amazon/tm/<locale>/flat_<channel>_<lc>.json (overwrites in place)
- Filename casing is preserved when an existing file is being replaced
(the on-disk seeded files use mixed casing: flat_MASS, flat_value,
flat_PrimeSpeed); falls back to CHANNEL_FILE_MAP, then user-typed case
- Registry upsert by (client_id, locale_code, channel): replaces row in
place rather than inserting duplicates
- Verified: replacement file at canonical path, registry COUNT=1, no dupes
Supplementary files now reach the LLM (critical):
- New supplementary_files field on FileManifest
- _resolve_file_manifest scans /storage/jobs/<job_id>/supplementary/ and
populates the manifest, with per-locale gating by filename prefix
(e.g. de-DE_glossary.txt only goes to de-DE; global_brief.txt goes to all)
- _format_supplementary_for_prompt reads each file (.txt/.md/.json/.csv/.tsv
/.docx) and inlines its text into the LLM user message under a
"## SUPPLEMENTARY MATERIAL" header, capped at 40k chars per file
- .docx files are extracted via inline zipfile read (no new dependency)
New job wizard:
- Per-supplementary-file locale dropdown ("Global" or one of 12 locales)
- Filename gets prefixed with the locale on upload (de-DE_brief.docx)
Admin TM upload:
- Channel field is now a free-text input with autocomplete suggestions
(datalist of known channels) — lets users add brand-new channels like
PrimeCBM that didn't exist before
Pipeline scaling:
- Bumped dynamic max_tokens tiers: 80+ lines now gets 64k output budget
(was 32k); 132-line briefs no longer truncate. Sonnet 4.6 caps at 64k
- Added stop_reason logging — "max_tokens" stop now shows up in logs
loud and clear rather than silently truncating
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
315 lines
11 KiB
Python
315 lines
11 KiB
Python
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import BinaryIO
|
|
from uuid import UUID, uuid4
|
|
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.config import settings
|
|
from app.models.files import ReferenceFile, ReferenceFileType, TMFileRegistry
|
|
from app.models.source import SourceLine
|
|
from app.pipeline.modules.source_file_parser import parse_source_file
|
|
|
|
|
|
class FileService:
|
|
"""Service for file upload, download, path resolution, and storage management."""
|
|
|
|
def __init__(self) -> None:
|
|
self.storage_root = Path(settings.STORAGE_ROOT)
|
|
|
|
def _resolve_path(self, *parts: str) -> Path:
|
|
"""Resolve a storage path and ensure parent directories exist."""
|
|
path = self.storage_root.joinpath(*parts)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
return path
|
|
|
|
def _canonical_tm_filename(self, channel: str, locale_code: str) -> str:
|
|
"""Return the canonical TM filename to use for (channel, locale).
|
|
|
|
Honours the existing on-disk casing if a file already exists for
|
|
this channel — that way an upload truly replaces the seeded file
|
|
rather than creating a sibling with different casing. Falls back
|
|
to the pipeline's hardcoded CHANNEL_FILE_MAP, then to the
|
|
user-supplied channel as-typed.
|
|
"""
|
|
from app.pipeline.agents.agent_2_tm_retrieval import CHANNEL_FILE_MAP
|
|
|
|
lc_lower = locale_code.lower()
|
|
# 1) An existing file on disk for this channel (case-insensitive).
|
|
tm_dir = self.storage_root / "amazon" / "tm" / locale_code
|
|
if tm_dir.is_dir():
|
|
target_lc_suffix = f"_{lc_lower}.json"
|
|
channel_lower = channel.lower()
|
|
for existing in tm_dir.iterdir():
|
|
name = existing.name
|
|
if not name.lower().startswith("flat_") or not name.lower().endswith(target_lc_suffix):
|
|
continue
|
|
# Extract the channel segment from "flat_<Channel>_<lc>.json"
|
|
stem = name[len("flat_") : -len(target_lc_suffix)]
|
|
if stem.lower() == channel_lower:
|
|
return name # preserve existing casing
|
|
|
|
# 2) Hardcoded patterns for the original channels.
|
|
pattern = CHANNEL_FILE_MAP.get(channel.lower())
|
|
if pattern:
|
|
return pattern.format(lc=lc_lower)
|
|
|
|
# 3) New channel — use the user-typed casing.
|
|
return f"flat_{channel}_{lc_lower}.json"
|
|
|
|
async def upload_source_file(
|
|
self,
|
|
db: AsyncSession,
|
|
job_id: UUID,
|
|
file: BinaryIO,
|
|
filename: str,
|
|
) -> list[SourceLine]:
|
|
"""Upload and parse a source xlsx file, creating SourceLine records."""
|
|
# Save to storage
|
|
file_path = self._resolve_path("jobs", str(job_id), "source", filename)
|
|
with open(file_path, "wb") as f:
|
|
shutil.copyfileobj(file, f)
|
|
|
|
# Parse the xlsx
|
|
parsed_lines = parse_source_file(str(file_path))
|
|
|
|
# Delete existing source lines for this job
|
|
existing = await db.execute(
|
|
select(SourceLine).where(SourceLine.job_id == job_id)
|
|
)
|
|
for line in existing.scalars().all():
|
|
await db.delete(line)
|
|
|
|
# Create new source lines
|
|
source_lines = []
|
|
for i, row in enumerate(parsed_lines):
|
|
source_line = SourceLine(
|
|
job_id=job_id,
|
|
row_order=i + 1,
|
|
en_gb=row["en_gb"],
|
|
copy_type=row.get("copy_type"),
|
|
creative_guidance=row.get("creative_guidance"),
|
|
visual_ref=row.get("visual_ref"),
|
|
char_limit=row.get("char_limit"),
|
|
is_display_format=row.get("is_display_format", False),
|
|
)
|
|
db.add(source_line)
|
|
source_lines.append(source_line)
|
|
|
|
await db.flush()
|
|
return source_lines
|
|
|
|
async def upload_supplementary_file(
|
|
self,
|
|
db: AsyncSession,
|
|
job_id: UUID,
|
|
file: BinaryIO,
|
|
filename: str,
|
|
) -> str:
|
|
"""Upload a supplementary file (TM, glossary, etc.) for a job."""
|
|
file_path = self._resolve_path("jobs", str(job_id), "supplementary", filename)
|
|
with open(file_path, "wb") as f:
|
|
shutil.copyfileobj(file, f)
|
|
return str(file_path)
|
|
|
|
async def upload_tm_file(
|
|
self,
|
|
db: AsyncSession,
|
|
client_id: UUID,
|
|
locale_code: str,
|
|
channel: str,
|
|
file: BinaryIO,
|
|
filename: str,
|
|
uploaded_by: UUID | None = None,
|
|
) -> TMFileRegistry:
|
|
"""Upload a TM file and register it.
|
|
|
|
Writes the file to the canonical path the pipeline reads from
|
|
(`/storage/amazon/tm/{locale_code}/flat_{channel}_{lc}.json`) so a
|
|
replacement upload always overrides the version used at runtime —
|
|
without this, the pipeline kept using the seeded original. The
|
|
filename is canonicalised to `flat_{channel}_{lc}.json` regardless
|
|
of what the user uploaded so replacements always match.
|
|
|
|
If a registry row already exists for this (client_id, locale_code,
|
|
channel) tuple, it's updated in place instead of inserting a
|
|
duplicate row.
|
|
"""
|
|
# Canonical filename + path that the pipeline reads.
|
|
# Existing files on disk follow inconsistent channel casing
|
|
# (e.g. flat_MASS_de-de.json, flat_value_de-de.json,
|
|
# flat_Onsite_de-de.json). To replace one in place, we honour the
|
|
# casing already used for that channel — looked up first via the
|
|
# pipeline's CHANNEL_FILE_MAP, then by scanning the locale's TM
|
|
# directory for any matching filename. Falls back to the channel
|
|
# as the user typed it if neither yields a match.
|
|
canonical_filename = self._canonical_tm_filename(channel, locale_code)
|
|
file_path = self._resolve_path(
|
|
"amazon", "tm", locale_code, canonical_filename
|
|
)
|
|
with open(file_path, "wb") as f:
|
|
shutil.copyfileobj(file, f)
|
|
|
|
# Count segments (lines in JSONL)
|
|
segment_count = 0
|
|
with open(file_path, "r") as f:
|
|
for line in f:
|
|
if line.strip():
|
|
segment_count += 1
|
|
|
|
# Upsert by (client_id, locale_code, channel). If a row exists
|
|
# (from prior upload OR seeder), update its fields rather than
|
|
# inserting a duplicate. Match on either filename to catch any
|
|
# legacy rows where casing differs.
|
|
from sqlalchemy import or_
|
|
|
|
existing_q = await db.execute(
|
|
select(TMFileRegistry).where(
|
|
TMFileRegistry.client_id == client_id,
|
|
TMFileRegistry.locale_code == locale_code,
|
|
or_(
|
|
TMFileRegistry.channel == channel,
|
|
TMFileRegistry.channel.ilike(channel),
|
|
),
|
|
)
|
|
)
|
|
existing = existing_q.scalars().first()
|
|
if existing is not None:
|
|
existing.filename = canonical_filename
|
|
existing.file_path = str(file_path)
|
|
existing.segment_count = segment_count
|
|
if uploaded_by is not None:
|
|
existing.uploaded_by = uploaded_by
|
|
await db.flush()
|
|
return existing
|
|
|
|
tm_file = TMFileRegistry(
|
|
client_id=client_id,
|
|
locale_code=locale_code,
|
|
channel=channel,
|
|
filename=canonical_filename,
|
|
file_path=str(file_path),
|
|
segment_count=segment_count,
|
|
uploaded_by=uploaded_by,
|
|
)
|
|
db.add(tm_file)
|
|
await db.flush()
|
|
return tm_file
|
|
|
|
async def upload_reference_file(
|
|
self,
|
|
db: AsyncSession,
|
|
client_id: UUID,
|
|
file_type: ReferenceFileType,
|
|
locale_scope: str,
|
|
file: BinaryIO,
|
|
filename: str,
|
|
uploaded_by: UUID | None = None,
|
|
) -> ReferenceFile:
|
|
"""Upload a reference file and create a registry entry."""
|
|
file_path = self._resolve_path(
|
|
"clients", str(client_id), "reference", file_type.value, filename
|
|
)
|
|
with open(file_path, "wb") as f:
|
|
shutil.copyfileobj(file, f)
|
|
|
|
ref_file = ReferenceFile(
|
|
client_id=client_id,
|
|
file_type=file_type,
|
|
locale_scope=locale_scope,
|
|
filename=filename,
|
|
file_path=str(file_path),
|
|
uploaded_by=uploaded_by,
|
|
)
|
|
db.add(ref_file)
|
|
await db.flush()
|
|
return ref_file
|
|
|
|
async def list_tm_files(
|
|
self,
|
|
db: AsyncSession,
|
|
client_id: UUID,
|
|
locale_code: str | None = None,
|
|
channel: str | None = None,
|
|
) -> list[TMFileRegistry]:
|
|
"""List TM files for a client with optional filters."""
|
|
query = select(TMFileRegistry).where(TMFileRegistry.client_id == client_id)
|
|
if locale_code:
|
|
query = query.where(TMFileRegistry.locale_code == locale_code)
|
|
if channel:
|
|
query = query.where(TMFileRegistry.channel == channel)
|
|
|
|
result = await db.execute(query.order_by(TMFileRegistry.uploaded_at.desc()))
|
|
return list(result.scalars().all())
|
|
|
|
async def list_reference_files(
|
|
self,
|
|
db: AsyncSession,
|
|
client_id: UUID,
|
|
file_type: ReferenceFileType | None = None,
|
|
locale_scope: str | None = None,
|
|
) -> list[ReferenceFile]:
|
|
"""List reference files for a client with optional filters."""
|
|
query = select(ReferenceFile).where(ReferenceFile.client_id == client_id)
|
|
if file_type:
|
|
query = query.where(ReferenceFile.file_type == file_type)
|
|
if locale_scope:
|
|
query = query.where(ReferenceFile.locale_scope == locale_scope)
|
|
|
|
result = await db.execute(query.order_by(ReferenceFile.uploaded_at.desc()))
|
|
return list(result.scalars().all())
|
|
|
|
def get_file_path(self, stored_path: str) -> Path | None:
|
|
"""Resolve a stored file path and verify it exists."""
|
|
path = Path(stored_path)
|
|
if path.exists():
|
|
return path
|
|
return None
|
|
|
|
async def delete_tm_file(
|
|
self, db: AsyncSession, file_id: UUID
|
|
) -> bool:
|
|
"""Delete a TM file from storage and database."""
|
|
result = await db.execute(
|
|
select(TMFileRegistry).where(TMFileRegistry.id == file_id)
|
|
)
|
|
tm_file = result.scalar_one_or_none()
|
|
if tm_file is None:
|
|
return False
|
|
|
|
# Remove from filesystem
|
|
file_path = Path(tm_file.file_path)
|
|
if file_path.exists():
|
|
os.remove(file_path)
|
|
|
|
await db.delete(tm_file)
|
|
await db.flush()
|
|
return True
|
|
|
|
async def delete_reference_file(
|
|
self, db: AsyncSession, file_id: UUID
|
|
) -> bool:
|
|
"""Delete a reference file from storage and database."""
|
|
result = await db.execute(
|
|
select(ReferenceFile).where(ReferenceFile.id == file_id)
|
|
)
|
|
ref_file = result.scalar_one_or_none()
|
|
if ref_file is None:
|
|
return False
|
|
|
|
file_path = Path(ref_file.file_path)
|
|
if file_path.exists():
|
|
os.remove(file_path)
|
|
|
|
await db.delete(ref_file)
|
|
await db.flush()
|
|
return True
|
|
|
|
def validate_file_extension(
|
|
self, filename: str, allowed_extensions: list[str]
|
|
) -> bool:
|
|
"""Validate that a file has an allowed extension."""
|
|
ext = Path(filename).suffix.lower()
|
|
return ext in allowed_extensions
|