180 lines
6.5 KiB
Python
180 lines
6.5 KiB
Python
"""
|
|
PostgreSQL Database Manager — CRUD for jobs and audit logging
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import hashlib
|
|
import time
|
|
import psycopg2
|
|
from psycopg2.extras import RealDictCursor
|
|
from contextlib import contextmanager
|
|
|
|
DB_HOST = os.getenv('DB_HOST', 'localhost')
|
|
DB_PORT = int(os.getenv('DB_PORT', 5432))
|
|
DB_NAME = os.getenv('DB_NAME', 'pdf_checker')
|
|
DB_USER = os.getenv('DB_USER', 'pdf_checker')
|
|
DB_PASSWORD = os.getenv('DB_PASSWORD', 'dev_password')
|
|
|
|
|
|
@contextmanager
|
|
def get_conn():
|
|
"""Get a database connection (context manager)."""
|
|
conn = psycopg2.connect(
|
|
host=DB_HOST,
|
|
port=DB_PORT,
|
|
dbname=DB_NAME,
|
|
user=DB_USER,
|
|
password=DB_PASSWORD
|
|
)
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def create_job(job_id: str, filename: str, ip: str = None, api_key: str = None):
|
|
"""Create a new job record."""
|
|
key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:16] if api_key else None
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO jobs (job_id, filename, status, api_key_hash, ip_address)
|
|
VALUES (%s, %s, 'queued', %s, %s)""",
|
|
(job_id, filename, key_hash, ip)
|
|
)
|
|
|
|
|
|
def update_job_status(job_id: str, status: str, result_json: dict = None,
|
|
score: int = None, grade: str = None,
|
|
total_issues: int = None, critical_count: int = None,
|
|
error_count: int = None, warning_count: int = None,
|
|
processing_time: float = None):
|
|
"""Update job status and optionally store results."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
fields = ["status = %s"]
|
|
values = [status]
|
|
|
|
if result_json is not None:
|
|
fields.append("result_json = %s")
|
|
values.append(json.dumps(result_json))
|
|
if score is not None:
|
|
fields.append("score = %s")
|
|
values.append(score)
|
|
if grade is not None:
|
|
fields.append("grade = %s")
|
|
values.append(grade)
|
|
if total_issues is not None:
|
|
fields.append("total_issues = %s")
|
|
values.append(total_issues)
|
|
if critical_count is not None:
|
|
fields.append("critical_count = %s")
|
|
values.append(critical_count)
|
|
if error_count is not None:
|
|
fields.append("error_count = %s")
|
|
values.append(error_count)
|
|
if warning_count is not None:
|
|
fields.append("warning_count = %s")
|
|
values.append(warning_count)
|
|
if processing_time is not None:
|
|
fields.append("processing_time = %s")
|
|
values.append(processing_time)
|
|
if status == 'completed':
|
|
fields.append("completed_at = NOW()")
|
|
|
|
values.append(job_id)
|
|
cur.execute(
|
|
f"UPDATE jobs SET {', '.join(fields)} WHERE job_id = %s",
|
|
values
|
|
)
|
|
|
|
|
|
def get_job(job_id: str) -> dict:
|
|
"""Get a job by ID."""
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("SELECT * FROM jobs WHERE job_id = %s", (job_id,))
|
|
row = cur.fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def list_jobs(limit: int = 50, offset: int = 0, status_filter: str = None) -> list:
|
|
"""List jobs with optional filtering."""
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
query = "SELECT job_id, filename, status, score, grade, total_issues, created_at, completed_at, processing_time FROM jobs"
|
|
values = []
|
|
if status_filter:
|
|
query += " WHERE status = %s"
|
|
values.append(status_filter)
|
|
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
|
|
values.extend([limit, offset])
|
|
cur.execute(query, values)
|
|
return [dict(row) for row in cur.fetchall()]
|
|
|
|
|
|
def log_audit(job_id: str, action: str, details: dict = None, ip: str = None):
|
|
"""Log an audit event."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO audit_log (job_id, action, details, ip_address)
|
|
VALUES (%s, %s, %s, %s)""",
|
|
(job_id, action, json.dumps(details or {}), ip)
|
|
)
|
|
|
|
|
|
def get_stats() -> dict:
|
|
"""Get aggregate statistics."""
|
|
with get_conn() as conn:
|
|
with conn.cursor(cursor_factory=RealDictCursor) as cur:
|
|
cur.execute("""
|
|
SELECT
|
|
COUNT(*) as total_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'failed') as failed_jobs,
|
|
COUNT(*) FILTER (WHERE status = 'processing') as active_jobs,
|
|
ROUND(AVG(score) FILTER (WHERE score IS NOT NULL)) as avg_score,
|
|
ROUND(AVG(processing_time) FILTER (WHERE processing_time IS NOT NULL)::numeric, 2) as avg_processing_time
|
|
FROM jobs
|
|
""")
|
|
return dict(cur.fetchone())
|
|
|
|
|
|
def dismiss_issue(job_id: str, issue_index: int, reason: str = None):
|
|
"""Record a dismissed/false-positive issue."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"""INSERT INTO dismissed_issues (job_id, issue_index, reason)
|
|
VALUES (%s, %s, %s)
|
|
ON CONFLICT (job_id, issue_index) DO UPDATE
|
|
SET reason = EXCLUDED.reason, dismissed_at = NOW()""",
|
|
(job_id, issue_index, reason)
|
|
)
|
|
|
|
|
|
def undismiss_issue(job_id: str, issue_index: int):
|
|
"""Remove a dismissal record."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"DELETE FROM dismissed_issues WHERE job_id = %s AND issue_index = %s",
|
|
(job_id, issue_index)
|
|
)
|
|
|
|
|
|
def get_dismissed_indices(job_id: str) -> list:
|
|
"""Return list of dismissed issue indices for a job."""
|
|
with get_conn() as conn:
|
|
with conn.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT issue_index FROM dismissed_issues WHERE job_id = %s ORDER BY issue_index",
|
|
(job_id,)
|
|
)
|
|
return [row[0] for row in cur.fetchall()]
|