pdf-accessibility/db_manager.py
Vadym Samoilenko ac8aedf4a3 Implement QA report fixes: scoring, Matterhorn, dismiss, PDF report, UX
Part 1 — CSS/Contrast/Accessibility:
- Raise --text-muted contrast to WCAG AA (#696969 light, #9a9a9a dark)
- Add body font-size: 16px baseline
- Enlarge #themeToggle to 15px / 10px 20px padding

Part 2 — Start Button (user-controlled analysis):
- Upload no longer auto-starts check; shows ready state with filename/size
- New showReadyState() / removeFile() functions in upload.js
- beginCheck() now shows progress + hides ready state on click
- Add prominent "Check Another PDF" button at bottom of results

Part 3 — Scoring recalibration:
- Replace deduction formula with check-pass ratio + soft penalty (cap 20)
- Fix run_check() to only examine issues added by the current check
- Add score_breakdown (per-check table) to JSON output + results UI
- Downgrade readability ERROR → WARNING (advisory, not hard failure)

Part 4 — Auto-fix debugging:
- Remediation failure now returns up to 2000 chars of log (was 500)
- pdf_remediation.py: stderr output, sys.exit(0/1), output dir creation

Part 5 — Error location: View on Page button on each issue card

Part 6 — Matterhorn Protocol PDF/UA-1:
- _build_matterhorn_summary() maps 19 checks → 31 checkpoints
- Matterhorn card in index.html with grouped PASS/FAIL/Not-tested table
- Correct M/H badges per checkpoint

Part 7 — Dismiss / False Positive:
- dismissed_issues table in db/init.sql + dismiss/undismiss in db_manager.py
- api.php: dismiss/undismiss endpoints (file-backed), dismissed_indices
  injected into both handleStatus and handleResult responses
- results.js: dismissIssue/undismissIssue with visual strikethrough
- CSS: .dismissed, .btn-dismiss, .btn-undismiss styles

Part 8 — PDF Report (WeasyPrint):
- generate_pdf() in report_generator.py: PAC-style A4, Oliver branding
- api.php handleExport() supports format=pdf
- index.html: "PDF Report" download button in results header
- requirements.txt: weasyprint>=60.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-12 18:06:32 +00:00

180 lines
6.5 KiB
Python

"""
PostgreSQL Database Manager — CRUD for jobs and audit logging
"""
import json
import os
import hashlib
import time
import psycopg2
from psycopg2.extras import RealDictCursor
from contextlib import contextmanager
DB_HOST = os.getenv('DB_HOST', 'localhost')
DB_PORT = int(os.getenv('DB_PORT', 5432))
DB_NAME = os.getenv('DB_NAME', 'pdf_checker')
DB_USER = os.getenv('DB_USER', 'pdf_checker')
DB_PASSWORD = os.getenv('DB_PASSWORD', 'dev_password')
@contextmanager
def get_conn():
"""Get a database connection (context manager)."""
conn = psycopg2.connect(
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME,
user=DB_USER,
password=DB_PASSWORD
)
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def create_job(job_id: str, filename: str, ip: str = None, api_key: str = None):
"""Create a new job record."""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:16] if api_key else None
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO jobs (job_id, filename, status, api_key_hash, ip_address)
VALUES (%s, %s, 'queued', %s, %s)""",
(job_id, filename, key_hash, ip)
)
def update_job_status(job_id: str, status: str, result_json: dict = None,
score: int = None, grade: str = None,
total_issues: int = None, critical_count: int = None,
error_count: int = None, warning_count: int = None,
processing_time: float = None):
"""Update job status and optionally store results."""
with get_conn() as conn:
with conn.cursor() as cur:
fields = ["status = %s"]
values = [status]
if result_json is not None:
fields.append("result_json = %s")
values.append(json.dumps(result_json))
if score is not None:
fields.append("score = %s")
values.append(score)
if grade is not None:
fields.append("grade = %s")
values.append(grade)
if total_issues is not None:
fields.append("total_issues = %s")
values.append(total_issues)
if critical_count is not None:
fields.append("critical_count = %s")
values.append(critical_count)
if error_count is not None:
fields.append("error_count = %s")
values.append(error_count)
if warning_count is not None:
fields.append("warning_count = %s")
values.append(warning_count)
if processing_time is not None:
fields.append("processing_time = %s")
values.append(processing_time)
if status == 'completed':
fields.append("completed_at = NOW()")
values.append(job_id)
cur.execute(
f"UPDATE jobs SET {', '.join(fields)} WHERE job_id = %s",
values
)
def get_job(job_id: str) -> dict:
"""Get a job by ID."""
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("SELECT * FROM jobs WHERE job_id = %s", (job_id,))
row = cur.fetchone()
return dict(row) if row else None
def list_jobs(limit: int = 50, offset: int = 0, status_filter: str = None) -> list:
"""List jobs with optional filtering."""
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
query = "SELECT job_id, filename, status, score, grade, total_issues, created_at, completed_at, processing_time FROM jobs"
values = []
if status_filter:
query += " WHERE status = %s"
values.append(status_filter)
query += " ORDER BY created_at DESC LIMIT %s OFFSET %s"
values.extend([limit, offset])
cur.execute(query, values)
return [dict(row) for row in cur.fetchall()]
def log_audit(job_id: str, action: str, details: dict = None, ip: str = None):
"""Log an audit event."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO audit_log (job_id, action, details, ip_address)
VALUES (%s, %s, %s, %s)""",
(job_id, action, json.dumps(details or {}), ip)
)
def get_stats() -> dict:
"""Get aggregate statistics."""
with get_conn() as conn:
with conn.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT
COUNT(*) as total_jobs,
COUNT(*) FILTER (WHERE status = 'completed') as completed_jobs,
COUNT(*) FILTER (WHERE status = 'failed') as failed_jobs,
COUNT(*) FILTER (WHERE status = 'processing') as active_jobs,
ROUND(AVG(score) FILTER (WHERE score IS NOT NULL)) as avg_score,
ROUND(AVG(processing_time) FILTER (WHERE processing_time IS NOT NULL)::numeric, 2) as avg_processing_time
FROM jobs
""")
return dict(cur.fetchone())
def dismiss_issue(job_id: str, issue_index: int, reason: str = None):
"""Record a dismissed/false-positive issue."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"""INSERT INTO dismissed_issues (job_id, issue_index, reason)
VALUES (%s, %s, %s)
ON CONFLICT (job_id, issue_index) DO UPDATE
SET reason = EXCLUDED.reason, dismissed_at = NOW()""",
(job_id, issue_index, reason)
)
def undismiss_issue(job_id: str, issue_index: int):
"""Remove a dismissal record."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"DELETE FROM dismissed_issues WHERE job_id = %s AND issue_index = %s",
(job_id, issue_index)
)
def get_dismissed_indices(job_id: str) -> list:
"""Return list of dismissed issue indices for a job."""
with get_conn() as conn:
with conn.cursor() as cur:
cur.execute(
"SELECT issue_index FROM dismissed_issues WHERE job_id = %s ORDER BY issue_index",
(job_id,)
)
return [row[0] for row in cur.fetchall()]