- Replaced `get_container_db_async_session` with `async_session_maker` for improved session handling in background tasks. - Refactored chat memory services to utilize a shared `mem0` client for better memory management. - Introduced new methods for retrieving and storing chat history, integrating with SQL and memory layers. - Enhanced error handling and response management in chat-related services. - Cleaned up unused code and improved overall structure for maintainability.
188 lines
6.4 KiB
Python
188 lines
6.4 KiB
Python
import asyncio
|
|
import uuid
|
|
from unittest.mock import patch
|
|
|
|
import services.mem0_oss_memory as mem0_oss
|
|
from services.mem0_presentation_memory_service import Mem0PresentationMemoryService
|
|
|
|
|
|
class FakeMemoryClient:
|
|
instances: list["FakeMemoryClient"] = []
|
|
|
|
def __init__(self, config=None):
|
|
self.config = config
|
|
self.add_calls = []
|
|
self.search_calls = []
|
|
self.next_search_response = {"results": []}
|
|
FakeMemoryClient.instances.append(self)
|
|
|
|
@classmethod
|
|
def from_config(cls, config):
|
|
return cls(config=config)
|
|
|
|
def add(self, *args, **kwargs):
|
|
messages = kwargs.get("messages") if "messages" in kwargs else None
|
|
if messages is None and args:
|
|
messages = args[0]
|
|
|
|
self.add_calls.append(
|
|
{
|
|
"messages": messages,
|
|
"user_id": kwargs.get("user_id"),
|
|
"infer": kwargs.get("infer"),
|
|
}
|
|
)
|
|
return {"ok": True}
|
|
|
|
def search(self, query, *args, **kwargs):
|
|
self.search_calls.append(
|
|
{
|
|
"query": query,
|
|
"filters": kwargs.get("filters"),
|
|
"user_id": kwargs.get("user_id"),
|
|
"top_k": kwargs.get("top_k"),
|
|
}
|
|
)
|
|
return self.next_search_response
|
|
|
|
|
|
def _mem0_oss_fresh() -> None:
|
|
mem0_oss._shared_client = None # type: ignore[attr-defined]
|
|
mem0_oss._init_attempted = False # type: ignore[attr-defined]
|
|
|
|
|
|
class TestMem0PresentationMemoryService:
|
|
def setup_method(self):
|
|
FakeMemoryClient.instances = []
|
|
_mem0_oss_fresh()
|
|
|
|
def test_store_generation_context_uses_presentation_scope(self):
|
|
with patch.dict(
|
|
"os.environ",
|
|
{
|
|
"MEM0_ENABLED": "true",
|
|
"APP_DATA_DIRECTORY": "/tmp/presenton-test",
|
|
},
|
|
clear=False,
|
|
), patch(
|
|
"services.mem0_presentation_memory_service.get_shared_mem0_client",
|
|
return_value=FakeMemoryClient.from_config(
|
|
{
|
|
"vector_store": {
|
|
"provider": "qdrant",
|
|
"config": {
|
|
"on_disk": True,
|
|
"embedding_model_dims": 384,
|
|
},
|
|
},
|
|
"embedder": {
|
|
"provider": "fastembed",
|
|
"config": {
|
|
"model": "BAAI/bge-small-en-v1.5",
|
|
"embedding_dims": 384,
|
|
},
|
|
},
|
|
}
|
|
),
|
|
):
|
|
service = Mem0PresentationMemoryService()
|
|
presentation_id = uuid.uuid4()
|
|
asyncio.run(
|
|
service.store_generation_context(
|
|
presentation_id=presentation_id,
|
|
system_prompt="system prompt",
|
|
user_prompt="user prompt",
|
|
extracted_document_text="doc text",
|
|
source_content="seed prompt",
|
|
instructions="be concise",
|
|
)
|
|
)
|
|
|
|
assert len(FakeMemoryClient.instances) == 1
|
|
client = FakeMemoryClient.instances[0]
|
|
assert client.config is not None
|
|
assert client.config["vector_store"]["provider"] == "qdrant"
|
|
assert client.config["embedder"]["provider"] == "fastembed"
|
|
assert (
|
|
client.config["embedder"]["config"]["model"]
|
|
== "BAAI/bge-small-en-v1.5"
|
|
)
|
|
assert client.config["embedder"]["config"]["embedding_dims"] == 384
|
|
assert client.config["vector_store"]["config"]["on_disk"] is True
|
|
assert client.config["vector_store"]["config"]["embedding_model_dims"] == 384
|
|
assert len(client.add_calls) == 5
|
|
|
|
scoped_user_id = f"presentation:{presentation_id}"
|
|
for call in client.add_calls:
|
|
assert call["user_id"] == scoped_user_id
|
|
assert call["infer"] is False
|
|
|
|
serialized_messages = "\n".join(
|
|
str(call["messages"][0]["content"]) for call in client.add_calls
|
|
)
|
|
assert "[outline_system_prompt]" in serialized_messages
|
|
assert "[outline_user_prompt]" in serialized_messages
|
|
assert "[document_extracted_text]" in serialized_messages
|
|
assert "[presentation_source_prompt]" in serialized_messages
|
|
|
|
def test_retrieve_context_uses_same_scope_and_deduplicates(self):
|
|
with patch.dict(
|
|
"os.environ",
|
|
{
|
|
"MEM0_ENABLED": "true",
|
|
"MEM0_TOP_K": "5",
|
|
"APP_DATA_DIRECTORY": "/tmp/presenton-test",
|
|
},
|
|
clear=False,
|
|
), patch(
|
|
"services.mem0_presentation_memory_service.get_shared_mem0_client",
|
|
return_value=FakeMemoryClient.from_config(
|
|
{
|
|
"vector_store": {"provider": "qdrant", "config": {}},
|
|
"embedder": {
|
|
"provider": "fastembed",
|
|
"config": {
|
|
"model": "BAAI/bge-small-en-v1.5",
|
|
"embedding_dims": 384,
|
|
},
|
|
},
|
|
}
|
|
),
|
|
):
|
|
service = Mem0PresentationMemoryService()
|
|
presentation_id = uuid.uuid4()
|
|
|
|
asyncio.run(
|
|
service.store_generated_outlines(
|
|
presentation_id,
|
|
{"slides": [{"content": "One"}]},
|
|
)
|
|
)
|
|
|
|
client = FakeMemoryClient.instances[0]
|
|
client.next_search_response = {
|
|
"results": [
|
|
{"memory": "Memory A"},
|
|
{"memory": "Memory A"},
|
|
{"memory": "Memory B"},
|
|
]
|
|
}
|
|
|
|
context = asyncio.run(
|
|
service.retrieve_context(
|
|
presentation_id=presentation_id,
|
|
query="change the conclusion",
|
|
)
|
|
)
|
|
|
|
assert "Memory A" in context
|
|
assert "Memory B" in context
|
|
assert context.count("Memory A") == 1
|
|
|
|
assert len(client.search_calls) == 1
|
|
assert client.search_calls[0]["query"] == "change the conclusion"
|
|
assert client.search_calls[0]["filters"] == {
|
|
"user_id": f"presentation:{presentation_id}"
|
|
}
|
|
assert client.search_calls[0]["top_k"] == 5
|
|
|