""" Machine-side technical pre-flight inspection for uploaded assets. Runs before any LLM analysis. Extracts dimensions, format, page count, duration, codec, etc. via PIL/PyMuPDF/ffprobe. Also opportunistically parses dimension hints from the filename and compares them to the actual file. Returns a JSON-serializable dict. Never raises — errors land in `errors` so the caller can still surface partial results. """ import json import os import re import subprocess from typing import Any, Dict, Optional from PIL import Image import fitz # PyMuPDF _DIMS_RE = re.compile(r'(\d{2,5})\s*[xX×]\s*(\d{2,5})') IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.tif', '.webp'} PDF_EXTENSIONS = {'.pdf'} VIDEO_EXTENSIONS = {'.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'} MIME_BY_EXT: Dict[str, str] = { '.jpg': 'image/jpeg', '.jpeg': 'image/jpeg', '.png': 'image/png', '.gif': 'image/gif', '.bmp': 'image/bmp', '.tiff': 'image/tiff', '.tif': 'image/tiff', '.webp': 'image/webp', '.pdf': 'application/pdf', '.mp4': 'video/mp4', '.avi': 'video/x-msvideo', '.mov': 'video/quicktime', '.mkv': 'video/x-matroska', '.wmv': 'video/x-ms-wmv', '.flv': 'video/x-flv', '.webm': 'video/webm', } def parse_filename_specs(filename: str) -> Dict[str, Any]: """Extract dimension hints from a filename — pattern like '1920x1080'. Returns {} when nothing parseable is found. """ hints: Dict[str, Any] = {} base = os.path.splitext(os.path.basename(filename))[0] match = _DIMS_RE.search(base) if match: w, h = int(match.group(1)), int(match.group(2)) if 50 <= w <= 50000 and 50 <= h <= 50000: hints['width'] = w hints['height'] = h return hints def compare_filename_to_actual( hints: Dict[str, Any], actual: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """Compare filename-extracted dimensions against actual file dimensions.""" if not hints or 'width' not in hints or 'height' not in hints: return None actual_dims = actual.get('dimensions') if not actual_dims: return None fw, fh = hints['width'], hints['height'] aw, ah = actual_dims['width'], actual_dims['height'] match = (fw == aw and fh == ah) return { 'checked': True, 'match': match, 'filename_says': f'{fw}x{fh}', 'actual_is': f'{aw}x{ah}', 'detail': ( f'Filename suggests {fw}x{fh}; file is {aw}x{ah}' + (' — match' if match else ' — MISMATCH') ), } def _inspect_image(file_path: str) -> Dict[str, Any]: report: Dict[str, Any] = {'kind': 'image'} try: with Image.open(file_path) as img: report['dimensions'] = {'width': img.width, 'height': img.height} report['format'] = img.format report['mode'] = img.mode report['has_alpha'] = img.mode in ('RGBA', 'LA') or 'transparency' in img.info dpi = img.info.get('dpi') if dpi: report['dpi'] = [int(round(dpi[0])), int(round(dpi[1]))] except Exception as exc: report.setdefault('errors', []).append(f'image inspection failed: {exc}') return report def _inspect_pdf(file_path: str) -> Dict[str, Any]: report: Dict[str, Any] = {'kind': 'pdf'} try: doc = fitz.open(file_path) report['page_count'] = doc.page_count if doc.metadata and doc.metadata.get('format'): report['pdf_version'] = doc.metadata['format'].replace('PDF ', '') page_dims = [] fonts = set() has_text = False for page in doc: rect = page.rect page_dims.append({'width': round(rect.width, 1), 'height': round(rect.height, 1)}) if not has_text and page.get_text().strip(): has_text = True for font_info in page.get_fonts(full=False): if len(font_info) > 3 and font_info[3]: fonts.add(font_info[3]) report['page_dimensions'] = page_dims report['embedded_fonts'] = sorted(fonts) report['has_text'] = has_text if page_dims: report['dimensions'] = { 'width': int(round(page_dims[0]['width'])), 'height': int(round(page_dims[0]['height'])), } doc.close() except Exception as exc: report.setdefault('errors', []).append(f'pdf inspection failed: {exc}') return report def _inspect_video(file_path: str) -> Dict[str, Any]: report: Dict[str, Any] = {'kind': 'video'} try: result = subprocess.run( [ 'ffprobe', '-v', 'error', '-print_format', 'json', '-show_format', '-show_streams', file_path, ], capture_output=True, text=True, timeout=30, ) if result.returncode != 0: report.setdefault('errors', []).append( f'ffprobe error: {result.stderr.strip()[:200]}' ) return report data = json.loads(result.stdout) fmt = data.get('format', {}) if 'duration' in fmt: report['duration_seconds'] = round(float(fmt['duration']), 2) if 'bit_rate' in fmt: report['bitrate_kbps'] = int(int(fmt['bit_rate']) / 1000) v_streams = [s for s in data.get('streams', []) if s.get('codec_type') == 'video'] a_streams = [s for s in data.get('streams', []) if s.get('codec_type') == 'audio'] if v_streams: v = v_streams[0] w, h = v.get('width'), v.get('height') if w and h: report['dimensions'] = {'width': w, 'height': h} report['video_codec'] = v.get('codec_name') fps_raw = v.get('avg_frame_rate', '0/0') if '/' in fps_raw: num, den = fps_raw.split('/') try: if int(den) > 0: report['fps'] = round(int(num) / int(den), 2) except ValueError: pass report['audio_codec'] = a_streams[0].get('codec_name') if a_streams else None except FileNotFoundError: report.setdefault('errors', []).append('ffprobe not installed on this server') except subprocess.TimeoutExpired: report.setdefault('errors', []).append('ffprobe timed out after 30s') except Exception as exc: report.setdefault('errors', []).append(f'video inspection failed: {exc}') return report def inspect(file_path: str) -> Dict[str, Any]: """Inspect any uploaded asset. Never raises.""" report: Dict[str, Any] = { 'kind': 'unknown', 'mime_type': None, 'file_size_bytes': None, 'file_size_mb': None, 'errors': [], } if not os.path.exists(file_path): report['errors'].append(f'file not found: {file_path}') return report try: size_bytes = os.path.getsize(file_path) report['file_size_bytes'] = size_bytes report['file_size_mb'] = round(size_bytes / (1024 * 1024), 3) except OSError as exc: report['errors'].append(f'stat failed: {exc}') ext = os.path.splitext(file_path)[1].lower() report['mime_type'] = MIME_BY_EXT.get(ext) if ext in IMAGE_EXTENSIONS: report.update(_inspect_image(file_path)) elif ext in PDF_EXTENSIONS: report.update(_inspect_pdf(file_path)) elif ext in VIDEO_EXTENSIONS: report.update(_inspect_video(file_path)) else: report['errors'].append(f'unsupported extension: {ext}') hints = parse_filename_specs(os.path.basename(file_path)) if hints: report['filename_hints'] = hints verdict = compare_filename_to_actual(hints, report) if verdict is not None: report['filename_match'] = verdict return report def format_for_llm_prompt(report: Dict[str, Any]) -> str: """Render the technical report as a short Markdown block for LLM prompts.""" lines = ['**Technical metadata (machine-inspected, pre-LLM):**'] kind = report.get('kind', 'unknown') lines.append(f'- File kind: {kind}') size_mb = report.get('file_size_mb') if size_mb is not None: lines.append(f'- File size: {size_mb} MB') dims = report.get('dimensions') if dims: lines.append(f"- Dimensions: {dims['width']} × {dims['height']}") dpi = report.get('dpi') if dpi: lines.append(f'- DPI: {dpi[0]} × {dpi[1]}') pc = report.get('page_count') if pc is not None: lines.append(f'- Pages: {pc}') duration = report.get('duration_seconds') if duration is not None: lines.append(f'- Duration: {duration}s') codec = report.get('video_codec') if codec: lines.append(f'- Video codec: {codec}') fonts = report.get('embedded_fonts') if fonts: suffix = ' …' if len(fonts) > 8 else '' lines.append(f"- Embedded fonts: {', '.join(fonts[:8])}{suffix}") fm = report.get('filename_match') if fm: verdict = 'MATCHES filename' if fm['match'] else 'DOES NOT match filename' lines.append(f"- Filename check: {verdict} ({fm['detail']})") if report.get('errors'): lines.append(f"- Inspection notes: {'; '.join(report['errors'])}") return '\n'.join(lines)