""" PostgreSQL Database Manager — CRUD for jobs and audit logging """ import json import os import hashlib import time import psycopg2 from psycopg2.extras import RealDictCursor from contextlib import contextmanager DB_HOST = os.getenv('DB_HOST', 'localhost') DB_PORT = int(os.getenv('DB_PORT', 5432)) DB_NAME = os.getenv('DB_NAME', 'pdf_checker') DB_USER = os.getenv('DB_USER', 'pdf_checker') DB_PASSWORD = os.getenv('DB_PASSWORD', 'dev_password') @contextmanager def get_conn(): """Get a database connection (context manager).""" conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD ) try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def create_job(job_id: str, filename: str, ip: str = None, api_key: str = None): """Create a new job record.""" key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:16] if api_key else None with get_conn() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO jobs (job_id, filename, status, api_key_hash, ip_address) VALUES (%s, %s, 'queued', %s, %s)""", (job_id, filename, key_hash, ip) ) def update_job_status(job_id: str, status: str, result_json: dict = None, score: int = None, grade: str = None, total_issues: int = None, critical_count: int = None, error_count: int = None, warning_count: int = None, processing_time: float = None): """Update job status and optionally store results.""" with get_conn() as conn: with conn.cursor() as cur: fields = ["status = %s"] values = [status] if result_json is not None: fields.append("result_json = %s") values.append(json.dumps(result_json)) if score is not None: fields.append("score = %s") values.append(score) if grade is not None: fields.append("grade = %s") values.append(grade) if total_issues is not None: fields.append("total_issues = %s") values.append(total_issues) if critical_count is not None: fields.append("critical_count = %s") values.append(critical_count) if error_count is not None: fields.append("error_count = %s") values.append(error_count) if warning_count is not None: fields.append("warning_count = %s") values.append(warning_count) if processing_time is not None: fields.append("processing_time = %s") values.append(processing_time) if status == 'completed': fields.append("completed_at = NOW()") values.append(job_id) cur.execute( f"UPDATE jobs SET {', '.join(fields)} WHERE job_id = %s", values ) def get_job(job_id: str) -> dict: """Get a job by ID.""" with get_conn() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute("SELECT * FROM jobs WHERE job_id = %s", (job_id,)) row = cur.fetchone() return dict(row) if row else None def list_jobs(limit: int = 50, offset: int = 0, status_filter: str = None) -> list: """List jobs with optional filtering.""" with get_conn() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: query = "SELECT job_id, filename, status, score, grade, total_issues, created_at, completed_at, processing_time FROM jobs" values = [] if status_filter: query += " WHERE status = %s" values.append(status_filter) query += " ORDER BY created_at DESC LIMIT %s OFFSET %s" values.extend([limit, offset]) cur.execute(query, values) return [dict(row) for row in cur.fetchall()] def log_audit(job_id: str, action: str, details: dict = None, ip: str = None): """Log an audit event.""" with get_conn() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO audit_log (job_id, action, details, ip_address) VALUES (%s, %s, %s, %s)""", (job_id, action, json.dumps(details or {}), ip) ) def get_stats() -> dict: """Get aggregate statistics.""" with get_conn() as conn: with conn.cursor(cursor_factory=RealDictCursor) as cur: cur.execute(""" SELECT COUNT(*) as total_jobs, COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs, COUNT(*) FILTER (WHERE status = 'failed') as failed_jobs, COUNT(*) FILTER (WHERE status = 'processing') as active_jobs, ROUND(AVG(score) FILTER (WHERE score IS NOT NULL)) as avg_score, ROUND(AVG(processing_time) FILTER (WHERE processing_time IS NOT NULL)::numeric, 2) as avg_processing_time FROM jobs """) return dict(cur.fetchone()) def dismiss_issue(job_id: str, issue_index: int, reason: str = None): """Record a dismissed/false-positive issue.""" with get_conn() as conn: with conn.cursor() as cur: cur.execute( """INSERT INTO dismissed_issues (job_id, issue_index, reason) VALUES (%s, %s, %s) ON CONFLICT (job_id, issue_index) DO UPDATE SET reason = EXCLUDED.reason, dismissed_at = NOW()""", (job_id, issue_index, reason) ) def undismiss_issue(job_id: str, issue_index: int): """Remove a dismissal record.""" with get_conn() as conn: with conn.cursor() as cur: cur.execute( "DELETE FROM dismissed_issues WHERE job_id = %s AND issue_index = %s", (job_id, issue_index) ) def get_dismissed_indices(job_id: str) -> list: """Return list of dismissed issue indices for a job.""" with get_conn() as conn: with conn.cursor() as cur: cur.execute( "SELECT issue_index FROM dismissed_issues WHERE job_id = %s ORDER BY issue_index", (job_id,) ) return [row[0] for row in cur.fetchall()]