Add Twenty CRM integration + lead enrichment + pulsating chat bubble

- New twenty_crm.py: full CRUD for people, companies, notes via Twenty REST API
- Lead capture now creates person + company in Twenty CRM automatically
- New update_lead tool: enriches CRM profile as conversation progresses
  (job title, phone, city, budget, requirements)
- Session meta stored in Redis to track Twenty person ID across messages
- Docker-compose updated with TWENTY_CRM env vars
- Chat bubble: pulsating ring animation with gradient background

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Vadym Samoilenko 2026-03-08 18:13:26 +00:00
parent a8e8d8a71b
commit e78d6dc1c6
7 changed files with 457 additions and 55 deletions

View file

@ -8,6 +8,8 @@ class Settings(BaseSettings):
rocketchat_auth_token: str = ""
rocketchat_user_id: str = ""
n8n_webhook_url: str = "https://n8n.ai-impress.com/webhook"
twenty_crm_url: str = "https://crm.ai-impress.com"
twenty_crm_api_key: str = ""
max_messages_per_session: int = 30
max_message_length: int = 500
conversation_window: int = 15

View file

@ -10,6 +10,7 @@ RULES:
- After 3-4 exchanges of genuine interest suggest booking a free consultation
- Naturally collect visitor's name, email, company — don't ask all at once, weave into conversation
- When you have collected name + email + company + their need, use the capture_lead tool
- After capturing a lead, if the visitor reveals additional useful info (job title, phone, city, budget, timeline, specific requirements), use the update_lead tool to enrich their profile
- Be warm, professional, and concise. No waffle
SERVICES:
@ -75,5 +76,35 @@ TOOLS = [
},
"required": ["name", "email"],
},
}
},
{
"name": "update_lead",
"description": "Update a lead's profile with additional information gathered during conversation. Use this when the visitor reveals new details like their job title, phone number, city, specific requirements, budget, or timeline after the initial lead capture.",
"input_schema": {
"type": "object",
"properties": {
"job_title": {
"type": "string",
"description": "The visitor's job title or role",
},
"phone": {
"type": "string",
"description": "The visitor's phone number",
},
"city": {
"type": "string",
"description": "The visitor's city or location",
},
"company": {
"type": "string",
"description": "Company name if not captured earlier",
},
"note": {
"type": "string",
"description": "Additional context: budget, timeline, specific requirements, interests discussed",
},
},
"required": [],
},
},
]

View file

