#!/usr/bin/env python3 """ Brand Guidelines Database Management Handles storage and retrieval of brand guidelines files for QC checks. """ import os import json import shutil from datetime import datetime from pathlib import Path from typing import Dict, List, Optional class BrandGuidelinesDB: """ Simple file-based database for brand guidelines management. """ def __init__(self, base_dir: str = "brand_guidelines"): """ Initialize the brand guidelines database. Args: base_dir: Directory to store brand guidelines files """ self.base_dir = Path(base_dir) self.base_dir.mkdir(exist_ok=True) # Create subdirectories self.files_dir = self.base_dir / "files" self.files_dir.mkdir(exist_ok=True) # Database metadata file self.db_file = self.base_dir / "guidelines_db.json" # Load existing database self.db = self._load_db() def _load_db(self) -> Dict: """Load the database from JSON file.""" if self.db_file.exists(): try: with open(self.db_file, 'r') as f: db = json.load(f) # Backfill missing client_id on existing records modified = False for file_id, record in db.get("files", {}).items(): if "client_id" not in record: record["client_id"] = "general" modified = True for brand_name, brand_data in db.get("brands", {}).items(): if "client_id" not in brand_data: brand_data["client_id"] = "general" modified = True if modified: db["last_updated"] = datetime.now().isoformat() with open(self.db_file, 'w') as f: json.dump(db, f, indent=2) return db except (json.JSONDecodeError, FileNotFoundError): pass # Return empty database structure return { "brands": {}, "files": {}, "created_at": datetime.now().isoformat(), "last_updated": datetime.now().isoformat() } def _save_db(self): """Save the database to JSON file.""" self.db["last_updated"] = datetime.now().isoformat() with open(self.db_file, 'w') as f: json.dump(self.db, f, indent=2) def add_brand_guideline(self, brand_name: str, file_path: str, description: str = "", tags: List[str] = None, client_id: str = "general") -> Dict: """ Add a brand guideline file to the database. Args: brand_name: Name of the brand file_path: Path to the guideline file description: Optional description tags: Optional list of tags client_id: Client this guideline belongs to (default "general") Returns: Dictionary with file information """ if not os.path.exists(file_path): raise FileNotFoundError(f"File not found: {file_path}") # Generate unique file ID file_id = f"{brand_name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}" # Get file info original_filename = os.path.basename(file_path) file_ext = os.path.splitext(original_filename)[1] stored_filename = f"{file_id}{file_ext}" stored_path = self.files_dir / stored_filename # Copy file to storage shutil.copy2(file_path, stored_path) # Get file stats file_stats = os.stat(stored_path) # Create file record file_record = { "id": file_id, "brand_name": brand_name, "client_id": client_id, "original_filename": original_filename, "stored_filename": stored_filename, "stored_path": str(stored_path), "file_size": file_stats.st_size, "description": description, "tags": tags or [], "upload_date": datetime.now().isoformat(), "file_type": file_ext.lower() } # Add to database self.db["files"][file_id] = file_record # Add brand if not exists if brand_name not in self.db["brands"]: self.db["brands"][brand_name] = { "name": brand_name, "client_id": client_id, "created_at": datetime.now().isoformat(), "file_count": 0, "guidelines": [] } # Update brand info self.db["brands"][brand_name]["file_count"] += 1 self.db["brands"][brand_name]["guidelines"].append(file_id) # Save database self._save_db() return file_record def get_brand_guidelines(self, brand_name: str) -> List[Dict]: """ Get all guidelines for a specific brand. Args: brand_name: Name of the brand Returns: List of guideline file records """ if brand_name not in self.db["brands"]: return [] guidelines = [] for file_id in self.db["brands"][brand_name]["guidelines"]: if file_id in self.db["files"]: guidelines.append(self.db["files"][file_id]) return guidelines def get_all_brands(self) -> List[str]: """Get list of all brand names.""" return list(self.db["brands"].keys()) def get_all_guidelines(self) -> List[Dict]: """Get all guideline files.""" return list(self.db["files"].values()) def get_guidelines_by_client(self, client_id: str) -> Dict: """ Get guidelines filtered by client_id. Args: client_id: The client identifier to filter by Returns: Dict with 'brands' and 'files' filtered to matching client """ filtered_files = {} filtered_brands = {} for file_id, record in self.db.get("files", {}).items(): if record.get("client_id", "general") == client_id: filtered_files[file_id] = record for brand_name, brand_data in self.db.get("brands", {}).items(): # Filter brand's guidelines list to only matching files matching_guidelines = [ gid for gid in brand_data.get("guidelines", []) if gid in filtered_files ] if matching_guidelines: filtered_brands[brand_name] = { **brand_data, "guidelines": matching_guidelines, "file_count": len(matching_guidelines) } return {"brands": filtered_brands, "files": filtered_files} def update_file_record(self, file_id: str, updates: Dict) -> bool: """Update fields on an existing file record and save.""" if file_id not in self.db["files"]: return False self.db["files"][file_id].update(updates) self._save_db() return True def get_summary_path(self, file_id: str) -> Optional[str]: """Return the path to a pre-computed summary file, or None.""" if file_id not in self.db["files"]: return None summary_path = self.db["files"][file_id].get("summary_path") if summary_path and os.path.exists(summary_path): return summary_path return None def get_cover_image_path(self, file_id: str) -> Optional[str]: """Return the path to a pre-extracted PDF cover image, or None.""" if file_id not in self.db["files"]: return None cover_path = self.db["files"][file_id].get("cover_image_path") if cover_path and os.path.exists(cover_path): return cover_path return None def delete_guideline(self, file_id: str) -> bool: """ Delete a guideline file. Args: file_id: ID of the file to delete Returns: True if deleted successfully, False otherwise """ if file_id not in self.db["files"]: return False file_record = self.db["files"][file_id] brand_name = file_record["brand_name"] # Delete physical file try: stored_path = Path(file_record["stored_path"]) if stored_path.exists(): stored_path.unlink() except Exception as e: print(f"Error deleting file: {e}") # Delete associated summary and cover image files for key in ("summary_path", "cover_image_path"): extra_path = file_record.get(key) if extra_path: try: p = Path(extra_path) if p.exists(): p.unlink() except Exception as e: print(f"Error deleting {key}: {e}") # Remove from database del self.db["files"][file_id] # Update brand info if brand_name in self.db["brands"]: if file_id in self.db["brands"][brand_name]["guidelines"]: self.db["brands"][brand_name]["guidelines"].remove(file_id) self.db["brands"][brand_name]["file_count"] -= 1 # Remove brand if no files left if self.db["brands"][brand_name]["file_count"] <= 0: del self.db["brands"][brand_name] # Save database self._save_db() return True def search_guidelines(self, query: str, brand_name: str = None) -> List[Dict]: """ Search guidelines by description, tags, or filename. Args: query: Search query brand_name: Optional brand filter Returns: List of matching guideline records """ results = [] query_lower = query.lower() for file_record in self.db["files"].values(): # Filter by brand if specified if brand_name and file_record["brand_name"] != brand_name: continue # Search in various fields searchable_text = " ".join([ file_record.get("description", ""), file_record.get("original_filename", ""), " ".join(file_record.get("tags", [])) ]).lower() if query_lower in searchable_text: results.append(file_record) return results def get_file_path(self, file_id: str) -> Optional[str]: """ Get the stored file path for a guideline. Args: file_id: ID of the file Returns: File path if found, None otherwise """ if file_id in self.db["files"]: stored_path = self.db["files"][file_id]["stored_path"] if os.path.exists(stored_path): return stored_path return None