modcomms/backend/app/services/knowledge_base_service.py
michael 1800e71229 Fix cache invalidation falling back to static files after reprocessing
After processing a new knowledge base spec, invalidate_cache() was
clearing the DB spec from the cache without replacing it. The next
analysis would then fall back to static prompts/*.md files instead of
using the newly generated DB spec.

Now invalidate_cache() accepts optional new_spec_content to immediately
populate the DB cache, and knowledge_base_service passes the freshly
distilled spec content so it's available for the next analysis without
a server restart.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 17:56:11 -06:00

276 lines
14 KiB
Python

import asyncio
import logging
import uuid
from datetime import datetime, timezone
from typing import Optional
from app.models.database import async_session_factory
from app.repositories.knowledge_base_repository import KnowledgeBaseRepository
from app.services.storage_service import storage_service
logger = logging.getLogger(__name__)
MAX_CONCURRENT_PARSES = 10
# Distillation prompt templates per agent type
DISTILLATION_PROMPTS = {
"legal": """You are a compliance documentation specialist. Below is raw reference material about legal compliance, advertising standards, financial promotions, and disclaimers relevant to Barclays marketing materials.
Your task is to distil this into a clear, structured specification document that an AI compliance agent can use to review marketing proofs. Organise the content into logical sections with clear headings. Focus on actionable rules, required disclaimers, prohibited content, and compliance requirements.
Remove any redundancy, marketing fluff, or content not relevant to compliance checking. Preserve all specific rules, thresholds, and requirements verbatim.
RAW REFERENCE MATERIAL:
{combined_markdown}
OUTPUT: A well-structured markdown specification document.""",
"brand_barclays": """You are a brand guidelines specialist. Below is raw reference material about Barclays brand guidelines including logo usage, colour palettes, typography, design principles, and visual identity standards.
Your task is to distil this into a clear, structured specification document that an AI brand compliance agent can use to review marketing proofs. Organise the content into logical sections: logo rules, colour specifications (with exact hex/RGB values), typography rules, spacing/layout requirements, do's and don'ts.
Remove any redundancy or content not directly relevant to visual brand compliance checking. Preserve all specific measurements, colour values, and rules verbatim.
RAW REFERENCE MATERIAL:
{combined_markdown}
OUTPUT: A well-structured markdown specification document.""",
"brand_barclaycard": """You are a brand guidelines specialist. Below is raw reference material about Barclaycard brand guidelines including logo usage, colour palettes, typography, design principles, and visual identity standards.
Your task is to distil this into a clear, structured specification document that an AI brand compliance agent can use to review marketing proofs. Organise the content into logical sections: logo rules, colour specifications (with exact hex/RGB values), typography rules, spacing/layout requirements, do's and don'ts.
Remove any redundancy or content not directly relevant to visual brand compliance checking. Preserve all specific measurements, colour values, and rules verbatim.
RAW REFERENCE MATERIAL:
{combined_markdown}
OUTPUT: A well-structured markdown specification document.""",
"channel_best_practices": """You are a marketing channel specialist. Below is raw reference material about best practices for various marketing channels (social media, display, email, print, OOH) relevant to Barclays marketing.
Your task is to distil this into a clear, structured specification document that an AI channel compliance agent can use to review marketing proofs. Organise by channel type, then by platform/format. Focus on content guidelines, accessibility requirements, and engagement best practices.
Remove any redundancy or content not directly relevant to proof review. Preserve all specific recommendations and requirements.
RAW REFERENCE MATERIAL:
{combined_markdown}
OUTPUT: A well-structured markdown specification document.""",
"channel_tech_specs": """You are a marketing production specialist. Below is raw reference material about technical specifications for various marketing channels (dimensions, file formats, file sizes, resolution requirements, platform constraints).
Your task is to distil this into a clear, structured specification document that an AI technical compliance agent can use to review marketing proofs. Organise by channel, then platform, then format. Use tables where appropriate for dimensions and specs.
Remove any redundancy or content not directly relevant to technical spec checking. Preserve all specific dimensions, file size limits, format requirements, and platform constraints verbatim.
RAW REFERENCE MATERIAL:
{combined_markdown}
OUTPUT: A well-structured markdown specification document.""",
}
class KnowledgeBaseService:
"""Orchestrates the document processing pipeline."""
def __init__(self, llamaparse_service, gemini_service, reference_docs_service):
self.llamaparse = llamaparse_service
self.gemini = gemini_service
self.reference_docs = reference_docs_service
async def process_documents(
self,
kb_id: uuid.UUID,
job_id: uuid.UUID,
agent_key: str,
user_id: Optional[uuid.UUID] = None,
user_name: Optional[str] = None,
) -> None:
"""
Run the full processing pipeline as a background task.
1. Parse each source document with LlamaParse
2. Combine parsed markdown
3. Distil with Gemini into a spec
4. Save as new SpecVersion
5. Invalidate ReferenceDocsService cache
"""
async with async_session_factory() as session:
try:
repo = KnowledgeBaseRepository(session)
# Update job status to parsing
await repo.update_processing_job(job_id, status="parsing_documents")
await session.commit()
# Get all source documents for this KB
docs = await repo.get_source_documents(kb_id)
if not docs:
await repo.update_processing_job(
job_id, status="failed",
error_message="No source documents found.",
completed_at=datetime.now(timezone.utc),
)
await session.commit()
return
# Parse documents in parallel (up to MAX_CONCURRENT_PARSES at a time)
semaphore = asyncio.Semaphore(MAX_CONCURRENT_PARSES)
progress_lock = asyncio.Lock()
parsed_count = 0
async def _parse_doc(doc):
nonlocal parsed_count
async with semaphore:
try:
async with async_session_factory() as doc_session:
doc_repo = KnowledgeBaseRepository(doc_session)
# Load file data from storage
file_data = await storage_service.get_file(doc.file_storage_key)
if not file_data:
logger.error(f"[KB_SERVICE] File not found: {doc.file_storage_key}")
await doc_repo.update_source_document_parse_status(
doc.id, "error", parse_error="File not found in storage."
)
await doc_session.commit()
async with progress_lock:
parsed_count += 1
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
await session.commit()
return None
# Parse with LlamaParse
await doc_repo.update_source_document_parse_status(doc.id, "parsing")
await doc_session.commit()
result = await self.llamaparse.parse_document(file_data, doc.filename)
# Determine status based on failed pages
if result.failed_pages:
if not result.markdown.strip():
# All pages failed
failed_desc = "; ".join(
f"Page {fp['page']}: {fp['error']}" for fp in result.failed_pages
)
status = "error"
error_msg = f"All {result.total_pages} pages failed to parse. {failed_desc}"
else:
failed_desc = "; ".join(
f"Page {fp['page']}: {fp['error']}" for fp in result.failed_pages
)
status = "partial"
error_msg = (
f"{len(result.failed_pages)} of {result.total_pages} pages "
f"failed to parse. {failed_desc}"
)
else:
status = "parsed"
error_msg = None
async with async_session_factory() as doc_session:
doc_repo = KnowledgeBaseRepository(doc_session)
await doc_repo.update_source_document_parse_status(
doc.id, status,
parsed_markdown=result.markdown if result.markdown.strip() else None,
parse_error=error_msg,
)
await doc_session.commit()
async with progress_lock:
parsed_count += 1
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
await session.commit()
if result.markdown.strip():
return (doc.filename, str(doc.id), result.markdown)
return None
except Exception as e:
logger.error(f"[KB_SERVICE] Error parsing {doc.filename}: {e}")
try:
async with async_session_factory() as doc_session:
doc_repo = KnowledgeBaseRepository(doc_session)
await doc_repo.update_source_document_parse_status(
doc.id, "error", parse_error=str(e)
)
await doc_session.commit()
except Exception:
logger.error(f"[KB_SERVICE] Failed to update error status for {doc.filename}")
async with progress_lock:
parsed_count += 1
await repo.update_processing_job(job_id, parsed_documents=parsed_count)
await session.commit()
return None
results = await asyncio.gather(*[_parse_doc(doc) for doc in docs])
combined_parts = [
f"# {filename}\n\n{markdown}"
for filename, doc_id, markdown in (r for r in results if r is not None)
]
source_doc_ids = [
doc_id
for _, doc_id, _ in (r for r in results if r is not None)
]
if not combined_parts:
await repo.update_processing_job(
job_id, status="failed",
error_message="No documents were successfully parsed.",
completed_at=datetime.now(timezone.utc),
)
await session.commit()
return
# Distil with Gemini
await repo.update_processing_job(job_id, status="distilling")
await session.commit()
combined_markdown = "\n\n---\n\n".join(combined_parts)
prompt_template = DISTILLATION_PROMPTS.get(agent_key, DISTILLATION_PROMPTS["legal"])
prompt = prompt_template.format(combined_markdown=combined_markdown)
logger.info(f"[KB_SERVICE] Sending {len(combined_markdown)} chars to Gemini for distillation")
response = await self.gemini.client.aio.models.generate_content(
model=self.gemini.model,
contents=prompt,
)
spec_content = response.text.strip()
logger.info(f"[KB_SERVICE] Distillation complete: {len(spec_content)} chars")
# Save as new spec version
spec = await repo.create_spec_version(
knowledge_base_id=kb_id,
content=spec_content,
source_document_ids=source_doc_ids,
generated_by_id=user_id,
generated_by_name=user_name,
processing_job_id=job_id,
)
await session.commit()
# Update job as completed
await repo.update_processing_job(
job_id,
status="completed",
spec_version_id=spec.id,
completed_at=datetime.now(timezone.utc),
)
await session.commit()
# Update reference docs cache with new spec content
self.reference_docs.invalidate_cache(agent_key, new_spec_content=spec_content)
logger.info(f"[KB_SERVICE] Pipeline complete for {agent_key}, spec version {spec.version_number}")
except Exception as e:
logger.error(f"[KB_SERVICE] Pipeline failed for job {job_id}: {e}")
try:
await repo.update_processing_job(
job_id, status="failed",
error_message=str(e),
completed_at=datetime.now(timezone.utc),
)
await session.commit()
except Exception:
logger.error("[KB_SERVICE] Failed to update job status after error")