netflix/init_mongodb.py
michael 236d1ddbd8 Initial commit: Netflix GraphRAG marketing chatbot
Full-stack application combining LlamaIndex vector search with Neo4j
knowledge graph (GraphRAG) for answering queries about Netflix marketing
materials. Flask/Hypercorn backend with custom ReAct agent, React frontend.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 10:28:33 -06:00

127 lines
No EOL
4.7 KiB
Python

"""
MongoDB Initialization Script for Netflix Chatbot
This script initializes the MongoDB database with the necessary collections for the Netflix chatbot.
It creates collections for users, conversations, and messages.
Usage:
python init_mongodb.py
"""
import pymongo
import logging
from datetime import datetime
import os
import sys
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('mongodb_init.log')
]
)
logger = logging.getLogger(__name__)
# MongoDB connection information
MONGO_URI = "mongodb://netflix:netflix@localhost:27017"
DB_NAME = "netflix_chatbot"
# Collection names
USERS_COLLECTION = "users"
CONVERSATIONS_COLLECTION = "conversations"
MESSAGES_COLLECTION = "messages"
def init_mongodb():
"""Initialize MongoDB database and collections."""
try:
# Connect to MongoDB
logger.info("Connecting to MongoDB...")
client = pymongo.MongoClient(MONGO_URI)
# Test connection
client.admin.command('ping')
logger.info("Successfully connected to MongoDB")
# Create or access database
db = client[DB_NAME]
logger.info(f"Using database: {DB_NAME}")
# Create collections if they don't exist
if USERS_COLLECTION not in db.list_collection_names():
db.create_collection(USERS_COLLECTION)
logger.info(f"Created collection: {USERS_COLLECTION}")
# Create indexes for users collection
db[USERS_COLLECTION].create_index("username", unique=True)
# Create a unique sparse index for email - only enforces uniqueness when email exists
db[USERS_COLLECTION].create_index("email", unique=True, sparse=True)
logger.info("Created indexes for users collection")
if CONVERSATIONS_COLLECTION not in db.list_collection_names():
db.create_collection(CONVERSATIONS_COLLECTION)
logger.info(f"Created collection: {CONVERSATIONS_COLLECTION}")
# Create indexes for conversations collection
db[CONVERSATIONS_COLLECTION].create_index("user_id")
db[CONVERSATIONS_COLLECTION].create_index("session_id", unique=True)
db[CONVERSATIONS_COLLECTION].create_index("created_at")
db[CONVERSATIONS_COLLECTION].create_index("last_updated")
logger.info("Created indexes for conversations collection")
if MESSAGES_COLLECTION not in db.list_collection_names():
db.create_collection(MESSAGES_COLLECTION)
logger.info(f"Created collection: {MESSAGES_COLLECTION}")
# Create indexes for messages collection
db[MESSAGES_COLLECTION].create_index("conversation_id")
db[MESSAGES_COLLECTION].create_index("timestamp")
logger.info("Created indexes for messages collection")
logger.info("MongoDB initialization completed successfully")
return True
except pymongo.errors.ConnectionFailure as e:
logger.error(f"Could not connect to MongoDB: {e}")
return False
except Exception as e:
logger.error(f"An error occurred during MongoDB initialization: {e}")
return False
def display_collection_info(client):
"""Display information about the collections in the database."""
db = client[DB_NAME]
logger.info("=== Database Structure ===")
for collection_name in db.list_collection_names():
count = db[collection_name].count_documents({})
logger.info(f"Collection: {collection_name}, Documents: {count}")
# Display indexes
indexes = db[collection_name].index_information()
logger.info(f" Indexes: {list(indexes.keys())}")
if __name__ == "__main__":
if init_mongodb():
# Display collection information
client = pymongo.MongoClient(MONGO_URI)
display_collection_info(client)
# Add sample user if none exist (optional)
db = client[DB_NAME]
if db[USERS_COLLECTION].count_documents({}) == 0:
sample_user = {
"username": "sample_user",
"email": "sample@example.com",
"created_at": datetime.utcnow(),
"last_login": datetime.utcnow()
}
db[USERS_COLLECTION].insert_one(sample_user)
logger.info("Added sample user for testing")
logger.info("Initialization complete. The database is ready for use.")
else:
logger.error("Failed to initialize MongoDB. See logs for details.")
sys.exit(1)