619 lines
25 KiB
Python
619 lines
25 KiB
Python
"""
|
|
Video Splitter Module
|
|
|
|
This module provides functionality to detect video duration and split long videos
|
|
into smaller chunks for processing with APIs that have duration limitations.
|
|
|
|
Key Features:
|
|
- Duration-based splitting (respects Gemini API 53-55 min limit)
|
|
- Size-based splitting (respects 1GB base64-encoded API limit)
|
|
- Variable Bitrate (VBR) handling with safety margins
|
|
- Automatic chunk validation and re-splitting for oversized chunks
|
|
"""
|
|
|
|
import ffmpeg
|
|
import os
|
|
import tempfile
|
|
import logging
|
|
import math
|
|
from typing import List, Tuple, Optional, Dict, Any
|
|
from system_utils import system_utils
|
|
from error_reporter import ErrorReporter, ErrorCategory
|
|
|
|
logger = logging.getLogger('video_query')
|
|
|
|
|
|
class VideoSplitter:
|
|
"""
|
|
Handles video duration detection and splitting operations.
|
|
|
|
Supports robust splitting with:
|
|
- Duration limits (Gemini API ~53-55 min)
|
|
- Size limits (1GB base64-encoded API request)
|
|
- VBR (Variable Bitrate) safety margins
|
|
- Automatic chunk validation and re-splitting
|
|
"""
|
|
|
|
# Duration constraint
|
|
# Default chunk duration in minutes (53 min to stay under 55 min Gemini API limit for videos with audio)
|
|
# Google Gemini 2.5 Pro limits: ~55 min with audio, ~60 min without audio
|
|
DEFAULT_CHUNK_DURATION = 53
|
|
|
|
# Size constraints (for base64 inline encoding approach)
|
|
# API limit for base64-encoded requests: 1GB (1000MB)
|
|
API_LIMIT_AFTER_ENCODING_MB = 1000
|
|
BASE64_OVERHEAD = 1.37 # Base64 encoding increases size by ~37%
|
|
|
|
# VBR (Variable Bitrate) safety margin
|
|
# Videos with VBR can have sections with significantly higher bitrate than average
|
|
# 30% margin ensures chunks stay within limits even with bitrate spikes
|
|
VBR_SAFETY_MARGIN = 1.30
|
|
|
|
# Calculate safe target chunk size
|
|
# Formula: API_LIMIT / BASE64_OVERHEAD / VBR_MARGIN
|
|
# 1000MB / 1.37 / 1.30 ≈ 560MB
|
|
SAFE_CHUNK_SIZE_MB = API_LIMIT_AFTER_ENCODING_MB / BASE64_OVERHEAD / VBR_SAFETY_MARGIN
|
|
|
|
# Hard limit (absolute maximum raw chunk size before encoding)
|
|
# Formula: API_LIMIT / BASE64_OVERHEAD
|
|
# 1000MB / 1.37 ≈ 730MB
|
|
HARD_LIMIT_MB = API_LIMIT_AFTER_ENCODING_MB / BASE64_OVERHEAD
|
|
|
|
# Minimum chunk duration to avoid creating too many tiny chunks
|
|
MIN_CHUNK_DURATION_MIN = 3
|
|
|
|
def __init__(self, chunk_duration_minutes: int = None):
|
|
"""
|
|
Initialize VideoSplitter with specified chunk duration.
|
|
|
|
Args:
|
|
chunk_duration_minutes: Duration of each chunk in minutes
|
|
(default: from CHUNK_DURATION_MINUTES env var or 53)
|
|
"""
|
|
if chunk_duration_minutes is None:
|
|
# Load from environment variable or use default
|
|
chunk_duration_minutes = int(os.getenv("CHUNK_DURATION_MINUTES", self.DEFAULT_CHUNK_DURATION))
|
|
|
|
self.chunk_duration_minutes = chunk_duration_minutes
|
|
self.chunk_duration_seconds = chunk_duration_minutes * 60
|
|
|
|
# Load configurable parameters from environment
|
|
self.vbr_safety_margin = float(os.getenv("VBR_SAFETY_MARGIN", self.VBR_SAFETY_MARGIN))
|
|
self.api_limit_mb = int(os.getenv("BASE64_API_LIMIT_MB", self.API_LIMIT_AFTER_ENCODING_MB))
|
|
|
|
# Recalculate safe chunk size if custom values provided
|
|
self.safe_chunk_size_mb = self.api_limit_mb / self.BASE64_OVERHEAD / self.vbr_safety_margin
|
|
self.hard_limit_mb = self.api_limit_mb / self.BASE64_OVERHEAD
|
|
|
|
logger.info(f"VideoSplitter initialized:")
|
|
logger.info(f" - Max chunk duration: {chunk_duration_minutes} minutes")
|
|
logger.info(f" - Safe chunk size: {self.safe_chunk_size_mb:.0f}MB (with {(self.vbr_safety_margin-1)*100:.0f}% VBR margin)")
|
|
logger.info(f" - Hard chunk limit: {self.hard_limit_mb:.0f}MB (API: {self.api_limit_mb}MB after encoding)")
|
|
|
|
def get_video_duration(self, video_path: str) -> Optional[float]:
|
|
"""
|
|
Get the duration of a video file in seconds.
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
|
|
Returns:
|
|
Duration in seconds, or None if unable to determine
|
|
"""
|
|
try:
|
|
logger.info(f"Detecting duration for video: {video_path}")
|
|
# Use cross-platform ffprobe detection
|
|
ffprobe_path = system_utils.find_ffprobe()
|
|
probe = ffmpeg.probe(video_path, cmd=ffprobe_path)
|
|
|
|
# Get duration from video stream
|
|
video_info = next(
|
|
(stream for stream in probe['streams'] if stream['codec_type'] == 'video'),
|
|
None
|
|
)
|
|
|
|
if video_info and 'duration' in video_info:
|
|
duration = float(video_info['duration'])
|
|
elif 'format' in probe and 'duration' in probe['format']:
|
|
duration = float(probe['format']['duration'])
|
|
else:
|
|
logger.error("Could not find duration in video metadata")
|
|
return None
|
|
|
|
logger.info(f"Video duration: {duration:.2f} seconds ({duration/60:.2f} minutes)")
|
|
return duration
|
|
|
|
except ffmpeg.Error as e:
|
|
error_report = ErrorReporter.capture_error(
|
|
e,
|
|
category=ErrorCategory.VIDEO_ERROR,
|
|
context={'video_path': video_path, 'operation': 'detect_duration'}
|
|
)
|
|
logger.error(f"FFmpeg error while detecting duration: {e.stderr.decode() if e.stderr else str(e)}")
|
|
return None
|
|
except FileNotFoundError as e:
|
|
error_report = ErrorReporter.capture_error(
|
|
e,
|
|
category=ErrorCategory.SYSTEM_ERROR,
|
|
context={'video_path': video_path, 'operation': 'detect_duration'}
|
|
)
|
|
logger.error(f"ffprobe not found: {str(e)}")
|
|
return None
|
|
except Exception as e:
|
|
error_report = ErrorReporter.capture_error(
|
|
e,
|
|
category=ErrorCategory.VIDEO_ERROR,
|
|
context={'video_path': video_path, 'operation': 'detect_duration'}
|
|
)
|
|
logger.error(f"Error detecting video duration: {str(e)}")
|
|
return None
|
|
|
|
def needs_splitting(self, video_path: str, max_chunk_size_mb: float = None) -> bool:
|
|
"""
|
|
Check if a video needs to be split based on duration OR file size.
|
|
|
|
Uses robust calculation considering:
|
|
- Duration limit (default 53 minutes)
|
|
- Size limit with VBR safety margin (default ~560MB)
|
|
- Base64 encoding overhead (1.37x)
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
max_chunk_size_mb: Maximum chunk size in MB (default: use calculated safe size)
|
|
|
|
Returns:
|
|
True if video needs splitting based on duration or size, False otherwise
|
|
"""
|
|
if max_chunk_size_mb is None:
|
|
max_chunk_size_mb = self.safe_chunk_size_mb
|
|
|
|
duration = self.get_video_duration(video_path)
|
|
if duration is None:
|
|
logger.warning("Could not determine video duration for splitting check")
|
|
return False
|
|
|
|
# Check duration
|
|
needs_split_duration = duration > self.chunk_duration_seconds
|
|
|
|
# Check file size
|
|
file_size_bytes = os.path.getsize(video_path)
|
|
file_size_mb = file_size_bytes / (1024 * 1024)
|
|
needs_split_size = file_size_mb > max_chunk_size_mb
|
|
|
|
needs_split = needs_split_duration or needs_split_size
|
|
|
|
if needs_split:
|
|
reasons = []
|
|
if needs_split_duration:
|
|
reasons.append(f"duration {duration/60:.2f} min > {self.chunk_duration_minutes} min")
|
|
if needs_split_size:
|
|
reasons.append(f"file size {file_size_mb:.1f}MB > {max_chunk_size_mb:.0f}MB")
|
|
logger.info(f"Video needs splitting: {' AND '.join(reasons)}")
|
|
else:
|
|
logger.info(f"Video does not need splitting: duration {duration/60:.2f} min <= {self.chunk_duration_minutes} min, size {file_size_mb:.1f}MB <= {max_chunk_size_mb:.0f}MB")
|
|
|
|
return needs_split
|
|
|
|
def calculate_optimal_chunks(self, video_path: str) -> Dict[str, Any]:
|
|
"""
|
|
Calculate optimal number of chunks and chunk duration considering:
|
|
1. File size with VBR safety margin
|
|
2. Duration limits
|
|
3. Base64 encoding overhead
|
|
|
|
This is the core robust algorithm that prevents oversized chunks.
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
|
|
Returns:
|
|
Dictionary with splitting strategy:
|
|
- needs_split: bool
|
|
- num_chunks: int
|
|
- chunk_duration_seconds: float
|
|
- estimated_chunk_size_mb: float
|
|
- estimated_after_encoding_mb: float
|
|
- split_reason: str
|
|
- bitrate_mb_per_min: float
|
|
"""
|
|
# Get video properties
|
|
duration_seconds = self.get_video_duration(video_path)
|
|
if duration_seconds is None:
|
|
raise ValueError("Cannot determine video duration")
|
|
|
|
file_size_bytes = os.path.getsize(video_path)
|
|
file_size_mb = file_size_bytes / (1024 * 1024)
|
|
duration_minutes = duration_seconds / 60
|
|
|
|
# Calculate bitrate
|
|
bitrate_mb_per_min = file_size_mb / duration_minutes
|
|
|
|
logger.info(
|
|
f"Video properties: {duration_minutes:.1f} min, {file_size_mb:.1f} MB, "
|
|
f"{bitrate_mb_per_min:.2f} MB/min bitrate"
|
|
)
|
|
|
|
# Calculate chunks needed by SIZE (with VBR safety margin built-in)
|
|
chunks_by_size = math.ceil(file_size_mb / self.safe_chunk_size_mb)
|
|
|
|
# Calculate chunks needed by DURATION
|
|
chunks_by_duration = math.ceil(duration_minutes / self.chunk_duration_minutes)
|
|
|
|
# Use the MORE restrictive constraint (more chunks = safer)
|
|
num_chunks = max(chunks_by_size, chunks_by_duration, 1)
|
|
|
|
# Calculate actual chunk duration
|
|
chunk_duration_seconds = duration_seconds / num_chunks
|
|
chunk_duration_minutes = chunk_duration_seconds / 60
|
|
|
|
# Check if chunk duration is too small
|
|
if chunk_duration_minutes < self.MIN_CHUNK_DURATION_MIN and num_chunks > 1:
|
|
logger.warning(
|
|
f"Calculated chunk duration {chunk_duration_minutes:.1f} min is below "
|
|
f"minimum {self.MIN_CHUNK_DURATION_MIN} min. Video has extremely high bitrate."
|
|
)
|
|
|
|
# Estimate chunk properties
|
|
estimated_chunk_size_mb = file_size_mb / num_chunks
|
|
estimated_after_encoding_mb = estimated_chunk_size_mb * self.BASE64_OVERHEAD
|
|
|
|
# Determine split reasons
|
|
split_reasons = []
|
|
if chunks_by_size > 1:
|
|
split_reasons.append(
|
|
f"size ({file_size_mb:.1f}MB requires {chunks_by_size} chunks "
|
|
f"with {self.safe_chunk_size_mb:.0f}MB safe target)"
|
|
)
|
|
if chunks_by_duration > 1:
|
|
split_reasons.append(
|
|
f"duration ({duration_minutes:.1f}min requires {chunks_by_duration} chunks "
|
|
f"with {self.chunk_duration_minutes}min limit)"
|
|
)
|
|
|
|
split_reason = " AND ".join(split_reasons) if split_reasons else "Within limits, no split needed"
|
|
needs_split = num_chunks > 1
|
|
|
|
# Log decision details
|
|
logger.info(f"Split calculation:")
|
|
logger.info(f" - Chunks by size: {chunks_by_size}")
|
|
logger.info(f" - Chunks by duration: {chunks_by_duration}")
|
|
logger.info(f" - Final chunks: {num_chunks}")
|
|
logger.info(f" - Chunk duration: {chunk_duration_minutes:.1f} min")
|
|
logger.info(
|
|
f" - Est. chunk size: {estimated_chunk_size_mb:.1f}MB raw, "
|
|
f"{estimated_after_encoding_mb:.1f}MB encoded"
|
|
)
|
|
logger.info(f" - Decision: {split_reason}")
|
|
|
|
return {
|
|
'needs_split': needs_split,
|
|
'num_chunks': num_chunks,
|
|
'chunk_duration_seconds': chunk_duration_seconds,
|
|
'estimated_chunk_size_mb': estimated_chunk_size_mb,
|
|
'estimated_after_encoding_mb': estimated_after_encoding_mb,
|
|
'split_reason': split_reason,
|
|
'bitrate_mb_per_min': bitrate_mb_per_min,
|
|
'constraints': {
|
|
'by_size': chunks_by_size,
|
|
'by_duration': chunks_by_duration
|
|
}
|
|
}
|
|
|
|
def calculate_optimal_chunk_duration(self, video_path: str, max_chunk_size_mb: float = None) -> int:
|
|
"""
|
|
Legacy method for backward compatibility.
|
|
Calls calculate_optimal_chunks() and returns just the duration.
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
max_chunk_size_mb: Ignored (kept for compatibility)
|
|
|
|
Returns:
|
|
Optimal chunk duration in seconds
|
|
"""
|
|
try:
|
|
split_info = self.calculate_optimal_chunks(video_path)
|
|
return int(split_info['chunk_duration_seconds'])
|
|
except Exception as e:
|
|
logger.warning(f"Could not calculate optimal chunks: {e}, using default duration")
|
|
return self.chunk_duration_seconds
|
|
|
|
def _split_by_duration(self, video_path: str, chunk_duration_seconds: float,
|
|
output_dir: str) -> List[str]:
|
|
"""
|
|
Perform the actual ffmpeg splitting by duration.
|
|
Internal helper method extracted from split_video for reusability.
|
|
|
|
Args:
|
|
video_path: Path to video file
|
|
chunk_duration_seconds: Duration of each chunk in seconds
|
|
output_dir: Output directory for chunks
|
|
|
|
Returns:
|
|
List of chunk file paths
|
|
"""
|
|
duration = self.get_video_duration(video_path)
|
|
num_chunks = math.ceil(duration / chunk_duration_seconds)
|
|
|
|
chunk_paths = []
|
|
video_basename = os.path.splitext(os.path.basename(video_path))[0]
|
|
video_extension = os.path.splitext(video_path)[1]
|
|
|
|
for i in range(num_chunks):
|
|
start_time = i * chunk_duration_seconds
|
|
chunk_output = os.path.join(
|
|
output_dir,
|
|
f"{video_basename}_chunk_{i+1:02d}{video_extension}"
|
|
)
|
|
|
|
logger.info(f"Creating chunk {i+1}/{num_chunks}: start={start_time:.1f}s")
|
|
|
|
try:
|
|
stream = ffmpeg.input(video_path, ss=start_time, t=chunk_duration_seconds)
|
|
stream = ffmpeg.output(stream, chunk_output, c='copy', map='0',
|
|
avoid_negative_ts='make_zero')
|
|
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
|
|
|
|
chunk_paths.append(chunk_output)
|
|
|
|
chunk_size_mb = os.path.getsize(chunk_output) / (1024 * 1024)
|
|
logger.info(f"Created chunk {i+1}/{num_chunks}: {chunk_size_mb:.1f}MB")
|
|
|
|
except ffmpeg.Error as e:
|
|
error_msg = e.stderr.decode() if e.stderr else str(e)
|
|
ErrorReporter.capture_error(
|
|
e,
|
|
category=ErrorCategory.VIDEO_ERROR,
|
|
context={
|
|
'video_path': video_path,
|
|
'chunk_number': i+1,
|
|
'total_chunks': num_chunks,
|
|
'operation': 'split_by_duration'
|
|
}
|
|
)
|
|
logger.error(f"FFmpeg error creating chunk {i+1}: {error_msg}")
|
|
self.cleanup_chunks(chunk_paths)
|
|
raise RuntimeError(f"Failed to create video chunk {i+1}: {error_msg}")
|
|
|
|
return chunk_paths
|
|
|
|
def _re_split_chunk(self, chunk_path: str, target_size_mb: float) -> List[str]:
|
|
"""
|
|
Re-split a single oversized chunk into smaller sub-chunks.
|
|
|
|
Args:
|
|
chunk_path: Path to oversized chunk
|
|
target_size_mb: Target size for sub-chunks
|
|
|
|
Returns:
|
|
List of sub-chunk paths
|
|
"""
|
|
duration = self.get_video_duration(chunk_path)
|
|
file_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)
|
|
|
|
# Calculate how many sub-chunks needed
|
|
num_sub_chunks = math.ceil(file_size_mb / target_size_mb)
|
|
sub_chunk_duration = duration / num_sub_chunks
|
|
|
|
logger.info(
|
|
f"Re-splitting oversized chunk: {file_size_mb:.1f}MB into {num_sub_chunks} sub-chunks "
|
|
f"of ~{sub_chunk_duration/60:.1f} min each"
|
|
)
|
|
|
|
# Split the chunk
|
|
chunk_dir = os.path.dirname(chunk_path)
|
|
chunk_basename = os.path.splitext(os.path.basename(chunk_path))[0]
|
|
chunk_extension = os.path.splitext(chunk_path)[1]
|
|
|
|
sub_chunks = []
|
|
for j in range(num_sub_chunks):
|
|
start_time = j * sub_chunk_duration
|
|
sub_chunk_output = os.path.join(
|
|
chunk_dir,
|
|
f"{chunk_basename}_sub_{j+1:02d}{chunk_extension}"
|
|
)
|
|
|
|
stream = ffmpeg.input(chunk_path, ss=start_time, t=sub_chunk_duration)
|
|
stream = ffmpeg.output(stream, sub_chunk_output, c='copy', map='0',
|
|
avoid_negative_ts='make_zero')
|
|
ffmpeg.run(stream, capture_stdout=True, capture_stderr=True, overwrite_output=True)
|
|
|
|
sub_chunks.append(sub_chunk_output)
|
|
|
|
sub_size_mb = os.path.getsize(sub_chunk_output) / (1024 * 1024)
|
|
logger.info(f"Created sub-chunk {j+1}/{num_sub_chunks}: {sub_size_mb:.1f}MB")
|
|
|
|
return sub_chunks
|
|
|
|
def validate_and_fix_chunks(self, chunk_paths: List[str]) -> List[str]:
|
|
"""
|
|
Validate chunk sizes and re-split any chunks exceeding hard limit.
|
|
This is the safety net that prevents oversized chunks from reaching the API.
|
|
|
|
Args:
|
|
chunk_paths: List of chunk file paths
|
|
|
|
Returns:
|
|
List of valid chunk paths (may include re-split sub-chunks)
|
|
"""
|
|
valid_chunks = []
|
|
chunks_to_cleanup = []
|
|
|
|
for i, chunk_path in enumerate(chunk_paths, 1):
|
|
chunk_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)
|
|
encoded_size_mb = chunk_size_mb * self.BASE64_OVERHEAD
|
|
|
|
# Check against HARD_LIMIT_MB (~730MB)
|
|
if chunk_size_mb > self.hard_limit_mb:
|
|
logger.warning(
|
|
f"Chunk {i} EXCEEDS hard limit: {chunk_size_mb:.1f}MB > "
|
|
f"{self.hard_limit_mb:.1f}MB (would be {encoded_size_mb:.1f}MB encoded)"
|
|
)
|
|
logger.info(f"Re-splitting oversized chunk {i}...")
|
|
|
|
try:
|
|
# Re-split with extra conservative target (80% of safe size)
|
|
sub_chunks = self._re_split_chunk(chunk_path, target_size_mb=self.safe_chunk_size_mb * 0.8)
|
|
|
|
logger.info(f"Successfully re-split chunk {i} into {len(sub_chunks)} sub-chunks")
|
|
valid_chunks.extend(sub_chunks)
|
|
chunks_to_cleanup.append(chunk_path) # Mark original for cleanup
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to re-split chunk {i}: {e}")
|
|
raise RuntimeError(
|
|
f"Chunk {i} is too large ({chunk_size_mb:.1f}MB) and could not be re-split. "
|
|
f"Video may have extremely high bitrate. Consider reducing video quality."
|
|
)
|
|
else:
|
|
# Chunk is valid
|
|
logger.info(
|
|
f"Chunk {i} validated: {chunk_size_mb:.1f}MB raw, "
|
|
f"{encoded_size_mb:.1f}MB encoded ✓"
|
|
)
|
|
valid_chunks.append(chunk_path)
|
|
|
|
# Cleanup original oversized chunks
|
|
for chunk_path in chunks_to_cleanup:
|
|
try:
|
|
os.remove(chunk_path)
|
|
logger.debug(f"Cleaned up oversized chunk: {chunk_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not remove oversized chunk: {e}")
|
|
|
|
return valid_chunks
|
|
|
|
def split_video(self, video_path: str, output_dir: Optional[str] = None) -> List[str]:
|
|
"""
|
|
Split video into safe-sized chunks with validation and automatic re-splitting.
|
|
|
|
This method:
|
|
1. Calculates optimal splitting strategy using calculate_optimal_chunks()
|
|
2. Performs initial split using ffmpeg
|
|
3. Validates all chunks against hard limit
|
|
4. Automatically re-splits any oversized chunks
|
|
5. Returns list of all valid chunks ready for processing
|
|
|
|
Args:
|
|
video_path: Path to the video file to split
|
|
output_dir: Directory to save chunks (default: system temp directory)
|
|
|
|
Returns:
|
|
List of paths to validated chunk files (all guaranteed < hard limit)
|
|
"""
|
|
# Calculate optimal splitting strategy
|
|
split_info = self.calculate_optimal_chunks(video_path)
|
|
|
|
if not split_info['needs_split']:
|
|
logger.info("Video within limits, no splitting required")
|
|
return [video_path]
|
|
|
|
# Create output directory
|
|
if output_dir is None:
|
|
output_dir = tempfile.mkdtemp(prefix="video_chunks_")
|
|
logger.info(f"Using temporary directory for chunks: {output_dir}")
|
|
else:
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
logger.info(f"Splitting video into {split_info['num_chunks']} chunks...")
|
|
|
|
# Perform initial split
|
|
try:
|
|
chunk_paths = self._split_by_duration(
|
|
video_path,
|
|
split_info['chunk_duration_seconds'],
|
|
output_dir
|
|
)
|
|
logger.info(f"Initial split complete: {len(chunk_paths)} chunks created")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed during initial split: {e}")
|
|
raise
|
|
|
|
# Validate and fix any oversized chunks
|
|
logger.info("Validating chunk sizes...")
|
|
try:
|
|
valid_chunks = self.validate_and_fix_chunks(chunk_paths)
|
|
logger.info(f"Validation complete: {len(valid_chunks)} valid chunks ready for processing")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed during chunk validation: {e}")
|
|
# Cleanup all chunks on validation failure
|
|
self.cleanup_chunks(chunk_paths)
|
|
raise
|
|
|
|
return valid_chunks
|
|
|
|
def cleanup_chunks(self, chunk_paths: List[str]) -> None:
|
|
"""
|
|
Delete temporary chunk files.
|
|
|
|
Args:
|
|
chunk_paths: List of paths to chunk files to delete
|
|
"""
|
|
if not chunk_paths:
|
|
return
|
|
|
|
logger.info(f"Cleaning up {len(chunk_paths)} chunk files")
|
|
for chunk_path in chunk_paths:
|
|
try:
|
|
if os.path.exists(chunk_path):
|
|
os.remove(chunk_path)
|
|
logger.debug(f"Deleted chunk: {chunk_path}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not delete chunk {chunk_path}: {str(e)}")
|
|
|
|
# Try to remove the temp directory if it's empty
|
|
if chunk_paths:
|
|
chunk_dir = os.path.dirname(chunk_paths[0])
|
|
try:
|
|
if os.path.exists(chunk_dir) and not os.listdir(chunk_dir):
|
|
os.rmdir(chunk_dir)
|
|
logger.debug(f"Deleted temporary directory: {chunk_dir}")
|
|
except Exception as e:
|
|
logger.warning(f"Could not delete temporary directory {chunk_dir}: {str(e)}")
|
|
|
|
def get_chunk_info(self, video_path: str) -> Tuple[int, float]:
|
|
"""
|
|
Get information about how a video would be chunked without actually splitting it.
|
|
Uses the robust multi-constraint algorithm (size + duration).
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
|
|
Returns:
|
|
Tuple of (number_of_chunks, total_duration_in_minutes)
|
|
"""
|
|
try:
|
|
split_info = self.calculate_optimal_chunks(video_path)
|
|
duration_minutes = self.get_video_duration(video_path) / 60
|
|
return (split_info['num_chunks'], duration_minutes)
|
|
except Exception as e:
|
|
logger.warning(f"Could not calculate chunk info: {e}")
|
|
# Fallback to duration-only logic
|
|
duration = self.get_video_duration(video_path)
|
|
if duration is None:
|
|
return (0, 0.0)
|
|
duration_minutes = duration / 60
|
|
num_chunks = math.ceil(duration / self.chunk_duration_seconds)
|
|
return (num_chunks, duration_minutes)
|
|
|
|
|
|
# Convenience functions for direct use
|
|
def get_video_duration(video_path: str) -> Optional[float]:
|
|
"""Get video duration in seconds."""
|
|
splitter = VideoSplitter()
|
|
return splitter.get_video_duration(video_path)
|
|
|
|
|
|
def split_video(video_path: str, chunk_duration_minutes: int = 54,
|
|
output_dir: Optional[str] = None) -> List[str]:
|
|
"""Split a video into chunks."""
|
|
splitter = VideoSplitter(chunk_duration_minutes=chunk_duration_minutes)
|
|
return splitter.split_video(video_path, output_dir=output_dir)
|
|
|
|
|
|
def cleanup_chunks(chunk_paths: List[str]) -> None:
|
|
"""Clean up chunk files."""
|
|
splitter = VideoSplitter()
|
|
splitter.cleanup_chunks(chunk_paths)
|