sandbox-notebookllamalm/tests/test_sql_engine.py
Clelia (Astra) Bertelli ca9ee1250c Renaming to NotebookLlama
2025-06-30 22:31:22 +02:00

84 lines
2.9 KiB
Python

import socket
import pytest
import pandas as pd
import os
from dotenv import load_dotenv
from src.notebookllama.instrumentation import OtelTracesSqlEngine
from sqlalchemy import text
ENV = load_dotenv()
def is_port_open(host: str, port: int, timeout: float = 2.0) -> bool:
"""Check if a TCP port is open on a given host."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.settimeout(timeout)
result = sock.connect_ex((host, port))
return result == 0
@pytest.fixture()
def otel_data() -> pd.DataFrame:
return pd.DataFrame(
{
"trace_id": ["abc123", "abc123", "def456"],
"span_id": ["span1", "span2", "span3"],
"parent_span_id": [None, "span1", "span2"],
"operation_name": [
"ServiceA.handle_request",
"ServiceA.query_db",
"ServiceB.send_email",
],
"start_time": [1750618321000000, 1750618321000100, 1750618321000200],
"duration": [150, 300, 500],
"status_code": ["OK", "OK", "ERROR"],
"service_name": ["service-a", "service-a", "service-b"],
}
)
@pytest.mark.skipif(
condition=not is_port_open(host="localhost", port=5432) and not ENV,
reason="Either Postgres is currently unavailable or you did not set any env variables in a .env file",
)
def test_engine(otel_data: pd.DataFrame) -> None:
engine_url = f"postgresql+psycopg2://{os.getenv('pgql_user')}:{os.getenv('pgql_psw')}@localhost:5432/{os.getenv('pgql_db')}"
sql_engine = OtelTracesSqlEngine(engine_url=engine_url, table_name="test")
res = sql_engine.execute(text("DROP TABLE IF EXISTS test;"))
res.close()
sql_engine._to_sql(dataframe=otel_data)
res1 = sql_engine.execute(
text(
"SELECT span_id, operation_name, duration FROM test WHERE status_code = 'ERROR'"
)
)
res2 = sql_engine.execute(
text(
"SELECT service_name, AVG(duration) AS avg_duration FROM test GROUP BY service_name;"
)
)
res1_data = res1.fetchall()
res2_data = res2.fetchall()
# Compare just the values
assert len(res1_data) == 1
assert res1_data[0].span_id == "span3"
assert res1_data[0].operation_name == "ServiceB.send_email"
assert res1_data[0].duration == 500
assert len(res2_data) == 2
# Sort by service_name for consistent comparison
res2_sorted = sorted(res2_data, key=lambda x: x.service_name)
assert res2_sorted[0].service_name == "service-a"
assert res2_sorted[0].avg_duration == 225.0
assert res2_sorted[1].service_name == "service-b"
assert res2_sorted[1].avg_duration == 500.0
assert isinstance(sql_engine.to_pandas(), pd.DataFrame)
res3 = sql_engine.execute(
text(
"SELECT service_name, AVG(duration) AS avg_duration FROM test GROUP BY service_name;"
),
return_pandas=True,
)
assert isinstance(res3, pd.DataFrame)