agent_tracker/audit_analyzer.py
nickviljoen 32b08f8b0c Add Prompt Audit & Auto-Classification feature with Gemini integration
Adds Gemini-powered agent classification system that analyzes agent instructions
to determine category, risk level, discipline, and client detection. Includes
admin Prompt Audit tab, audit review workflow, and auto-classification on sync.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 18:29:35 +02:00

461 lines
18 KiB
Python

import os
import json
import re
import asyncio
import logging
from datetime import datetime
from bson import ObjectId
from google import genai
from database import agents_collection, audit_history_collection
logger = logging.getLogger("audit_analyzer")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [AUDIT] %(message)s"))
logger.addHandler(handler)
def is_gemini_configured() -> bool:
return bool(os.getenv("GOOGLE_API_KEY"))
def _get_client():
api_key = os.getenv("GOOGLE_API_KEY")
return genai.Client(api_key=api_key)
def _get_model_name():
return os.getenv("AUDIT_GEMINI_MODEL", "gemini-2.5-pro")
SYSTEM_INSTRUCTION = """You are an AI agent compliance analyst and classifier. Analyse AI agents based on their
system prompts/instructions and metadata. You must:
1. Classify into business risk categories
2. Assign a business discipline
3. Infer the department/team from the instructions
4. Detect whether the agent is used for client-specific work
Respond ONLY with valid JSON matching this schema:
{
"category": "1" | "1B" | "2" | "3",
"category_reasoning": "string",
"discipline": "Strategy" | "Creative" | "Oversight including delivery" | "Optimization" | "Back Office including operations" | "Pencil Agents",
"discipline_reasoning": "string - why this discipline was chosen",
"department": "string or null - inferred team/department from instructions (e.g. Project Management, Creative Services, Finance, Media, Strategy). null if not determinable",
"is_client_work": true | false,
"client_work_reasoning": "string - evidence from instructions that this agent handles client-specific work, references client names, brands, deliverables, or external-facing outputs. Empty string if not client work",
"client_name_detected": "string or null - specific client or brand name found in instructions, null if none",
"flags": ["array", "of", "strings"],
"summary": "2-3 sentence analysis",
"recommendations": "which team(s) should review and why",
"risk_level": "low" | "medium" | "high" | "critical"
}
CATEGORY DEFINITIONS:
Cat 1 - Internal Sandbox/Experimentation (Oliver AI Sandbox, behind the scenes, needs IT/Compliance)
Cat 1B - High Cost Internal (may incur large cost, not Cat 2 or 3)
Cat 2 - Client-Exposed Not Sold (Pencil platform, exposed to clients, needs Legal)
Cat 3 - Client-Sold (Pencil platform, sold to clients, needs Commercial team)
DISCIPLINE DEFINITIONS (pick the best fit):
- Strategy: Agents focused on strategic planning, research, market analysis, insights
- Creative: Agents focused on creative work, content creation, design, copywriting
- Oversight including delivery: Agents focused on project management, delivery, QA, oversight, compliance
- Optimization: Agents focused on performance optimization, data analysis, efficiency, media planning
- Back Office including operations: Agents focused on internal operations, HR, finance, IT support, admin tasks
- Pencil Agents: Agents built on or for the Pencil platform specifically
DEPARTMENT INFERENCE:
Look for clues in the instructions about which team or role uses this agent. Examples:
- "I am a project manager" -> department: "Project Management"
- "help the media team" -> department: "Media"
- "creative brief" -> department: "Creative"
- If no department/team clues found, set department to null
CLIENT WORK DETECTION:
An agent is client work (is_client_work = true) if the instructions reference:
- Specific client names or brand names
- Client deliverables, client presentations, client reports
- External-facing outputs meant for clients
- Client briefs, client feedback, client approvals
- Work explicitly described as "for the client" or "client-facing"
Do NOT flag as client work if the agent merely mentions "users" or "stakeholders" generically.
RISK LEVELS:
- low: Internal-only, limited capabilities
- medium: Internal with external tool access or moderate cost
- high: Client-facing or accesses sensitive data
- critical: Client-sold or handles financial/legal/PII data
FLAGS TO CONSIDER:
internal_only, experimental, sandbox, client_facing, pencil_platform,
revenue_generating, not_for_sale, uses_external_tools, uses_code_interpreter,
uses_file_search, accesses_sensitive_data, handles_pii, high_cost,
resource_intensive, legal_review_needed, commercial_review_needed,
compliance_review_needed, no_instructions"""
USER_PROMPT_TEMPLATE = """Analyse this AI agent:
AGENT NAME: {name}
DESCRIPTION: {description}
AUTHOR: {author}
INSTRUCTIONS (SYSTEM PROMPT):
---
{instructions}
---
TOOLS: {tools}"""
def _parse_json_response(text: str) -> dict:
"""Parse JSON from Gemini response, with regex fallback."""
# Try direct parse first
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Regex fallback: extract JSON block
match = re.search(r'\{[\s\S]*\}', text)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
# Last resort: return raw text as summary
return {
"category": "1",
"category_reasoning": "Could not parse LLM response",
"discipline": "Back Office including operations",
"discipline_reasoning": "Default — LLM response unparseable",
"department": None,
"is_client_work": False,
"client_work_reasoning": "",
"client_name_detected": None,
"flags": ["parse_error"],
"summary": text[:500],
"recommendations": "Manual review required — automated analysis failed to produce structured output",
"risk_level": "medium"
}
async def analyze_single_agent(agent_name: str, instructions: str, tools: str,
description: str, author: str, max_retries: int = 3) -> dict:
"""Analyse a single agent using Gemini and return structured results.
Includes retry with exponential backoff for rate limits."""
client = _get_client()
model_name = _get_model_name()
prompt = USER_PROMPT_TEMPLATE.format(
name=agent_name,
description=description or "No description provided",
author=author or "Unknown",
instructions=instructions or "No instructions available",
tools=tools or "None specified"
)
for attempt in range(max_retries):
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=model_name,
contents=prompt,
config=genai.types.GenerateContentConfig(
system_instruction=SYSTEM_INSTRUCTION
)
)
result = _parse_json_response(response.text)
result["agent_name"] = agent_name
return result
except Exception as e:
error_str = str(e).lower()
is_rate_limit = "429" in error_str or "rate" in error_str or "quota" in error_str or "resource_exhausted" in error_str
if is_rate_limit and attempt < max_retries - 1:
wait_time = (2 ** attempt) * 10 # 10s, 20s, 40s
logger.warning(f"Rate limited on '{agent_name}', retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
# Final attempt failed or non-rate-limit error
logger.error(f"Failed to analyse '{agent_name}': {e}")
e_final = e
return {
"error": str(e_final),
"agent_name": agent_name,
"category": "1",
"category_reasoning": f"Analysis failed: {e_final}",
"discipline": None,
"discipline_reasoning": f"Analysis failed: {e_final}",
"department": None,
"is_client_work": False,
"client_work_reasoning": "",
"client_name_detected": None,
"flags": ["analysis_error"],
"summary": f"Automated analysis failed: {e_final}",
"recommendations": "Manual review required",
"risk_level": "medium"
}
async def store_audit_result(agent_id: str, audit_data: dict):
"""Store audit results on the agent document and in audit_history."""
now = datetime.utcnow()
# Update agent document with audit fields
update_fields = {
"audit_status": "flagged",
"audit_date": now.isoformat(),
"audit_category": audit_data.get("category"),
"audit_risk_level": audit_data.get("risk_level"),
"audit_summary": audit_data.get("summary"),
"audit_flags": audit_data.get("flags", []),
"audit_recommendations": audit_data.get("recommendations"),
"audit_category_reasoning": audit_data.get("category_reasoning"),
"audit_discipline": audit_data.get("discipline"),
"audit_discipline_reasoning": audit_data.get("discipline_reasoning"),
"audit_department": audit_data.get("department"),
"audit_is_client_work": audit_data.get("is_client_work", False),
"audit_client_work_reasoning": audit_data.get("client_work_reasoning", ""),
"audit_client_name_detected": audit_data.get("client_name_detected"),
}
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_fields}
)
# Insert into audit_history for historical tracking
history_entry = {
"agent_id": agent_id,
"audit_date": now,
**audit_data
}
await audit_history_collection.insert_one(history_entry)
async def apply_classification_fields(agent_id: str, audit_data: dict):
"""Apply discipline, department, and client detection to the agent document.
Only overwrites fields that are currently empty/null, to respect manual edits.
"""
agent = await agents_collection.find_one({"_id": ObjectId(agent_id)})
if not agent:
return
update_fields = {}
# Auto-assign discipline if not already set
discipline = audit_data.get("discipline")
if discipline and not agent.get("discipline"):
update_fields["discipline"] = discipline
# Auto-assign department if not already set
department = audit_data.get("department")
if department and not agent.get("agent_department"):
update_fields["agent_department"] = department
# Auto-flag client work if not already manually set
is_client_work = audit_data.get("is_client_work", False)
if is_client_work and not agent.get("client"):
update_fields["client"] = "yes"
update_fields["verification_status"] = "needs_verification"
detected_name = audit_data.get("client_name_detected")
if detected_name and not agent.get("client_name"):
update_fields["client_name"] = detected_name
if update_fields:
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_fields}
)
async def classify_single_agent(agent_id: str):
"""Convenience function for post-sync: load agent, analyse, store, apply fields."""
if not is_gemini_configured():
return
try:
agent = await agents_collection.find_one({"_id": ObjectId(agent_id)})
if not agent:
logger.warning(f"Agent {agent_id} not found")
return
instructions = agent.get("instructions")
if not instructions:
logger.info(f"Agent '{agent.get('agent_name')}' has no instructions, skipping")
return
# Build tools string from agent metadata
tools = ", ".join(agent.get("agent_capabilities", []) or [])
audit_data = await analyze_single_agent(
agent_name=agent.get("agent_name", "Unknown"),
instructions=instructions,
tools=tools,
description=agent.get("agent_description", ""),
author=agent.get("created_by", "")
)
await store_audit_result(agent_id, audit_data)
await apply_classification_fields(agent_id, audit_data)
logger.info(f"Classified '{agent.get('agent_name')}' as Cat {audit_data.get('category')}, "
f"discipline={audit_data.get('discipline')}, client={audit_data.get('is_client_work')}")
except Exception as e:
logger.error(f"Failed to classify agent {agent_id}: {e}")
async def run_audit_batch(unclassified_only: bool = False, single_agent_id: str = None,
concurrency: int = None) -> dict:
"""Run audit on multiple agents with concurrency control.
Returns summary dict with counts and results.
"""
if not is_gemini_configured():
return {"error": "GOOGLE_API_KEY not configured"}
if concurrency is None:
concurrency = int(os.getenv("AUDIT_CONCURRENCY", "2"))
# Build query
query = {}
if single_agent_id:
query["_id"] = ObjectId(single_agent_id)
elif unclassified_only:
query["audit_status"] = {"$exists": False}
agents = await agents_collection.find(query).to_list(length=None)
total = len(agents)
audited = 0
failed = 0
skipped = 0
results = []
# Filter out agents without instructions first
agents_with_instructions = []
for agent in agents:
if not agent.get("instructions"):
skipped += 1
else:
agents_with_instructions.append(agent)
logger.info(f"Audit batch: {total} total, {len(agents_with_instructions)} with instructions, {skipped} skipped")
# Process agents sequentially in small batches to respect Gemini rate limits
batch_size = concurrency
for i in range(0, len(agents_with_instructions), batch_size):
batch = agents_with_instructions[i:i + batch_size]
for agent in batch:
agent_id = str(agent["_id"])
tools = ", ".join(agent.get("agent_capabilities", []) or [])
try:
audit_data = await analyze_single_agent(
agent_name=agent.get("agent_name", "Unknown"),
instructions=agent.get("instructions"),
tools=tools,
description=agent.get("agent_description", ""),
author=agent.get("created_by", "")
)
if audit_data.get("error"):
failed += 1
results.append({"agent_name": agent.get("agent_name"), "error": audit_data["error"]})
else:
await store_audit_result(agent_id, audit_data)
await apply_classification_fields(agent_id, audit_data)
audited += 1
results.append({
"agent_name": agent.get("agent_name"),
"category": audit_data.get("category"),
"discipline": audit_data.get("discipline"),
"risk_level": audit_data.get("risk_level"),
"is_client_work": audit_data.get("is_client_work")
})
logger.info(f"[{audited + failed}/{len(agents_with_instructions)}] "
f"Classified '{agent.get('agent_name')}' -> Cat {audit_data.get('category')}")
except Exception as e:
failed += 1
results.append({"agent_name": agent.get("agent_name"), "error": str(e)})
logger.error(f"[{audited + failed}/{len(agents_with_instructions)}] "
f"Failed '{agent.get('agent_name')}': {e}")
# Pause between batches to avoid rate limits (4 seconds per batch)
if i + batch_size < len(agents_with_instructions):
await asyncio.sleep(4)
logger.info(f"Audit batch complete: {audited} audited, {failed} failed, {skipped} skipped")
return {
"status": "completed",
"total": total,
"audited_count": audited,
"failed_count": failed,
"skipped_count": skipped,
"results_summary": results[:50]
}
async def get_all_audit_results() -> list:
"""Get all agents with audit data for the audit tab."""
agents = await agents_collection.find(
{},
{
"agent_name": 1, "audit_status": 1, "audit_date": 1,
"audit_category": 1, "audit_risk_level": 1, "audit_summary": 1,
"audit_flags": 1, "audit_recommendations": 1,
"audit_category_reasoning": 1,
"audit_discipline": 1, "audit_discipline_reasoning": 1,
"audit_department": 1,
"audit_is_client_work": 1, "audit_client_work_reasoning": 1,
"audit_client_name_detected": 1,
"discipline": 1, "agent_department": 1,
"client": 1, "client_name": 1, "verification_status": 1,
"instructions": 1, "agent_capabilities": 1,
"created_by": 1, "agent_description": 1,
"audit_reviewer": 1, "audit_reviewer_notes": 1,
"audit_reviewed_date": 1
}
).to_list(length=None)
# Convert ObjectId to string
for agent in agents:
agent["_id"] = str(agent["_id"])
return agents
async def update_audit_review(agent_id: str, status: str, notes: str, reviewer_email: str):
"""Mark an agent's audit as reviewed/cleared."""
now = datetime.utcnow()
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": {
"audit_status": status,
"audit_reviewer": reviewer_email,
"audit_reviewer_notes": notes,
"audit_reviewed_date": now.isoformat()
}}
)
# Also record in history
await audit_history_collection.insert_one({
"agent_id": agent_id,
"audit_date": now,
"action": "review",
"status": status,
"reviewer": reviewer_email,
"notes": notes
})