Aimpress_site/chatbot-api/llm.py
Vadym Samoilenko 93288aa981 Fix CRM enrichment: correct API filters, note linking, add tasks & transcripts
- Fix Twenty CRM filter syntax (dot notation for composite fields)
- Fix noteTargets/taskTargets to use targetPerson instead of personId
- Handle duplicate person creation gracefully (find existing on 400)
- Add task creation for new leads (follow-up TODO)
- Save conversation transcript to CRM on escalation and rate limit
- Strengthen system prompt to make Claude call update_lead proactively
- Tell Claude that form-submitted leads are already in CRM

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 21:42:56 +00:00

149 lines
6.1 KiB
Python

import anthropic
import httpx
import logging
from config import settings
from knowledge import SYSTEM_PROMPT, TOOLS
from twenty_crm import create_lead_in_crm, enrich_person
logger = logging.getLogger("llm")
client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
async def _handle_tool_calls(
response, messages: list[dict], session_meta: dict
) -> tuple[str, dict | None]:
"""Process tool calls from Claude, execute them, and get follow-up text."""
lead_data = None
tool_results = []
for block in response.content:
if block.type != "tool_use":
continue
if block.name == "capture_lead":
lead_data = block.input
# Send to n8n webhook
try:
async with httpx.AsyncClient() as http:
await http.post(
f"{settings.n8n_webhook_url}/chatbot-lead",
json=lead_data,
timeout=10,
)
except Exception:
pass
# Create lead in Twenty CRM
person_id = await create_lead_in_crm(
name=lead_data.get("name", ""),
email=lead_data.get("email", ""),
company=lead_data.get("company", ""),
need=lead_data.get("need", ""),
page_context=session_meta.get("page_context", "/"),
)
if person_id:
session_meta["twenty_person_id"] = person_id
logger.info(f"Lead created in Twenty CRM: {person_id}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Lead captured and added to CRM. Confirm to the visitor IN THEIR LANGUAGE and offer to book a free consultation at https://cal.ai-impress.com.",
})
elif block.name == "escalate_to_human":
session_meta["_escalate"] = {
"reason": block.input.get("reason", ""),
"summary": block.input.get("summary", ""),
}
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Escalation initiated. A human team member will be notified. Send a warm message to the visitor confirming someone will join shortly.",
})
elif block.name == "update_lead":
enrichment = block.input
person_id = session_meta.get("twenty_person_id")
if person_id:
await enrich_person(person_id, enrichment)
logger.info(f"Lead enriched in Twenty CRM: {person_id} with {list(enrichment.keys())}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Lead profile updated with new information. Continue the conversation naturally.",
})
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "No lead captured yet. Continue the conversation to gather their details first.",
})
if not tool_results:
# No tool calls — extract text
text_parts = [b.text for b in response.content if b.type == "text"]
return " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?", lead_data
# Send tool results back to get follow-up text
followup = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
],
)
# Check if follow-up also has tool calls (recursive, max 1 level)
has_more_tools = any(b.type == "tool_use" for b in followup.content)
if has_more_tools:
return await _handle_tool_calls(followup, messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
], session_meta)
text_parts = [b.text for b in followup.content if b.type == "text"]
if not text_parts:
# Fallback: ask Claude to generate a proper response in the visitor's language
fallback_resp = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
messages=messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
{"role": "assistant", "content": followup.content},
{"role": "user", "content": [{"type": "text", "text": "Please respond to the visitor in their language. Confirm you noted their details and offer to book at https://cal.ai-impress.com"}]},
],
)
text_parts = [b.text for b in fallback_resp.content if b.type == "text"]
reply = " ".join(text_parts) if text_parts else "https://cal.ai-impress.com"
return reply, lead_data
async def get_ai_response(
messages: list[dict], session_meta: dict | None = None
) -> tuple[str, dict | None]:
"""Get response from Claude. Returns (text_reply, lead_data_or_none)."""
if session_meta is None:
session_meta = {}
response = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages,
)
has_tools = any(b.type == "tool_use" for b in response.content)
if has_tools:
return await _handle_tool_calls(response, messages, session_meta)
text_parts = [b.text for b in response.content if b.type == "text"]
reply = " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?"
return reply, None