- Replaced `get_container_db_async_session` with `async_session_maker` for improved session handling in background tasks. - Refactored chat memory services to utilize a shared `mem0` client for better memory management. - Introduced new methods for retrieving and storing chat history, integrating with SQL and memory layers. - Enhanced error handling and response management in chat-related services. - Cleaned up unused code and improved overall structure for maintainability.
131 lines
4.5 KiB
Python
131 lines
4.5 KiB
Python
"""Single shared mem0 OSS ``Memory`` client for the process.
|
|
|
|
All callers (presentation context, chat turns) use the same on-disk Qdrant/SQLite
|
|
and distinguish data via mem0 ``user_id``:
|
|
|
|
- Deck-level (no chat thread): ``{namespace}:{presentation_id}``
|
|
- Chat thread: ``{namespace}:{presentation_id}:conversation:{conversation_id}``
|
|
|
|
The chat flow calls ``ensure_conversation_id`` before the first turn, so a
|
|
``conversation_id`` exists before any mem0 write for that thread.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import threading
|
|
from importlib import import_module
|
|
from typing import Any, Optional
|
|
|
|
LOGGER = logging.getLogger(__name__)
|
|
|
|
_memory_init_lock = threading.Lock()
|
|
_shared_client: Any | None = None
|
|
_init_attempted = False
|
|
|
|
|
|
def _to_bool(value: Optional[str], default: bool = False) -> bool:
|
|
if value is None:
|
|
return default
|
|
return str(value).strip().lower() in {"1", "true", "yes", "on"}
|
|
|
|
|
|
def _to_int(value: Optional[str], default: int) -> int:
|
|
try:
|
|
parsed = int(value) if value is not None else default
|
|
return max(1, parsed)
|
|
except Exception:
|
|
return default
|
|
|
|
|
|
def _oss_config_from_env() -> tuple[str, str, str, str, int, dict[str, Any]]:
|
|
"""Return (mem0_dir, qdrant_path, history_db, collection, dims, from_config_dict)."""
|
|
app_data_dir = (os.getenv("APP_DATA_DIRECTORY") or "/tmp/presenton").strip()
|
|
mem0_dir = (os.getenv("MEM0_DIR") or os.path.join(app_data_dir, "mem0")).strip()
|
|
qdrant_path = (
|
|
os.getenv("MEM0_QDRANT_PATH") or os.path.join(mem0_dir, "qdrant")
|
|
).strip()
|
|
history_db_path = (
|
|
os.getenv("MEM0_HISTORY_DB_PATH") or os.path.join(mem0_dir, "history.db")
|
|
).strip()
|
|
collection = (
|
|
os.getenv("MEM0_COLLECTION_NAME") or "presenton_memories"
|
|
).strip() or "presenton_memories"
|
|
embedder = (os.getenv("MEM0_EMBEDDER_PROVIDER") or "fastembed").strip() or "fastembed"
|
|
model = (
|
|
os.getenv("MEM0_EMBEDDER_MODEL") or "BAAI/bge-small-en-v1.5"
|
|
).strip() or "BAAI/bge-small-en-v1.5"
|
|
dims = _to_int(os.getenv("MEM0_EMBEDDING_DIMS"), default=384)
|
|
config: dict[str, Any] = {
|
|
"vector_store": {
|
|
"provider": "qdrant",
|
|
"config": {
|
|
"collection_name": collection,
|
|
"path": qdrant_path,
|
|
"on_disk": True,
|
|
"embedding_model_dims": dims,
|
|
},
|
|
},
|
|
"embedder": {
|
|
"provider": embedder,
|
|
"config": {
|
|
"model": model,
|
|
"embedding_dims": dims,
|
|
},
|
|
},
|
|
"history_db_path": history_db_path,
|
|
}
|
|
return mem0_dir, qdrant_path, history_db_path, collection, dims, config
|
|
|
|
|
|
def memory_from_config(config: dict[str, Any], *, telemetry_base: str) -> Any:
|
|
"""Construct ``mem0.Memory``. Caller must hold ``_memory_init_lock`` if used with shared state."""
|
|
os.makedirs(telemetry_base, exist_ok=True)
|
|
import mem0.memory.main as mem0_main # type: ignore[import-untyped]
|
|
|
|
mem0_main.mem0_dir = telemetry_base
|
|
memory_cls = getattr(import_module("mem0"), "Memory")
|
|
return memory_cls.from_config(config)
|
|
|
|
|
|
def get_shared_mem0_client() -> Any | None:
|
|
"""Return the process-wide mem0 client, or ``None`` if disabled or init failed."""
|
|
global _shared_client, _init_attempted
|
|
|
|
if not _to_bool(os.getenv("MEM0_ENABLED"), default=True):
|
|
return None
|
|
if _shared_client is not None:
|
|
return _shared_client
|
|
if _init_attempted:
|
|
return None
|
|
|
|
with _memory_init_lock:
|
|
if _shared_client is not None:
|
|
return _shared_client
|
|
if _init_attempted:
|
|
return None
|
|
_init_attempted = True
|
|
try:
|
|
mem0_dir, qdrant_path, history_db, collection, dims, config = (
|
|
_oss_config_from_env()
|
|
)
|
|
os.makedirs(mem0_dir, exist_ok=True)
|
|
os.makedirs(qdrant_path, exist_ok=True)
|
|
telemetry_base = os.path.join(mem0_dir, "telemetry", "oss")
|
|
_shared_client = memory_from_config(
|
|
config,
|
|
telemetry_base=telemetry_base,
|
|
)
|
|
LOGGER.info(
|
|
"Mem0 OSS shared memory initialized (qdrant_path=%s, history_db_path=%s, collection=%s, dims=%s)",
|
|
qdrant_path,
|
|
history_db,
|
|
collection,
|
|
dims,
|
|
)
|
|
except BaseException:
|
|
LOGGER.exception("Failed to initialize shared Mem0 OSS Memory")
|
|
_shared_client = None
|
|
|
|
return _shared_client
|