""" Code Interpreter Tool Executes code in a sandboxed environment via the LibreCodeInterpreter microservice. Supports 13 languages. Python sessions are persistent within a conversation. """ import logging from typing import Any, Dict import httpx from app.config import settings from app.tools.base import BaseTool, ToolContext, ToolResult logger = logging.getLogger(__name__) # Map friendly language names to LibreCodeInterpreter language codes _LANG_CODES: Dict[str, str] = { "python": "py", "javascript": "js", "typescript": "ts", "go": "go", "java": "java", "c": "c", "cpp": "cpp", "rust": "rs", "php": "php", "r": "r", "bash": "bash", "fortran": "f90", } class CodeInterpreterTool(BaseTool): """Execute code in a secure sandboxed environment.""" @property def name(self) -> str: return "code_interpreter" @property def display_name(self) -> str: return "Code Interpreter" @property def description(self) -> str: return ( "Execute code in a secure sandboxed environment. " "Supports Python, JavaScript, TypeScript, Go, Java, C, C++, Rust, PHP, R, Bash, and Fortran. " "Use for calculations, data analysis, generating files, visualizations, or any task " "that benefits from running code. Returns stdout, stderr, and any generated files. " "Python sessions maintain state between calls within the same conversation." ) @property def parameters_schema(self) -> Dict[str, Any]: return { "type": "object", "properties": { "code": { "type": "string", "description": "The code to execute.", }, "language": { "type": "string", "enum": list(_LANG_CODES.keys()), "description": "Programming language to use. Defaults to python.", "default": "python", }, }, "required": ["code"], } @property def category(self) -> str: return "code_execution" @property def requires_graph_consent(self) -> bool: return False async def execute(self, arguments: Dict[str, Any], context: ToolContext) -> ToolResult: if not settings.CODE_INTERPRETER_URL: return ToolResult( success=False, error="Code interpreter is not configured. Set CODE_INTERPRETER_URL.", ) code = arguments.get("code", "").strip() if not code: return ToolResult(success=False, error="No code provided.") language = arguments.get("language", "python").lower() lang_code = _LANG_CODES.get(language, "py") # Use conversation_id as session_id for state persistence (Python REPL) session_id = str(context.conversation_id) if context.conversation_id else str(context.user_id) payload = { "code": code, "lang": lang_code, "session_id": session_id, "user_id": str(context.user_id), } if context.agent_slug: payload["entity_id"] = context.agent_slug[:40] try: async with httpx.AsyncClient(timeout=90.0) as client: # /exec streams keepalive whitespace then the JSON body — read full response resp = await client.post( f"{settings.CODE_INTERPRETER_URL.rstrip('/')}/exec", json=payload, headers={ "X-API-Key": settings.CODE_INTERPRETER_API_KEY, "Content-Type": "application/json", }, ) except httpx.TimeoutException: return ToolResult(success=False, error="Code execution timed out after 90 seconds.") except Exception as exc: # noqa: BLE001 logger.error("code_interpreter: request failed: %s", exc) return ToolResult(success=False, error=f"Failed to reach code interpreter: {exc}") if resp.status_code != 200: logger.warning("code_interpreter: non-200 response %s: %s", resp.status_code, resp.text[:200]) return ToolResult(success=False, error=f"Code interpreter returned HTTP {resp.status_code}.") try: # Response may have leading whitespace keepalive before JSON body import json as _json result = _json.loads(resp.text.strip()) except Exception: return ToolResult(success=False, error="Invalid response from code interpreter.") stdout = result.get("stdout", "") stderr = result.get("stderr", "") raw_files = result.get("files", []) effective_session_id = result.get("session_id", session_id) # Normalise file objects: ensure consistent {id, name, session_id} shape # LibreCodeInterpreter may use "file_id"/"filename" variants normalized_files = [] for f in raw_files: file_id = f.get("id") or f.get("file_id") or "" name = f.get("name") or f.get("filename") or file_id or "file" if file_id: normalized_files.append({ "id": file_id, "name": name, "session_id": effective_session_id, }) # Build human-readable display string parts = [] if stdout: parts.append(f"Output:\n{stdout.rstrip()}") if stderr: parts.append(f"Errors:\n{stderr.rstrip()}") if normalized_files: file_lines = [f" - {f['name']}" for f in normalized_files] parts.append("Generated files:\n" + "\n".join(file_lines)) if not parts: parts.append("Code executed successfully (no output).") return ToolResult( success=True, data={ "language": language, "code": code, "stdout": stdout, "stderr": stderr, "files": normalized_files, "session_id": effective_session_id, }, display="\n".join(parts), )