ai_qc/backend/technical_check.py
nickviljoen f4a95914b5 feat(tech-check): add machine-side pre-flight inspection module
New backend/technical_check.py extracts technical metadata from
uploaded assets via PIL (images), PyMuPDF (PDFs), and ffprobe (videos)
— no LLM, runs in milliseconds. Also opportunistically parses
dimension hints from the filename and compares them to the actual
file, returning a match/mismatch verdict.

Output is a JSON-serializable dict; format_for_llm_prompt() renders it
as a tight Markdown block that downstream prompts can prepend. Module
never raises — inspection errors land in `errors` so partial reports
still surface.

Standalone for this commit. Wiring into the upload flow and UI lands
in subsequent commits on this branch.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-14 21:53:06 +02:00

248 lines
9.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Machine-side technical pre-flight inspection for uploaded assets.
Runs before any LLM analysis. Extracts dimensions, format, page count,
duration, codec, etc. via PIL/PyMuPDF/ffprobe. Also opportunistically
parses dimension hints from the filename and compares them to the actual
file. Returns a JSON-serializable dict. Never raises — errors land in
`errors` so the caller can still surface partial results.
"""
import json
import os
import re
import subprocess
from typing import Any, Dict, Optional
from PIL import Image
import fitz # PyMuPDF
_DIMS_RE = re.compile(r'(\d{2,5})\s*[xX×]\s*(\d{2,5})')
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'}
PDF_EXTENSIONS = {'.pdf'}
VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'}
MIME_BY_EXT: Dict[str, str] = {
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
'.gif': 'image/gif', '.bmp': 'image/bmp', '.tiff': 'image/tiff',
'.tif': 'image/tiff', '.webp': 'image/webp', '.pdf': 'application/pdf',
'.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime',
'.mkv': 'video/x-matroska', '.wmv': 'video/x-ms-wmv', '.flv': 'video/x-flv',
'.webm': 'video/webm',
}
def parse_filename_specs(filename: str) -> Dict[str, Any]:
"""Extract dimension hints from a filename — pattern like '1920x1080'.
Returns {} when nothing parseable is found.
"""
hints: Dict[str, Any] = {}
base = os.path.splitext(os.path.basename(filename))[0]
match = _DIMS_RE.search(base)
if match:
w, h = int(match.group(1)), int(match.group(2))
if 50 <= w <= 50000 and 50 <= h <= 50000:
hints['width'] = w
hints['height'] = h
return hints
def compare_filename_to_actual(
hints: Dict[str, Any], actual: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Compare filename-extracted dimensions against actual file dimensions."""
if not hints or 'width' not in hints or 'height' not in hints:
return None
actual_dims = actual.get('dimensions')
if not actual_dims:
return None
fw, fh = hints['width'], hints['height']
aw, ah = actual_dims['width'], actual_dims['height']
match = (fw == aw and fh == ah)
return {
'checked': True,
'match': match,
'filename_says': f'{fw}x{fh}',
'actual_is': f'{aw}x{ah}',
'detail': (
f'Filename suggests {fw}x{fh}; file is {aw}x{ah}'
+ (' — match' if match else ' — MISMATCH')
),
}
def _inspect_image(file_path: str) -> Dict[str, Any]:
report: Dict[str, Any] = {'kind': 'image'}
try:
with Image.open(file_path) as img:
report['dimensions'] = {'width': img.width, 'height': img.height}
report['format'] = img.format
report['mode'] = img.mode
report['has_alpha'] = img.mode in ('RGBA', 'LA') or 'transparency' in img.info
dpi = img.info.get('dpi')
if dpi:
report['dpi'] = [int(round(dpi[0])), int(round(dpi[1]))]
except Exception as exc:
report.setdefault('errors', []).append(f'image inspection failed: {exc}')
return report
def _inspect_pdf(file_path: str) -> Dict[str, Any]:
report: Dict[str, Any] = {'kind': 'pdf'}
try:
doc = fitz.open(file_path)
report['page_count'] = doc.page_count
if doc.metadata and doc.metadata.get('format'):
report['pdf_version'] = doc.metadata['format'].replace('PDF ', '')
page_dims = []
fonts = set()
has_text = False
for page in doc:
rect = page.rect
page_dims.append({'width': round(rect.width, 1), 'height': round(rect.height, 1)})
if not has_text and page.get_text().strip():
has_text = True
for font_info in page.get_fonts(full=False):
if len(font_info) > 3 and font_info[3]:
fonts.add(font_info[3])
report['page_dimensions'] = page_dims
report['embedded_fonts'] = sorted(fonts)
report['has_text'] = has_text
if page_dims:
report['dimensions'] = {
'width': int(round(page_dims[0]['width'])),
'height': int(round(page_dims[0]['height'])),
}
doc.close()
except Exception as exc:
report.setdefault('errors', []).append(f'pdf inspection failed: {exc}')
return report
def _inspect_video(file_path: str) -> Dict[str, Any]:
report: Dict[str, Any] = {'kind': 'video'}
try:
result = subprocess.run(
[
'ffprobe', '-v', 'error', '-print_format', 'json',
'-show_format', '-show_streams', file_path,
],
capture_output=True, text=True, timeout=30,
)
if result.returncode != 0:
report.setdefault('errors', []).append(
f'ffprobe error: {result.stderr.strip()[:200]}'
)
return report
data = json.loads(result.stdout)
fmt = data.get('format', {})
if 'duration' in fmt:
report['duration_seconds'] = round(float(fmt['duration']), 2)
if 'bit_rate' in fmt:
report['bitrate_kbps'] = int(int(fmt['bit_rate']) / 1000)
v_streams = [s for s in data.get('streams', []) if s.get('codec_type') == 'video']
a_streams = [s for s in data.get('streams', []) if s.get('codec_type') == 'audio']
if v_streams:
v = v_streams[0]
w, h = v.get('width'), v.get('height')
if w and h:
report['dimensions'] = {'width': w, 'height': h}
report['video_codec'] = v.get('codec_name')
fps_raw = v.get('avg_frame_rate', '0/0')
if '/' in fps_raw:
num, den = fps_raw.split('/')
try:
if int(den) > 0:
report['fps'] = round(int(num) / int(den), 2)
except ValueError:
pass
report['audio_codec'] = a_streams[0].get('codec_name') if a_streams else None
except FileNotFoundError:
report.setdefault('errors', []).append('ffprobe not installed on this server')
except subprocess.TimeoutExpired:
report.setdefault('errors', []).append('ffprobe timed out after 30s')
except Exception as exc:
report.setdefault('errors', []).append(f'video inspection failed: {exc}')
return report
def inspect(file_path: str) -> Dict[str, Any]:
"""Inspect any uploaded asset. Never raises."""
report: Dict[str, Any] = {
'kind': 'unknown',
'mime_type': None,
'file_size_bytes': None,
'file_size_mb': None,
'errors': [],
}
if not os.path.exists(file_path):
report['errors'].append(f'file not found: {file_path}')
return report
try:
size_bytes = os.path.getsize(file_path)
report['file_size_bytes'] = size_bytes
report['file_size_mb'] = round(size_bytes / (1024 * 1024), 3)
except OSError as exc:
report['errors'].append(f'stat failed: {exc}')
ext = os.path.splitext(file_path)[1].lower()
report['mime_type'] = MIME_BY_EXT.get(ext)
if ext in IMAGE_EXTENSIONS:
report.update(_inspect_image(file_path))
elif ext in PDF_EXTENSIONS:
report.update(_inspect_pdf(file_path))
elif ext in VIDEO_EXTENSIONS:
report.update(_inspect_video(file_path))
else:
report['errors'].append(f'unsupported extension: {ext}')
hints = parse_filename_specs(os.path.basename(file_path))
if hints:
report['filename_hints'] = hints
verdict = compare_filename_to_actual(hints, report)
if verdict is not None:
report['filename_match'] = verdict
return report
def format_for_llm_prompt(report: Dict[str, Any]) -> str:
"""Render the technical report as a short Markdown block for LLM prompts."""
lines = ['**Technical metadata (machine-inspected, pre-LLM):**']
kind = report.get('kind', 'unknown')
lines.append(f'- File kind: {kind}')
size_mb = report.get('file_size_mb')
if size_mb is not None:
lines.append(f'- File size: {size_mb} MB')
dims = report.get('dimensions')
if dims:
lines.append(f"- Dimensions: {dims['width']} × {dims['height']}")
dpi = report.get('dpi')
if dpi:
lines.append(f'- DPI: {dpi[0]} × {dpi[1]}')
pc = report.get('page_count')
if pc is not None:
lines.append(f'- Pages: {pc}')
duration = report.get('duration_seconds')
if duration is not None:
lines.append(f'- Duration: {duration}s')
codec = report.get('video_codec')
if codec:
lines.append(f'- Video codec: {codec}')
fonts = report.get('embedded_fonts')
if fonts:
suffix = '' if len(fonts) > 8 else ''
lines.append(f"- Embedded fonts: {', '.join(fonts[:8])}{suffix}")
fm = report.get('filename_match')
if fm:
verdict = 'MATCHES filename' if fm['match'] else 'DOES NOT match filename'
lines.append(f"- Filename check: {verdict} ({fm['detail']})")
if report.get('errors'):
lines.append(f"- Inspection notes: {'; '.join(report['errors'])}")
return '\n'.join(lines)