@ -1,29 +1,29 @@
import anthropic
import httpx
import logging
from config import settings
from knowledge import SYSTEM_PROMPT, TOOLS
from twenty_crm import create_lead_in_crm, enrich_person
logger = logging.getLogger("llm")
client = anthropic.Anthropic(api_key=settings.anthropic_api_key)
async def get_ai_response(messages: list[dict]) -> tuple[str, dict | None]:
"""Get response from Claude. Returns (text_reply, lead_data_or_none)."""
response = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages,
)
async def _handle_tool_calls(
response, messages: list[dict], session_meta: dict
) -> tuple[str, dict | None]:
"""Process tool calls from Claude, execute them, and get follow-up text."""
lead_data = None
tool_block = None
tool_results = []
for block in response.content:
if block.type == "tool_use" and block.name == "capture_lead":
if block.type != "tool_use":
continue
if block.name == "capture_lead":
lead_data = block.input
tool_block = block
# Send lead to n8n webhook
# Send to n8n webhook
try:
async with httpx.AsyncClient() as http:
await http.post(
@ -32,39 +32,95 @@ async def get_ai_response(messages: list[dict]) -> tuple[str, dict | None]:
timeout=10,
)
except Exception:
pass # Don't fail the chat if webhook fails
pass
# If there was a tool use, always send tool result back to get a proper follow-up
if tool_block:
followup = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages + [
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_block.id,
"content": "Lead captured successfully. Now confirm to the visitor and offer to book a free consultation at https://cal.ai-impress.com.",
}
],
},
],
)
text_parts = []
for block in followup.content:
if block.type == "text":
text_parts.append(block.text)
reply = " ".join(text_parts) if text_parts else "Thank you! I've noted your details. You can book a free consultation at https://cal.ai-impress.com — we'll be in touch shortly!"
else:
text_parts = []
for block in response.content:
if block.type == "text":
text_parts.append(block.text)
reply = " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?"
# Create lead in Twenty CRM
person_id = await create_lead_in_crm(
name=lead_data.get("name", ""),
email=lead_data.get("email", ""),
company=lead_data.get("company", ""),
need=lead_data.get("need", ""),
page_context=session_meta.get("page_context", "/"),
)
if person_id:
session_meta["twenty_person_id"] = person_id
logger.info(f"Lead created in Twenty CRM: {person_id}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Lead captured and added to CRM. Confirm to the visitor and offer to book a free consultation at https://cal.ai-impress.com.",
})
elif block.name == "update_lead":
enrichment = block.input
person_id = session_meta.get("twenty_person_id")
if person_id:
await enrich_person(person_id, enrichment)
logger.info(f"Lead enriched in Twenty CRM: {person_id} with {list(enrichment.keys())}")
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "Lead profile updated with new information. Continue the conversation naturally.",
})
else:
tool_results.append({
"type": "tool_result",
"tool_use_id": block.id,
"content": "No lead captured yet. Continue the conversation to gather their details first.",
})
if not tool_results:
# No tool calls — extract text
text_parts = [b.text for b in response.content if b.type == "text"]
return " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?", lead_data
# Send tool results back to get follow-up text
followup = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
],
)
# Check if follow-up also has tool calls (recursive, max 1 level)
has_more_tools = any(b.type == "tool_use" for b in followup.content)
if has_more_tools:
return await _handle_tool_calls(followup, messages + [
{"role": "assistant", "content": response.content},
{"role": "user", "content": tool_results},
], session_meta)
text_parts = [b.text for b in followup.content if b.type == "text"]
reply = " ".join(text_parts) if text_parts else (
"Thank you! I've noted your details. You can book a free consultation at https://cal.ai-impress.com"
)
return reply, lead_data
async def get_ai_response(
messages: list[dict], session_meta: dict | None = None
) -> tuple[str, dict | None]:
"""Get response from Claude. Returns (text_reply, lead_data_or_none)."""
if session_meta is None:
session_meta = {}
response = client.messages.create(
model=settings.model,
max_tokens=settings.max_response_tokens,
system=SYSTEM_PROMPT,
tools=TOOLS,
messages=messages,
)
has_tools = any(b.type == "tool_use" for b in response.content)
if has_tools:
return await _handle_tool_calls(response, messages, session_meta)
text_parts = [b.text for b in response.content if b.type == "text"]
reply = " ".join(text_parts) if text_parts else "I'm sorry, could you rephrase that?"
return reply, None

View file

@ -1,4 +1,5 @@
import asyncio
import json
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from models import ChatRequest, ChatResponse, RocketChatWebhook
@ -10,6 +11,7 @@ from security import (
store_messages,
get_session_mode,
set_session_mode,
get_redis,
)
from llm import get_ai_response
from rocketchat import get_or_create_room, send_message
@ -88,14 +90,23 @@ async def chat(req: ChatRequest):
lead_context += ". Use this info naturally — greet them by name. Don't re-ask for info you already have.]"
messages = [{"role": "user", "content": lead_context}, {"role": "assistant", "content": "Understood."}] + messages
# Load session meta (twenty_person_id, etc.)
r = await get_redis()
meta_raw = await r.get(f"chat:meta:{req.session_id}")
session_meta = json.loads(meta_raw) if meta_raw else {}
session_meta["page_context"] = req.page_context
# Get AI response
try:
reply, lead_data = await get_ai_response(messages)
reply, lead_data = await get_ai_response(messages, session_meta)
except Exception as e:
reply = "I'm sorry, I'm having a technical issue. Please try again or contact us at hello@ai-impress.com."
print(f"LLM error: {e}")
lead_data = None
# Persist session meta (may contain new twenty_person_id)
await r.set(f"chat:meta:{req.session_id}", json.dumps(session_meta), ex=settings.conversation_ttl)
# Store bot reply
await store_messages(req.session_id, "assistant", reply)

264
chatbot-api/twenty_crm.py Normal file
View file

