video-query/backend/video_splitter.py
2025-11-27 02:48:15 +05:30

619 lines
25 KiB
Python

"""
Video Splitter Module
This module provides functionality to detect video duration and split long videos
into smaller chunks for processing with APIs that have duration limitations.
Key Features:
- Duration-based splitting (respects Gemini API 53-55 min limit)
- Size-based splitting (respects 1GB base64-encoded API limit)
- Variable Bitrate (VBR) handling with safety margins
- Automatic chunk validation and re-splitting for oversized chunks
"""
import ffmpeg
import os
import tempfile
import logging
import math
from typing import List, Tuple, Optional, Dict, Any
from system_utils import system_utils
from error_reporter import ErrorReporter, ErrorCategory
logger = logging.getLogger('video_query')
class VideoSplitter:
"""
Handles video duration detection and splitting operations.
Supports robust splitting with:
- Duration limits (Gemini API ~53-55 min)
- Size limits (1GB base64-encoded API request)
- VBR (Variable Bitrate) safety margins
- Automatic chunk validation and re-splitting
"""
# Duration constraint
# Default chunk duration in minutes (53 min to stay under 55 min Gemini API limit for videos with audio)
# Google Gemini 2.5 Pro limits: ~55 min with audio, ~60 min without audio
DEFAULT_CHUNK_DURATION = 53
# Size constraints (for base64 inline encoding approach)
# API limit for base64-encoded requests: 1GB (1000MB)
API_LIMIT_AFTER_ENCODING_MB = 1000
BASE64_OVERHEAD = 1.37 # Base64 encoding increases size by ~37%
# VBR (Variable Bitrate) safety margin
# Videos with VBR can have sections with significantly higher bitrate than average
# 30% margin ensures chunks stay within limits even with bitrate spikes
VBR_SAFETY_MARGIN = 1.30
# Calculate safe target chunk size
# Formula: API_LIMIT / BASE64_OVERHEAD / VBR_MARGIN
# 1000MB / 1.37 / 1.30 ≈ 560MB
SAFE_CHUNK_SIZE_MB = API_LIMIT_AFTER_ENCODING_MB / BASE64_OVERHEAD / VBR_SAFETY_MARGIN
# Hard limit (absolute maximum raw chunk size before encoding)
# Formula: API_LIMIT / BASE64_OVERHEAD
# 1000MB / 1.37 ≈ 730MB
HARD_LIMIT_MB = API_LIMIT_AFTER_ENCODING_MB / BASE64_OVERHEAD
# Minimum chunk duration to avoid creating too many tiny chunks
MIN_CHUNK_DURATION_MIN = 3
def __init__(self, chunk_duration_minutes: int = None):
"""
Initialize VideoSplitter with specified chunk duration.
Args:
chunk_duration_minutes: Duration of each chunk in minutes
(default: from CHUNK_DURATION_MINUTES env var or 53)
"""
if chunk_duration_minutes is None:
# Load from environment variable or use default
chunk_duration_minutes = int(os.getenv("CHUNK_DURATION_MINUTES", self.DEFAULT_CHUNK_DURATION))
self.chunk_duration_minutes = chunk_duration_minutes
self.chunk_duration_seconds = chunk_duration_minutes * 60
# Load configurable parameters from environment
self.vbr_safety_margin = float(os.getenv("VBR_SAFETY_MARGIN", self.VBR_SAFETY_MARGIN))
self.api_limit_mb = int(os.getenv("BASE64_API_LIMIT_MB", self.API_LIMIT_AFTER_ENCODING_MB))
# Recalculate safe chunk size if custom values provided
self.safe_chunk_size_mb = self.api_limit_mb / self.BASE64_OVERHEAD / self.vbr_safety_margin
self.hard_limit_mb = self.api_limit_mb / self.BASE64_OVERHEAD
logger.info(f"VideoSplitter initialized:")
logger.info(f" - Max chunk duration: {chunk_duration_minutes} minutes")
logger.info(f" - Safe chunk size: {self.safe_chunk_size_mb:.0f}MB (with {(self.vbr_safety_margin-1)*100:.0f}% VBR margin)")
logger.info(f" - Hard chunk limit: {self.hard_limit_mb:.0f}MB (API: {self.api_limit_mb}MB after encoding)")
def get_video_duration(self, video_path: str) -> Optional[float]:
"""
Get the duration of a video file in seconds.
Args:
video_path: Path to the video file
Returns:
Duration in seconds, or None if unable to determine
"""
try:
logger.info(f"Detecting duration for video: {video_path}")
# Use cross-platform ffprobe detection
ffprobe_path = system_utils.find_ffprobe()
probe = ffmpeg.probe(video_path, cmd=ffprobe_path)
# Get duration from video stream
video_info = next(
(stream for stream in probe['streams'] if stream['codec_type'] == 'video'),
None
)
if video_info and 'duration' in video_info:
duration = float(video_info['duration'])
elif 'format' in probe and 'duration' in probe['format']:
duration = float(probe['format']['duration'])
else:
logger.error("Could not find duration in video metadata")
return None
logger.info(f"Video duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
return duration
except ffmpeg.Error as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={'video_path': video_path, 'operation': 'detect_duration'}
)
logger.error(f"FFmpeg error while detecting duration: {e.stderr.decode() if e.stderr else str(e)}")
return None
except FileNotFoundError as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.SYSTEM_ERROR,
context={'video_path': video_path, 'operation': 'detect_duration'}
)
logger.error(f"ffprobe not found: {str(e)}")
return None
except Exception as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={'video_path': video_path, 'operation': 'detect_duration'}
)
logger.error(f"Error detecting video duration: {str(e)}")
return None
def needs_splitting(self, video_path: str, max_chunk_size_mb: float = None) -> bool:
"""
Check if a video needs to be split based on duration OR file size.
Uses robust calculation considering:
- Duration limit (default 53 minutes)
- Size limit with VBR safety margin (default ~560MB)
- Base64 encoding overhead (1.37x)
Args:
video_path: Path to the video file
max_chunk_size_mb: Maximum chunk size in MB (default: use calculated safe size)
Returns:
True if video needs splitting based on duration or size, False otherwise
"""
if max_chunk_size_mb is None:
max_chunk_size_mb = self.safe_chunk_size_mb
duration = self.get_video_duration(video_path)
if duration is None:
logger.warning("Could not determine video duration for splitting check")
return False
# Check duration
needs_split_duration = duration > self.chunk_duration_seconds
# Check file size
file_size_bytes = os.path.getsize(video_path)
file_size_mb = file_size_bytes / (1024 * 1024)
needs_split_size = file_size_mb > max_chunk_size_mb
needs_split = needs_split_duration or needs_split_size
if needs_split:
reasons = []
if needs_split_duration:
reasons.append(f"duration {duration/60:.2f} min > {self.chunk_duration_minutes} min")
if needs_split_size:
reasons.append(f"file size {file_size_mb:.1f}MB > {max_chunk_size_mb:.0f}MB")
logger.info(f"Video needs splitting: {' AND '.join(reasons)}")
else:
logger.info(f"Video does not need splitting: duration {duration/60:.2f} min <= {self.chunk_duration_minutes} min, size {file_size_mb:.1f}MB <= {max_chunk_size_mb:.0f}MB")
return needs_split
def calculate_optimal_chunks(self, video_path: str) -> Dict[str, Any]:
"""
Calculate optimal number of chunks and chunk duration considering:
1. File size with VBR safety margin
2. Duration limits
3. Base64 encoding overhead
This is the core robust algorithm that prevents oversized chunks.
Args:
video_path: Path to the video file
Returns:
Dictionary with splitting strategy:
- needs_split: bool
- num_chunks: int
- chunk_duration_seconds: float
- estimated_chunk_size_mb: float
- estimated_after_encoding_mb: float
- split_reason: str
- bitrate_mb_per_min: float
"""
# Get video properties
duration_seconds = self.get_video_duration(video_path)
if duration_seconds is None:
raise ValueError("Cannot determine video duration")
file_size_bytes = os.path.getsize(video_path)
file_size_mb = file_size_bytes / (1024 * 1024)
duration_minutes = duration_seconds / 60
# Calculate bitrate
bitrate_mb_per_min = file_size_mb / duration_minutes
logger.info(
f"Video properties: {duration_minutes:.1f} min, {file_size_mb:.1f} MB, "
f"{bitrate_mb_per_min:.2f} MB/min bitrate"
)
# Calculate chunks needed by SIZE (with VBR safety margin built-in)
chunks_by_size = math.ceil(file_size_mb / self.safe_chunk_size_mb)
# Calculate chunks needed by DURATION
chunks_by_duration = math.ceil(duration_minutes / self.chunk_duration_minutes)
# Use the MORE restrictive constraint (more chunks = safer)
num_chunks = max(chunks_by_size, chunks_by_duration, 1)
# Calculate actual chunk duration
chunk_duration_seconds = duration_seconds / num_chunks
chunk_duration_minutes = chunk_duration_seconds / 60
# Check if chunk duration is too small
if chunk_duration_minutes < self.MIN_CHUNK_DURATION_MIN and num_chunks > 1:
logger.warning(
f"Calculated chunk duration {chunk_duration_minutes:.1f} min is below "
f"minimum {self.MIN_CHUNK_DURATION_MIN} min. Video has extremely high bitrate."
)
# Estimate chunk properties
estimated_chunk_size_mb = file_size_mb / num_chunks
estimated_after_encoding_mb = estimated_chunk_size_mb * self.BASE64_OVERHEAD
# Determine split reasons
split_reasons = []
if chunks_by_size > 1:
split_reasons.append(
f"size ({file_size_mb:.1f}MB requires {chunks_by_size} chunks "
f"with {self.safe_chunk_size_mb:.0f}MB safe target)"
)
if chunks_by_duration > 1:
split_reasons.append(
f"duration ({duration_minutes:.1f}min requires {chunks_by_duration} chunks "
f"with {self.chunk_duration_minutes}min limit)"
)
split_reason = " AND ".join(split_reasons) if split_reasons else "Within limits, no split needed"
needs_split = num_chunks > 1
# Log decision details
logger.info(f"Split calculation:")
logger.info(f" - Chunks by size: {chunks_by_size}")
logger.info(f" - Chunks by duration: {chunks_by_duration}")
logger.info(f" - Final chunks: {num_chunks}")
logger.info(f" - Chunk duration: {chunk_duration_minutes:.1f} min")
logger.info(
f" - Est. chunk size: {estimated_chunk_size_mb:.1f}MB raw, "
f"{estimated_after_encoding_mb:.1f}MB encoded"
)
logger.info(f" - Decision: {split_reason}")
return {
'needs_split': needs_split,
'num_chunks': num_chunks,
'chunk_duration_seconds': chunk_duration_seconds,
'estimated_chunk_size_mb': estimated_chunk_size_mb,
'estimated_after_encoding_mb': estimated_after_encoding_mb,
'split_reason': split_reason,
'bitrate_mb_per_min': bitrate_mb_per_min,
'constraints': {
'by_size': chunks_by_size,
'by_duration': chunks_by_duration
}
}
def calculate_optimal_chunk_duration(self, video_path: str, max_chunk_size_mb: float = None) -> int:
"""
Legacy method for backward compatibility.
Calls calculate_optimal_chunks() and returns just the duration.
Args:
video_path: Path to the video file
max_chunk_size_mb: Ignored (kept for compatibility)
Returns:
Optimal chunk duration in seconds
"""
try:
split_info = self.calculate_optimal_chunks(video_path)
return int(split_info['chunk_duration_seconds'])
except Exception as e:
logger.warning(f"Could not calculate optimal chunks: {e}, using default duration")
return self.chunk_duration_seconds
def _split_by_duration(self, video_path: str, chunk_duration_seconds: float,
output_dir: str) -> List[str]:
"""
Perform the actual ffmpeg splitting by duration.
Internal helper method extracted from split_video for reusability.
Args:
video_path: Path to video file
chunk_duration_seconds: Duration of each chunk in seconds
output_dir: Output directory for chunks
Returns:
List of chunk file paths
"""
duration = self.get_video_duration(video_path)
num_chunks = math.ceil(duration / chunk_duration_seconds)
chunk_paths = []
video_basename = os.path.splitext(os.path.basename(video_path))[0]
video_extension = os.path.splitext(video_path)[1]
for i in range(num_chunks):
start_time = i * chunk_duration_seconds
chunk_output = os.path.join(
output_dir,
f"{video_basename}_chunk_{i+1:02d}{video_extension}"
)
logger.info(f"Creating chunk {i+1}/{num_chunks}: start={start_time:.1f}s")
try:
stream = ffmpeg.input(video_path, ss=start_time, t=chunk_duration_seconds)
stream = ffmpeg.output(stream, chunk_output, c='copy', map='0',
avoid_negative_ts='make_zero')
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
chunk_paths.append(chunk_output)
chunk_size_mb = os.path.getsize(chunk_output) / (1024 * 1024)
logger.info(f"Created chunk {i+1}/{num_chunks}: {chunk_size_mb:.1f}MB")
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={
'video_path': video_path,
'chunk_number': i+1,
'total_chunks': num_chunks,
'operation': 'split_by_duration'
}
)
logger.error(f"FFmpeg error creating chunk {i+1}: {error_msg}")
self.cleanup_chunks(chunk_paths)
raise RuntimeError(f"Failed to create video chunk {i+1}: {error_msg}")
return chunk_paths
def _re_split_chunk(self, chunk_path: str, target_size_mb: float) -> List[str]:
"""
Re-split a single oversized chunk into smaller sub-chunks.
Args:
chunk_path: Path to oversized chunk
target_size_mb: Target size for sub-chunks
Returns:
List of sub-chunk paths
"""
duration = self.get_video_duration(chunk_path)
file_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)
# Calculate how many sub-chunks needed
num_sub_chunks = math.ceil(file_size_mb / target_size_mb)
sub_chunk_duration = duration / num_sub_chunks
logger.info(
f"Re-splitting oversized chunk: {file_size_mb:.1f}MB into {num_sub_chunks} sub-chunks "
f"of ~{sub_chunk_duration/60:.1f} min each"
)
# Split the chunk
chunk_dir = os.path.dirname(chunk_path)
chunk_basename = os.path.splitext(os.path.basename(chunk_path))[0]
chunk_extension = os.path.splitext(chunk_path)[1]
sub_chunks = []
for j in range(num_sub_chunks):
start_time = j * sub_chunk_duration
sub_chunk_output = os.path.join(
chunk_dir,
f"{chunk_basename}_sub_{j+1:02d}{chunk_extension}"
)
stream = ffmpeg.input(chunk_path, ss=start_time, t=sub_chunk_duration)
stream = ffmpeg.output(stream, sub_chunk_output, c='copy', map='0',
avoid_negative_ts='make_zero')
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
sub_chunks.append(sub_chunk_output)
sub_size_mb = os.path.getsize(sub_chunk_output) / (1024 * 1024)
logger.info(f"Created sub-chunk {j+1}/{num_sub_chunks}: {sub_size_mb:.1f}MB")
return sub_chunks
def validate_and_fix_chunks(self, chunk_paths: List[str]) -> List[str]:
"""
Validate chunk sizes and re-split any chunks exceeding hard limit.
This is the safety net that prevents oversized chunks from reaching the API.
Args:
chunk_paths: List of chunk file paths
Returns:
List of valid chunk paths (may include re-split sub-chunks)
"""
valid_chunks = []
chunks_to_cleanup = []
for i, chunk_path in enumerate(chunk_paths, 1):
chunk_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)
encoded_size_mb = chunk_size_mb * self.BASE64_OVERHEAD
# Check against HARD_LIMIT_MB (~730MB)
if chunk_size_mb > self.hard_limit_mb:
logger.warning(
f"Chunk {i} EXCEEDS hard limit: {chunk_size_mb:.1f}MB > "
f"{self.hard_limit_mb:.1f}MB (would be {encoded_size_mb:.1f}MB encoded)"
)
logger.info(f"Re-splitting oversized chunk {i}...")
try:
# Re-split with extra conservative target (80% of safe size)
sub_chunks = self._re_split_chunk(chunk_path, target_size_mb=self.safe_chunk_size_mb * 0.8)
logger.info(f"Successfully re-split chunk {i} into {len(sub_chunks)} sub-chunks")
valid_chunks.extend(sub_chunks)
chunks_to_cleanup.append(chunk_path) # Mark original for cleanup
except Exception as e:
logger.error(f"Failed to re-split chunk {i}: {e}")
raise RuntimeError(
f"Chunk {i} is too large ({chunk_size_mb:.1f}MB) and could not be re-split. "
f"Video may have extremely high bitrate. Consider reducing video quality."
)
else:
# Chunk is valid
logger.info(
f"Chunk {i} validated: {chunk_size_mb:.1f}MB raw, "
f"{encoded_size_mb:.1f}MB encoded ✓"
)
valid_chunks.append(chunk_path)
# Cleanup original oversized chunks
for chunk_path in chunks_to_cleanup:
try:
os.remove(chunk_path)
logger.debug(f"Cleaned up oversized chunk: {chunk_path}")
except Exception as e:
logger.warning(f"Could not remove oversized chunk: {e}")
return valid_chunks
def split_video(self, video_path: str, output_dir: Optional[str] = None) -> List[str]:
"""
Split video into safe-sized chunks with validation and automatic re-splitting.
This method:
1. Calculates optimal splitting strategy using calculate_optimal_chunks()
2. Performs initial split using ffmpeg
3. Validates all chunks against hard limit
4. Automatically re-splits any oversized chunks
5. Returns list of all valid chunks ready for processing
Args:
video_path: Path to the video file to split
output_dir: Directory to save chunks (default: system temp directory)
Returns:
List of paths to validated chunk files (all guaranteed < hard limit)
"""
# Calculate optimal splitting strategy
split_info = self.calculate_optimal_chunks(video_path)
if not split_info['needs_split']:
logger.info("Video within limits, no splitting required")
return [video_path]
# Create output directory
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="video_chunks_")
logger.info(f"Using temporary directory for chunks: {output_dir}")
else:
os.makedirs(output_dir, exist_ok=True)
logger.info(f"Splitting video into {split_info['num_chunks']} chunks...")
# Perform initial split
try:
chunk_paths = self._split_by_duration(
video_path,
split_info['chunk_duration_seconds'],
output_dir
)
logger.info(f"Initial split complete: {len(chunk_paths)} chunks created")
except Exception as e:
logger.error(f"Failed during initial split: {e}")
raise
# Validate and fix any oversized chunks
logger.info("Validating chunk sizes...")
try:
valid_chunks = self.validate_and_fix_chunks(chunk_paths)
logger.info(f"Validation complete: {len(valid_chunks)} valid chunks ready for processing")
except Exception as e:
logger.error(f"Failed during chunk validation: {e}")
# Cleanup all chunks on validation failure
self.cleanup_chunks(chunk_paths)
raise
return valid_chunks
def cleanup_chunks(self, chunk_paths: List[str]) -> None:
"""
Delete temporary chunk files.
Args:
chunk_paths: List of paths to chunk files to delete
"""
if not chunk_paths:
return
logger.info(f"Cleaning up {len(chunk_paths)} chunk files")
for chunk_path in chunk_paths:
try:
if os.path.exists(chunk_path):
os.remove(chunk_path)
logger.debug(f"Deleted chunk: {chunk_path}")
except Exception as e:
logger.warning(f"Could not delete chunk {chunk_path}: {str(e)}")
# Try to remove the temp directory if it's empty
if chunk_paths:
chunk_dir = os.path.dirname(chunk_paths[0])
try:
if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
os.rmdir(chunk_dir)
logger.debug(f"Deleted temporary directory: {chunk_dir}")
except Exception as e:
logger.warning(f"Could not delete temporary directory {chunk_dir}: {str(e)}")
def get_chunk_info(self, video_path: str) -> Tuple[int, float]:
"""
Get information about how a video would be chunked without actually splitting it.
Uses the robust multi-constraint algorithm (size + duration).
Args:
video_path: Path to the video file
Returns:
Tuple of (number_of_chunks, total_duration_in_minutes)
"""
try:
split_info = self.calculate_optimal_chunks(video_path)
duration_minutes = self.get_video_duration(video_path) / 60
return (split_info['num_chunks'], duration_minutes)
except Exception as e:
logger.warning(f"Could not calculate chunk info: {e}")
# Fallback to duration-only logic
duration = self.get_video_duration(video_path)
if duration is None:
return (0, 0.0)
duration_minutes = duration / 60
num_chunks = math.ceil(duration / self.chunk_duration_seconds)
return (num_chunks, duration_minutes)
# Convenience functions for direct use
def get_video_duration(video_path: str) -> Optional[float]:
"""Get video duration in seconds."""
splitter = VideoSplitter()
return splitter.get_video_duration(video_path)
def split_video(video_path: str, chunk_duration_minutes: int = 54,
output_dir: Optional[str] = None) -> List[str]:
"""Split a video into chunks."""
splitter = VideoSplitter(chunk_duration_minutes=chunk_duration_minutes)
return splitter.split_video(video_path, output_dir=output_dir)
def cleanup_chunks(chunk_paths: List[str]) -> None:
"""Clean up chunk files."""
splitter = VideoSplitter()
splitter.cleanup_chunks(chunk_paths)