#!/usr/bin/env python3 """ Register agents from a JSON file with the Agent Registration API. Usage: python register_agents.py --input /path/to/shared_agents.json \ [--base-url https://ai-sandbox.oliver.solutions/agent_collector/agents] \ [--api-key YOUR_KEY] \ [--dry-run] Notes: - If --api-key is not provided, the script will fall back to the static key from the docs. - The input can be either: * a list of agent documents, or * a list of wrapper objects that contain an "agentDetails" object (common when exported from MongoDB aggregations). - The script maps fields best-effort to the API schema and prunes any empty/None fields. """ import argparse import json import os import re import sys import time import urllib3 from typing import Any, Dict, List, Optional # Suppress SSL warnings when verification is disabled urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) try: import requests # type: ignore except Exception as e: print("This script requires the 'requests' package. Install with: pip install requests") raise DEFAULT_BASE_URL = "https://ai-sandbox.oliver.solutions/agent_collector/agents" # Fallback to the static key from the documentation if not supplied via --api-key DEFAULT_API_KEY = "agent-collector-static-key-2024-secure" def parse_iso(value: Any) -> Optional[str]: """ Best-effort: return ISO8601 string or None. - If value is already a string, return it (assuming it's already ISO8601). - If value looks like 'ISODate(\"...\")', extract the inner string. - Otherwise, return None. """ if value is None: return None if isinstance(value, str): # Handle Mongo shell export style: ISODate('2025-07-09T06:33:12.682Z') m = re.match(r"^ISODate\(['\"]?([^'\"]+)['\"]?\)$", value.strip()) return m.group(1) if m else value # Could try to parse datetime objects here if needed; for now, prefer string passthrough return None def ensure_list_of_str(value: Any) -> Optional[List[str]]: """Convert to a list[str] if reasonable; otherwise return None.""" if value is None: return None if isinstance(value, list): out = [] for v in value: if isinstance(v, str): out.append(v) else: out.append(str(v)) return out if out else None # Single string -> wrap if isinstance(value, str) and value.strip(): return [value] return None def extract_agent(doc: Dict[str, Any]) -> Dict[str, Any]: """Handle two shapes: raw agent doc OR wrapper { ..., agentDetails: {...} }""" if isinstance(doc, dict) and "agentDetails" in doc and isinstance(doc["agentDetails"], dict): return doc["agentDetails"] return doc def build_payload(agent: Dict[str, Any]) -> Dict[str, Any]: """ Map agent fields to Agent Registration API payload. Required by API: name, description, purpose, tool Optional fields are filled best-effort; empty/None/blank fields are pruned. """ name = agent.get("name") or agent.get("id") or str(agent.get("_id") or "Unnamed Agent") description = agent.get("description") or agent.get("instructions") or f"{name} agent" purpose = agent.get("purpose") or description # API's "tool" = platform where agent operates; best-effort map to provider tool = "LibreChat (chat-sandbox)" # Optional mappings version = agent.get("version") or agent.get("model") creation_date = parse_iso(agent.get("createdAt")) last_updated = parse_iso(agent.get("updatedAt")) capabilities = ensure_list_of_str(agent.get("capabilities") or agent.get("tools")) department = agent.get("department") or agent.get("category") # contact_person: prefer a support contact email or name; else use author (often email) contact_person = None sc = agent.get("support_contact") if isinstance(sc, dict): contact_person = sc.get("email") or sc.get("name") if not contact_person: contact_person = agent.get("contact_person") or agent.get("author") # tags: compact context about provider/model/category tags = [t for t in [agent.get("provider"), agent.get("model"), agent.get("category")] if t] # metadata: stash extras for traceability avatar_path = None avatar = agent.get("avatar") if isinstance(avatar, dict): avatar_path = avatar.get("filepath") elif isinstance(avatar, str): avatar_path = avatar metadata = { "source_id": agent.get("id"), "provider": agent.get("provider"), "model": agent.get("model"), "artifacts": agent.get("artifacts"), "tool_kwargs": agent.get("tool_kwargs"), "agent_ids": agent.get("agent_ids"), "projectIds": agent.get("projectIds"), "avatar": avatar_path, "author_email": agent.get("author"), "raw_category": agent.get("category"), } # Prune empty metadata metadata = {k: v for k, v in metadata.items() if v not in (None, "", [], {})} # Extract usage data usage_timeline = agent.get("usage_timeline", []) usage_summary = agent.get("usage_summary", {}) payload: Dict[str, Any] = { "name": name, "description": description, "purpose": purpose, "tool": tool, # Optionals: "version": version, "creation_date": creation_date, "last_updated": last_updated, "capabilities": capabilities, "department": department, "contact_person": contact_person, "tags": tags or None, "metadata": metadata or None, # Usage data: "usage_timeline": usage_timeline or None, "conversation_count": usage_summary.get("conversation_count", 0), "unique_users": usage_summary.get("unique_users", 0), "total_messages": usage_summary.get("total_messages", 0), "first_used": parse_iso(usage_summary.get("first_used")), "last_used": parse_iso(usage_summary.get("last_used")), } # Final prune of empties so the API only sees meaningful data for k in list(payload.keys()): if payload[k] in (None, "", [], {}): payload.pop(k, None) return payload def post_agent(session: requests.Session, base_url: str, api_key: str, payload: Dict[str, Any], retries: int = 3, backoff: float = 1.5) -> Dict[str, Any]: """POST with simple retry on transient errors. Return parsed JSON or a structured error.""" headers = { "Content-Type": "application/json", "X-API-Key": api_key, } attempt = 0 last_err: Optional[str] = None while attempt <= retries: try: resp = session.post(base_url, headers=headers, json=payload, timeout=30) if resp.status_code >= 200 and resp.status_code < 300: try: return {"ok": True, "status_code": resp.status_code, "data": resp.json(), "payload_name": payload.get("name")} except Exception: return {"ok": True, "status_code": resp.status_code, "data": {"raw": resp.text}, "payload_name": payload.get("name")} else: # Retry on 429/5xx if resp.status_code in (429, 500, 502, 503, 504) and attempt < retries: time.sleep((attempt + 1) * backoff) attempt += 1 continue try: detail = resp.json() except Exception: detail = {"raw": resp.text} return {"ok": False, "status_code": resp.status_code, "error": detail, "payload_name": payload.get("name")} except requests.RequestException as e: last_err = str(e) if attempt < retries: time.sleep((attempt + 1) * backoff) attempt += 1 continue return {"ok": False, "status_code": None, "error": {"exception": last_err}, "payload_name": payload.get("name")} return {"ok": False, "status_code": None, "error": {"exception": last_err or "unknown"}, "payload_name": payload.get("name")} def load_records(path: str) -> List[Dict[str, Any]]: with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, dict) and "agents" in data and isinstance(data["agents"], list): items = data["agents"] elif isinstance(data, list): items = data else: raise ValueError("Unrecognized JSON structure. Expected a list, or an object with an 'agents' list.") return [extract_agent(item) for item in items] def main(): parser = argparse.ArgumentParser(description="Register agents with the Agent Registration API") parser.add_argument("--input", "-i", default="shared_agents.json", help="Path to input JSON (default: shared_agents.json)") parser.add_argument("--base-url", default=os.environ.get("AGENT_REG_URL", DEFAULT_BASE_URL), help=f"API endpoint URL (default: {DEFAULT_BASE_URL})") parser.add_argument("--api-key", default=os.environ.get("AGENT_REG_KEY", DEFAULT_API_KEY), help="API key (default: uses static key from the docs unless AGENT_REG_KEY is set)") parser.add_argument("--dry-run", action="store_true", help="Print payloads without sending to the API") parser.add_argument("--save-log", default="registration_results.json", help="Path to write results log JSON") args = parser.parse_args() # Load items try: agents = load_records(args.input) except Exception as e: print(f"Failed to load input JSON: {e}") sys.exit(2) session = requests.Session() # Disable SSL certificate verification for development/internal APIs session.verify = False # Create an adapter with SSL verification disabled from requests.adapters import HTTPAdapter from urllib3.poolmanager import PoolManager from urllib3.util import ssl_ class SSLAdapter(HTTPAdapter): def init_poolmanager(self, *args, **kwargs): kwargs['ssl_context'] = ssl_.create_urllib3_context() kwargs['ssl_context'].check_hostname = False kwargs['ssl_context'].verify_mode = 0 return super().init_poolmanager(*args, **kwargs) session.mount('https://', SSLAdapter()) results: List[Dict[str, Any]] = [] success = 0 usage_logged = 0 failures = 0 for idx, agent in enumerate(agents, start=1): payload = build_payload(agent) if args.dry_run: print(f"[DRY RUN {idx}/{len(agents)}] Would register: {payload.get('name')}") print(json.dumps(payload, indent=2, ensure_ascii=False)) results.append({"ok": True, "dry_run": True, "payload": payload}) continue res = post_agent(session, args.base_url, args.api_key, payload) results.append(res) if res.get("ok"): data = res.get("data", {}) status = str(data.get("status", "")).lower() if status == "usage_logged": usage_logged += 1 print(f"[{idx}/{len(agents)}] Usage tracked for existing agent: {payload.get('name')}") else: success += 1 print(f"[{idx}/{len(agents)}] Registered: {payload.get('name')}") else: failures += 1 print(f"[{idx}/{len(agents)}] FAILED for {payload.get('name')}: {res.get('error')}") # Write log try: with open(args.save_log, "w", encoding="utf-8") as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"\nWrote results to {args.save_log}") except Exception as e: print(f"Failed to write results log: {e}") print(f"\nSummary: registered={success}, usage_logged={usage_logged}, failed={failures}, total={len(agents)}") if __name__ == "__main__": main()