agent_tracker/audit_analyzer.py
nickviljoen 938691e598 Add filtered XLSX export, fix completion reminders, consolidate admin UI
Export & filtering
- Replace /api/admin/agents/export/csv with /api/admin/agents/export/xlsx
  (openpyxl). Multi-line system prompts stay inside one cell instead of
  fragmenting into thousands of rows when opened in Excel.
- Accept filter query params on export: status, discipline, audit,
  business_entity, agent_classification, autonomy_level, risks_only, search.
- Move Export/Import CSV/Delete by CSV buttons into the Agents Management
  tab, drop the duplicate links from the top nav, and rebuild the cramped
  filter row as a wrappable two-row layout.
- Add a Discipline dropdown to the Agents Management filter row to match
  the Prompt Audit tab.

Completion-reminder emails (fix for the broken Complete-button links)
- Add h:Reply-To header to every Mailgun send so users can reply to a real
  mailbox instead of noreply@. Default Nick.Viljoen@oliver.agency, overridable
  via NOTIFICATION_REPLY_TO env var.
- send_completion_reminders now skips with an error log when
  AGENTHUB_PUBLIC_URL is unset instead of mailing relative links email
  clients can't follow.

UI polish
- Restrict the 5-second alert auto-hide to .alert-dismissible so the
  load-bearing 'unresolved owner' banner stays visible until acted on.
- Move the orange brand gradient to a fixed body::before pseudo-element so
  it can't be covered by the white content card or scrolled out of view.
- Lock the navbar to viewport top (position: fixed) and reserve body
  padding-top so content doesn't sit beneath it.

Audit polish (carried over from previous WIP)
- Add batch-state tracking to audit_analyzer for in-flight progress visibility.
- Update PLAN-prompt-audit.md to match the shipped behaviour.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-12 22:45:29 +02:00

539 lines
21 KiB
Python

