84 lines
2.9 KiB
Python
84 lines
2.9 KiB
Python
import socket
|
|
import pytest
|
|
import pandas as pd
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
from src.notebookllama.instrumentation import OtelTracesSqlEngine
|
|
from sqlalchemy import text
|
|
|
|
ENV = load_dotenv()
|
|
|
|
|
|
def is_port_open(host: str, port: int, timeout: float = 2.0) -> bool:
|
|
"""Check if a TCP port is open on a given host."""
|
|
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
|
|
sock.settimeout(timeout)
|
|
result = sock.connect_ex((host, port))
|
|
return result == 0
|
|
|
|
|
|
@pytest.fixture()
|
|
def otel_data() -> pd.DataFrame:
|
|
return pd.DataFrame(
|
|
{
|
|
"trace_id": ["abc123", "abc123", "def456"],
|
|
"span_id": ["span1", "span2", "span3"],
|
|
"parent_span_id": [None, "span1", "span2"],
|
|
"operation_name": [
|
|
"ServiceA.handle_request",
|
|
"ServiceA.query_db",
|
|
"ServiceB.send_email",
|
|
],
|
|
"start_time": [1750618321000000, 1750618321000100, 1750618321000200],
|
|
"duration": [150, 300, 500],
|
|
"status_code": ["OK", "OK", "ERROR"],
|
|
"service_name": ["service-a", "service-a", "service-b"],
|
|
}
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
condition=not is_port_open(host="localhost", port=5432) and not ENV,
|
|
reason="Either Postgres is currently unavailable or you did not set any env variables in a .env file",
|
|
)
|
|
def test_engine(otel_data: pd.DataFrame) -> None:
|
|
engine_url = f"postgresql+psycopg2://{os.getenv('pgql_user')}:{os.getenv('pgql_psw')}@localhost:5432/{os.getenv('pgql_db')}"
|
|
sql_engine = OtelTracesSqlEngine(engine_url=engine_url, table_name="test")
|
|
res = sql_engine.execute(text("DROP TABLE IF EXISTS test;"))
|
|
res.close()
|
|
sql_engine._to_sql(dataframe=otel_data)
|
|
res1 = sql_engine.execute(
|
|
text(
|
|
"SELECT span_id, operation_name, duration FROM test WHERE status_code = 'ERROR'"
|
|
)
|
|
)
|
|
res2 = sql_engine.execute(
|
|
text(
|
|
"SELECT service_name, AVG(duration) AS avg_duration FROM test GROUP BY service_name;"
|
|
)
|
|
)
|
|
res1_data = res1.fetchall()
|
|
res2_data = res2.fetchall()
|
|
|
|
# Compare just the values
|
|
assert len(res1_data) == 1
|
|
assert res1_data[0].span_id == "span3"
|
|
assert res1_data[0].operation_name == "ServiceB.send_email"
|
|
assert res1_data[0].duration == 500
|
|
|
|
assert len(res2_data) == 2
|
|
# Sort by service_name for consistent comparison
|
|
res2_sorted = sorted(res2_data, key=lambda x: x.service_name)
|
|
assert res2_sorted[0].service_name == "service-a"
|
|
assert res2_sorted[0].avg_duration == 225.0
|
|
assert res2_sorted[1].service_name == "service-b"
|
|
assert res2_sorted[1].avg_duration == 500.0
|
|
assert isinstance(sql_engine.to_pandas(), pd.DataFrame)
|
|
res3 = sql_engine.execute(
|
|
text(
|
|
"SELECT service_name, AVG(duration) AS avg_duration FROM test GROUP BY service_name;"
|
|
),
|
|
return_pandas=True,
|
|
)
|
|
assert isinstance(res3, pd.DataFrame)
|