Aimpress_site/chatbot-api/main.py
Vadym Samoilenko 8e73d77abc Fix chatbot: lead persistence, CRM creation, RC delivery, mobile UI
- Store lead info in Redis session meta so bot remembers name across messages
- Create Twenty CRM lead immediately from form data (bypass tool-call flow)
- Store room→session reverse mapping in Redis for RC webhook delivery
- Update system prompt: don't re-ask for form-provided info
- Fix "Most Popular" badge clipped on mobile (overflow: visible)
- Fix contact form inputs overflowing on small screens (box-sizing)
- Reduce chat tooltip size on mobile to avoid overlapping content

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 20:18:29 +00:00

198 lines
7.5 KiB
Python

import asyncio
import json
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from models import ChatRequest, ChatResponse, RocketChatWebhook
from config import settings
from security import (
detect_injection,
is_off_topic,
check_rate_limit,
store_messages,
get_session_mode,
set_session_mode,
get_redis,
)
from llm import get_ai_response
from rocketchat import get_or_create_room, send_message
from twenty_crm import create_lead_in_crm
app = FastAPI(title="AImpress Chatbot API", docs_url=None, redoc_url=None)
@app.get("/api/chat/health")
async def health():
return {"status": "ok"}
@app.post("/api/chat")
async def chat(req: ChatRequest):
# Rate limit check
limited, count = await check_rate_limit(req.session_id)
if limited:
return ChatResponse(
reply="You've reached the message limit for this session. Please leave your contact details at hello@ai-impress.com and we'll follow up personally.",
sender="bot",
session_id=req.session_id,
message_count=count,
rate_limited=True,
)
# Message length validation (Pydantic handles max_length, but double-check)
message = req.message[:settings.max_message_length]
# Prompt injection detection
if detect_injection(message):
return ChatResponse(
reply="I'm here to help with AImpress services. What can I assist you with today?",
sender="bot",
session_id=req.session_id,
message_count=count,
)
# Check session mode (AI vs human takeover)
mode = await get_session_mode(req.session_id)
if mode == "human":
# Store message and forward to Rocket.Chat only
messages = await store_messages(req.session_id, "user", message)
room_id = await get_or_create_room(req.session_id)
if room_id:
asyncio.create_task(send_message(room_id, req.session_id, message, "visitor"))
return ChatResponse(
reply="", # Empty — waiting for human response via webhook
sender="human",
session_id=req.session_id,
message_count=count,
)
# Off-topic filter (lenient — only rejects clearly off-topic messages)
if is_off_topic(message):
reply = "I'm here to help with AImpress AI and automation services. What would you like to know about our solutions?"
await store_messages(req.session_id, "user", message)
await store_messages(req.session_id, "assistant", reply)
return ChatResponse(
reply=reply,
sender="bot",
session_id=req.session_id,
message_count=count,
)
# Store user message and get conversation history
messages = await store_messages(req.session_id, "user", message)
# Load session meta (twenty_person_id, lead info, etc.)
r = await get_redis()
meta_raw = await r.get(f"chat:meta:{req.session_id}")
session_meta = json.loads(meta_raw) if meta_raw else {}
session_meta["page_context"] = req.page_context
# If lead info provided (first message), save to session meta + create CRM lead
if req.lead and req.lead.name and "lead" not in session_meta:
lead_info = {"name": req.lead.name}
if req.lead.email:
lead_info["email"] = req.lead.email
if req.lead.company:
lead_info["company"] = req.lead.company
session_meta["lead"] = lead_info
# Create lead in Twenty CRM immediately from form data
person_id = await create_lead_in_crm(
name=req.lead.name,
email=req.lead.email or "",
company=req.lead.company or "",
need=message,
page_context=req.page_context or "/",
)
if person_id:
session_meta["twenty_person_id"] = person_id
# Always prepend lead context from session meta (persists across messages)
stored_lead = session_meta.get("lead")
if stored_lead:
lead_context = f"[System: The visitor has introduced themselves. Name: {stored_lead['name']}"
if stored_lead.get("email"):
lead_context += f", Email: {stored_lead['email']}"
if stored_lead.get("company"):
lead_context += f", Company: {stored_lead['company']}"
lead_context += ". Use this info naturally — greet them by name. Don't re-ask for info you already have.]"
messages = [{"role": "user", "content": lead_context}, {"role": "assistant", "content": "Understood."}] + messages
# Get AI response
try:
reply, lead_data = await get_ai_response(messages, session_meta)
except Exception as e:
reply = "I'm sorry, I'm having a technical issue. Please try again or contact us at hello@ai-impress.com."
print(f"LLM error: {e}")
lead_data = None
# Persist session meta (may contain new twenty_person_id)
await r.set(f"chat:meta:{req.session_id}", json.dumps(session_meta), ex=settings.conversation_ttl)
# Store bot reply
await store_messages(req.session_id, "assistant", reply)
# Mirror to Rocket.Chat (async, don't block response)
visitor_name = req.lead.name if req.lead and req.lead.name else "Website Visitor"
room_id = await get_or_create_room(req.session_id, visitor_name)
if room_id:
asyncio.create_task(send_message(room_id, req.session_id, message, "visitor"))
asyncio.create_task(send_message(room_id, req.session_id, reply, "bot"))
return ChatResponse(
reply=reply,
sender="bot",
session_id=req.session_id,
message_count=count,
)
@app.post("/api/chat/webhook/rocketchat")
async def rocketchat_webhook(req: Request):
"""Webhook from Rocket.Chat when a manager sends a message."""
body = await req.json()
text = body.get("text", "")
channel_id = body.get("channel_id", "")
user_name = body.get("user_name", "")
if not text or not channel_id:
return {"status": "ignored"}
# Extract session_id from the channel (stored in Redis when room was created)
from security import get_redis
r = await get_redis()
# Check for /ai command to resume AI mode
if text.strip().lower() == "/ai":
session_id = await r.get(f"chat:room_session:{channel_id}")
if session_id:
await set_session_mode(session_id, "ai")
return {"status": "ai_resumed"}
# Set session to human mode and store the manager's message
session_id = await r.get(f"chat:room_session:{channel_id}")
if session_id:
await set_session_mode(session_id, "human")
await store_messages(session_id, "assistant", f"[{user_name}] {text}")
# Store for SSE/polling delivery to frontend
import json
await r.rpush(
f"chat:pending:{session_id}",
json.dumps({"sender": "human", "text": text, "agent": user_name}),
)
await r.expire(f"chat:pending:{session_id}", 300)
return {"status": "delivered"}
@app.get("/api/chat/pending/{session_id}")
async def get_pending_messages(session_id: str):
"""Poll for pending human messages (Phase 1 polling, Phase 2 SSE)."""
from security import get_redis
import json
r = await get_redis()
key = f"chat:pending:{session_id}"
raw = await r.lrange(key, 0, -1)
await r.delete(key)
messages = [json.loads(m) for m in raw]
return {"messages": messages}