Aimpress_site/chatbot-api/llm.py
Vadym Samoilenko 99e6c37827 Add sales escalation to chatbot + CRM integration for all forms
- Bot now acts as sales consultant: identifies needs, proposes services, pushes for booking
- escalate_to_human tool: triggers on user request, bot stuck, or hot lead
- Escalation notifies RC with reason + conversation summary
- Contact form and quote form now create leads in Twenty CRM
- Fix RC webhook to use correct payload format (visitor.token as session_id)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 21:22:11 +00:00

137 lines
5.3 KiB
Python

import anthropic
import httpx
import logging
from config import settings
from knowledge import SYSTEM_PROMPT, TOOLS
from twenty_crm import create_lead_in_crm, enrich_person
logger = logging.getLogger("llm")
client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
async def _handle_tool_calls(
response, messages: list[dict], session_meta: dict
) -> tuple[str, dict | None]:
"""Process tool calls from Claude, execute them, and get follow-up text."""
lead_data = None
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "capture_lead":
lead_data = block.input
# Send to n8n webhook
try:
async with httpx.AsyncClient() as http:
await http.post(
f"{settings.n8n_webhook_url}/chatbot-lead",
json=lead_data,
timeout=10,
)
except Exception:
pass
# Create lead in Twenty CRM
person_id = await create_lead_in_crm(
name=lead_data.get("name", ""),
email=lead_data.get("email", ""),
company=lead_data.get("company", ""),
need=lead_data.get("need", ""),
page_context=session_meta.get("page_context", "/"),
)
if person_id:
session_meta["twenty_person_id"] = person_id
logger.info(f"Lead created in Twenty CRM: {person_id}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Lead captured and added to CRM. Confirm to the visitor and offer to book a free consultation at https://cal.ai-impress.com.",
})
elif block.name == "escalate_to_human":
session_meta["_escalate"] = {
"reason": block.input.get("reason", ""),
"summary": block.input.get("summary", ""),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Escalation initiated. A human team member will be notified. Send a warm message to the visitor confirming someone will join shortly.",
})
elif block.name == "update_lead":
enrichment = block.input
person_id = session_meta.get("twenty_person_id")
if person_id:
await enrich_person(person_id, enrichment)
logger.info(f"Lead enriched in Twenty CRM: {person_id} with {list(enrichment.keys())}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Lead profile updated with new information. Continue the conversation naturally.",
})
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "No lead captured yet. Continue the conversation to gather their details first.",
})
if not tool_results:
# No tool calls — extract text
text_parts = [b.text for b in response.content if b.type == "text"]
return " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?", lead_data
# Send tool results back to get follow-up text
followup = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
],
)
# Check if follow-up also has tool calls (recursive, max 1 level)
has_more_tools = any(b.type == "tool_use" for b in followup.content)
if has_more_tools:
return await _handle_tool_calls(followup, messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
], session_meta)
text_parts = [b.text for b in followup.content if b.type == "text"]
reply = " ".join(text_parts) if text_parts else (
"Thank you! I've noted your details. You can book a free consultation at https://cal.ai-impress.com"
)
return reply, lead_data
async def get_ai_response(
messages: list[dict], session_meta: dict | None = None
) -> tuple[str, dict | None]:
"""Get response from Claude. Returns (text_reply, lead_data_or_none)."""
if session_meta is None:
session_meta = {}
response = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages,
)
has_tools = any(b.type == "tool_use" for b in response.content)
if has_tools:
return await _handle_tool_calls(response, messages, session_meta)
text_parts = [b.text for b in response.content if b.type == "text"]
reply = " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?"
return reply, None