ai_qc/backend/brand_guidelines_db.py
nickviljoen 3f4cc149ad Process multi-page PDF reference assets with LLM summarization
PDF brand guidelines were previously ignored - QC checks received no
content from uploaded PDFs. Now on upload, all pages are text-extracted,
summarized by Gemini into a structured brand guidelines summary, and
a cover image is extracted. QC checks receive the full summary in their
prompt and the cover image as visual reference.

- New backend/pdf_processor.py: text extraction, cover image, LLM summary
- brand_guidelines_db.py: summary/cover path tracking, cleanup on delete
- api_server.py: background processing on upload, summary-aware content
  retrieval, PDF cover image support, status/reprocess endpoints, startup
  backfill for existing unprocessed PDFs
- web_ui.html: processing status badges and upload feedback for PDFs

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-26 22:02:47 +02:00

333 lines
No EOL
11 KiB
Python

#!/usr/bin/env python3
"""
Brand Guidelines Database Management
Handles storage and retrieval of brand guidelines files for QC checks.
"""
import os
import json
import shutil
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
class BrandGuidelinesDB:
"""
Simple file-based database for brand guidelines management.
"""
def __init__(self, base_dir: str = "brand_guidelines"):
"""
Initialize the brand guidelines database.
Args:
base_dir: Directory to store brand guidelines files
"""
self.base_dir = Path(base_dir)
self.base_dir.mkdir(exist_ok=True)
# Create subdirectories
self.files_dir = self.base_dir / "files"
self.files_dir.mkdir(exist_ok=True)
# Database metadata file
self.db_file = self.base_dir / "guidelines_db.json"
# Load existing database
self.db = self._load_db()
def _load_db(self) -> Dict:
"""Load the database from JSON file."""
if self.db_file.exists():
try:
with open(self.db_file, 'r') as f:
db = json.load(f)
# Backfill missing client_id on existing records
modified = False
for file_id, record in db.get("files", {}).items():
if "client_id" not in record:
record["client_id"] = "general"
modified = True
for brand_name, brand_data in db.get("brands", {}).items():
if "client_id" not in brand_data:
brand_data["client_id"] = "general"
modified = True
if modified:
db["last_updated"] = datetime.now().isoformat()
with open(self.db_file, 'w') as f:
json.dump(db, f, indent=2)
return db
except (json.JSONDecodeError, FileNotFoundError):
pass
# Return empty database structure
return {
"brands": {},
"files": {},
"created_at": datetime.now().isoformat(),
"last_updated": datetime.now().isoformat()
}
def _save_db(self):
"""Save the database to JSON file."""
self.db["last_updated"] = datetime.now().isoformat()
with open(self.db_file, 'w') as f:
json.dump(self.db, f, indent=2)
def add_brand_guideline(self, brand_name: str, file_path: str,
description: str = "", tags: List[str] = None,
client_id: str = "general") -> Dict:
"""
Add a brand guideline file to the database.
Args:
brand_name: Name of the brand
file_path: Path to the guideline file
description: Optional description
tags: Optional list of tags
client_id: Client this guideline belongs to (default "general")
Returns:
Dictionary with file information
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"File not found: {file_path}")
# Generate unique file ID
file_id = f"{brand_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
# Get file info
original_filename = os.path.basename(file_path)
file_ext = os.path.splitext(original_filename)[1]
stored_filename = f"{file_id}{file_ext}"
stored_path = self.files_dir / stored_filename
# Copy file to storage
shutil.copy2(file_path, stored_path)
# Get file stats
file_stats = os.stat(stored_path)
# Create file record
file_record = {
"id": file_id,
"brand_name": brand_name,
"client_id": client_id,
"original_filename": original_filename,
"stored_filename": stored_filename,
"stored_path": str(stored_path),
"file_size": file_stats.st_size,
"description": description,
"tags": tags or [],
"upload_date": datetime.now().isoformat(),
"file_type": file_ext.lower()
}
# Add to database
self.db["files"][file_id] = file_record
# Add brand if not exists
if brand_name not in self.db["brands"]:
self.db["brands"][brand_name] = {
"name": brand_name,
"client_id": client_id,
"created_at": datetime.now().isoformat(),
"file_count": 0,
"guidelines": []
}
# Update brand info
self.db["brands"][brand_name]["file_count"] += 1
self.db["brands"][brand_name]["guidelines"].append(file_id)
# Save database
self._save_db()
return file_record
def get_brand_guidelines(self, brand_name: str) -> List[Dict]:
"""
Get all guidelines for a specific brand.
Args:
brand_name: Name of the brand
Returns:
List of guideline file records
"""
if brand_name not in self.db["brands"]:
return []
guidelines = []
for file_id in self.db["brands"][brand_name]["guidelines"]:
if file_id in self.db["files"]:
guidelines.append(self.db["files"][file_id])
return guidelines
def get_all_brands(self) -> List[str]:
"""Get list of all brand names."""
return list(self.db["brands"].keys())
def get_all_guidelines(self) -> List[Dict]:
"""Get all guideline files."""
return list(self.db["files"].values())
def get_guidelines_by_client(self, client_id: str) -> Dict:
"""
Get guidelines filtered by client_id.
Args:
client_id: The client identifier to filter by
Returns:
Dict with 'brands' and 'files' filtered to matching client
"""
filtered_files = {}
filtered_brands = {}
for file_id, record in self.db.get("files", {}).items():
if record.get("client_id", "general") == client_id:
filtered_files[file_id] = record
for brand_name, brand_data in self.db.get("brands", {}).items():
# Filter brand's guidelines list to only matching files
matching_guidelines = [
gid for gid in brand_data.get("guidelines", [])
if gid in filtered_files
]
if matching_guidelines:
filtered_brands[brand_name] = {
**brand_data,
"guidelines": matching_guidelines,
"file_count": len(matching_guidelines)
}
return {"brands": filtered_brands, "files": filtered_files}
def update_file_record(self, file_id: str, updates: Dict) -> bool:
"""Update fields on an existing file record and save."""
if file_id not in self.db["files"]:
return False
self.db["files"][file_id].update(updates)
self._save_db()
return True
def get_summary_path(self, file_id: str) -> Optional[str]:
"""Return the path to a pre-computed summary file, or None."""
if file_id not in self.db["files"]:
return None
summary_path = self.db["files"][file_id].get("summary_path")
if summary_path and os.path.exists(summary_path):
return summary_path
return None
def get_cover_image_path(self, file_id: str) -> Optional[str]:
"""Return the path to a pre-extracted PDF cover image, or None."""
if file_id not in self.db["files"]:
return None
cover_path = self.db["files"][file_id].get("cover_image_path")
if cover_path and os.path.exists(cover_path):
return cover_path
return None
def delete_guideline(self, file_id: str) -> bool:
"""
Delete a guideline file.
Args:
file_id: ID of the file to delete
Returns:
True if deleted successfully, False otherwise
"""
if file_id not in self.db["files"]:
return False
file_record = self.db["files"][file_id]
brand_name = file_record["brand_name"]
# Delete physical file
try:
stored_path = Path(file_record["stored_path"])
if stored_path.exists():
stored_path.unlink()
except Exception as e:
print(f"Error deleting file: {e}")
# Delete associated summary and cover image files
for key in ("summary_path", "cover_image_path"):
extra_path = file_record.get(key)
if extra_path:
try:
p = Path(extra_path)
if p.exists():
p.unlink()
except Exception as e:
print(f"Error deleting {key}: {e}")
# Remove from database
del self.db["files"][file_id]
# Update brand info
if brand_name in self.db["brands"]:
if file_id in self.db["brands"][brand_name]["guidelines"]:
self.db["brands"][brand_name]["guidelines"].remove(file_id)
self.db["brands"][brand_name]["file_count"] -= 1
# Remove brand if no files left
if self.db["brands"][brand_name]["file_count"] <= 0:
del self.db["brands"][brand_name]
# Save database
self._save_db()
return True
def search_guidelines(self, query: str, brand_name: str = None) -> List[Dict]:
"""
Search guidelines by description, tags, or filename.
Args:
query: Search query
brand_name: Optional brand filter
Returns:
List of matching guideline records
"""
results = []
query_lower = query.lower()
for file_record in self.db["files"].values():
# Filter by brand if specified
if brand_name and file_record["brand_name"] != brand_name:
continue
# Search in various fields
searchable_text = " ".join([
file_record.get("description", ""),
file_record.get("original_filename", ""),
" ".join(file_record.get("tags", []))
]).lower()
if query_lower in searchable_text:
results.append(file_record)
return results
def get_file_path(self, file_id: str) -> Optional[str]:
"""
Get the stored file path for a guideline.
Args:
file_id: ID of the file
Returns:
File path if found, None otherwise
"""
if file_id in self.db["files"]:
stored_path = self.db["files"][file_id]["stored_path"]
if os.path.exists(stored_path):
return stored_path
return None