video-query/backend/video_splitter.py
2025-10-22 14:59:39 +05:30

238 lines
9.1 KiB
Python

"""
Video Splitter Module
This module provides functionality to detect video duration and split long videos
into smaller chunks for processing with APIs that have duration limitations.
"""
import ffmpeg
import os
import tempfile
import logging
from typing import List, Tuple, Optional
logger = logging.getLogger('video_query')
class VideoSplitter:
"""
Handles video duration detection and splitting operations.
"""
# Default chunk duration in minutes (54 min to stay under 55 min Gemini API limit)
DEFAULT_CHUNK_DURATION = 54
def __init__(self, chunk_duration_minutes: int = DEFAULT_CHUNK_DURATION):
"""
Initialize VideoSplitter with specified chunk duration.
Args:
chunk_duration_minutes: Duration of each chunk in minutes (default: 54)
"""
self.chunk_duration_minutes = chunk_duration_minutes
self.chunk_duration_seconds = chunk_duration_minutes * 60
logger.info(f"VideoSplitter initialized with chunk duration: {chunk_duration_minutes} minutes")
def get_video_duration(self, video_path: str) -> Optional[float]:
"""
Get the duration of a video file in seconds.
Args:
video_path: Path to the video file
Returns:
Duration in seconds, or None if unable to determine
"""
try:
logger.info(f"Detecting duration for video: {video_path}")
# Explicitly set ffprobe command path to avoid PATH issues
probe = ffmpeg.probe(video_path, cmd='/usr/bin/ffprobe')
# Get duration from video stream
video_info = next(
(stream for stream in probe['streams'] if stream['codec_type'] == 'video'),
None
)
if video_info and 'duration' in video_info:
duration = float(video_info['duration'])
elif 'format' in probe and 'duration' in probe['format']:
duration = float(probe['format']['duration'])
else:
logger.error("Could not find duration in video metadata")
return None
logger.info(f"Video duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
return duration
except ffmpeg.Error as e:
logger.error(f"FFmpeg error while detecting duration: {e.stderr.decode() if e.stderr else str(e)}")
return None
except Exception as e:
logger.error(f"Error detecting video duration: {str(e)}")
return None
def needs_splitting(self, video_path: str) -> bool:
"""
Check if a video needs to be split based on its duration.
Args:
video_path: Path to the video file
Returns:
True if video duration exceeds chunk duration, False otherwise
"""
duration = self.get_video_duration(video_path)
if duration is None:
logger.warning("Could not determine if video needs splitting")
return False
needs_split = duration > self.chunk_duration_seconds
if needs_split:
logger.info(f"Video needs splitting: {duration/60:.2f} min > {self.chunk_duration_minutes} min")
else:
logger.info(f"Video does not need splitting: {duration/60:.2f} min <= {self.chunk_duration_minutes} min")
return needs_split
def split_video(self, video_path: str, output_dir: Optional[str] = None) -> List[str]:
"""
Split a video into multiple chunks based on the configured chunk duration.
Args:
video_path: Path to the video file to split
output_dir: Directory to save chunks (default: system temp directory)
Returns:
List of paths to the generated chunk files
"""
duration = self.get_video_duration(video_path)
if duration is None:
raise ValueError("Could not determine video duration")
# Use temp directory if none specified
if output_dir is None:
output_dir = tempfile.mkdtemp(prefix="video_chunks_")
logger.info(f"Using temporary directory for chunks: {output_dir}")
else:
os.makedirs(output_dir, exist_ok=True)
# Calculate number of chunks needed
num_chunks = int(duration / self.chunk_duration_seconds) + (
1 if duration % self.chunk_duration_seconds > 0 else 0
)
logger.info(f"Splitting video into {num_chunks} chunks")
chunk_paths = []
video_basename = os.path.splitext(os.path.basename(video_path))[0]
video_extension = os.path.splitext(video_path)[1]
for i in range(num_chunks):
start_time = i * self.chunk_duration_seconds
chunk_output = os.path.join(
output_dir,
f"{video_basename}_chunk_{i+1:02d}{video_extension}"
)
logger.info(f"Creating chunk {i+1}/{num_chunks}: start={start_time}s, output={chunk_output}")
try:
# Split the video using ffmpeg
# Using -t to specify duration of this chunk
# Using -c copy for fast processing (no re-encoding)
stream = ffmpeg.input(video_path, ss=start_time, t=self.chunk_duration_seconds)
stream = ffmpeg.output(
stream,
chunk_output,
c='copy', # Copy streams without re-encoding for speed
map='0', # Include all streams from input
avoid_negative_ts='make_zero' # Handle timestamp issues
)
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
chunk_paths.append(chunk_output)
logger.info(f"Successfully created chunk {i+1}/{num_chunks}")
except ffmpeg.Error as e:
error_msg = e.stderr.decode() if e.stderr else str(e)
logger.error(f"FFmpeg error creating chunk {i+1}: {error_msg}")
# Clean up any created chunks on error
self.cleanup_chunks(chunk_paths)
raise RuntimeError(f"Failed to create video chunk {i+1}: {error_msg}")
except Exception as e:
logger.error(f"Error creating chunk {i+1}: {str(e)}")
self.cleanup_chunks(chunk_paths)
raise
logger.info(f"Successfully split video into {len(chunk_paths)} chunks")
return chunk_paths
def cleanup_chunks(self, chunk_paths: List[str]) -> None:
"""
Delete temporary chunk files.
Args:
chunk_paths: List of paths to chunk files to delete
"""
if not chunk_paths:
return
logger.info(f"Cleaning up {len(chunk_paths)} chunk files")
for chunk_path in chunk_paths:
try:
if os.path.exists(chunk_path):
os.remove(chunk_path)
logger.debug(f"Deleted chunk: {chunk_path}")
except Exception as e:
logger.warning(f"Could not delete chunk {chunk_path}: {str(e)}")
# Try to remove the temp directory if it's empty
if chunk_paths:
chunk_dir = os.path.dirname(chunk_paths[0])
try:
if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
os.rmdir(chunk_dir)
logger.debug(f"Deleted temporary directory: {chunk_dir}")
except Exception as e:
logger.warning(f"Could not delete temporary directory {chunk_dir}: {str(e)}")
def get_chunk_info(self, video_path: str) -> Tuple[int, float]:
"""
Get information about how a video would be chunked without actually splitting it.
Args:
video_path: Path to the video file
Returns:
Tuple of (number_of_chunks, total_duration_in_minutes)
"""
duration = self.get_video_duration(video_path)
if duration is None:
return (0, 0.0)
duration_minutes = duration / 60
num_chunks = int(duration / self.chunk_duration_seconds) + (
1 if duration % self.chunk_duration_seconds > 0 else 0
)
return (num_chunks, duration_minutes)
# Convenience functions for direct use
def get_video_duration(video_path: str) -> Optional[float]:
"""Get video duration in seconds."""
splitter = VideoSplitter()
return splitter.get_video_duration(video_path)
def split_video(video_path: str, chunk_duration_minutes: int = 54,
output_dir: Optional[str] = None) -> List[str]:
"""Split a video into chunks."""
splitter = VideoSplitter(chunk_duration_minutes=chunk_duration_minutes)
return splitter.split_video(video_path, output_dir=output_dir)
def cleanup_chunks(chunk_paths: List[str]) -> None:
"""Clean up chunk files."""
splitter = VideoSplitter()
splitter.cleanup_chunks(chunk_paths)