modcomms/backend/alembic/versions/006_add_knowledge_base.py
Vadym Samoilenko aeab7d3b18 Rename Legal Agent to Risk & Control Agent across frontend and backend
Updates all display labels (PDF report, campaign page, Knowledge Base card, analytics, status dashboard, checks overview) and aligns internal agent name in backend. Adds migration 010 to update the knowledge base display_name in production DB.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-14 15:10:32 +01:00

179 lines
8.5 KiB
Python

"""Add knowledge base tables and seed initial data
Revision ID: 006_add_knowledge_base
Revises: 005_add_file_hash
Create Date: 2025-02-12
"""
from pathlib import Path
from typing import Sequence, Union
from uuid import uuid4
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = '006_add_knowledge_base'
down_revision: Union[str, None] = '005_add_file_hash'
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Create knowledge base tables and seed initial data."""
# ===========================================
# 1. Create knowledge_bases table
# ===========================================
op.create_table(
'knowledge_bases',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('agent_key', sa.String(100), unique=True, nullable=False),
sa.Column('display_name', sa.String(255), nullable=False),
sa.Column('description', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
# ===========================================
# 2. Create source_documents table
# ===========================================
op.create_table(
'source_documents',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('knowledge_base_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('knowledge_bases.id', ondelete='CASCADE'), nullable=False),
sa.Column('filename', sa.String(500), nullable=False),
sa.Column('file_storage_key', sa.String(500), nullable=False),
sa.Column('file_size_bytes', sa.Integer(), nullable=False),
sa.Column('mime_type', sa.String(255), nullable=False),
sa.Column('uploaded_by_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('users.id'), nullable=True),
sa.Column('uploaded_by_name', sa.String(255), nullable=True),
sa.Column('parsed_markdown', sa.Text(), nullable=True),
sa.Column('parse_status', sa.String(50), nullable=False, server_default='pending'),
sa.Column('parse_error', sa.Text(), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index('ix_source_documents_kb_id', 'source_documents', ['knowledge_base_id'])
# ===========================================
# 3. Create processing_jobs table (before spec_versions due to FK)
# ===========================================
op.create_table(
'processing_jobs',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('knowledge_base_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('knowledge_bases.id', ondelete='CASCADE'), nullable=False),
sa.Column('status', sa.String(50), nullable=False, server_default='pending'),
sa.Column('triggered_by_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('users.id'), nullable=True),
sa.Column('triggered_by_name', sa.String(255), nullable=True),
sa.Column('total_documents', sa.Integer(), nullable=False, server_default='0'),
sa.Column('parsed_documents', sa.Integer(), nullable=False, server_default='0'),
sa.Column('spec_version_id', postgresql.UUID(as_uuid=True), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.Column('log', postgresql.JSONB(), nullable=True),
sa.Column('started_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('completed_at', sa.DateTime(timezone=True), nullable=True),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
)
op.create_index('ix_processing_jobs_kb_id', 'processing_jobs', ['knowledge_base_id'])
# ===========================================
# 4. Create spec_versions table
# ===========================================
op.create_table(
'spec_versions',
sa.Column('id', postgresql.UUID(as_uuid=True), primary_key=True),
sa.Column('knowledge_base_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('knowledge_bases.id', ondelete='CASCADE'), nullable=False),
sa.Column('version_number', sa.Integer(), nullable=False),
sa.Column('content', sa.Text(), nullable=False),
sa.Column('source_document_ids', postgresql.JSONB(), nullable=True),
sa.Column('generated_by_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('users.id'), nullable=True),
sa.Column('generated_by_name', sa.String(255), nullable=True),
sa.Column('processing_job_id', postgresql.UUID(as_uuid=True),
sa.ForeignKey('processing_jobs.id'), nullable=True),
sa.Column('is_active', sa.Boolean(), nullable=False, server_default='true'),
sa.Column('char_count', sa.Integer(), nullable=False),
sa.Column('created_at', sa.DateTime(timezone=True), server_default=sa.func.now()),
sa.UniqueConstraint('knowledge_base_id', 'version_number', name='uq_kb_version_number'),
)
op.create_index('ix_spec_versions_kb_id', 'spec_versions', ['knowledge_base_id'])
# Add the FK from processing_jobs.spec_version_id -> spec_versions.id
# (deferred because of circular reference)
op.create_foreign_key(
'fk_processing_jobs_spec_version',
'processing_jobs', 'spec_versions',
['spec_version_id'], ['id'],
)
# ===========================================
# 5. Seed 5 knowledge base rows
# ===========================================
conn = op.get_bind()
kb_seeds = [
("legal", "Risk & Control", "Legal compliance, advertising standards, disclaimers, and financial promotion rules."),
("brand_barclays", "Brand (Barclays)", "Barclays brand guidelines: logo usage, colors, typography, and design principles."),
("brand_barclaycard", "Brand (Barclaycard)", "Barclaycard brand guidelines: logo usage, colors, typography, and design principles."),
("channel_best_practices", "Channel Best Practices", "Channel-specific best practices for social, display, email, print, and OOH."),
("channel_tech_specs", "Channel Tech Specs", "Technical specifications, dimensions, file formats, and platform requirements."),
]
kb_ids = {}
for agent_key, display_name, description in kb_seeds:
kb_id = str(uuid4())
kb_ids[agent_key] = kb_id
conn.execute(
sa.text("""
INSERT INTO knowledge_bases (id, agent_key, display_name, description)
VALUES (:id, :agent_key, :display_name, :description)
ON CONFLICT (agent_key) DO NOTHING
"""),
{"id": kb_id, "agent_key": agent_key, "display_name": display_name, "description": description}
)
# ===========================================
# 6. Seed existing prompts/*.md as spec_versions v1
# ===========================================
prompts_dir = Path(__file__).parent.parent.parent.parent / "prompts"
spec_file_map = {
"legal": "legal.md",
"brand_barclays": "brand_barclays.md",
"brand_barclaycard": "brand_barclaycard.md",
"channel_best_practices": "channel_best_practices.md",
"channel_tech_specs": "channel_tech_specs.md",
}
for agent_key, filename in spec_file_map.items():
spec_path = prompts_dir / filename
if spec_path.exists():
content = spec_path.read_text(encoding="utf-8")
spec_id = str(uuid4())
conn.execute(
sa.text("""
INSERT INTO spec_versions (id, knowledge_base_id, version_number, content, generated_by_name, is_active, char_count)
VALUES (:id, :kb_id, 1, :content, :generated_by_name, true, :char_count)
"""),
{
"id": spec_id,
"kb_id": kb_ids[agent_key],
"content": content,
"generated_by_name": "System (Migration)",
"char_count": len(content),
}
)
def downgrade() -> None:
"""Drop knowledge base tables."""
op.drop_constraint('fk_processing_jobs_spec_version', 'processing_jobs', type_='foreignkey')
op.drop_table('spec_versions')
op.drop_table('processing_jobs')
op.drop_table('source_documents')
op.drop_table('knowledge_bases')