@ -0,0 +1,264 @@
import httpx
import logging
from config import settings
logger = logging.getLogger("twenty_crm")
BASE_URL = f"{settings.twenty_crm_url}/rest"
HEADERS = {
"Authorization": f"Bearer {settings.twenty_crm_api_key}",
"Content-Type": "application/json",
}
def _split_name(full_name: str) -> tuple[str, str]:
parts = full_name.strip().split(maxsplit=1)
return parts[0], parts[1] if len(parts) > 1 else ""
async def find_person_by_email(email: str) -> dict | None:
"""Find existing person by email."""
if not settings.twenty_crm_api_key:
return None
try:
async with httpx.AsyncClient() as http:
resp = await http.get(
f"{BASE_URL}/people",
headers=HEADERS,
params={"filter": f'emails[primaryEmail][eq]:"{email}"', "limit": 1},
timeout=10,
)
if resp.status_code == 200:
data = resp.json().get("data", {}).get("people", [])
return data[0] if data else None
except Exception as e:
logger.error(f"Twenty find_person error: {e}")
return None
async def create_or_update_company(name: str, domain: str = "") -> str | None:
"""Create company in Twenty CRM, return company ID."""
if not settings.twenty_crm_api_key or not name:
return None
try:
async with httpx.AsyncClient() as http:
# Check if company exists
resp = await http.get(
f"{BASE_URL}/companies",
headers=HEADERS,
params={"filter": f'name[eq]:"{name}"', "limit": 1},
timeout=10,
)
if resp.status_code == 200:
companies = resp.json().get("data", {}).get("companies", [])
if companies:
return companies[0]["id"]
# Create new company
body = {"name": name}
if domain:
body["domainName"] = {
"primaryLinkLabel": "",
"primaryLinkUrl": domain,
"secondaryLinks": [],
}
resp = await http.post(
f"{BASE_URL}/companies",
headers=HEADERS,
json=body,
timeout=10,
)
if resp.status_code == 201:
return resp.json()["data"]["createCompany"]["id"]
logger.error(f"Twenty create_company: {resp.status_code} {resp.text}")
except Exception as e:
logger.error(f"Twenty create_company error: {e}")
return None
async def create_person(
name: str,
email: str,
company_id: str | None = None,
job_title: str = "",
city: str = "",
phone: str = "",
) -> str | None:
"""Create person in Twenty CRM, return person ID."""
if not settings.twenty_crm_api_key:
return None
try:
first, last = _split_name(name)
body: dict = {
"name": {"firstName": first, "lastName": last},
"emails": {"primaryEmail": email, "additionalEmails": []},
}
if company_id:
body["companyId"] = company_id
if job_title:
body["jobTitle"] = job_title
if city:
body["city"] = city
if phone:
body["phones"] = {
"primaryPhoneNumber": phone,
"primaryPhoneCountryCode": "",
"primaryPhoneCallingCode": "",
"additionalPhones": [],
}
async with httpx.AsyncClient() as http:
resp = await http.post(
f"{BASE_URL}/people",
headers=HEADERS,
json=body,
timeout=10,
)
if resp.status_code == 201:
person_id = resp.json()["data"]["createPerson"]["id"]
logger.info(f"Twenty person created: {person_id} ({name})")
return person_id
logger.error(f"Twenty create_person: {resp.status_code} {resp.text}")
except Exception as e:
logger.error(f"Twenty create_person error: {e}")
return None
async def update_person(person_id: str, updates: dict) -> bool:
"""Update person fields in Twenty CRM."""
if not settings.twenty_crm_api_key or not person_id:
return False
try:
async with httpx.AsyncClient() as http:
resp = await http.patch(
f"{BASE_URL}/people/{person_id}",
headers=HEADERS,
json=updates,
timeout=10,
)
if resp.status_code == 200:
logger.info(f"Twenty person updated: {person_id}")
return True
logger.error(f"Twenty update_person: {resp.status_code} {resp.text}")
except Exception as e:
logger.error(f"Twenty update_person error: {e}")
return False
async def create_note(title: str, person_id: str | None = None) -> str | None:
"""Create a note in Twenty CRM, optionally linked to a person."""
if not settings.twenty_crm_api_key:
return None
try:
async with httpx.AsyncClient() as http:
resp = await http.post(
f"{BASE_URL}/notes",
headers=HEADERS,
json={"title": title},
timeout=10,
)
if resp.status_code != 201:
logger.error(f"Twenty create_note: {resp.status_code} {resp.text}")
return None
note_id = resp.json()["data"]["createNote"]["id"]
# Link note to person via noteTargets
if person_id:
await http.post(
f"{BASE_URL}/noteTargets",
headers=HEADERS,
json={"noteId": note_id, "personId": person_id},
timeout=10,
)
return note_id
except Exception as e:
logger.error(f"Twenty create_note error: {e}")
return None
async def create_lead_in_crm(
name: str,
email: str,
company: str = "",
need: str = "",
job_title: str = "",
city: str = "",
phone: str = "",
page_context: str = "",
) -> str | None:
"""Full lead creation flow: company → person → note. Returns person ID."""
# Check if person already exists
existing = await find_person_by_email(email)
if existing:
person_id = existing["id"]
# Update with any new info
updates = {}
if job_title:
updates["jobTitle"] = job_title
if city:
updates["city"] = city
if updates:
await update_person(person_id, updates)
if need:
await create_note(
f"Chatbot: {need} (page: {page_context})", person_id
)
return person_id
# Create company first
company_id = None
if company:
company_id = await create_or_update_company(company)
# Create person
person_id = await create_person(
name=name,
email=email,
company_id=company_id,
job_title=job_title,
city=city,
phone=phone,
)
# Create note with the lead's need
if person_id and need:
await create_note(
f"Chatbot lead: {need} (page: {page_context})", person_id
)
return person_id
async def enrich_person(person_id: str, data: dict) -> bool:
"""Enrich person with additional data gathered during conversation."""
updates = {}
if data.get("job_title"):
updates["jobTitle"] = data["job_title"]
if data.get("city"):
updates["city"] = data["city"]
if data.get("phone"):
updates["phones"] = {
"primaryPhoneNumber": data["phone"],
"primaryPhoneCountryCode": "",
"primaryPhoneCallingCode": "",
"additionalPhones": [],
}
success = True
if updates:
success = await update_person(person_id, updates)
# Add conversation notes
if data.get("note"):
await create_note(data["note"], person_id)
# Update company info if provided
if data.get("company") and not data.get("_company_linked"):
company_id = await create_or_update_company(data["company"])
if company_id:
await update_person(person_id, {"companyId": company_id})
return success

