hm_video_ai_qc_tool/checks/video_parse.py
2025-12-31 12:59:50 +02:00

255 lines
9.3 KiB
Python

"""
Video Parse Check - Extract video metadata and technical information using FFmpeg.
This check loads the video file and extracts comprehensive metadata including:
- Video codec, resolution, frame rate, bitrate, duration
- Audio codec, sample rate, channels, bitrate
- Color space information
- Container format
- Frame thumbnails for visual reference
Stores extracted information in context for downstream checks.
"""
import os
import json
import subprocess
import shutil
from typing import Dict, Any, List
from pathlib import Path
import ffmpeg
from PIL import Image
def get_video_metadata(video_path: str) -> Dict[str, Any]:
"""
Extract comprehensive video metadata using FFmpeg probe.
:param video_path: Path to the video file
:return: Dictionary containing all video metadata
"""
try:
probe = ffmpeg.probe(video_path)
except ffmpeg.Error as e:
raise ValueError(f"Failed to probe video file: {e.stderr.decode()}")
# Extract format information
format_info = probe.get('format', {})
# Find video and audio streams
video_stream = None
audio_stream = None
for stream in probe.get('streams', []):
if stream.get('codec_type') == 'video' and video_stream is None:
video_stream = stream
elif stream.get('codec_type') == 'audio' and audio_stream is None:
audio_stream = stream
if not video_stream:
raise ValueError("No video stream found in file")
# Parse video metadata
metadata = {
'container': format_info.get('format_name', 'unknown'),
'container_long_name': format_info.get('format_long_name', 'unknown'),
'duration': float(format_info.get('duration', 0)),
'size_bytes': int(format_info.get('size', 0)),
'bitrate': int(format_info.get('bit_rate', 0)),
'video': {
'codec': video_stream.get('codec_name', 'unknown'),
'codec_long_name': video_stream.get('codec_long_name', 'unknown'),
'width': int(video_stream.get('width', 0)),
'height': int(video_stream.get('height', 0)),
'aspect_ratio': video_stream.get('display_aspect_ratio', 'N/A'),
'pix_fmt': video_stream.get('pix_fmt', 'unknown'),
'color_space': video_stream.get('color_space', 'unknown'),
'color_range': video_stream.get('color_range', 'unknown'),
'bitrate': int(video_stream.get('bit_rate', 0)) if video_stream.get('bit_rate') else None,
},
'audio': None
}
# Parse frame rate
fps_str = video_stream.get('r_frame_rate', '0/1')
if '/' in fps_str:
num, den = map(int, fps_str.split('/'))
metadata['video']['frame_rate'] = round(num / den, 3) if den != 0 else 0
else:
metadata['video']['frame_rate'] = float(fps_str)
# Calculate frame count
nb_frames = video_stream.get('nb_frames')
if nb_frames:
metadata['video']['frame_count'] = int(nb_frames)
else:
# Estimate from duration and frame rate
metadata['video']['frame_count'] = int(metadata['duration'] * metadata['video']['frame_rate'])
# Parse audio metadata if present
if audio_stream:
metadata['audio'] = {
'codec': audio_stream.get('codec_name', 'unknown'),
'codec_long_name': audio_stream.get('codec_long_name', 'unknown'),
'sample_rate': int(audio_stream.get('sample_rate', 0)),
'channels': int(audio_stream.get('channels', 0)),
'channel_layout': audio_stream.get('channel_layout', 'unknown'),
'bitrate': int(audio_stream.get('bit_rate', 0)) if audio_stream.get('bit_rate') else None,
}
return metadata
def extract_thumbnails(video_path: str, output_dir: str, count: int = 5) -> List[str]:
"""
Extract thumbnail frames from video at uniform intervals.
:param video_path: Path to the video file
:param output_dir: Directory to save thumbnails
:param count: Number of thumbnails to extract
:return: List of paths to extracted thumbnail files
"""
os.makedirs(output_dir, exist_ok=True)
# Get video duration
probe = ffmpeg.probe(video_path)
duration = float(probe['format']['duration'])
thumbnail_paths = []
# Extract frames at uniform intervals
for i in range(count):
# Calculate timestamp (avoid very start and end)
if count == 1:
timestamp = duration / 2
else:
timestamp = (duration * (i + 1)) / (count + 1)
output_path = os.path.join(output_dir, f'thumbnail_{i+1}.jpg')
try:
(
ffmpeg
.input(video_path, ss=timestamp)
.output(output_path, vframes=1, format='image2', vcodec='mjpeg')
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True, quiet=True)
)
thumbnail_paths.append(output_path)
except ffmpeg.Error as e:
print(f"Warning: Failed to extract thumbnail at {timestamp}s: {e.stderr.decode()}")
return thumbnail_paths
def run_check(config: Dict[str, Any], context: Dict[str, Any], check_id: str) -> Dict[str, Any]:
"""
Parse video file and extract all metadata.
:param config: Configuration dictionary with keys:
- input_file (str): Path to video file
- working_dir (str): Directory for temporary files
- extract_frames (int, optional): Number of thumbnails to extract (default: 5)
- frame_sampling (str, optional): Sampling method - 'uniform' (default)
:param context: Shared context dictionary
:param check_id: Unique identifier for this check
:return: Check result with status and details
"""
input_file = config.get('input_file')
working_dir = config.get('working_dir', './tmp')
extract_frame_count = config.get('extract_frames', 5)
if not input_file:
return {
"status": "error",
"error_message": "No input_file specified in config"
}
if not os.path.exists(input_file):
return {
"status": "error",
"error_message": f"Video file not found: {input_file}"
}
# Validate file extension
file_ext = os.path.splitext(input_file)[1].lower()
supported_formats = ['.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v']
if file_ext not in supported_formats:
return {
"status": "error",
"error_message": f"Unsupported video format: {file_ext}. Supported: {', '.join(supported_formats)}"
}
try:
# Extract metadata
metadata = get_video_metadata(input_file)
# Create working directory for this video
video_working_dir = os.path.join(working_dir, check_id)
os.makedirs(video_working_dir, exist_ok=True)
# Copy video to working directory for potential further processing
video_filename = os.path.basename(input_file)
working_video_path = os.path.join(video_working_dir, video_filename)
# Only copy if not already in working dir
if os.path.abspath(input_file) != os.path.abspath(working_video_path):
shutil.copy2(input_file, working_video_path)
# Extract thumbnails
thumbnail_dir = os.path.join(video_working_dir, 'thumbnails')
thumbnail_paths = extract_thumbnails(input_file, thumbnail_dir, extract_frame_count)
# Store in context for downstream checks
context[check_id] = {
'video_path': working_video_path,
'original_path': input_file,
'metadata': metadata,
'thumbnail_paths': thumbnail_paths,
'working_dir': video_working_dir
}
# Format metadata for display
video_info = metadata['video']
audio_info = metadata['audio']
details = {
'file_path': input_file,
'file_size_mb': round(metadata['size_bytes'] / (1024 * 1024), 2),
'container': metadata['container'],
'duration_seconds': round(metadata['duration'], 2),
'overall_bitrate_kbps': round(metadata['bitrate'] / 1000, 0) if metadata['bitrate'] > 0 else 'N/A',
'video': {
'codec': video_info['codec'],
'resolution': f"{video_info['width']}x{video_info['height']}",
'frame_rate': video_info['frame_rate'],
'frame_count': video_info['frame_count'],
'aspect_ratio': video_info['aspect_ratio'],
'color_space': video_info['color_space'],
'pixel_format': video_info['pix_fmt'],
'bitrate_kbps': round(video_info['bitrate'] / 1000, 0) if video_info['bitrate'] else 'N/A',
},
'audio': audio_info if audio_info else 'No audio stream found',
'thumbnails_extracted': len(thumbnail_paths)
}
if audio_info:
details['audio'] = {
'codec': audio_info['codec'],
'sample_rate': audio_info['sample_rate'],
'channels': audio_info['channels'],
'channel_layout': audio_info['channel_layout'],
'bitrate_kbps': round(audio_info['bitrate'] / 1000, 0) if audio_info['bitrate'] else 'N/A',
}
return {
"status": "passed",
"details": details
}
except Exception as e:
return {
"status": "error",
"error_message": f"Failed to parse video: {str(e)}"
}