Aimpress-site/chatbot-api/security.py
Vadym Samoilenko 73b1a0feda Add AI chatbot: FastAPI backend + React chat widget
- Python FastAPI backend (chatbot-api/) with Claude Sonnet 4.6, prompt injection
  protection, rate limiting (30 msg/session), off-topic filtering, Redis session storage
- Rocket.Chat integration for live monitoring and human takeover
- Lead capture via n8n webhook
- React chat widget: floating bubble, auto-greeting after 30s, glassmorphism chat
  window, mobile responsive, lazy loaded, Mixpanel analytics
- Nginx proxy /api/chat → chatbot-api:8000
- Docker: chatbot-api + Redis services added to docker-compose

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-08 17:14:07 +00:00

93 lines
3.6 KiB
Python

import re
import redis.asyncio as redis
from config import settings
_redis: redis.Redis | None = None
INJECTION_PATTERNS = [
re.compile(r"ignore\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)", re.I),
re.compile(r"(system\s*prompt|system\s*message|initial\s*prompt)", re.I),
re.compile(r"you\s+are\s+now\s+", re.I),
re.compile(r"(pretend|act)\s+(to\s+be|as\s+if|like|you.re)\s+", re.I),
re.compile(r"(reveal|show|print|output|repeat)\s+(your|the)\s+(system|instructions|prompt|rules)", re.I),
re.compile(r"do\s+not\s+follow\s+(your|the)\s+(rules|instructions)", re.I),
re.compile(r"disregard\s+(all|your|the)\s+(previous|prior|rules|instructions)", re.I),
re.compile(r"jailbreak", re.I),
re.compile(r"\bDAN\b"),
re.compile(r"override\s+(your|safety|the)\s+", re.I),
]
BUSINESS_KEYWORDS = [
"ai", "automation", "chatbot", "bot", "service", "price", "pricing", "cost",
"plan", "retainer", "consult", "book", "meeting", "call", "demo",
"workflow", "process", "solution", "business", "company", "help",
"website", "analytics", "data", "custom", "integrate", "integration",
"email", "contact", "phone", "name", "hi", "hello", "hey", "thanks",
"thank", "yes", "no", "ok", "okay", "sure", "please", "what", "how",
"when", "where", "who", "why", "can", "do", "does", "is", "are",
"tell", "more", "about", "your", "offer", "work", "project",
"startup", "charity", "discount", "free", "trial", "poc", "mvp",
"assessment", "briefing", "challenge", "case", "study", "result",
"aimpress", "impress", "uk", "client", "customer", "need", "want",
"looking", "interested", "budget", "timeline", "estimate", "quote",
]
async def get_redis() -> redis.Redis:
global _redis
if _redis is None:
_redis = redis.from_url(settings.redis_url, decode_responses=True)
return _redis
def detect_injection(message: str) -> bool:
for pattern in INJECTION_PATTERNS:
if pattern.search(message):
return True
return False
def is_off_topic(message: str) -> bool:
"""Returns True only if we're fairly confident the message is off-topic.
When uncertain, returns False and lets Claude handle it via system prompt."""
words = set(re.findall(r"[a-zA-Z]+", message.lower()))
if len(words) <= 3:
return False
matches = words & set(BUSINESS_KEYWORDS)
return len(matches) == 0 and len(words) > 5
async def check_rate_limit(session_id: str) -> tuple[bool, int]:
r = await get_redis()
key = f"chat:rate:{session_id}"
count = await r.get(key)
current = int(count) if count else 0
if current >= settings.max_messages_per_session:
return True, current
pipe = r.pipeline()
pipe.incr(key)
pipe.expire(key, settings.conversation_ttl)
results = await pipe.execute()
return False, results[0]
async def store_messages(session_id: str, role: str, content: str) -> list[dict]:
r = await get_redis()
key = f"chat:history:{session_id}"
import json
msg = json.dumps({"role": role, "content": content})
await r.rpush(key, msg)
await r.expire(key, settings.conversation_ttl)
raw = await r.lrange(key, -settings.conversation_window, -1)
return [json.loads(m) for m in raw]
async def get_session_mode(session_id: str) -> str:
r = await get_redis()
mode = await r.get(f"chat:mode:{session_id}")
return mode or "ai"
async def set_session_mode(session_id: str, mode: str) -> None:
r = await get_redis()
await r.set(f"chat:mode:{session_id}", mode, ex=settings.conversation_ttl)