import os import json import re import asyncio import logging from datetime import datetime from bson import ObjectId from google import genai from database import agents_collection, audit_history_collection logger = logging.getLogger("audit_analyzer") logger.setLevel(logging.INFO) if not logger.handlers: handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(asctime)s [AUDIT] %(message)s")) logger.addHandler(handler) _batch_state = { "running": False, "mode": None, "started_at": None, "completed_at": None, "total": 0, "audited": 0, "failed": 0, "skipped": 0, "current_agent": None, "error": None, "started_by": None, } def get_batch_state() -> dict: return dict(_batch_state) def _reset_batch_state(mode: str, started_by: str = None): _batch_state.update({ "running": True, "mode": mode, "started_at": datetime.utcnow().isoformat(), "completed_at": None, "total": 0, "audited": 0, "failed": 0, "skipped": 0, "current_agent": None, "error": None, "started_by": started_by, }) def is_gemini_configured() -> bool: return bool(os.getenv("GOOGLE_API_KEY")) def _get_client(): api_key = os.getenv("GOOGLE_API_KEY") return genai.Client(api_key=api_key) def _get_model_name(): return os.getenv("AUDIT_GEMINI_MODEL", "gemini-2.5-pro") SYSTEM_INSTRUCTION = """You are an AI agent compliance analyst and classifier. Analyse AI agents based on their system prompts/instructions and metadata. You must: 1. Classify into business risk categories 2. Assign a business discipline 3. Infer the department/team from the instructions 4. Detect whether the agent is used for client-specific work 5. Infer an Agent Type (single-task / workflow / orchestrator / guardrail) 6. Infer an autonomy hint (does the agent ask the user for sign-off, run partly automated, or fully automated?) Respond ONLY with valid JSON matching this schema: { "category": "1" | "1B" | "2" | "3", "category_reasoning": "string", "discipline": "Strategy" | "Creative" | "Oversight including delivery" | "Optimisation" | "Back Office including operations" | "Pencil Agents", "discipline_reasoning": "string - why this discipline was chosen", "department": "string or null - inferred team/department from instructions (e.g. Project Management, Creative Services, Finance, Media, Strategy). null if not determinable", "is_client_work": true | false, "client_work_reasoning": "string - evidence from instructions that this agent handles client-specific work, references client names, brands, deliverables, or external-facing outputs. Empty string if not client work", "client_name_detected": "string or null - specific client or brand name found in instructions, null if none", "agent_classification": "Utility" | "Functional" | "Supervisory" | "Guardian", "agent_classification_reasoning": "string - one short sentence on why this type was chosen", "autonomy_level_hint": "Human-Led" | "Hybrid" | "Autopilot" | null, "autonomy_reasoning": "string - one short sentence on autonomy signals in the prompt; empty string if no signal", "flags": ["array", "of", "strings"], "summary": "2-3 sentence analysis", "recommendations": "which team(s) should review and why", "risk_level": "low" | "medium" | "high" | "critical" } CATEGORY DEFINITIONS: Cat 1 - Internal Sandbox/Experimentation (Oliver AI Sandbox, behind the scenes, needs IT/Compliance) Cat 1B - High Cost Internal (may incur large cost, not Cat 2 or 3) Cat 2 - Client-Exposed Not Sold (Pencil platform, exposed to clients, needs Legal) Cat 3 - Client-Sold (Pencil platform, sold to clients, needs Commercial team) DISCIPLINE DEFINITIONS (pick the best fit): - Strategy: Agents focused on strategic planning, research, market analysis, insights - Creative: Agents focused on creative work, content creation, design, copywriting - Oversight including delivery: Agents focused on project management, delivery, QA, oversight, compliance - Optimisation: Agents focused on performance optimisation, data analysis, efficiency, media planning - Back Office including operations: Agents focused on internal operations, HR, finance, IT support, admin tasks - Pencil Agents: Agents built on or for the Pencil platform specifically DEPARTMENT INFERENCE: Look for clues in the instructions about which team or role uses this agent. Examples: - "I am a project manager" -> department: "Project Management" - "help the media team" -> department: "Media" - "creative brief" -> department: "Creative" - If no department/team clues found, set department to null CLIENT WORK DETECTION: An agent is client work (is_client_work = true) if the instructions reference: - Specific client names or brand names - Client deliverables, client presentations, client reports - External-facing outputs meant for clients - Client briefs, client feedback, client approvals - Work explicitly described as "for the client" or "client-facing" Do NOT flag as client work if the agent merely mentions "users" or "stakeholders" generically. RISK LEVELS: - low: Internal-only, limited capabilities - medium: Internal with external tool access or moderate cost - high: Client-facing or accesses sensitive data - critical: Client-sold or handles financial/legal/PII data AGENT TYPE DEFINITIONS: - Utility: Single-purpose tool that does one specific task — summariser, formatter, tone adjuster, glossary lookup. - Functional: Multi-step agent completing a defined workflow — brief writer, campaign planner, document analyser. - Supervisory: Orchestrates or oversees other agents — coordinator, router, planner, multi-agent dispatcher. - Guardian: Monitors, filters, and enforces safety/compliance guardrails on other agents or outputs. AUTONOMY HINT: Look in the instructions for explicit signals about how independently the agent acts. - "Human-Led": prompt says "ask the user", "confirm before", "wait for approval", "always check with" - "Hybrid": some steps automated, others gated — "you may automatically X but must confirm Y" - "Autopilot": runs end-to-end without asking — "autonomously", "without confirmation", "do not ask" - null: no clear signal in the prompt — leave for the human to decide. This is a hint only; the human can override on the registration form. FLAGS TO CONSIDER: internal_only, experimental, sandbox, client_facing, pencil_platform, revenue_generating, not_for_sale, uses_external_tools, uses_code_interpreter, uses_file_search, accesses_sensitive_data, handles_pii, high_cost, resource_intensive, legal_review_needed, commercial_review_needed, compliance_review_needed, no_instructions""" USER_PROMPT_TEMPLATE = """Analyse this AI agent: AGENT NAME: {name} DESCRIPTION: {description} AUTHOR: {author} INSTRUCTIONS (SYSTEM PROMPT): --- {instructions} --- TOOLS: {tools}""" def _parse_json_response(text: str) -> dict: """Parse JSON from Gemini response, with regex fallback.""" # Try direct parse first try: return json.loads(text) except json.JSONDecodeError: pass # Regex fallback: extract JSON block match = re.search(r'\{[\s\S]*\}', text) if match: try: return json.loads(match.group()) except json.JSONDecodeError: pass # Last resort: return raw text as summary return { "category": "1", "category_reasoning": "Could not parse LLM response", "discipline": "Back Office including operations", "discipline_reasoning": "Default — LLM response unparseable", "department": None, "is_client_work": False, "client_work_reasoning": "", "client_name_detected": None, "agent_classification": None, "agent_classification_reasoning": "", "autonomy_level_hint": None, "autonomy_reasoning": "", "flags": ["parse_error"], "summary": text[:500], "recommendations": "Manual review required — automated analysis failed to produce structured output", "risk_level": "medium" } async def analyze_single_agent(agent_name: str, instructions: str, tools: str, description: str, author: str, max_retries: int = 3) -> dict: """Analyse a single agent using Gemini and return structured results. Includes retry with exponential backoff for rate limits.""" client = _get_client() model_name = _get_model_name() prompt = USER_PROMPT_TEMPLATE.format( name=agent_name, description=description or "No description provided", author=author or "Unknown", instructions=instructions or "No instructions available", tools=tools or "None specified" ) for attempt in range(max_retries): try: response = await asyncio.to_thread( client.models.generate_content, model=model_name, contents=prompt, config=genai.types.GenerateContentConfig( system_instruction=SYSTEM_INSTRUCTION ) ) result = _parse_json_response(response.text) result["agent_name"] = agent_name return result except Exception as e: error_str = str(e).lower() is_rate_limit = "429" in error_str or "rate" in error_str or "quota" in error_str or "resource_exhausted" in error_str if is_rate_limit and attempt < max_retries - 1: wait_time = (2 ** attempt) * 10 # 10s, 20s, 40s logger.warning(f"Rate limited on '{agent_name}', retrying in {wait_time}s (attempt {attempt + 1}/{max_retries})") await asyncio.sleep(wait_time) continue # Final attempt failed or non-rate-limit error logger.error(f"Failed to analyse '{agent_name}': {e}") e_final = e return { "error": str(e_final), "agent_name": agent_name, "category": "1", "category_reasoning": f"Analysis failed: {e_final}", "discipline": None, "discipline_reasoning": f"Analysis failed: {e_final}", "department": None, "is_client_work": False, "client_work_reasoning": "", "client_name_detected": None, "flags": ["analysis_error"], "summary": f"Automated analysis failed: {e_final}", "recommendations": "Manual review required", "risk_level": "medium" } async def store_audit_result(agent_id: str, audit_data: dict): """Store audit results on the agent document and in audit_history.""" now = datetime.utcnow() audit_discipline = audit_data.get("discipline") if audit_discipline == "Optimization": audit_discipline = "Optimisation" # Update agent document with audit fields update_fields = { "audit_status": "flagged", "audit_date": now.isoformat(), "audit_category": audit_data.get("category"), "audit_risk_level": audit_data.get("risk_level"), "audit_summary": audit_data.get("summary"), "audit_flags": audit_data.get("flags", []), "audit_recommendations": audit_data.get("recommendations"), "audit_category_reasoning": audit_data.get("category_reasoning"), "audit_discipline": audit_discipline, "audit_discipline_reasoning": audit_data.get("discipline_reasoning"), "audit_department": audit_data.get("department"), "audit_is_client_work": audit_data.get("is_client_work", False), "audit_client_work_reasoning": audit_data.get("client_work_reasoning", ""), "audit_client_name_detected": audit_data.get("client_name_detected"), "audit_agent_classification": audit_data.get("agent_classification"), "audit_agent_classification_reasoning": audit_data.get("agent_classification_reasoning"), "audit_autonomy_level_hint": audit_data.get("autonomy_level_hint"), "audit_autonomy_reasoning": audit_data.get("autonomy_reasoning"), } await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update_fields} ) # Insert into audit_history for historical tracking history_entry = { "agent_id": agent_id, "audit_date": now, **audit_data } await audit_history_collection.insert_one(history_entry) async def apply_classification_fields(agent_id: str, audit_data: dict): """Apply discipline, department, and client detection to the agent document. Only overwrites fields that are currently empty/null, to respect manual edits. """ agent = await agents_collection.find_one({"_id": ObjectId(agent_id)}) if not agent: return update_fields = {} # Auto-assign discipline if not already set discipline = audit_data.get("discipline") if discipline == "Optimization": discipline = "Optimisation" if discipline and not agent.get("discipline"): update_fields["discipline"] = discipline # Auto-assign department if not already set department = audit_data.get("department") if department and not agent.get("agent_department"): update_fields["agent_department"] = department # Auto-flag client work if not already manually set is_client_work = audit_data.get("is_client_work", False) if is_client_work and not agent.get("client"): update_fields["client"] = "yes" update_fields["verification_status"] = "needs_verification" detected_name = audit_data.get("client_name_detected") if detected_name and not agent.get("client_name"): update_fields["client_name"] = detected_name # Auto-assign agent_classification if Gemini gave a usable answer and the field is empty classification = audit_data.get("agent_classification") if classification in ("Utility", "Functional", "Supervisory", "Guardian") and not agent.get("agent_classification"): update_fields["agent_classification"] = classification # Autonomy is a hint — only fill if Gemini was confident enough to return non-null AND # the field is empty. The user can override on the registration form. autonomy_hint = audit_data.get("autonomy_level_hint") if autonomy_hint in ("Human-Led", "Hybrid", "Autopilot") and not agent.get("autonomy_level"): update_fields["autonomy_level"] = autonomy_hint if update_fields: await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": update_fields} ) async def classify_single_agent(agent_id: str): """Convenience function for post-sync: load agent, analyse, store, apply fields.""" if not is_gemini_configured(): return try: agent = await agents_collection.find_one({"_id": ObjectId(agent_id)}) if not agent: logger.warning(f"Agent {agent_id} not found") return instructions = agent.get("instructions") if not instructions: logger.info(f"Agent '{agent.get('agent_name')}' has no instructions, skipping") return # Build tools string from agent metadata tools = ", ".join(agent.get("agent_capabilities", []) or []) audit_data = await analyze_single_agent( agent_name=agent.get("agent_name", "Unknown"), instructions=instructions, tools=tools, description=agent.get("agent_description", ""), author=agent.get("created_by", "") ) await store_audit_result(agent_id, audit_data) await apply_classification_fields(agent_id, audit_data) logger.info(f"Classified '{agent.get('agent_name')}' as Cat {audit_data.get('category')}, " f"discipline={audit_data.get('discipline')}, client={audit_data.get('is_client_work')}") except Exception as e: logger.error(f"Failed to classify agent {agent_id}: {e}") async def run_audit_batch(unclassified_only: bool = False, single_agent_id: str = None, concurrency: int = None) -> dict: """Run audit on multiple agents with concurrency control. Updates the module-level _batch_state as it progresses so the frontend can poll /api/admin/audit/status. Returns summary dict with counts and results. """ if not is_gemini_configured(): return {"error": "GOOGLE_API_KEY not configured"} if concurrency is None: concurrency = int(os.getenv("AUDIT_CONCURRENCY", "2")) query = {} if single_agent_id: query["_id"] = ObjectId(single_agent_id) elif unclassified_only: query["audit_status"] = {"$exists": False} agents = await agents_collection.find(query).to_list(length=None) total = len(agents) results = [] agents_with_instructions = [] for agent in agents: if not agent.get("instructions"): _batch_state["skipped"] += 1 else: agents_with_instructions.append(agent) _batch_state["total"] = total logger.info(f"Audit batch: {total} total, {len(agents_with_instructions)} with instructions, {_batch_state['skipped']} skipped") batch_size = concurrency for i in range(0, len(agents_with_instructions), batch_size): batch = agents_with_instructions[i:i + batch_size] for agent in batch: agent_id = str(agent["_id"]) agent_name = agent.get("agent_name", "Unknown") tools = ", ".join(agent.get("agent_capabilities", []) or []) _batch_state["current_agent"] = agent_name try: audit_data = await analyze_single_agent( agent_name=agent_name, instructions=agent.get("instructions"), tools=tools, description=agent.get("agent_description", ""), author=agent.get("created_by", "") ) if audit_data.get("error"): _batch_state["failed"] += 1 results.append({"agent_name": agent_name, "error": audit_data["error"]}) else: await store_audit_result(agent_id, audit_data) await apply_classification_fields(agent_id, audit_data) _batch_state["audited"] += 1 results.append({ "agent_name": agent_name, "category": audit_data.get("category"), "discipline": audit_data.get("discipline"), "risk_level": audit_data.get("risk_level"), "is_client_work": audit_data.get("is_client_work") }) logger.info(f"[{_batch_state['audited'] + _batch_state['failed']}/{len(agents_with_instructions)}] " f"Classified '{agent_name}' -> Cat {audit_data.get('category')}") except Exception as e: _batch_state["failed"] += 1 results.append({"agent_name": agent_name, "error": str(e)}) logger.error(f"[{_batch_state['audited'] + _batch_state['failed']}/{len(agents_with_instructions)}] " f"Failed '{agent_name}': {e}") if i + batch_size < len(agents_with_instructions): await asyncio.sleep(4) _batch_state["current_agent"] = None logger.info(f"Audit batch complete: {_batch_state['audited']} audited, {_batch_state['failed']} failed, {_batch_state['skipped']} skipped") return { "status": "completed", "total": total, "audited_count": _batch_state["audited"], "failed_count": _batch_state["failed"], "skipped_count": _batch_state["skipped"], "results_summary": results[:50] } async def get_all_audit_results() -> list: """Get all agents with audit data for the audit tab.""" agents = await agents_collection.find( {}, { "agent_name": 1, "audit_status": 1, "audit_date": 1, "audit_category": 1, "audit_risk_level": 1, "audit_summary": 1, "audit_flags": 1, "audit_recommendations": 1, "audit_category_reasoning": 1, "audit_discipline": 1, "audit_discipline_reasoning": 1, "audit_department": 1, "audit_is_client_work": 1, "audit_client_work_reasoning": 1, "audit_client_name_detected": 1, "discipline": 1, "agent_department": 1, "client": 1, "client_name": 1, "verification_status": 1, "instructions": 1, "agent_capabilities": 1, "created_by": 1, "agent_description": 1, "audit_reviewer": 1, "audit_reviewer_notes": 1, "audit_reviewed_date": 1 } ).to_list(length=None) # Convert ObjectId to string for agent in agents: agent["_id"] = str(agent["_id"]) return agents async def update_audit_review(agent_id: str, status: str, notes: str, reviewer_email: str): """Mark an agent's audit as reviewed/cleared.""" now = datetime.utcnow() await agents_collection.update_one( {"_id": ObjectId(agent_id)}, {"$set": { "audit_status": status, "audit_reviewer": reviewer_email, "audit_reviewer_notes": notes, "audit_reviewed_date": now.isoformat() }} ) # Also record in history await audit_history_collection.insert_one({ "agent_id": agent_id, "audit_date": now, "action": "review", "status": status, "reviewer": reviewer_email, "notes": notes })