forge/backend/app/utils/video.py

166 lines
5.1 KiB
Python

"""Video utility functions"""
import subprocess
import json
from typing import Optional, Dict, Any
def extract_video_metadata(file_path: str) -> Dict[str, Any]:
"""Extract video metadata using ffprobe
Args:
file_path: Path to the video file
Returns:
Dictionary containing:
- duration_seconds: Video duration in seconds
- width: Video width in pixels
- height: Video height in pixels
- fps: Frames per second
- codec: Video codec name
- bitrate: Video bitrate
Returns empty dict if extraction fails.
"""
try:
result = subprocess.run([
'ffprobe',
'-v', 'quiet',
'-print_format', 'json',
'-show_format',
'-show_streams',
file_path
], capture_output=True, text=True, timeout=30)
if result.returncode != 0:
print(f"ffprobe failed with return code {result.returncode}")
return {}
data = json.loads(result.stdout)
# Initialize metadata dict
metadata = {
'duration_seconds': None,
'width': None,
'height': None,
'fps': None,
'codec': None,
'bitrate': None
}
# Get duration from format
if 'format' in data and 'duration' in data['format']:
metadata['duration_seconds'] = float(data['format']['duration'])
if 'format' in data and 'bit_rate' in data['format']:
metadata['bitrate'] = int(data['format']['bit_rate'])
# Get video stream info
for stream in data.get('streams', []):
if stream.get('codec_type') == 'video':
metadata['width'] = stream.get('width')
metadata['height'] = stream.get('height')
metadata['codec'] = stream.get('codec_name')
# Calculate FPS from r_frame_rate
if 'r_frame_rate' in stream:
try:
num, den = map(int, stream['r_frame_rate'].split('/'))
if den > 0:
metadata['fps'] = num / den
except (ValueError, ZeroDivisionError):
pass
# Some videos have avg_frame_rate instead
if not metadata['fps'] and 'avg_frame_rate' in stream:
try:
num, den = map(int, stream['avg_frame_rate'].split('/'))
if den > 0:
metadata['fps'] = num / den
except (ValueError, ZeroDivisionError):
pass
break # Use first video stream
return metadata
except subprocess.TimeoutExpired:
print(f"ffprobe timed out for file: {file_path}")
return {}
except FileNotFoundError:
print("ffprobe not found. Please ensure ffmpeg is installed.")
return {}
except json.JSONDecodeError:
print(f"Failed to parse ffprobe output for file: {file_path}")
return {}
except Exception as e:
print(f"Failed to extract video metadata: {e}")
return {}
def format_duration(seconds: float) -> str:
"""Format duration in seconds to HH:MM:SS
Args:
seconds: Duration in seconds
Returns:
Formatted duration string
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
if hours > 0:
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
else:
return f"{minutes:02d}:{secs:02d}"
def generate_video_thumbnail(video_path: str, output_path: str, timestamp: float = 1.0) -> bool:
"""Generate a thumbnail from a video file at specified timestamp
Args:
video_path: Path to input video
output_path: Path to save thumbnail (should end in .jpg or .png)
timestamp: Time in seconds to extract frame from (default: 1.0)
Returns:
True if successful, False otherwise
"""
import os
import logging
logger = logging.getLogger(__name__)
try:
# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)
cmd = [
'ffmpeg',
'-y', # Overwrite output file
'-ss', str(timestamp), # Seek to timestamp
'-i', video_path,
'-vframes', '1', # Extract 1 frame
'-vf', 'scale=320:-1', # Scale to 320px width, maintain aspect ratio
'-q:v', '2', # High quality
output_path
]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
if result.returncode == 0 and os.path.exists(output_path):
logger.info(f"Generated thumbnail: {output_path}")
return True
else:
logger.error(f"FFmpeg thumbnail generation failed: {result.stderr}")
return False
except Exception as e:
logger.error(f"Failed to generate video thumbnail: {e}")
return False