presenton/servers/fastapi/tests/test_chat_memory_store.py
sudipnext 4e87dc8b70 refactor: Update database session management and enhance chat memory services
- Replaced `get_container_db_async_session` with `async_session_maker` for improved session handling in background tasks.
- Refactored chat memory services to utilize a shared `mem0` client for better memory management.
- Introduced new methods for retrieving and storing chat history, integrating with SQL and memory layers.
- Enhanced error handling and response management in chat-related services.
- Cleaned up unused code and improved overall structure for maintainability.
2026-04-25 19:10:39 +05:45

249 lines
8.7 KiB
Python

import asyncio
import uuid
from unittest.mock import patch
import services.mem0_oss_memory as mem0_oss
from services.chat.chat_memory_store import ChatMemoryStore
class FakeMemoryClient:
instances: list["FakeMemoryClient"] = []
def __init__(self, config=None):
self.config = config
self.add_calls = []
self.search_calls = []
self.get_all_calls = []
self.next_search_response = {"results": []}
self.next_get_all_response = {"results": []}
FakeMemoryClient.instances.append(self)
@classmethod
def from_config(cls, config):
return cls(config=config)
def add(self, *args, **kwargs):
messages = kwargs.get("messages") if "messages" in kwargs else None
if messages is None and args:
messages = args[0]
self.add_calls.append(
{
"messages": messages,
"user_id": kwargs.get("user_id"),
"infer": kwargs.get("infer"),
}
)
return {"ok": True}
def search(self, query, *args, **kwargs):
self.search_calls.append(
{
"query": query,
"filters": kwargs.get("filters"),
"user_id": kwargs.get("user_id"),
"top_k": kwargs.get("top_k"),
}
)
return self.next_search_response
def get_all(self, *args, **kwargs):
self.get_all_calls.append(
{
"filters": kwargs.get("filters"),
"user_id": kwargs.get("user_id"),
"limit": kwargs.get("limit"),
}
)
return self.next_get_all_response
def _mem0_oss_fresh() -> None:
mem0_oss._shared_client = None # type: ignore[attr-defined]
mem0_oss._init_attempted = False # type: ignore[attr-defined]
class TestChatMemoryStore:
def setup_method(self):
FakeMemoryClient.instances = []
_mem0_oss_fresh()
def test_store_chat_turn_uses_conversation_scoped_user_id(self):
with patch.dict(
"os.environ",
{
"MEM0_ENABLED": "true",
"MEM0_TOP_K": "4",
"MEM0_PRESENTATION_NAMESPACE_PREFIX": "presentation",
"APP_DATA_DIRECTORY": "/tmp/presenton-test",
},
clear=False,
), patch(
"services.chat.chat_memory_store.get_shared_mem0_client",
return_value=FakeMemoryClient.from_config(
{
"vector_store": {"provider": "qdrant", "config": {}},
"embedder": {"provider": "fastembed", "config": {}},
}
),
):
store = ChatMemoryStore()
presentation_id = uuid.uuid4()
conversation_id = uuid.uuid4()
asyncio.run(
store.store_chat_turn(
presentation_id=presentation_id,
conversation_id=conversation_id,
user_message="Can you tighten slide 3?",
assistant_message="Yes, I can make it shorter.",
)
)
assert len(FakeMemoryClient.instances) == 1
client = FakeMemoryClient.instances[0]
assert len(client.add_calls) == 1
expected_user_id = (
f"presentation:{presentation_id}:conversation:{conversation_id}"
)
assert client.add_calls[0]["user_id"] == expected_user_id
assert client.add_calls[0]["infer"] is False
payload = str(client.add_calls[0]["messages"][0]["content"])
assert "[chat_turn]" in payload
assert "user=Can you tighten slide 3?" in payload
assert "assistant=Yes, I can make it shorter." in payload
def test_retrieve_context_reads_only_conversation_scoped_user_id(self):
with patch.dict(
"os.environ",
{
"MEM0_ENABLED": "true",
"MEM0_TOP_K": "6",
"MEM0_PRESENTATION_NAMESPACE_PREFIX": "presentation",
"APP_DATA_DIRECTORY": "/tmp/presenton-test",
},
clear=False,
), patch(
"services.chat.chat_memory_store.get_shared_mem0_client",
return_value=FakeMemoryClient.from_config(
{
"vector_store": {"provider": "qdrant", "config": {}},
"embedder": {"provider": "fastembed", "config": {}},
}
),
):
store = ChatMemoryStore()
presentation_id = uuid.uuid4()
conversation_id = uuid.uuid4()
expected_user_id = (
f"presentation:{presentation_id}:conversation:{conversation_id}"
)
asyncio.run(
store.store_chat_turn(
presentation_id=presentation_id,
conversation_id=conversation_id,
user_message="First turn",
assistant_message="First answer",
)
)
client = FakeMemoryClient.instances[0]
client.next_search_response = {
"results": [
{"memory": "Chat memory A"},
{"memory": "Chat memory A"},
{"memory": "Chat memory B"},
]
}
context = asyncio.run(
store.retrieve_context(
presentation_id=presentation_id,
conversation_id=conversation_id,
query="What did we decide?",
)
)
assert "Chat memory A" in context
assert "Chat memory B" in context
assert context.count("Chat memory A") == 1
assert len(client.search_calls) == 1
assert client.search_calls[0]["query"] == "What did we decide?"
assert client.search_calls[0]["filters"] == {"user_id": expected_user_id}
assert client.search_calls[0]["top_k"] == 6
def test_load_history_reads_conversation_scoped_turns(self):
with patch.dict(
"os.environ",
{
"MEM0_ENABLED": "true",
"MEM0_PRESENTATION_NAMESPACE_PREFIX": "presentation",
"APP_DATA_DIRECTORY": "/tmp/presenton-test",
},
clear=False,
), patch(
"services.chat.chat_memory_store.get_shared_mem0_client",
return_value=FakeMemoryClient.from_config(
{
"vector_store": {"provider": "qdrant", "config": {}},
"embedder": {"provider": "fastembed", "config": {}},
}
),
):
store = ChatMemoryStore()
presentation_id = uuid.uuid4()
conversation_id = uuid.uuid4()
expected_user_id = (
f"presentation:{presentation_id}:conversation:{conversation_id}"
)
asyncio.run(
store.store_chat_turn(
presentation_id=presentation_id,
conversation_id=conversation_id,
user_message="Draft intro",
assistant_message="Updated intro done.",
)
)
client = FakeMemoryClient.instances[0]
client.next_get_all_response = {
"results": [
{
"memory": (
"[chat_turn]\n"
"turn_created_at=2026-04-25T10:00:00+00:00\n"
"user=Draft intro\nassistant=Updated intro done."
),
"created_at": "2026-04-25T10:00:01+00:00",
},
{
"memory": (
"[chat_turn]\n"
"turn_created_at=2026-04-25T10:01:00+00:00\n"
"user=Add roadmap\nassistant=Roadmap slide added."
),
"created_at": "2026-04-25T10:01:01+00:00",
},
]
}
history = asyncio.run(
store.load_history(
presentation_id=presentation_id,
conversation_id=conversation_id,
)
)
assert history == [
{"role": "user", "content": "Draft intro"},
{"role": "assistant", "content": "Updated intro done."},
{"role": "user", "content": "Add roadmap"},
{"role": "assistant", "content": "Roadmap slide added."},
]
assert len(client.get_all_calls) == 1
assert client.get_all_calls[0]["filters"] == {"user_id": expected_user_id}
assert client.get_all_calls[0]["limit"] >= 10