""" asyncpg connection pool with JSONB codec registration. Call init_pool() once at startup and close_pool() at shutdown. """ import json import logging import asyncpg logger = logging.getLogger(__name__) _pool: asyncpg.Pool | None = None async def _init_conn(conn: asyncpg.Connection): """Register JSONB/JSON codecs so Python dicts/lists are passed transparently.""" await conn.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog') async def init_pool(dsn: str): global _pool _pool = await asyncpg.create_pool( dsn, min_size=2, max_size=10, command_timeout=30, init=_init_conn, ) logger.info("PostgreSQL pool initialized") async def close_pool(): global _pool if _pool: await _pool.close() _pool = None logger.info("PostgreSQL pool closed") def get_pool() -> asyncpg.Pool: if _pool is None: raise RuntimeError("Database pool not initialized — call init_pool() first") return _pool