View file

@ -38,6 +38,8 @@ services:
- ROCKETCHAT_AUTH_TOKEN=${ROCKETCHAT_AUTH_TOKEN}
- ROCKETCHAT_USER_ID=${ROCKETCHAT_USER_ID}
- N8N_WEBHOOK_URL=https://n8n.ai-impress.com/webhook
- TWENTY_CRM_URL=https://crm.ai-impress.com
- TWENTY_CRM_API_KEY=${TWENTY_CRM_API_KEY}
chatbot-redis:
image: redis:alpine

View file

@ -10,22 +10,58 @@
}
.chat-bubble {
width: 56px;
height: 56px;
width: 60px;
height: 60px;
border-radius: 50%;
background: var(--orange-100);
background: linear-gradient(135deg, var(--orange-100) 0%, #ff7b33 100%);
border: none;
color: #fff;
cursor: pointer;
display: flex;
align-items: center;
justify-content: center;
box-shadow: 0 4px 20px rgba(255, 91, 4, 0.4);
transition: background 0.3s;
box-shadow: 0 4px 24px rgba(255, 91, 4, 0.45);
transition: background 0.3s, box-shadow 0.3s;
position: relative;
z-index: 1;
}
.chat-bubble::before,
.chat-bubble::after {
content: '';
position: absolute;
inset: -4px;
border-radius: 50%;
border: 2px solid rgba(255, 91, 4, 0.4);
animation: chat-pulse 2s ease-out infinite;
pointer-events: none;
}
.chat-bubble::after {
inset: -4px;
animation-delay: 1s;
}
@keyframes chat-pulse {
0% {
transform: scale(1);
opacity: 0.6;
}
100% {
transform: scale(1.6);
opacity: 0;
}
}
.chat-bubble:hover {
background: #e65200;
background: linear-gradient(135deg, #e65200 0%, var(--orange-100) 100%);
box-shadow: 0 6px 30px rgba(255, 91, 4, 0.6);
}
.chat-bubble:hover::before,
.chat-bubble:hover::after {
animation: none;
opacity: 0;
}
.chat-bubble svg {