Aimpress_site/chatbot-api/main.py
Vadym Samoilenko 73b1a0feda Add AI chatbot: FastAPI backend + React chat widget
- Python FastAPI backend (chatbot-api/) with Claude Sonnet 4.6, prompt injection
  protection, rate limiting (30 msg/session), off-topic filtering, Redis session storage
- Rocket.Chat integration for live monitoring and human takeover
- Lead capture via n8n webhook
- React chat widget: floating bubble, auto-greeting after 30s, glassmorphism chat
  window, mobile responsive, lazy loaded, Mixpanel analytics
- Nginx proxy /api/chat → chatbot-api:8000
- Docker: chatbot-api + Redis services added to docker-compose

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 17:14:07 +00:00

154 lines
5.4 KiB
Python

import asyncio
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
from models import ChatRequest, ChatResponse, RocketChatWebhook
from config import settings
from security import (
detect_injection,
is_off_topic,
check_rate_limit,
store_messages,
get_session_mode,
set_session_mode,
)
from llm import get_ai_response
from rocketchat import get_or_create_room, send_message
app = FastAPI(title="AImpress Chatbot API", docs_url=None, redoc_url=None)
@app.get("/api/chat/health")
async def health():
return {"status": "ok"}
@app.post("/api/chat")
async def chat(req: ChatRequest):
# Rate limit check
limited, count = await check_rate_limit(req.session_id)
if limited:
return ChatResponse(
reply="You've reached the message limit for this session. Please leave your contact details at hello@ai-impress.com and we'll follow up personally.",
sender="bot",
session_id=req.session_id,
message_count=count,
rate_limited=True,
)
# Message length validation (Pydantic handles max_length, but double-check)
message = req.message[:settings.max_message_length]
# Prompt injection detection
if detect_injection(message):
return ChatResponse(
reply="I'm here to help with AImpress services. What can I assist you with today?",
sender="bot",
session_id=req.session_id,
message_count=count,
)
# Check session mode (AI vs human takeover)
mode = await get_session_mode(req.session_id)
if mode == "human":
# Store message and forward to Rocket.Chat only
messages = await store_messages(req.session_id, "user", message)
room_id = await get_or_create_room(req.session_id)
if room_id:
asyncio.create_task(send_message(room_id, req.session_id, message, "visitor"))
return ChatResponse(
reply="", # Empty — waiting for human response via webhook
sender="human",
session_id=req.session_id,
message_count=count,
)
# Off-topic filter (lenient — only rejects clearly off-topic messages)
if is_off_topic(message):
reply = "I'm here to help with AImpress AI and automation services. What would you like to know about our solutions?"
await store_messages(req.session_id, "user", message)
await store_messages(req.session_id, "assistant", reply)
return ChatResponse(
reply=reply,
sender="bot",
session_id=req.session_id,
message_count=count,
)
# Store user message and get conversation history
messages = await store_messages(req.session_id, "user", message)
# Get AI response
try:
reply, lead_data = await get_ai_response(messages)
except Exception as e:
reply = "I'm sorry, I'm having a technical issue. Please try again or contact us at hello@ai-impress.com."
print(f"LLM error: {e}")
lead_data = None
# Store bot reply
await store_messages(req.session_id, "assistant", reply)
# Mirror to Rocket.Chat (async, don't block response)
room_id = await get_or_create_room(req.session_id)
if room_id:
asyncio.create_task(send_message(room_id, req.session_id, message, "visitor"))
asyncio.create_task(send_message(room_id, req.session_id, reply, "bot"))
return ChatResponse(
reply=reply,
sender="bot",
session_id=req.session_id,
message_count=count,
)
@app.post("/api/chat/webhook/rocketchat")
async def rocketchat_webhook(req: Request):
"""Webhook from Rocket.Chat when a manager sends a message."""
body = await req.json()
text = body.get("text", "")
channel_id = body.get("channel_id", "")
user_name = body.get("user_name", "")
if not text or not channel_id:
return {"status": "ignored"}
# Extract session_id from the channel (stored in Redis when room was created)
from security import get_redis
r = await get_redis()
# Check for /ai command to resume AI mode
if text.strip().lower() == "/ai":
session_id = await r.get(f"chat:room_session:{channel_id}")
if session_id:
await set_session_mode(session_id, "ai")
return {"status": "ai_resumed"}
# Set session to human mode and store the manager's message
session_id = await r.get(f"chat:room_session:{channel_id}")
if session_id:
await set_session_mode(session_id, "human")
await store_messages(session_id, "assistant", f"[{user_name}] {text}")
# Store for SSE/polling delivery to frontend
import json
await r.rpush(
f"chat:pending:{session_id}",
json.dumps({"sender": "human", "text": text, "agent": user_name}),
)
await r.expire(f"chat:pending:{session_id}", 300)
return {"status": "delivered"}
@app.get("/api/chat/pending/{session_id}")
async def get_pending_messages(session_id: str):
"""Poll for pending human messages (Phase 1 polling, Phase 2 SSE)."""
from security import get_redis
import json
r = await get_redis()
key = f"chat:pending:{session_id}"
raw = await r.lrange(key, 0, -1)
await r.delete(key)
messages = [json.loads(m) for m in raw]
return {"messages": messages}