56 lines
No EOL
1.6 KiB
Python
56 lines
No EOL
1.6 KiB
Python
from motor.motor_asyncio import AsyncIOMotorClient
|
|
from typing import Optional
|
|
import redis.asyncio as redis
|
|
from .settings import settings
|
|
|
|
class Database:
|
|
client: Optional[AsyncIOMotorClient] = None
|
|
database = None
|
|
redis_client: Optional[redis.Redis] = None
|
|
|
|
db = Database()
|
|
|
|
async def connect_to_mongo():
|
|
"""Create database connection"""
|
|
db.client = AsyncIOMotorClient(settings.mongodb_url)
|
|
db.database = db.client[settings.database_name]
|
|
|
|
# Test connection
|
|
try:
|
|
await db.client.admin.command('ping')
|
|
print("Connected to MongoDB successfully!")
|
|
except Exception as e:
|
|
print(f"Error connecting to MongoDB: {e}")
|
|
raise
|
|
|
|
async def close_mongo_connection():
|
|
"""Close database connection"""
|
|
if db.client:
|
|
db.client.close()
|
|
print("Disconnected from MongoDB")
|
|
|
|
async def connect_to_redis():
|
|
"""Create Redis connection"""
|
|
if settings.cache_enabled:
|
|
try:
|
|
db.redis_client = redis.from_url(settings.redis_url)
|
|
await db.redis_client.ping()
|
|
print("Connected to Redis successfully!")
|
|
except Exception as e:
|
|
print(f"Error connecting to Redis: {e}")
|
|
print("Continuing without Redis cache...")
|
|
db.redis_client = None
|
|
|
|
async def close_redis_connection():
|
|
"""Close Redis connection"""
|
|
if db.redis_client:
|
|
await db.redis_client.close()
|
|
print("Disconnected from Redis")
|
|
|
|
def get_database():
|
|
"""Get database instance"""
|
|
return db.database
|
|
|
|
def get_redis():
|
|
"""Get Redis instance"""
|
|
return db.redis_client |