- Fix Twenty CRM filter syntax (dot notation for composite fields) - Fix noteTargets/taskTargets to use targetPerson instead of personId - Handle duplicate person creation gracefully (find existing on 400) - Add task creation for new leads (follow-up TODO) - Save conversation transcript to CRM on escalation and rate limit - Strengthen system prompt to make Claude call update_lead proactively - Tell Claude that form-submitted leads are already in CRM Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
149 lines
6.1 KiB
Python
149 lines
6.1 KiB
Python
import anthropic
|
|
import httpx
|
|
import logging
|
|
from config import settings
|
|
from knowledge import SYSTEM_PROMPT, TOOLS
|
|
from twenty_crm import create_lead_in_crm, enrich_person
|
|
|
|
logger = logging.getLogger("llm")
|
|
|
|
client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
|
|
|
|
|
|
async def _handle_tool_calls(
|
|
response, messages: list[dict], session_meta: dict
|
|
) -> tuple[str, dict | None]:
|
|
"""Process tool calls from Claude, execute them, and get follow-up text."""
|
|
lead_data = None
|
|
tool_results = []
|
|
|
|
for block in response.content:
|
|
if block.type != "tool_use":
|
|
continue
|
|
|
|
if block.name == "capture_lead":
|
|
lead_data = block.input
|
|
# Send to n8n webhook
|
|
try:
|
|
async with httpx.AsyncClient() as http:
|
|
await http.post(
|
|
f"{settings.n8n_webhook_url}/chatbot-lead",
|
|
json=lead_data,
|
|
timeout=10,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
# Create lead in Twenty CRM
|
|
person_id = await create_lead_in_crm(
|
|
name=lead_data.get("name", ""),
|
|
email=lead_data.get("email", ""),
|
|
company=lead_data.get("company", ""),
|
|
need=lead_data.get("need", ""),
|
|
page_context=session_meta.get("page_context", "/"),
|
|
)
|
|
if person_id:
|
|
session_meta["twenty_person_id"] = person_id
|
|
logger.info(f"Lead created in Twenty CRM: {person_id}")
|
|
|
|
tool_results.append({
|
|
"type": "tool_result",
|
|
"tool_use_id": block.id,
|
|
"content": "Lead captured and added to CRM. Confirm to the visitor IN THEIR LANGUAGE and offer to book a free consultation at https://cal.ai-impress.com.",
|
|
})
|
|
|
|
elif block.name == "escalate_to_human":
|
|
session_meta["_escalate"] = {
|
|
"reason": block.input.get("reason", ""),
|
|
"summary": block.input.get("summary", ""),
|
|
}
|
|
tool_results.append({
|
|
"type": "tool_result",
|
|
"tool_use_id": block.id,
|
|
"content": "Escalation initiated. A human team member will be notified. Send a warm message to the visitor confirming someone will join shortly.",
|
|
})
|
|
|
|
elif block.name == "update_lead":
|
|
enrichment = block.input
|
|
person_id = session_meta.get("twenty_person_id")
|
|
if person_id:
|
|
await enrich_person(person_id, enrichment)
|
|
logger.info(f"Lead enriched in Twenty CRM: {person_id} with {list(enrichment.keys())}")
|
|
tool_results.append({
|
|
"type": "tool_result",
|
|
"tool_use_id": block.id,
|
|
"content": "Lead profile updated with new information. Continue the conversation naturally.",
|
|
})
|
|
else:
|
|
tool_results.append({
|
|
"type": "tool_result",
|
|
"tool_use_id": block.id,
|
|
"content": "No lead captured yet. Continue the conversation to gather their details first.",
|
|
})
|
|
|
|
if not tool_results:
|
|
# No tool calls — extract text
|
|
text_parts = [b.text for b in response.content if b.type == "text"]
|
|
return " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?", lead_data
|
|
|
|
# Send tool results back to get follow-up text
|
|
followup = client.messages.create(
|
|
model=settings.model,
|
|
max_tokens=settings.max_response_tokens,
|
|
system=SYSTEM_PROMPT,
|
|
tools=TOOLS,
|
|
messages=messages + [
|
|
{"role": "assistant", "content": response.content},
|
|
{"role": "user", "content": tool_results},
|
|
],
|
|
)
|
|
|
|
# Check if follow-up also has tool calls (recursive, max 1 level)
|
|
has_more_tools = any(b.type == "tool_use" for b in followup.content)
|
|
if has_more_tools:
|
|
return await _handle_tool_calls(followup, messages + [
|
|
{"role": "assistant", "content": response.content},
|
|
{"role": "user", "content": tool_results},
|
|
], session_meta)
|
|
|
|
text_parts = [b.text for b in followup.content if b.type == "text"]
|
|
if not text_parts:
|
|
# Fallback: ask Claude to generate a proper response in the visitor's language
|
|
fallback_resp = client.messages.create(
|
|
model=settings.model,
|
|
max_tokens=settings.max_response_tokens,
|
|
system=SYSTEM_PROMPT,
|
|
messages=messages + [
|
|
{"role": "assistant", "content": response.content},
|
|
{"role": "user", "content": tool_results},
|
|
{"role": "assistant", "content": followup.content},
|
|
{"role": "user", "content": [{"type": "text", "text": "Please respond to the visitor in their language. Confirm you noted their details and offer to book at https://cal.ai-impress.com"}]},
|
|
],
|
|
)
|
|
text_parts = [b.text for b in fallback_resp.content if b.type == "text"]
|
|
reply = " ".join(text_parts) if text_parts else "https://cal.ai-impress.com"
|
|
return reply, lead_data
|
|
|
|
|
|
async def get_ai_response(
|
|
messages: list[dict], session_meta: dict | None = None
|
|
) -> tuple[str, dict | None]:
|
|
"""Get response from Claude. Returns (text_reply, lead_data_or_none)."""
|
|
if session_meta is None:
|
|
session_meta = {}
|
|
|
|
response = client.messages.create(
|
|
model=settings.model,
|
|
max_tokens=settings.max_response_tokens,
|
|
system=SYSTEM_PROMPT,
|
|
tools=TOOLS,
|
|
messages=messages,
|
|
)
|
|
|
|
has_tools = any(b.type == "tool_use" for b in response.content)
|
|
if has_tools:
|
|
return await _handle_tool_calls(response, messages, session_meta)
|
|
|
|
text_parts = [b.text for b in response.content if b.type == "text"]
|
|
reply = " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?"
|
|
return reply, None
|