import os
import json
import re
import asyncio
import logging
from datetime import datetime
from bson import ObjectId
from google import genai
from database import agents_collection, audit_history_collection
logger = logging.getLogger("audit_analyzer")
logger.setLevel(logging.INFO)
if not logger.handlers:
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [AUDIT] %(message)s"))
logger.addHandler(handler)
_batch_state = {
"running": False,
"mode": None,
"started_at": None,
"completed_at": None,
"total": 0,
"audited": 0,
"failed": 0,
"skipped": 0,
"current_agent": None,
"error": None,
"started_by": None,
}
def get_batch_state() -> dict:
return dict(_batch_state)
def _reset_batch_state(mode: str, started_by: str = None):
_batch_state.update({
"running": True,
"mode": mode,
"started_at": datetime.utcnow().isoformat(),
"completed_at": None,
"total": 0,
"audited": 0,
"failed": 0,
"skipped": 0,
"current_agent": None,
"error": None,
"started_by": started_by,
})
def is_gemini_configured() -> bool:
return bool(os.getenv("GOOGLE_API_KEY"))
def _get_client():
api_key = os.getenv("GOOGLE_API_KEY")
return genai.Client(api_key=api_key)
def _get_model_name():
return os.getenv("AUDIT_GEMINI_MODEL", "gemini-2.5-pro")
SYSTEM_INSTRUCTION = """You are an AI agent compliance analyst and classifier. Analyse AI agents based on their
system prompts/instructions and metadata. You must:
1. Classify into business risk categories
2. Assign a business discipline
3. Infer the department/team from the instructions
4. Detect whether the agent is used for client-specific work
5. Infer an Agent Type (single-task / workflow / orchestrator / guardrail)
6. Infer an autonomy hint (does the agent ask the user for sign-off, run partly automated, or fully automated?)
Respond ONLY with valid JSON matching this schema:
{
"category": "1" | "1B" | "2" | "3",
"category_reasoning": "string",
"discipline": "Strategy" | "Creative" | "Oversight including delivery" | "Optimisation" | "Back Office including operations" | "Pencil Agents",
"discipline_reasoning": "string - why this discipline was chosen",
"department": "string or null - inferred team/department from instructions (e.g. Project Management, Creative Services, Finance, Media, Strategy). null if not determinable",
"is_client_work": true | false,
"client_work_reasoning": "string - evidence from instructions that this agent handles client-specific work, references client names, brands, deliverables, or external-facing outputs. Empty string if not client work",
"client_name_detected": "string or null - specific client or brand name found in instructions, null if none",
"agent_classification": "Utility" | "Functional" | "Supervisory" | "Guardian",
"agent_classification_reasoning": "string - one short sentence on why this type was chosen",
"autonomy_level_hint": "Human-Led" | "Hybrid" | "Autopilot" | null,
"autonomy_reasoning": "string - one short sentence on autonomy signals in the prompt; empty string if no signal",
"flags": ["array", "of", "strings"],
"summary": "2-3 sentence analysis",
"recommendations": "which team(s) should review and why",
"risk_level": "low" | "medium" | "high" | "critical"
}
CATEGORY DEFINITIONS:
Cat 1 - Internal Sandbox/Experimentation (Oliver AI Sandbox, behind the scenes, needs IT/Compliance)
Cat 1B - High Cost Internal (may incur large cost, not Cat 2 or 3)
Cat 2 - Client-Exposed Not Sold (Pencil platform, exposed to clients, needs Legal)
Cat 3 - Client-Sold (Pencil platform, sold to clients, needs Commercial team)
DISCIPLINE DEFINITIONS (pick the best fit):
- Strategy: Agents focused on strategic planning, research, market analysis, insights
- Creative: Agents focused on creative work, content creation, design, copywriting
- Oversight including delivery: Agents focused on project management, delivery, QA, oversight, compliance
- Optimisation: Agents focused on performance optimisation, data analysis, efficiency, media planning
- Back Office including operations: Agents focused on internal operations, HR, finance, IT support, admin tasks
- Pencil Agents: Agents built on or for the Pencil platform specifically
DEPARTMENT INFERENCE:
Look for clues in the instructions about which team or role uses this agent. Examples:
- "I am a project manager" -> department: "Project Management"
- "help the media team" -> department: "Media"
- "creative brief" -> department: "Creative"
- If no department/team clues found, set department to null
CLIENT WORK DETECTION:
An agent is client work (is_client_work = true) if the instructions reference:
- Specific client names or brand names
- Client deliverables, client presentations, client reports
- External-facing outputs meant for clients
- Client briefs, client feedback, client approvals
- Work explicitly described as "for the client" or "client-facing"
Do NOT flag as client work if the agent merely mentions "users" or "stakeholders" generically.
RISK LEVELS:
- low: Internal-only, limited capabilities
- medium: Internal with external tool access or moderate cost
- high: Client-facing or accesses sensitive data
- critical: Client-sold or handles financial/legal/PII data
AGENT TYPE DEFINITIONS:
- Utility: Single-purpose tool that does one specific task — summariser, formatter, tone adjuster, glossary lookup.
- Functional: Multi-step agent completing a defined workflow — brief writer, campaign planner, document analyser.
- Supervisory: Orchestrates or oversees other agents — coordinator, router, planner, multi-agent dispatcher.
- Guardian: Monitors, filters, and enforces safety/compliance guardrails on other agents or outputs.
AUTONOMY HINT:
Look in the instructions for explicit signals about how independently the agent acts.
- "Human-Led": prompt says "ask the user", "confirm before", "wait for approval", "always check with"
- "Hybrid": some steps automated, others gated — "you may automatically X but must confirm Y"
- "Autopilot": runs end-to-end without asking — "autonomously", "without confirmation", "do not ask"
- null: no clear signal in the prompt — leave for the human to decide.
This is a hint only; the human can override on the registration form.
FLAGS TO CONSIDER:
internal_only, experimental, sandbox, client_facing, pencil_platform,
revenue_generating, not_for_sale, uses_external_tools, uses_code_interpreter,
uses_file_search, accesses_sensitive_data, handles_pii, high_cost,
resource_intensive, legal_review_needed, commercial_review_needed,
compliance_review_needed, no_instructions"""
USER_PROMPT_TEMPLATE = """Analyse this AI agent:
AGENT NAME: {name}
DESCRIPTION: {description}
AUTHOR: {author}
INSTRUCTIONS (SYSTEM PROMPT):
---
{instructions}
---
TOOLS: {tools}"""
def _parse_json_response(text: str) -> dict:
"""Parse JSON from Gemini response, with regex fallback."""
# Try direct parse first
try:
return json.loads(text)
except json.JSONDecodeError:
pass
# Regex fallback: extract JSON block
match = re.search(r'\{[\s\S]*\}', text)
if match:
try:
return json.loads(match.group())
except json.JSONDecodeError:
pass
# Last resort: return raw text as summary
return {
"category": "1",
"category_reasoning": "Could not parse LLM response",
"discipline": "Back Office including operations",
"discipline_reasoning": "Default — LLM response unparseable",
"department": None,
"is_client_work": False,
"client_work_reasoning": "",
"client_name_detected": None,
"agent_classification": None,
"agent_classification_reasoning": "",
"autonomy_level_hint": None,
"autonomy_reasoning": "",
"flags": ["parse_error"],
"summary": text[:500],
"recommendations": "Manual review required — automated analysis failed to produce structured output",
"risk_level": "medium"
}
async def analyze_single_agent(agent_name: str, instructions: str, tools: str,
description: str, author: str, max_retries: int = 3) -> dict:
"""Analyse a single agent using Gemini and return structured results.
Includes retry with exponential backoff for rate limits."""
client = _get_client()
model_name = _get_model_name()
prompt = USER_PROMPT_TEMPLATE.format(
name=agent_name,
description=description or "No description provided",
author=author or "Unknown",
instructions=instructions or "No instructions available",
tools=tools or "None specified"
)
for attempt in range(max_retries):
try:
response = await asyncio.to_thread(
client.models.generate_content,
model=model_name,
contents=prompt,
config=genai.types.GenerateContentConfig(
system_instruction=SYSTEM_INSTRUCTION
)
)
result = _parse_json_response(response.text)
result["agent_name"] = agent_name
return result
except Exception as e:
error_str = str(e).lower()
is_rate_limit = "429" in error_str or "rate" in error_str or "quota" in error_str or "resource_exhausted" in error_str
if is_rate_limit and attempt < max_retries - 1:
wait_time = (2 ** attempt) * 10 # 10s, 20s, 40s
logger.warning(f"Rate limited on '{agent_name}', retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})")
await asyncio.sleep(wait_time)
continue
# Final attempt failed or non-rate-limit error
logger.error(f"Failed to analyse '{agent_name}': {e}")
e_final = e
return {
"error": str(e_final),
"agent_name": agent_name,
"category": "1",
"category_reasoning": f"Analysis failed: {e_final}",
"discipline": None,
"discipline_reasoning": f"Analysis failed: {e_final}",
"department": None,
"is_client_work": False,
"client_work_reasoning": "",
"client_name_detected": None,
"flags": ["analysis_error"],
"summary": f"Automated analysis failed: {e_final}",
"recommendations": "Manual review required",
"risk_level": "medium"
}
async def store_audit_result(agent_id: str, audit_data: dict):
"""Store audit results on the agent document and in audit_history."""
now = datetime.utcnow()
audit_discipline = audit_data.get("discipline")
if audit_discipline == "Optimization":
audit_discipline = "Optimisation"
# Update agent document with audit fields
update_fields = {
"audit_status": "flagged",
"audit_date": now.isoformat(),
"audit_category": audit_data.get("category"),
"audit_risk_level": audit_data.get("risk_level"),
"audit_summary": audit_data.get("summary"),
"audit_flags": audit_data.get("flags", []),
"audit_recommendations": audit_data.get("recommendations"),
"audit_category_reasoning": audit_data.get("category_reasoning"),
"audit_discipline": audit_discipline,
"audit_discipline_reasoning": audit_data.get("discipline_reasoning"),
"audit_department": audit_data.get("department"),
"audit_is_client_work": audit_data.get("is_client_work", False),
"audit_client_work_reasoning": audit_data.get("client_work_reasoning", ""),
"audit_client_name_detected": audit_data.get("client_name_detected"),
"audit_agent_classification": audit_data.get("agent_classification"),
"audit_agent_classification_reasoning": audit_data.get("agent_classification_reasoning"),
"audit_autonomy_level_hint": audit_data.get("autonomy_level_hint"),
"audit_autonomy_reasoning": audit_data.get("autonomy_reasoning"),
}
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_fields}
)
# Insert into audit_history for historical tracking
history_entry = {
"agent_id": agent_id,
"audit_date": now,
**audit_data
}
await audit_history_collection.insert_one(history_entry)
async def apply_classification_fields(agent_id: str, audit_data: dict):
"""Apply discipline, department, and client detection to the agent document.
Only overwrites fields that are currently empty/null, to respect manual edits.
"""
agent = await agents_collection.find_one({"_id": ObjectId(agent_id)})
if not agent:
return
update_fields = {}
# Auto-assign discipline if not already set
discipline = audit_data.get("discipline")
if discipline == "Optimization":
discipline = "Optimisation"
if discipline and not agent.get("discipline"):
update_fields["discipline"] = discipline
# Auto-assign department if not already set
department = audit_data.get("department")
if department and not agent.get("agent_department"):
update_fields["agent_department"] = department
# Auto-flag client work if not already manually set
is_client_work = audit_data.get("is_client_work", False)
if is_client_work and not agent.get("client"):
update_fields["client"] = "yes"
update_fields["verification_status"] = "needs_verification"
detected_name = audit_data.get("client_name_detected")
if detected_name and not agent.get("client_name"):
update_fields["client_name"] = detected_name
# Auto-assign agent_classification if Gemini gave a usable answer and the field is empty
classification = audit_data.get("agent_classification")
if classification in ("Utility", "Functional", "Supervisory", "Guardian") and not agent.get("agent_classification"):
update_fields["agent_classification"] = classification
# Autonomy is a hint — only fill if Gemini was confident enough to return non-null AND
# the field is empty. The user can override on the registration form.
autonomy_hint = audit_data.get("autonomy_level_hint")
if autonomy_hint in ("Human-Led", "Hybrid", "Autopilot") and not agent.get("autonomy_level"):
update_fields["autonomy_level"] = autonomy_hint
if update_fields:
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": update_fields}
)
async def classify_single_agent(agent_id: str):
"""Convenience function for post-sync: load agent, analyse, store, apply fields."""
if not is_gemini_configured():
return
try:
agent = await agents_collection.find_one({"_id": ObjectId(agent_id)})
if not agent:
logger.warning(f"Agent {agent_id} not found")
return
instructions = agent.get("instructions")
if not instructions:
logger.info(f"Agent '{agent.get('agent_name')}' has no instructions, skipping")
return
# Build tools string from agent metadata
tools = ", ".join(agent.get("agent_capabilities", []) or [])
audit_data = await analyze_single_agent(
agent_name=agent.get("agent_name", "Unknown"),
instructions=instructions,
tools=tools,
description=agent.get("agent_description", ""),
author=agent.get("created_by", "")
)
await store_audit_result(agent_id, audit_data)
await apply_classification_fields(agent_id, audit_data)
logger.info(f"Classified '{agent.get('agent_name')}' as Cat {audit_data.get('category')}, "
f"discipline={audit_data.get('discipline')}, client={audit_data.get('is_client_work')}")
except Exception as e:
logger.error(f"Failed to classify agent {agent_id}: {e}")
async def run_audit_batch(unclassified_only: bool = False, single_agent_id: str = None,
concurrency: int = None) -> dict:
"""Run audit on multiple agents with concurrency control.
Updates the module-level _batch_state as it progresses so the frontend
can poll /api/admin/audit/status. Returns summary dict with counts and results.
"""
if not is_gemini_configured():
return {"error": "GOOGLE_API_KEY not configured"}
if concurrency is None:
concurrency = int(os.getenv("AUDIT_CONCURRENCY", "2"))
query = {}
if single_agent_id:
query["_id"] = ObjectId(single_agent_id)
elif unclassified_only:
query["audit_status"] = {"$exists": False}
agents = await agents_collection.find(query).to_list(length=None)
total = len(agents)
results = []
agents_with_instructions = []
for agent in agents:
if not agent.get("instructions"):
_batch_state["skipped"] += 1
else:
agents_with_instructions.append(agent)
_batch_state["total"] = total
logger.info(f"Audit batch: {total} total, {len(agents_with_instructions)} with instructions, {_batch_state['skipped']} skipped")
batch_size = concurrency
for i in range(0, len(agents_with_instructions), batch_size):
batch = agents_with_instructions[i:i + batch_size]
for agent in batch:
agent_id = str(agent["_id"])
agent_name = agent.get("agent_name", "Unknown")
tools = ", ".join(agent.get("agent_capabilities", []) or [])
_batch_state["current_agent"] = agent_name
try:
audit_data = await analyze_single_agent(
agent_name=agent_name,
instructions=agent.get("instructions"),
tools=tools,
description=agent.get("agent_description", ""),
author=agent.get("created_by", "")
)
if audit_data.get("error"):
_batch_state["failed"] += 1
results.append({"agent_name": agent_name, "error": audit_data["error"]})
else:
await store_audit_result(agent_id, audit_data)
await apply_classification_fields(agent_id, audit_data)
_batch_state["audited"] += 1
results.append({
"agent_name": agent_name,
"category": audit_data.get("category"),
"discipline": audit_data.get("discipline"),
"risk_level": audit_data.get("risk_level"),
"is_client_work": audit_data.get("is_client_work")
})
logger.info(f"[{_batch_state['audited'] + _batch_state['failed']}/{len(agents_with_instructions)}] "
f"Classified '{agent_name}' -> Cat {audit_data.get('category')}")
except Exception as e:
_batch_state["failed"] += 1
results.append({"agent_name": agent_name, "error": str(e)})
logger.error(f"[{_batch_state['audited'] + _batch_state['failed']}/{len(agents_with_instructions)}] "
f"Failed '{agent_name}': {e}")
if i + batch_size < len(agents_with_instructions):
await asyncio.sleep(4)
_batch_state["current_agent"] = None
logger.info(f"Audit batch complete: {_batch_state['audited']} audited, {_batch_state['failed']} failed, {_batch_state['skipped']} skipped")
return {
"status": "completed",
"total": total,
"audited_count": _batch_state["audited"],
"failed_count": _batch_state["failed"],
"skipped_count": _batch_state["skipped"],
"results_summary": results[:50]
}
async def get_all_audit_results() -> list:
"""Get all agents with audit data for the audit tab."""
agents = await agents_collection.find(
{},
{
"agent_name": 1, "audit_status": 1, "audit_date": 1,
"audit_category": 1, "audit_risk_level": 1, "audit_summary": 1,
"audit_flags": 1, "audit_recommendations": 1,
"audit_category_reasoning": 1,
"audit_discipline": 1, "audit_discipline_reasoning": 1,
"audit_department": 1,
"audit_is_client_work": 1, "audit_client_work_reasoning": 1,
"audit_client_name_detected": 1,
"discipline": 1, "agent_department": 1,
"client": 1, "client_name": 1, "verification_status": 1,
"instructions": 1, "agent_capabilities": 1,
"created_by": 1, "agent_description": 1,
"audit_reviewer": 1, "audit_reviewer_notes": 1,
"audit_reviewed_date": 1
}
).to_list(length=None)
# Convert ObjectId to string
for agent in agents:
agent["_id"] = str(agent["_id"])
return agents
async def update_audit_review(agent_id: str, status: str, notes: str, reviewer_email: str):
"""Mark an agent's audit as reviewed/cleared."""
now = datetime.utcnow()
await agents_collection.update_one(
{"_id": ObjectId(agent_id)},
{"$set": {
"audit_status": status,
"audit_reviewer": reviewer_email,
"audit_reviewer_notes": notes,
"audit_reviewed_date": now.isoformat()
}}
)
# Also record in history
await audit_history_collection.insert_one({
"agent_id": agent_id,
"audit_date": now,
"action": "review",
"status": status,
"reviewer": reviewer_email,
"notes": notes
})