PDF-accessibility-saas/db_manager.py

180 lines
6.5 KiB
Python

"""
PostgreSQL Database Manager — CRUD for jobs and audit logging
"""
import json
import os
import hashlib
import time
import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'pdf_checker')
DB_USER = os.getenv('DB_USER', 'pdf_checker')
DB_PASSWORD = os.getenv('DB_PASSWORD', 'dev_password')
@contextmanager
def get_conn():
"""Get a database connection (context manager)."""
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def create_job(job_id: str, filename: str, ip: str = None, api_key: str = None):
"""Create a new job record."""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:16] if api_key else None
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO jobs (job_id, filename, status, api_key_hash, ip_address)
VALUES (%s, %s, 'queued', %s, %s)""",
(job_id, filename, key_hash, ip)
)
def update_job_status(job_id: str, status: str, result_json: dict = None,
score: int = None, grade: str = None,
total_issues: int = None, critical_count: int = None,
error_count: int = None, warning_count: int = None,
processing_time: float = None):
"""Update job status and optionally store results."""
with get_conn() as conn:
with conn.cursor() as cur:
fields = ["status = %s"]
values = [status]
if result_json is not None:
fields.append("result_json = %s")
values.append(json.dumps(result_json))
if score is not None:
fields.append("score = %s")
values.append(score)
if grade is not None:
fields.append("grade = %s")
values.append(grade)
if total_issues is not None:
fields.append("total_issues = %s")
values.append(total_issues)
if critical_count is not None:
fields.append("critical_count = %s")
values.append(critical_count)
if error_count is not None:
fields.append("error_count = %s")
values.append(error_count)
if warning_count is not None:
fields.append("warning_count = %s")
values.append(warning_count)
if processing_time is not None:
fields.append("processing_time = %s")
values.append(processing_time)
if status == 'completed':
fields.append("completed_at = NOW()")
values.append(job_id)
cur.execute(
f"UPDATE jobs SET {', '.join(fields)} WHERE job_id = %s",
values
)
def get_job(job_id: str) -> dict:
"""Get a job by ID."""
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM jobs WHERE job_id = %s", (job_id,))
row = cur.fetchone()
return dict(row) if row else None
def list_jobs(limit: int = 50, offset: int = 0, status_filter: str = None) -> list:
"""List jobs with optional filtering."""
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
query = "SELECT job_id, filename, status, score, grade, total_issues, created_at, completed_at, processing_time FROM jobs"
values = []
if status_filter:
query += " WHERE status = %s"
values.append(status_filter)
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
values.extend([limit, offset])
cur.execute(query, values)
return [dict(row) for row in cur.fetchall()]
def log_audit(job_id: str, action: str, details: dict = None, ip: str = None):
"""Log an audit event."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO audit_log (job_id, action, details, ip_address)
VALUES (%s, %s, %s, %s)""",
(job_id, action, json.dumps(details or {}), ip)
)
def get_stats() -> dict:
"""Get aggregate statistics."""
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
COUNT(*) FILTER (WHERE status = 'failed') as failed_jobs,
COUNT(*) FILTER (WHERE status = 'processing') as active_jobs,
ROUND(AVG(score) FILTER (WHERE score IS NOT NULL)) as avg_score,
ROUND(AVG(processing_time) FILTER (WHERE processing_time IS NOT NULL)::numeric, 2) as avg_processing_time
FROM jobs
""")
return dict(cur.fetchone())
def dismiss_issue(job_id: str, issue_index: int, reason: str = None):
"""Record a dismissed/false-positive issue."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO dismissed_issues (job_id, issue_index, reason)
VALUES (%s, %s, %s)
ON CONFLICT (job_id, issue_index) DO UPDATE
SET reason = EXCLUDED.reason, dismissed_at = NOW()""",
(job_id, issue_index, reason)
)
def undismiss_issue(job_id: str, issue_index: int):
"""Remove a dismissal record."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM dismissed_issues WHERE job_id = %s AND issue_index = %s",
(job_id, issue_index)
)
def get_dismissed_indices(job_id: str) -> list:
"""Return list of dismissed issue indices for a job."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT issue_index FROM dismissed_issues WHERE job_id = %s ORDER BY issue_index",
(job_id,)
)
return [row[0] for row in cur.fetchall()]