New backend/technical_check.py extracts technical metadata from uploaded assets via PIL (images), PyMuPDF (PDFs), and ffprobe (videos) — no LLM, runs in milliseconds. Also opportunistically parses dimension hints from the filename and compares them to the actual file, returning a match/mismatch verdict. Output is a JSON-serializable dict; format_for_llm_prompt() renders it as a tight Markdown block that downstream prompts can prepend. Module never raises — inspection errors land in `errors` so partial reports still surface. Standalone for this commit. Wiring into the upload flow and UI lands in subsequent commits on this branch. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
248 lines
9.1 KiB
Python
248 lines
9.1 KiB
Python
"""
|
||
Machine-side technical pre-flight inspection for uploaded assets.
|
||
|
||
Runs before any LLM analysis. Extracts dimensions, format, page count,
|
||
duration, codec, etc. via PIL/PyMuPDF/ffprobe. Also opportunistically
|
||
parses dimension hints from the filename and compares them to the actual
|
||
file. Returns a JSON-serializable dict. Never raises — errors land in
|
||
`errors` so the caller can still surface partial results.
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import re
|
||
import subprocess
|
||
from typing import Any, Dict, Optional
|
||
|
||
from PIL import Image
|
||
import fitz # PyMuPDF
|
||
|
||
|
||
_DIMS_RE = re.compile(r'(\d{2,5})\s*[xX×]\s*(\d{2,5})')
|
||
|
||
IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'}
|
||
PDF_EXTENSIONS = {'.pdf'}
|
||
VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'}
|
||
|
||
MIME_BY_EXT: Dict[str, str] = {
|
||
'.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png',
|
||
'.gif': 'image/gif', '.bmp': 'image/bmp', '.tiff': 'image/tiff',
|
||
'.tif': 'image/tiff', '.webp': 'image/webp', '.pdf': 'application/pdf',
|
||
'.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime',
|
||
'.mkv': 'video/x-matroska', '.wmv': 'video/x-ms-wmv', '.flv': 'video/x-flv',
|
||
'.webm': 'video/webm',
|
||
}
|
||
|
||
|
||
def parse_filename_specs(filename: str) -> Dict[str, Any]:
|
||
"""Extract dimension hints from a filename — pattern like '1920x1080'.
|
||
|
||
Returns {} when nothing parseable is found.
|
||
"""
|
||
hints: Dict[str, Any] = {}
|
||
base = os.path.splitext(os.path.basename(filename))[0]
|
||
match = _DIMS_RE.search(base)
|
||
if match:
|
||
w, h = int(match.group(1)), int(match.group(2))
|
||
if 50 <= w <= 50000 and 50 <= h <= 50000:
|
||
hints['width'] = w
|
||
hints['height'] = h
|
||
return hints
|
||
|
||
|
||
def compare_filename_to_actual(
|
||
hints: Dict[str, Any], actual: Dict[str, Any]
|
||
) -> Optional[Dict[str, Any]]:
|
||
"""Compare filename-extracted dimensions against actual file dimensions."""
|
||
if not hints or 'width' not in hints or 'height' not in hints:
|
||
return None
|
||
actual_dims = actual.get('dimensions')
|
||
if not actual_dims:
|
||
return None
|
||
fw, fh = hints['width'], hints['height']
|
||
aw, ah = actual_dims['width'], actual_dims['height']
|
||
match = (fw == aw and fh == ah)
|
||
return {
|
||
'checked': True,
|
||
'match': match,
|
||
'filename_says': f'{fw}x{fh}',
|
||
'actual_is': f'{aw}x{ah}',
|
||
'detail': (
|
||
f'Filename suggests {fw}x{fh}; file is {aw}x{ah}'
|
||
+ (' — match' if match else ' — MISMATCH')
|
||
),
|
||
}
|
||
|
||
|
||
def _inspect_image(file_path: str) -> Dict[str, Any]:
|
||
report: Dict[str, Any] = {'kind': 'image'}
|
||
try:
|
||
with Image.open(file_path) as img:
|
||
report['dimensions'] = {'width': img.width, 'height': img.height}
|
||
report['format'] = img.format
|
||
report['mode'] = img.mode
|
||
report['has_alpha'] = img.mode in ('RGBA', 'LA') or 'transparency' in img.info
|
||
dpi = img.info.get('dpi')
|
||
if dpi:
|
||
report['dpi'] = [int(round(dpi[0])), int(round(dpi[1]))]
|
||
except Exception as exc:
|
||
report.setdefault('errors', []).append(f'image inspection failed: {exc}')
|
||
return report
|
||
|
||
|
||
def _inspect_pdf(file_path: str) -> Dict[str, Any]:
|
||
report: Dict[str, Any] = {'kind': 'pdf'}
|
||
try:
|
||
doc = fitz.open(file_path)
|
||
report['page_count'] = doc.page_count
|
||
if doc.metadata and doc.metadata.get('format'):
|
||
report['pdf_version'] = doc.metadata['format'].replace('PDF ', '')
|
||
page_dims = []
|
||
fonts = set()
|
||
has_text = False
|
||
for page in doc:
|
||
rect = page.rect
|
||
page_dims.append({'width': round(rect.width, 1), 'height': round(rect.height, 1)})
|
||
if not has_text and page.get_text().strip():
|
||
has_text = True
|
||
for font_info in page.get_fonts(full=False):
|
||
if len(font_info) > 3 and font_info[3]:
|
||
fonts.add(font_info[3])
|
||
report['page_dimensions'] = page_dims
|
||
report['embedded_fonts'] = sorted(fonts)
|
||
report['has_text'] = has_text
|
||
if page_dims:
|
||
report['dimensions'] = {
|
||
'width': int(round(page_dims[0]['width'])),
|
||
'height': int(round(page_dims[0]['height'])),
|
||
}
|
||
doc.close()
|
||
except Exception as exc:
|
||
report.setdefault('errors', []).append(f'pdf inspection failed: {exc}')
|
||
return report
|
||
|
||
|
||
def _inspect_video(file_path: str) -> Dict[str, Any]:
|
||
report: Dict[str, Any] = {'kind': 'video'}
|
||
try:
|
||
result = subprocess.run(
|
||
[
|
||
'ffprobe', '-v', 'error', '-print_format', 'json',
|
||
'-show_format', '-show_streams', file_path,
|
||
],
|
||
capture_output=True, text=True, timeout=30,
|
||
)
|
||
if result.returncode != 0:
|
||
report.setdefault('errors', []).append(
|
||
f'ffprobe error: {result.stderr.strip()[:200]}'
|
||
)
|
||
return report
|
||
data = json.loads(result.stdout)
|
||
fmt = data.get('format', {})
|
||
if 'duration' in fmt:
|
||
report['duration_seconds'] = round(float(fmt['duration']), 2)
|
||
if 'bit_rate' in fmt:
|
||
report['bitrate_kbps'] = int(int(fmt['bit_rate']) / 1000)
|
||
v_streams = [s for s in data.get('streams', []) if s.get('codec_type') == 'video']
|
||
a_streams = [s for s in data.get('streams', []) if s.get('codec_type') == 'audio']
|
||
if v_streams:
|
||
v = v_streams[0]
|
||
w, h = v.get('width'), v.get('height')
|
||
if w and h:
|
||
report['dimensions'] = {'width': w, 'height': h}
|
||
report['video_codec'] = v.get('codec_name')
|
||
fps_raw = v.get('avg_frame_rate', '0/0')
|
||
if '/' in fps_raw:
|
||
num, den = fps_raw.split('/')
|
||
try:
|
||
if int(den) > 0:
|
||
report['fps'] = round(int(num) / int(den), 2)
|
||
except ValueError:
|
||
pass
|
||
report['audio_codec'] = a_streams[0].get('codec_name') if a_streams else None
|
||
except FileNotFoundError:
|
||
report.setdefault('errors', []).append('ffprobe not installed on this server')
|
||
except subprocess.TimeoutExpired:
|
||
report.setdefault('errors', []).append('ffprobe timed out after 30s')
|
||
except Exception as exc:
|
||
report.setdefault('errors', []).append(f'video inspection failed: {exc}')
|
||
return report
|
||
|
||
|
||
def inspect(file_path: str) -> Dict[str, Any]:
|
||
"""Inspect any uploaded asset. Never raises."""
|
||
report: Dict[str, Any] = {
|
||
'kind': 'unknown',
|
||
'mime_type': None,
|
||
'file_size_bytes': None,
|
||
'file_size_mb': None,
|
||
'errors': [],
|
||
}
|
||
|
||
if not os.path.exists(file_path):
|
||
report['errors'].append(f'file not found: {file_path}')
|
||
return report
|
||
|
||
try:
|
||
size_bytes = os.path.getsize(file_path)
|
||
report['file_size_bytes'] = size_bytes
|
||
report['file_size_mb'] = round(size_bytes / (1024 * 1024), 3)
|
||
except OSError as exc:
|
||
report['errors'].append(f'stat failed: {exc}')
|
||
|
||
ext = os.path.splitext(file_path)[1].lower()
|
||
report['mime_type'] = MIME_BY_EXT.get(ext)
|
||
|
||
if ext in IMAGE_EXTENSIONS:
|
||
report.update(_inspect_image(file_path))
|
||
elif ext in PDF_EXTENSIONS:
|
||
report.update(_inspect_pdf(file_path))
|
||
elif ext in VIDEO_EXTENSIONS:
|
||
report.update(_inspect_video(file_path))
|
||
else:
|
||
report['errors'].append(f'unsupported extension: {ext}')
|
||
|
||
hints = parse_filename_specs(os.path.basename(file_path))
|
||
if hints:
|
||
report['filename_hints'] = hints
|
||
verdict = compare_filename_to_actual(hints, report)
|
||
if verdict is not None:
|
||
report['filename_match'] = verdict
|
||
|
||
return report
|
||
|
||
|
||
def format_for_llm_prompt(report: Dict[str, Any]) -> str:
|
||
"""Render the technical report as a short Markdown block for LLM prompts."""
|
||
lines = ['**Technical metadata (machine-inspected, pre-LLM):**']
|
||
kind = report.get('kind', 'unknown')
|
||
lines.append(f'- File kind: {kind}')
|
||
size_mb = report.get('file_size_mb')
|
||
if size_mb is not None:
|
||
lines.append(f'- File size: {size_mb} MB')
|
||
dims = report.get('dimensions')
|
||
if dims:
|
||
lines.append(f"- Dimensions: {dims['width']} × {dims['height']}")
|
||
dpi = report.get('dpi')
|
||
if dpi:
|
||
lines.append(f'- DPI: {dpi[0]} × {dpi[1]}')
|
||
pc = report.get('page_count')
|
||
if pc is not None:
|
||
lines.append(f'- Pages: {pc}')
|
||
duration = report.get('duration_seconds')
|
||
if duration is not None:
|
||
lines.append(f'- Duration: {duration}s')
|
||
codec = report.get('video_codec')
|
||
if codec:
|
||
lines.append(f'- Video codec: {codec}')
|
||
fonts = report.get('embedded_fonts')
|
||
if fonts:
|
||
suffix = ' …' if len(fonts) > 8 else ''
|
||
lines.append(f"- Embedded fonts: {', '.join(fonts[:8])}{suffix}")
|
||
fm = report.get('filename_match')
|
||
if fm:
|
||
verdict = 'MATCHES filename' if fm['match'] else 'DOES NOT match filename'
|
||
lines.append(f"- Filename check: {verdict} ({fm['detail']})")
|
||
if report.get('errors'):
|
||
lines.append(f"- Inspection notes: {'; '.join(report['errors'])}")
|
||
return '\n'.join(lines)
|