pahvalentines/backend/video_generator/create_video.py
michael fa58a15e58 video: fix flush of closed file error
Use wait() instead of communicate() after manually closing stdin.
communicate() tries to flush stdin which fails if already closed.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 08:42:06 -06:00

282 lines
9.9 KiB
Python

"""
Video Creation Script - Vinyl Record Animation
Creates an animated video with a rotating vinyl record effect by layering:
- Base background image (1080x1080)
- Pet/cover art image (centered in vinyl, rotates with record)
- Vinyl record template (transparent PNG, rotates)
- Needle overlay (stationary)
The script generates ONE complete rotation cycle and streams frames directly to
FFmpeg, which seamlessly loops the rotation to match the audio duration. This
approach is significantly faster and more efficient than generating all frames.
The vinyl rotates at a configurable RPM (default: 20 RPM for smooth animation).
Audio duration must be at least as long as one full rotation.
Dependency:
- ffmpeg should be installed on the OS and be present on the system $PATH
Usage:
main(pet_img_path="path/to/image.jpg", audio_track_path="path/to/audio.mp3", output_path="path/to/output.mp4")
"""
import logging
import shutil
import subprocess
import time
from datetime import datetime as dt
from math import ceil
from pathlib import Path
from mutagen.mp3 import MP3
from PIL import Image
logger = logging.getLogger(__name__)
# Asset paths (relative to this file's directory)
SCRIPT_DIR = Path(__file__).parent
ASSET_DIR = SCRIPT_DIR / "assets"
BASE_IMG = ASSET_DIR / "1080x1080-bg.png"
VINYL_TEMPLATE = ASSET_DIR / "736-x-736-record.png"
NEEDLE_IMG = ASSET_DIR / "needle.png"
# Video settings
VIDEO_FORMAT = "mp4"
FINAL_VIDEO_WIDTH_PX = 720
FINAL_VIDEO_HEIGHT_PX = 720
FRAME_RATE = 15
# Rotation settings
VINYL_RPM = 20
# Image positioning offsets (centered on base image)
ORIGINAL_RESOLUTION = 1080
SCALE_FACTOR = FINAL_VIDEO_WIDTH_PX / ORIGINAL_RESOLUTION
BASE_WIDTH = FINAL_VIDEO_WIDTH_PX
BASE_HEIGHT = FINAL_VIDEO_HEIGHT_PX
PET_SIZE_PX = int(360 * SCALE_FACTOR)
VINYL_SIZE_PX = int(736 * SCALE_FACTOR)
VINYL_OFFSET_X = (BASE_WIDTH - VINYL_SIZE_PX) // 2
VINYL_OFFSET_Y = (BASE_HEIGHT - VINYL_SIZE_PX) // 2 + int(100 * SCALE_FACTOR)
PET_OFFSET_X = VINYL_OFFSET_X + (VINYL_SIZE_PX - PET_SIZE_PX) // 2
PET_OFFSET_Y = VINYL_OFFSET_Y + (VINYL_SIZE_PX - PET_SIZE_PX) // 2
NEEDLE_OFFSET_X = 0
NEEDLE_OFFSET_Y = 0
def get_audio_duration(audio_track_path: str) -> int:
"""Read the duration of the audio track."""
logger.info("Reading audio track duration...")
audio = MP3(audio_track_path)
duration = ceil(audio.info.length)
logger.info(f"Audio duration: {duration} seconds ({duration / 60:.2f} minutes)")
return duration
def load_and_resize_images(pet_img_path: str) -> tuple:
"""Load all images and resize them to required dimensions."""
logger.info("Loading and resizing images...")
resample_mode = Image.Resampling.BILINEAR
# Load base image
base_img = Image.open(BASE_IMG).convert("RGBA")
if base_img.size != (BASE_WIDTH, BASE_HEIGHT):
base_img = base_img.resize((BASE_WIDTH, BASE_HEIGHT), resample_mode)
# Load and resize pet image
pet_img = Image.open(pet_img_path).convert("RGBA")
pet_img = pet_img.resize((PET_SIZE_PX, PET_SIZE_PX), resample_mode)
# Load vinyl template
vinyl_img = Image.open(VINYL_TEMPLATE).convert("RGBA")
if vinyl_img.size != (VINYL_SIZE_PX, VINYL_SIZE_PX):
vinyl_img = vinyl_img.resize((VINYL_SIZE_PX, VINYL_SIZE_PX), resample_mode)
# Load needle image
needle_img = Image.open(NEEDLE_IMG).convert("RGBA")
if needle_img.size != (BASE_WIDTH, BASE_HEIGHT):
needle_img = needle_img.resize((BASE_WIDTH, BASE_HEIGHT), resample_mode)
logger.info(f"Images loaded - Base: {base_img.size}, Pet: {pet_img.size}, Vinyl: {vinyl_img.size}")
return base_img, pet_img, vinyl_img, needle_img
def create_composite_frame(base_img, pet_img, vinyl_img, needle_img, rotation_angle: float = 0):
"""Composite all layers into a single frame."""
frame = base_img.copy()
# Layer 2: Rotate and paste pet image
if rotation_angle != 0:
rotated_pet = pet_img.rotate(-rotation_angle, resample=Image.Resampling.BICUBIC, expand=False)
frame.paste(rotated_pet, (PET_OFFSET_X, PET_OFFSET_Y), rotated_pet)
else:
frame.paste(pet_img, (PET_OFFSET_X, PET_OFFSET_Y), pet_img)
# Layer 3: Rotate and paste vinyl template
if rotation_angle != 0:
rotated_vinyl = vinyl_img.rotate(-rotation_angle, resample=Image.Resampling.BICUBIC, expand=False)
frame.paste(rotated_vinyl, (VINYL_OFFSET_X, VINYL_OFFSET_Y), rotated_vinyl)
else:
frame.paste(vinyl_img, (VINYL_OFFSET_X, VINYL_OFFSET_Y), vinyl_img)
# Layer 4: Paste needle on top
frame.paste(needle_img, (NEEDLE_OFFSET_X, NEEDLE_OFFSET_Y), needle_img)
return frame
def calculate_rotation_duration() -> float:
"""Calculate duration in seconds for one full 360 rotation based on RPM."""
return 60.0 / VINYL_RPM
def calculate_rotation_angle(frame_num: int, total_frames: int) -> float:
"""Calculate rotation angle for a given frame."""
progress = frame_num / total_frames
return (progress * 360) % 360
def generate_and_stream_frames(base_img, pet_img, vinyl_img, needle_img, audio_duration: int, audio_track_path: str, output_path: str):
"""Generate frames for one full rotation and stream to FFmpeg via stdin."""
step_start = time.time()
rotation_duration = calculate_rotation_duration()
frames_per_rotation = int(rotation_duration * FRAME_RATE)
num_loops = int(audio_duration / rotation_duration)
logger.info(f"Generating 1 rotation cycle ({frames_per_rotation} frames at {FRAME_RATE} fps)...")
logger.info(f"Rotation duration: {rotation_duration:.2f}s at {VINYL_RPM} RPM")
logger.info(f"Video will loop {num_loops} times to match audio duration")
ffmpeg_cmd = [
"ffmpeg",
"-y",
"-f", "image2pipe",
"-framerate", str(FRAME_RATE),
"-i", "pipe:0",
"-i", audio_track_path,
"-filter_complex", f"[0:v]loop=loop={num_loops}:size={frames_per_rotation}:start=0[outv]",
"-map", "[outv]",
"-map", "1:a",
"-c:v", "libx264",
"-crf", "28",
"-pix_fmt", "yuv420p",
"-c:a", "aac",
"-b:a", "192k",
"-r", str(FRAME_RATE),
"-shortest",
output_path,
]
try:
ffmpeg_process = subprocess.Popen(
ffmpeg_cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**8,
)
for frame_num in range(frames_per_rotation):
rotation_angle = calculate_rotation_angle(frame_num, frames_per_rotation)
composite_frame = create_composite_frame(base_img, pet_img, vinyl_img, needle_img, rotation_angle)
composite_frame.save(ffmpeg_process.stdin, format="PNG")
if (frame_num + 1) % 50 == 0 or frame_num == frames_per_rotation - 1:
logger.info(f"Frame progress: {frame_num + 1}/{frames_per_rotation}")
ffmpeg_process.stdin.close()
ffmpeg_process.wait()
stderr = ffmpeg_process.stderr.read()
if ffmpeg_process.returncode != 0:
logger.error(f"FFmpeg error: {stderr.decode()}")
raise subprocess.CalledProcessError(ffmpeg_process.returncode, ffmpeg_cmd)
step_time = time.time() - step_start
logger.info(f"Video created successfully in {step_time:.2f} seconds")
except BrokenPipeError:
logger.error("FFmpeg process terminated unexpectedly")
raise
def check_dependencies():
"""Verify that required system tools are installed."""
if shutil.which("ffmpeg") is None:
raise RuntimeError(
"ffmpeg was not found on your system PATH. "
"Please install ffmpeg: brew install ffmpeg (macOS) or apt-get install ffmpeg (Linux)"
)
logger.info("Dependency check passed (ffmpeg found)")
def validate_audio_duration(audio_duration: int):
"""Validate that audio duration is sufficient for at least one rotation."""
rotation_duration = calculate_rotation_duration()
if audio_duration < rotation_duration:
raise ValueError(
f"Audio duration ({audio_duration}s) is shorter than one full rotation "
f"({rotation_duration:.2f}s at {VINYL_RPM} RPM)"
)
def main(pet_img_path: str, audio_track_path: str, output_path: str | None = None):
"""
Main execution function.
Args:
pet_img_path: Path to the pet image
audio_track_path: Path to the audio MP3 file
output_path: Optional custom output path (generates timestamped name if not provided)
"""
start_time = time.time()
logger.info("=" * 60)
logger.info(f"Video generation started at {dt.now().strftime('%Y-%m-%d %H:%M:%S')}")
logger.info(f"Pet image: {pet_img_path}")
logger.info(f"Audio track: {audio_track_path}")
logger.info(f"Output path: {output_path}")
logger.info("=" * 60)
check_dependencies()
audio_duration = get_audio_duration(audio_track_path)
validate_audio_duration(audio_duration)
# Use provided output path or generate default
if output_path is None:
output_dir = Path("./output")
output_dir.mkdir(exist_ok=True)
output_path = str(output_dir / f"final_video-{dt.now().strftime('%Y%b%d_%H%M%S')}.{VIDEO_FORMAT}")
base_img, pet_img, vinyl_img, needle_img = load_and_resize_images(pet_img_path)
generate_and_stream_frames(
base_img, pet_img, vinyl_img, needle_img, audio_duration, audio_track_path, output_path
)
total_time = time.time() - start_time
output_file = Path(output_path)
logger.info("=" * 60)
logger.info(f"Video saved to: {output_path}")
logger.info(f"Total time: {total_time:.2f} seconds ({total_time / 60:.2f} minutes)")
if output_file.exists():
logger.info(f"File size: {output_file.stat().st_size / (1024 * 1024):.1f} MB")
logger.info("=" * 60)
if __name__ == "__main__":
# Default parameters for testing
PET_IMG = str(ASSET_DIR / "dog_upload.jpg")
AUDIO_TRACK = str(ASSET_DIR / "my-track.mp3")
main(pet_img_path=PET_IMG, audio_track_path=AUDIO_TRACK)