PDF brand guidelines were previously ignored - QC checks received no content from uploaded PDFs. Now on upload, all pages are text-extracted, summarized by Gemini into a structured brand guidelines summary, and a cover image is extracted. QC checks receive the full summary in their prompt and the cover image as visual reference. - New backend/pdf_processor.py: text extraction, cover image, LLM summary - brand_guidelines_db.py: summary/cover path tracking, cleanup on delete - api_server.py: background processing on upload, summary-aware content retrieval, PDF cover image support, status/reprocess endpoints, startup backfill for existing unprocessed PDFs - web_ui.html: processing status badges and upload feedback for PDFs Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
333 lines
No EOL
11 KiB
Python
333 lines
No EOL
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Brand Guidelines Database Management
|
|
Handles storage and retrieval of brand guidelines files for QC checks.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import shutil
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
class BrandGuidelinesDB:
|
|
"""
|
|
Simple file-based database for brand guidelines management.
|
|
"""
|
|
|
|
def __init__(self, base_dir: str = "brand_guidelines"):
|
|
"""
|
|
Initialize the brand guidelines database.
|
|
|
|
Args:
|
|
base_dir: Directory to store brand guidelines files
|
|
"""
|
|
self.base_dir = Path(base_dir)
|
|
self.base_dir.mkdir(exist_ok=True)
|
|
|
|
# Create subdirectories
|
|
self.files_dir = self.base_dir / "files"
|
|
self.files_dir.mkdir(exist_ok=True)
|
|
|
|
# Database metadata file
|
|
self.db_file = self.base_dir / "guidelines_db.json"
|
|
|
|
# Load existing database
|
|
self.db = self._load_db()
|
|
|
|
def _load_db(self) -> Dict:
|
|
"""Load the database from JSON file."""
|
|
if self.db_file.exists():
|
|
try:
|
|
with open(self.db_file, 'r') as f:
|
|
db = json.load(f)
|
|
# Backfill missing client_id on existing records
|
|
modified = False
|
|
for file_id, record in db.get("files", {}).items():
|
|
if "client_id" not in record:
|
|
record["client_id"] = "general"
|
|
modified = True
|
|
for brand_name, brand_data in db.get("brands", {}).items():
|
|
if "client_id" not in brand_data:
|
|
brand_data["client_id"] = "general"
|
|
modified = True
|
|
if modified:
|
|
db["last_updated"] = datetime.now().isoformat()
|
|
with open(self.db_file, 'w') as f:
|
|
json.dump(db, f, indent=2)
|
|
return db
|
|
except (json.JSONDecodeError, FileNotFoundError):
|
|
pass
|
|
|
|
# Return empty database structure
|
|
return {
|
|
"brands": {},
|
|
"files": {},
|
|
"created_at": datetime.now().isoformat(),
|
|
"last_updated": datetime.now().isoformat()
|
|
}
|
|
|
|
def _save_db(self):
|
|
"""Save the database to JSON file."""
|
|
self.db["last_updated"] = datetime.now().isoformat()
|
|
with open(self.db_file, 'w') as f:
|
|
json.dump(self.db, f, indent=2)
|
|
|
|
def add_brand_guideline(self, brand_name: str, file_path: str,
|
|
description: str = "", tags: List[str] = None,
|
|
client_id: str = "general") -> Dict:
|
|
"""
|
|
Add a brand guideline file to the database.
|
|
|
|
Args:
|
|
brand_name: Name of the brand
|
|
file_path: Path to the guideline file
|
|
description: Optional description
|
|
tags: Optional list of tags
|
|
client_id: Client this guideline belongs to (default "general")
|
|
|
|
Returns:
|
|
Dictionary with file information
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
raise FileNotFoundError(f"File not found: {file_path}")
|
|
|
|
# Generate unique file ID
|
|
file_id = f"{brand_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
|
|
|
# Get file info
|
|
original_filename = os.path.basename(file_path)
|
|
file_ext = os.path.splitext(original_filename)[1]
|
|
stored_filename = f"{file_id}{file_ext}"
|
|
stored_path = self.files_dir / stored_filename
|
|
|
|
# Copy file to storage
|
|
shutil.copy2(file_path, stored_path)
|
|
|
|
# Get file stats
|
|
file_stats = os.stat(stored_path)
|
|
|
|
# Create file record
|
|
file_record = {
|
|
"id": file_id,
|
|
"brand_name": brand_name,
|
|
"client_id": client_id,
|
|
"original_filename": original_filename,
|
|
"stored_filename": stored_filename,
|
|
"stored_path": str(stored_path),
|
|
"file_size": file_stats.st_size,
|
|
"description": description,
|
|
"tags": tags or [],
|
|
"upload_date": datetime.now().isoformat(),
|
|
"file_type": file_ext.lower()
|
|
}
|
|
|
|
# Add to database
|
|
self.db["files"][file_id] = file_record
|
|
|
|
# Add brand if not exists
|
|
if brand_name not in self.db["brands"]:
|
|
self.db["brands"][brand_name] = {
|
|
"name": brand_name,
|
|
"client_id": client_id,
|
|
"created_at": datetime.now().isoformat(),
|
|
"file_count": 0,
|
|
"guidelines": []
|
|
}
|
|
|
|
# Update brand info
|
|
self.db["brands"][brand_name]["file_count"] += 1
|
|
self.db["brands"][brand_name]["guidelines"].append(file_id)
|
|
|
|
# Save database
|
|
self._save_db()
|
|
|
|
return file_record
|
|
|
|
def get_brand_guidelines(self, brand_name: str) -> List[Dict]:
|
|
"""
|
|
Get all guidelines for a specific brand.
|
|
|
|
Args:
|
|
brand_name: Name of the brand
|
|
|
|
Returns:
|
|
List of guideline file records
|
|
"""
|
|
if brand_name not in self.db["brands"]:
|
|
return []
|
|
|
|
guidelines = []
|
|
for file_id in self.db["brands"][brand_name]["guidelines"]:
|
|
if file_id in self.db["files"]:
|
|
guidelines.append(self.db["files"][file_id])
|
|
|
|
return guidelines
|
|
|
|
def get_all_brands(self) -> List[str]:
|
|
"""Get list of all brand names."""
|
|
return list(self.db["brands"].keys())
|
|
|
|
def get_all_guidelines(self) -> List[Dict]:
|
|
"""Get all guideline files."""
|
|
return list(self.db["files"].values())
|
|
|
|
def get_guidelines_by_client(self, client_id: str) -> Dict:
|
|
"""
|
|
Get guidelines filtered by client_id.
|
|
|
|
Args:
|
|
client_id: The client identifier to filter by
|
|
|
|
Returns:
|
|
Dict with 'brands' and 'files' filtered to matching client
|
|
"""
|
|
filtered_files = {}
|
|
filtered_brands = {}
|
|
|
|
for file_id, record in self.db.get("files", {}).items():
|
|
if record.get("client_id", "general") == client_id:
|
|
filtered_files[file_id] = record
|
|
|
|
for brand_name, brand_data in self.db.get("brands", {}).items():
|
|
# Filter brand's guidelines list to only matching files
|
|
matching_guidelines = [
|
|
gid for gid in brand_data.get("guidelines", [])
|
|
if gid in filtered_files
|
|
]
|
|
if matching_guidelines:
|
|
filtered_brands[brand_name] = {
|
|
**brand_data,
|
|
"guidelines": matching_guidelines,
|
|
"file_count": len(matching_guidelines)
|
|
}
|
|
|
|
return {"brands": filtered_brands, "files": filtered_files}
|
|
|
|
def update_file_record(self, file_id: str, updates: Dict) -> bool:
|
|
"""Update fields on an existing file record and save."""
|
|
if file_id not in self.db["files"]:
|
|
return False
|
|
self.db["files"][file_id].update(updates)
|
|
self._save_db()
|
|
return True
|
|
|
|
def get_summary_path(self, file_id: str) -> Optional[str]:
|
|
"""Return the path to a pre-computed summary file, or None."""
|
|
if file_id not in self.db["files"]:
|
|
return None
|
|
summary_path = self.db["files"][file_id].get("summary_path")
|
|
if summary_path and os.path.exists(summary_path):
|
|
return summary_path
|
|
return None
|
|
|
|
def get_cover_image_path(self, file_id: str) -> Optional[str]:
|
|
"""Return the path to a pre-extracted PDF cover image, or None."""
|
|
if file_id not in self.db["files"]:
|
|
return None
|
|
cover_path = self.db["files"][file_id].get("cover_image_path")
|
|
if cover_path and os.path.exists(cover_path):
|
|
return cover_path
|
|
return None
|
|
|
|
def delete_guideline(self, file_id: str) -> bool:
|
|
"""
|
|
Delete a guideline file.
|
|
|
|
Args:
|
|
file_id: ID of the file to delete
|
|
|
|
Returns:
|
|
True if deleted successfully, False otherwise
|
|
"""
|
|
if file_id not in self.db["files"]:
|
|
return False
|
|
|
|
file_record = self.db["files"][file_id]
|
|
brand_name = file_record["brand_name"]
|
|
|
|
# Delete physical file
|
|
try:
|
|
stored_path = Path(file_record["stored_path"])
|
|
if stored_path.exists():
|
|
stored_path.unlink()
|
|
except Exception as e:
|
|
print(f"Error deleting file: {e}")
|
|
|
|
# Delete associated summary and cover image files
|
|
for key in ("summary_path", "cover_image_path"):
|
|
extra_path = file_record.get(key)
|
|
if extra_path:
|
|
try:
|
|
p = Path(extra_path)
|
|
if p.exists():
|
|
p.unlink()
|
|
except Exception as e:
|
|
print(f"Error deleting {key}: {e}")
|
|
|
|
# Remove from database
|
|
del self.db["files"][file_id]
|
|
|
|
# Update brand info
|
|
if brand_name in self.db["brands"]:
|
|
if file_id in self.db["brands"][brand_name]["guidelines"]:
|
|
self.db["brands"][brand_name]["guidelines"].remove(file_id)
|
|
self.db["brands"][brand_name]["file_count"] -= 1
|
|
|
|
# Remove brand if no files left
|
|
if self.db["brands"][brand_name]["file_count"] <= 0:
|
|
del self.db["brands"][brand_name]
|
|
|
|
# Save database
|
|
self._save_db()
|
|
|
|
return True
|
|
|
|
def search_guidelines(self, query: str, brand_name: str = None) -> List[Dict]:
|
|
"""
|
|
Search guidelines by description, tags, or filename.
|
|
|
|
Args:
|
|
query: Search query
|
|
brand_name: Optional brand filter
|
|
|
|
Returns:
|
|
List of matching guideline records
|
|
"""
|
|
results = []
|
|
query_lower = query.lower()
|
|
|
|
for file_record in self.db["files"].values():
|
|
# Filter by brand if specified
|
|
if brand_name and file_record["brand_name"] != brand_name:
|
|
continue
|
|
|
|
# Search in various fields
|
|
searchable_text = " ".join([
|
|
file_record.get("description", ""),
|
|
file_record.get("original_filename", ""),
|
|
" ".join(file_record.get("tags", []))
|
|
]).lower()
|
|
|
|
if query_lower in searchable_text:
|
|
results.append(file_record)
|
|
|
|
return results
|
|
|
|
def get_file_path(self, file_id: str) -> Optional[str]:
|
|
"""
|
|
Get the stored file path for a guideline.
|
|
|
|
Args:
|
|
file_id: ID of the file
|
|
|
|
Returns:
|
|
File path if found, None otherwise
|
|
"""
|
|
if file_id in self.db["files"]:
|
|
stored_path = self.db["files"][file_id]["stored_path"]
|
|
if os.path.exists(stored_path):
|
|
return stored_path
|
|
|
|
return None |