import re import redis.asyncio as redis from config import settings _redis: redis.Redis | None = None INJECTION_PATTERNS = [ re.compile(r"ignore\s+(all\s+)?(previous|prior|above)\s+(instructions|prompts|rules)", re.I), re.compile(r"(system\s*prompt|system\s*message|initial\s*prompt)", re.I), re.compile(r"you\s+are\s+now\s+", re.I), re.compile(r"(pretend|act)\s+(to\s+be|as\s+if|like|you.re)\s+", re.I), re.compile(r"(reveal|show|print|output|repeat)\s+(your|the)\s+(system|instructions|prompt|rules)", re.I), re.compile(r"do\s+not\s+follow\s+(your|the)\s+(rules|instructions)", re.I), re.compile(r"disregard\s+(all|your|the)\s+(previous|prior|rules|instructions)", re.I), re.compile(r"jailbreak", re.I), re.compile(r"\bDAN\b"), re.compile(r"override\s+(your|safety|the)\s+", re.I), ] BUSINESS_KEYWORDS = [ "ai", "automation", "chatbot", "bot", "service", "price", "pricing", "cost", "plan", "retainer", "consult", "book", "meeting", "call", "demo", "workflow", "process", "solution", "business", "company", "help", "website", "analytics", "data", "custom", "integrate", "integration", "email", "contact", "phone", "name", "hi", "hello", "hey", "thanks", "thank", "yes", "no", "ok", "okay", "sure", "please", "what", "how", "when", "where", "who", "why", "can", "do", "does", "is", "are", "tell", "more", "about", "your", "offer", "work", "project", "startup", "charity", "discount", "free", "trial", "poc", "mvp", "assessment", "briefing", "challenge", "case", "study", "result", "aimpress", "impress", "uk", "client", "customer", "need", "want", "looking", "interested", "budget", "timeline", "estimate", "quote", ] async def get_redis() -> redis.Redis: global _redis if _redis is None: _redis = redis.from_url(settings.redis_url, decode_responses=True) return _redis def detect_injection(message: str) -> bool: for pattern in INJECTION_PATTERNS: if pattern.search(message): return True return False def is_off_topic(message: str) -> bool: """Returns True only if we're fairly confident the message is off-topic. When uncertain, returns False and lets Claude handle it via system prompt.""" words = set(re.findall(r"[a-zA-Z]+", message.lower())) if len(words) <= 3: return False matches = words & set(BUSINESS_KEYWORDS) return len(matches) == 0 and len(words) > 5 async def check_rate_limit(session_id: str) -> tuple[bool, int]: r = await get_redis() key = f"chat:rate:{session_id}" count = await r.get(key) current = int(count) if count else 0 if current >= settings.max_messages_per_session: return True, current pipe = r.pipeline() pipe.incr(key) pipe.expire(key, settings.conversation_ttl) results = await pipe.execute() return False, results[0] async def store_messages(session_id: str, role: str, content: str) -> list[dict]: r = await get_redis() key = f"chat:history:{session_id}" import json msg = json.dumps({"role": role, "content": content}) await r.rpush(key, msg) await r.expire(key, settings.conversation_ttl) raw = await r.lrange(key, -settings.conversation_window, -1) return [json.loads(m) for m in raw] async def get_session_mode(session_id: str) -> str: r = await get_redis() mode = await r.get(f"chat:mode:{session_id}") return mode or "ai" async def set_session_mode(session_id: str, mode: str) -> None: r = await get_redis() await r.set(f"chat:mode:{session_id}", mode, ex=settings.conversation_ttl)