video-query/backend/video_splitter.py
2025-11-13 20:08:32 +05:30

366 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Video Splitter Module
This module provides functionality to detect video duration and split long videos
into smaller chunks for processing with APIs that have duration limitations.
"""
import ffmpeg
import os
import tempfile
import logging
from typing import List, Tuple, Optional
from system_utils import system_utils
from error_reporter import ErrorReporter, ErrorCategory
logger = logging.getLogger('video_query')
class VideoSplitter:
"""
Handles video duration detection and splitting operations.
"""
# Default chunk duration in minutes (43 min to stay under 45 min Gemini API limit for videos with audio)
# Google Gemini 2.5 Pro limits: ~45 min with audio, ~60 min without audio
DEFAULT_CHUNK_DURATION = 43
def __init__(self, chunk_duration_minutes: int = DEFAULT_CHUNK_DURATION):
"""
Initialize VideoSplitter with specified chunk duration.
Args:
chunk_duration_minutes: Duration of each chunk in minutes (default: 54)
"""
self.chunk_duration_minutes = chunk_duration_minutes
self.chunk_duration_seconds = chunk_duration_minutes * 60
logger.info(f"VideoSplitter initialized with chunk duration: {chunk_duration_minutes} minutes")
def get_video_duration(self, video_path: str) -> Optional[float]:
"""
Get the duration of a video file in seconds.
Args:
video_path: Path to the video file
Returns:
Duration in seconds, or None if unable to determine
"""
try:
logger.info(f"Detecting duration for video: {video_path}")
# Use cross-platform ffprobe detection
ffprobe_path = system_utils.find_ffprobe()
probe = ffmpeg.probe(video_path, cmd=ffprobe_path)
# Get duration from video stream
video_info = next(
(stream for stream in probe['streams'] if stream['codec_type'] == 'video'),
None
)
if video_info and 'duration' in video_info:
duration = float(video_info['duration'])
elif 'format' in probe and 'duration' in probe['format']:
duration = float(probe['format']['duration'])
else:
logger.error("Could not find duration in video metadata")
return None
logger.info(f"Video duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
return duration
except ffmpeg.Error as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={'video_path': video_path, 'operation': 'detect_duration'}
)
logger.error(f"FFmpeg error while detecting duration: {e.stderr.decode() if e.stderr else str(e)}")
return None
except FileNotFoundError as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.SYSTEM_ERROR,
context={'video_path': video_path, 'operation': 'detect_duration'}
)
logger.error(f"ffprobe not found: {str(e)}")
return None
except Exception as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={'video_path': video_path, 'operation': 'detect_duration'}
)
logger.error(f"Error detecting video duration: {str(e)}")
return None
def needs_splitting(self, video_path: str, max_chunk_size_mb: float = 500) -> bool:
"""
Check if a video needs to be split based on duration OR file size.
A video needs splitting if:
1. Duration > 54 minutes (Gemini API time limit), OR
2. File size > 500MB (conservative target to handle variable bitrate)
With 30% variance: 500MB × 1.3 = 650MB max
After base64 encoding: 650MB × 1.37 = 891MB (well under 1GB API limit)
Args:
video_path: Path to the video file
max_chunk_size_mb: Maximum chunk size in MB (default: 500MB)
Returns:
True if video needs splitting based on duration or size, False otherwise
"""
duration = self.get_video_duration(video_path)
if duration is None:
logger.warning("Could not determine video duration for splitting check")
return False
# Check duration
needs_split_duration = duration > self.chunk_duration_seconds
# Check file size
file_size_bytes = os.path.getsize(video_path)
file_size_mb = file_size_bytes / (1024 * 1024)
needs_split_size = file_size_mb > max_chunk_size_mb
needs_split = needs_split_duration or needs_split_size
if needs_split:
reasons = []
if needs_split_duration:
reasons.append(f"duration {duration/60:.2f} min > {self.chunk_duration_minutes} min")
if needs_split_size:
reasons.append(f"file size {file_size_mb:.1f}MB > {max_chunk_size_mb}MB")
logger.info(f"Video needs splitting: {' AND '.join(reasons)}")
else:
logger.info(f"Video does not need splitting: duration {duration/60:.2f} min <= {self.chunk_duration_minutes} min, size {file_size_mb:.1f}MB <= {max_chunk_size_mb:.0f}MB")
return needs_split
def calculate_optimal_chunk_duration(self, video_path: str, max_chunk_size_mb: float = 500) -> int:
"""
Calculate optimal chunk duration based on file size and video duration
to ensure chunks don't exceed a maximum file size.
IMPORTANT: Gemini API has a 1GB request payload limit.
Conservative target of 500MB accounts for variable bitrate (VBR).
With 30% VBR variance: 500MB × 1.3 = 650MB max
After base64 encoding: 650MB × 1.37 = 891MB (under 1GB limit)
Args:
video_path: Path to the video file
max_chunk_size_mb: Maximum desired chunk size in MB (default: 500MB)
Returns:
Optimal chunk duration in seconds
"""
duration = self.get_video_duration(video_path)
if duration is None:
logger.warning("Could not determine duration, using default chunk duration")
return self.chunk_duration_seconds
# Get file size
file_size_bytes = os.path.getsize(video_path)
file_size_mb = file_size_bytes / (1024 * 1024)
file_size_gb = file_size_bytes / (1024 * 1024 * 1024)
# Calculate average bitrate (bytes per second)
avg_bitrate = file_size_bytes / duration
# Calculate chunk duration that would result in max_chunk_size_mb
max_chunk_size_bytes = max_chunk_size_mb * 1024 * 1024
optimal_duration = max_chunk_size_bytes / avg_bitrate
# Use the smaller of optimal duration or default chunk duration
final_duration = min(optimal_duration, self.chunk_duration_seconds)
# Ensure minimum chunk duration of 5 minutes (300 seconds)
final_duration = max(final_duration, 300)
logger.info(
f"Calculated optimal chunk duration: {final_duration:.0f}s ({final_duration/60:.1f} min) "
f"based on file size {file_size_mb:.1f}MB ({file_size_gb:.2f}GB) and duration {duration/60:.1f} min. "
f"Target chunk size: {max_chunk_size_mb}MB"
)
return int(final_duration)
def split_video(self, video_path: str, output_dir: Optional[str] = None) -> List[str]:
"""
Split a video into multiple chunks based on the configured chunk duration.
Automatically adjusts chunk duration if file size would result in chunks > 1.2GB.
Args:
video_path: Path to the video file to split
output_dir: Directory to save chunks (default: system temp directory)
Returns:
List of paths to the generated chunk files
"""
duration = self.get_video_duration(video_path)
if duration is None:
raise ValueError("Could not determine video duration")
# Calculate optimal chunk duration based on file size
chunk_duration = self.calculate_optimal_chunk_duration(video_path)
# Use temp directory if none specified
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="video_chunks_")
logger.info(f"Using temporary directory for chunks: {output_dir}")
else:
os.makedirs(output_dir, exist_ok=True)
# Calculate number of chunks needed
num_chunks = int(duration / chunk_duration) + (
1 if duration % chunk_duration > 0 else 0
)
logger.info(f"Splitting video into {num_chunks} chunks (chunk duration: {chunk_duration/60:.1f} min)")
chunk_paths = []
video_basename = os.path.splitext(os.path.basename(video_path))[0]
video_extension = os.path.splitext(video_path)[1]
for i in range(num_chunks):
start_time = i * chunk_duration
chunk_output = os.path.join(
output_dir,
f"{video_basename}_chunk_{i+1:02d}{video_extension}"
)
logger.info(f"Creating chunk {i+1}/{num_chunks}: start={start_time}s, output={chunk_output}")
try:
# Split the video using ffmpeg
# Using -t to specify duration of this chunk
# Using -c copy for fast processing (no re-encoding)
stream = ffmpeg.input(video_path, ss=start_time, t=chunk_duration)
stream = ffmpeg.output(
stream,
chunk_output,
c='copy', # Copy streams without re-encoding for speed
map='0', # Include all streams from input
avoid_negative_ts='make_zero' # Handle timestamp issues
)
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
chunk_paths.append(chunk_output)
# Log chunk size for monitoring
chunk_size_bytes = os.path.getsize(chunk_output)
chunk_size_mb = chunk_size_bytes / (1024 * 1024)
chunk_size_gb = chunk_size_bytes / (1024 * 1024 * 1024)
logger.info(f"Successfully created chunk {i+1}/{num_chunks} (size: {chunk_size_mb:.1f}MB / {chunk_size_gb:.2f}GB)")
# Warn if chunk is approaching size limits (500MB target due to VBR variance)
if chunk_size_mb > 550:
logger.warning(
f"Chunk {i+1} is {chunk_size_mb:.1f}MB ({chunk_size_gb:.2f}GB), exceeding the 500MB target. "
f"After base64 encoding (~37% overhead), this will be ~{chunk_size_mb * 1.37:.1f}MB. "
f"API limit is 1000MB (1GB). If close to limit, consider reducing video quality."
)
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={
'video_path': video_path,
'chunk_number': i+1,
'total_chunks': num_chunks,
'operation': 'split_video'
}
)
logger.error(f"FFmpeg error creating chunk {i+1}: {error_msg}")
# Clean up any created chunks on error
self.cleanup_chunks(chunk_paths)
raise RuntimeError(f"Failed to create video chunk {i+1}: {error_msg}")
except Exception as e:
error_report = ErrorReporter.capture_error(
e,
category=ErrorCategory.VIDEO_ERROR,
context={
'video_path': video_path,
'chunk_number': i+1,
'total_chunks': num_chunks,
'operation': 'split_video'
}
)
logger.error(f"Error creating chunk {i+1}: {str(e)}")
self.cleanup_chunks(chunk_paths)
raise
logger.info(f"Successfully split video into {len(chunk_paths)} chunks")
return chunk_paths
def cleanup_chunks(self, chunk_paths: List[str]) -> None:
"""
Delete temporary chunk files.
Args:
chunk_paths: List of paths to chunk files to delete
"""
if not chunk_paths:
return
logger.info(f"Cleaning up {len(chunk_paths)} chunk files")
for chunk_path in chunk_paths:
try:
if os.path.exists(chunk_path):
os.remove(chunk_path)
logger.debug(f"Deleted chunk: {chunk_path}")
except Exception as e:
logger.warning(f"Could not delete chunk {chunk_path}: {str(e)}")
# Try to remove the temp directory if it's empty
if chunk_paths:
chunk_dir = os.path.dirname(chunk_paths[0])
try:
if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
os.rmdir(chunk_dir)
logger.debug(f"Deleted temporary directory: {chunk_dir}")
except Exception as e:
logger.warning(f"Could not delete temporary directory {chunk_dir}: {str(e)}")
def get_chunk_info(self, video_path: str) -> Tuple[int, float]:
"""
Get information about how a video would be chunked without actually splitting it.
Args:
video_path: Path to the video file
Returns:
Tuple of (number_of_chunks, total_duration_in_minutes)
"""
duration = self.get_video_duration(video_path)
if duration is None:
return (0, 0.0)
duration_minutes = duration / 60
num_chunks = int(duration / self.chunk_duration_seconds) + (
1 if duration % self.chunk_duration_seconds > 0 else 0
)
return (num_chunks, duration_minutes)
# Convenience functions for direct use
def get_video_duration(video_path: str) -> Optional[float]:
"""Get video duration in seconds."""
splitter = VideoSplitter()
return splitter.get_video_duration(video_path)
def split_video(video_path: str, chunk_duration_minutes: int = 54,
output_dir: Optional[str] = None) -> List[str]:
"""Split a video into chunks."""
splitter = VideoSplitter(chunk_duration_minutes=chunk_duration_minutes)
return splitter.split_video(video_path, output_dir=output_dir)
def cleanup_chunks(chunk_paths: List[str]) -> None:
"""Clean up chunk files."""
splitter = VideoSplitter()
splitter.cleanup_chunks(chunk_paths)