255 lines
9.3 KiB
Python
255 lines
9.3 KiB
Python
"""
|
|
Video Parse Check - Extract video metadata and technical information using FFmpeg.
|
|
|
|
This check loads the video file and extracts comprehensive metadata including:
|
|
- Video codec, resolution, frame rate, bitrate, duration
|
|
- Audio codec, sample rate, channels, bitrate
|
|
- Color space information
|
|
- Container format
|
|
- Frame thumbnails for visual reference
|
|
|
|
Stores extracted information in context for downstream checks.
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import subprocess
|
|
import shutil
|
|
from typing import Dict, Any, List
|
|
from pathlib import Path
|
|
import ffmpeg
|
|
from PIL import Image
|
|
|
|
|
|
def get_video_metadata(video_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Extract comprehensive video metadata using FFmpeg probe.
|
|
|
|
:param video_path: Path to the video file
|
|
:return: Dictionary containing all video metadata
|
|
"""
|
|
try:
|
|
probe = ffmpeg.probe(video_path)
|
|
except ffmpeg.Error as e:
|
|
raise ValueError(f"Failed to probe video file: {e.stderr.decode()}")
|
|
|
|
# Extract format information
|
|
format_info = probe.get('format', {})
|
|
|
|
# Find video and audio streams
|
|
video_stream = None
|
|
audio_stream = None
|
|
|
|
for stream in probe.get('streams', []):
|
|
if stream.get('codec_type') == 'video' and video_stream is None:
|
|
video_stream = stream
|
|
elif stream.get('codec_type') == 'audio' and audio_stream is None:
|
|
audio_stream = stream
|
|
|
|
if not video_stream:
|
|
raise ValueError("No video stream found in file")
|
|
|
|
# Parse video metadata
|
|
metadata = {
|
|
'container': format_info.get('format_name', 'unknown'),
|
|
'container_long_name': format_info.get('format_long_name', 'unknown'),
|
|
'duration': float(format_info.get('duration', 0)),
|
|
'size_bytes': int(format_info.get('size', 0)),
|
|
'bitrate': int(format_info.get('bit_rate', 0)),
|
|
'video': {
|
|
'codec': video_stream.get('codec_name', 'unknown'),
|
|
'codec_long_name': video_stream.get('codec_long_name', 'unknown'),
|
|
'width': int(video_stream.get('width', 0)),
|
|
'height': int(video_stream.get('height', 0)),
|
|
'aspect_ratio': video_stream.get('display_aspect_ratio', 'N/A'),
|
|
'pix_fmt': video_stream.get('pix_fmt', 'unknown'),
|
|
'color_space': video_stream.get('color_space', 'unknown'),
|
|
'color_range': video_stream.get('color_range', 'unknown'),
|
|
'bitrate': int(video_stream.get('bit_rate', 0)) if video_stream.get('bit_rate') else None,
|
|
},
|
|
'audio': None
|
|
}
|
|
|
|
# Parse frame rate
|
|
fps_str = video_stream.get('r_frame_rate', '0/1')
|
|
if '/' in fps_str:
|
|
num, den = map(int, fps_str.split('/'))
|
|
metadata['video']['frame_rate'] = round(num / den, 3) if den != 0 else 0
|
|
else:
|
|
metadata['video']['frame_rate'] = float(fps_str)
|
|
|
|
# Calculate frame count
|
|
nb_frames = video_stream.get('nb_frames')
|
|
if nb_frames:
|
|
metadata['video']['frame_count'] = int(nb_frames)
|
|
else:
|
|
# Estimate from duration and frame rate
|
|
metadata['video']['frame_count'] = int(metadata['duration'] * metadata['video']['frame_rate'])
|
|
|
|
# Parse audio metadata if present
|
|
if audio_stream:
|
|
metadata['audio'] = {
|
|
'codec': audio_stream.get('codec_name', 'unknown'),
|
|
'codec_long_name': audio_stream.get('codec_long_name', 'unknown'),
|
|
'sample_rate': int(audio_stream.get('sample_rate', 0)),
|
|
'channels': int(audio_stream.get('channels', 0)),
|
|
'channel_layout': audio_stream.get('channel_layout', 'unknown'),
|
|
'bitrate': int(audio_stream.get('bit_rate', 0)) if audio_stream.get('bit_rate') else None,
|
|
}
|
|
|
|
return metadata
|
|
|
|
|
|
def extract_thumbnails(video_path: str, output_dir: str, count: int = 5) -> List[str]:
|
|
"""
|
|
Extract thumbnail frames from video at uniform intervals.
|
|
|
|
:param video_path: Path to the video file
|
|
:param output_dir: Directory to save thumbnails
|
|
:param count: Number of thumbnails to extract
|
|
:return: List of paths to extracted thumbnail files
|
|
"""
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Get video duration
|
|
probe = ffmpeg.probe(video_path)
|
|
duration = float(probe['format']['duration'])
|
|
|
|
thumbnail_paths = []
|
|
|
|
# Extract frames at uniform intervals
|
|
for i in range(count):
|
|
# Calculate timestamp (avoid very start and end)
|
|
if count == 1:
|
|
timestamp = duration / 2
|
|
else:
|
|
timestamp = (duration * (i + 1)) / (count + 1)
|
|
|
|
output_path = os.path.join(output_dir, f'thumbnail_{i+1}.jpg')
|
|
|
|
try:
|
|
(
|
|
ffmpeg
|
|
.input(video_path, ss=timestamp)
|
|
.output(output_path, vframes=1, format='image2', vcodec='mjpeg')
|
|
.overwrite_output()
|
|
.run(capture_stdout=True, capture_stderr=True, quiet=True)
|
|
)
|
|
thumbnail_paths.append(output_path)
|
|
except ffmpeg.Error as e:
|
|
print(f"Warning: Failed to extract thumbnail at {timestamp}s: {e.stderr.decode()}")
|
|
|
|
return thumbnail_paths
|
|
|
|
|
|
def run_check(config: Dict[str, Any], context: Dict[str, Any], check_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Parse video file and extract all metadata.
|
|
|
|
:param config: Configuration dictionary with keys:
|
|
- input_file (str): Path to video file
|
|
- working_dir (str): Directory for temporary files
|
|
- extract_frames (int, optional): Number of thumbnails to extract (default: 5)
|
|
- frame_sampling (str, optional): Sampling method - 'uniform' (default)
|
|
:param context: Shared context dictionary
|
|
:param check_id: Unique identifier for this check
|
|
:return: Check result with status and details
|
|
"""
|
|
input_file = config.get('input_file')
|
|
working_dir = config.get('working_dir', './tmp')
|
|
extract_frame_count = config.get('extract_frames', 5)
|
|
|
|
if not input_file:
|
|
return {
|
|
"status": "error",
|
|
"error_message": "No input_file specified in config"
|
|
}
|
|
|
|
if not os.path.exists(input_file):
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Video file not found: {input_file}"
|
|
}
|
|
|
|
# Validate file extension
|
|
file_ext = os.path.splitext(input_file)[1].lower()
|
|
supported_formats = ['.mp4', '.mov', '.avi', '.mkv', '.webm', '.flv', '.wmv', '.m4v']
|
|
|
|
if file_ext not in supported_formats:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Unsupported video format: {file_ext}. Supported: {', '.join(supported_formats)}"
|
|
}
|
|
|
|
try:
|
|
# Extract metadata
|
|
metadata = get_video_metadata(input_file)
|
|
|
|
# Create working directory for this video
|
|
video_working_dir = os.path.join(working_dir, check_id)
|
|
os.makedirs(video_working_dir, exist_ok=True)
|
|
|
|
# Copy video to working directory for potential further processing
|
|
video_filename = os.path.basename(input_file)
|
|
working_video_path = os.path.join(video_working_dir, video_filename)
|
|
|
|
# Only copy if not already in working dir
|
|
if os.path.abspath(input_file) != os.path.abspath(working_video_path):
|
|
shutil.copy2(input_file, working_video_path)
|
|
|
|
# Extract thumbnails
|
|
thumbnail_dir = os.path.join(video_working_dir, 'thumbnails')
|
|
thumbnail_paths = extract_thumbnails(input_file, thumbnail_dir, extract_frame_count)
|
|
|
|
# Store in context for downstream checks
|
|
context[check_id] = {
|
|
'video_path': working_video_path,
|
|
'original_path': input_file,
|
|
'metadata': metadata,
|
|
'thumbnail_paths': thumbnail_paths,
|
|
'working_dir': video_working_dir
|
|
}
|
|
|
|
# Format metadata for display
|
|
video_info = metadata['video']
|
|
audio_info = metadata['audio']
|
|
|
|
details = {
|
|
'file_path': input_file,
|
|
'file_size_mb': round(metadata['size_bytes'] / (1024 * 1024), 2),
|
|
'container': metadata['container'],
|
|
'duration_seconds': round(metadata['duration'], 2),
|
|
'overall_bitrate_kbps': round(metadata['bitrate'] / 1000, 0) if metadata['bitrate'] > 0 else 'N/A',
|
|
'video': {
|
|
'codec': video_info['codec'],
|
|
'resolution': f"{video_info['width']}x{video_info['height']}",
|
|
'frame_rate': video_info['frame_rate'],
|
|
'frame_count': video_info['frame_count'],
|
|
'aspect_ratio': video_info['aspect_ratio'],
|
|
'color_space': video_info['color_space'],
|
|
'pixel_format': video_info['pix_fmt'],
|
|
'bitrate_kbps': round(video_info['bitrate'] / 1000, 0) if video_info['bitrate'] else 'N/A',
|
|
},
|
|
'audio': audio_info if audio_info else 'No audio stream found',
|
|
'thumbnails_extracted': len(thumbnail_paths)
|
|
}
|
|
|
|
if audio_info:
|
|
details['audio'] = {
|
|
'codec': audio_info['codec'],
|
|
'sample_rate': audio_info['sample_rate'],
|
|
'channels': audio_info['channels'],
|
|
'channel_layout': audio_info['channel_layout'],
|
|
'bitrate_kbps': round(audio_info['bitrate'] / 1000, 0) if audio_info['bitrate'] else 'N/A',
|
|
}
|
|
|
|
return {
|
|
"status": "passed",
|
|
"details": details
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
"status": "error",
|
|
"error_message": f"Failed to parse video: {str(e)}"
|
|
}
|