video-query/backend/video_processor.py
2025-11-15 05:27:08 +05:30

1313 lines
58 KiB
Python

from google import genai
import mimetypes
import time
import os
import logging
import requests
import json
import datetime
import base64
from typing import Dict, Any, Optional, List, Tuple
from dotenv import load_dotenv
from video_splitter import VideoSplitter
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
from system_utils import system_utils
from error_reporter import ErrorReporter, ErrorCategory
# Load environment variables from .env file
load_dotenv()
logger = logging.getLogger('video_query')
class VideoProcessor:
"""
Class to handle video uploads and processing with Gemini API.
"""
# Default prompts for different modes
PROMPTS = {
"meeting_summary": "Generate a detailed summary of the meeting in the attached video recording, including discussion points and action items with owners",
"process_documentation": "Generate detailed process documentation suitable for reference or training based on the process illustrated in the attached video recording. Write the documentation so that a new user will be able to follow step by step and accomplish the task illustrated in the video",
"documentation_with_charts": "Analyze this video to create comprehensive process documentation with workflow diagrams for a knowledge base article. Follow these requirements exactly:\n\n1. CONTENT REQUIREMENTS:\n - Provide a detailed step-by-step explanation of the process shown\n - Be extremely verbose and thorough - include all relevant details, context, and nuances\n - Structure as a complete knowledge base article with clear sections\n - Include overview, detailed steps, tips, and troubleshooting where applicable\n\n2. MERMAID DIAGRAM REQUIREMENTS:\n - Create workflow diagrams using valid Mermaid syntax where helpful\n - CRITICAL: Use only simple alphanumeric text in node descriptions and labels\n - CRITICAL: No special characters like quotes brackets colons semicolons or symbols in node text\n - CRITICAL: Use underscores instead of spaces in node IDs and labels\n - CRITICAL: Keep all text simple to avoid syntax errors\n - Example format: Start_Process --> Complete_Task --> End_Process\n - Use flowchart format: graph TD or graph LR\n\n3. OUTPUT STRUCTURE:\n - Title and overview section\n - Prerequisites section if applicable\n - Detailed step-by-step process\n - Mermaid workflow diagram(s) showing the process flow\n - Tips and best practices\n - Troubleshooting common issues\n\nEnsure all Mermaid diagrams use simple text without special characters to prevent parsing errors.",
"custom": "" # Custom prompt will be provided by the user
}
# Maximum video duration in minutes (Gemini limitation)
MAX_VIDEO_DURATION = 55
# Threshold for chunked upload (10MB)
CHUNKED_UPLOAD_THRESHOLD = 10 * 1024 * 1024
# Webhook URL for tracking usage
WEBHOOK_URL = "https://hook.us1.make.celonis.com/8ri1h8b2he4wudp2jku69mgcxumzxf3v"
# Parallel processing configuration
# Default max workers for parallel chunk processing
DEFAULT_MAX_WORKERS = 4 # Default concurrent workers
# Model configuration
DEFAULT_PROCESSING_MODEL = "gemini-2.5-pro" # Model for individual video processing
DEFAULT_SYNTHESIS_MODEL = "gemini-2.5-pro" # Model for batch synthesis
# Retry configuration
MAX_RETRY_ATTEMPTS = 5 # Maximum retry attempts
RETRY_DELAYS = [5, 10, 20, 40, 60] # Exponential backoff delays in seconds
def __init__(self, api_key: Optional[str] = None, max_parallel_chunks: int = None):
"""
Initialize with API key from environment variable or direct setting
Args:
api_key: Google API key for Gemini
max_parallel_chunks: Maximum number of chunks to process in parallel
(default: from MAX_PARALLEL_CHUNKS env var or 4)
"""
self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
if not self.api_key:
logger.error("API key not provided")
raise ValueError("API key not provided - set GOOGLE_API_KEY environment variable or pass when initializing")
# Initialize the Gemini client
logger.info("Initializing Gemini API client")
self.client = genai.Client(api_key=self.api_key)
logger.info("Gemini API client initialized successfully")
# Set parallel processing configuration
if max_parallel_chunks:
self.max_parallel_chunks = max_parallel_chunks
else:
# Load from environment variable or use default
env_max_workers = os.getenv("MAX_PARALLEL_CHUNKS")
if env_max_workers:
self.max_parallel_chunks = int(env_max_workers)
else:
self.max_parallel_chunks = self.DEFAULT_MAX_WORKERS
logger.info(f"Parallel processing: max {self.max_parallel_chunks} concurrent chunks")
# Initialize video splitter
self.video_splitter = VideoSplitter()
# Load configuration from environment variables
self.processing_model = os.getenv("VIDEO_PROCESSOR_MODEL", self.DEFAULT_PROCESSING_MODEL)
self.synthesis_model = os.getenv("VIDEO_SYNTHESIS_MODEL", self.DEFAULT_SYNTHESIS_MODEL)
self.log_prompts = os.getenv("BATCH_PROCESSING_LOG_PROMPTS", "false").lower() == "true"
self.log_summaries = os.getenv("BATCH_PROCESSING_LOG_SUMMARIES", "false").lower() == "true"
logger.info(f"Configuration: processing_model={self.processing_model}, synthesis_model={self.synthesis_model}")
logger.info(f"Logging: prompts={self.log_prompts}, summaries={self.log_summaries}")
def send_usage_webhook(self, user_email: str, prompt: str) -> None:
"""
Send usage data to webhook for tracking purposes
Args:
user_email: Email of the user who processed the video
prompt: The prompt used for processing
"""
try:
current_datetime = datetime.datetime.now().isoformat()
webhook_data = {
"tool": "VIDEOQUERY",
"date": current_datetime,
"user": user_email,
"model": "GEMINI",
"settings": "no settings",
"subTool": "no subTool",
"prompt": prompt,
"negativePrompt": "no NEGATIVE_PROMPT",
"image": "no image"
}
logger.info(f"Sending usage data to webhook for user: {user_email}")
response = requests.post(
self.WEBHOOK_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(webhook_data),
timeout=10 # 10 second timeout
)
if response.status_code == 200:
logger.info("Successfully sent usage data to webhook")
else:
logger.warning(f"Webhook request failed with status code: {response.status_code}")
logger.warning(f"Response: {response.text}")
except Exception as e:
logger.error(f"Error sending usage data to webhook: {str(e)}")
# Don't raise the exception - webhook failure shouldn't block the main flow
def _extract_error_code(self, error_message: str) -> str:
"""
Extract HTTP error code from error message.
Args:
error_message: Error message string
Returns:
Error code (e.g., "503", "429") or "UNKNOWN"
"""
import re
match = re.search(r'(\d{3})\s+(UNAVAILABLE|TOO_MANY_REQUESTS|RESOURCE_EXHAUSTED|INVALID_ARGUMENT|INTERNAL)',
error_message, re.IGNORECASE)
if match:
return match.group(1)
return "UNKNOWN"
def _is_retryable_error(self, error_str: str, error_code: str, attempt: int) -> Tuple[bool, int]:
"""
Determine if error is retryable and calculate retry delay.
Args:
error_str: Error message (lowercase)
error_code: Extracted error code
attempt: Current attempt number (0-indexed)
Returns:
Tuple of (is_retryable: bool, retry_delay_seconds: int)
"""
# 503 UNAVAILABLE - Model overloaded (RETRYABLE with longer delays)
if '503' in error_code or 'unavailable' in error_str:
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
logger.warning(f"503 UNAVAILABLE detected - API overloaded, will retry in {delay}s")
return (True, delay)
# 429 TOO_MANY_REQUESTS - Rate limit (RETRYABLE with longer delays)
if '429' in error_code or 'too many requests' in error_str or 'rate limit' in error_str:
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
logger.warning(f"429 RATE LIMIT detected, will retry in {delay}s")
return (True, delay)
# 500 INTERNAL_SERVER_ERROR (RETRYABLE)
if '500' in error_code or 'internal server error' in error_str:
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
logger.warning(f"500 INTERNAL ERROR detected, will retry in {delay}s")
return (True, delay)
# RESOURCE_EXHAUSTED (RETRYABLE)
if 'resource_exhausted' in error_str or 'quota' in error_str:
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
logger.warning(f"Resource exhausted - quota or rate limit, will retry in {delay}s")
return (True, delay)
# Network errors (RETRYABLE with shorter delays)
if any(err in error_str for err in ['name resolution', 'connection', 'timeout', 'network']):
delay = 5 # Fixed 5s delay for network issues
logger.warning(f"Network error detected, will retry in {delay}s")
return (True, delay)
# 400 INVALID_ARGUMENT - Usually not retryable
if '400' in error_code or 'invalid_argument' in error_str:
logger.error(f"400 INVALID_ARGUMENT - not retryable")
return (False, 0)
# Default: not retryable
logger.error(f"Error not recognized as retryable: {error_str[:100]}")
return (False, 0)
def _make_api_request_with_retry(self, model: str, contents: list, context: str = "") -> any:
"""
Make API request with intelligent retry logic.
Handles 503 (overload), 429 (rate limit), 500 (server error), and network errors.
Args:
model: Model name to use
contents: Content to send to the API
context: Context description for logging (e.g., "[Video: example.mp4]")
Returns:
API response object
Raises:
Exception: If all retry attempts fail
"""
last_exception = None
for attempt in range(self.MAX_RETRY_ATTEMPTS):
try:
# Make the API call
if attempt == 0:
logger.info(f"{context} Sending request to Gemini API")
else:
logger.info(f"{context} Retry attempt {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}")
response = self.client.models.generate_content(
model=model,
contents=contents
)
# Success!
if attempt > 0:
logger.info(f"{context} ✓ Request succeeded after {attempt + 1} attempts")
else:
logger.info(f"{context} ✓ Request succeeded on first attempt")
return response
except Exception as e:
last_exception = e
error_str = str(e).lower()
error_code = self._extract_error_code(str(e))
# Log detailed error information for INVALID_ARGUMENT (helps debug)
if 'invalid_argument' in error_str or '400' in error_str:
logger.error("=" * 80)
logger.error(f"{context} INVALID_ARGUMENT ERROR:")
logger.error(f" Error: {str(e)[:200]}")
logger.error(f" Model: {model}")
logger.error(f" Attempt: {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}")
logger.error("=" * 80)
# Determine if retryable
is_retryable, retry_delay = self._is_retryable_error(error_str, error_code, attempt)
if not is_retryable:
logger.error(f"{context} Non-retryable error: {error_code} - {str(e)[:100]}")
raise
if attempt < self.MAX_RETRY_ATTEMPTS - 1:
logger.warning(
f"{context} Retryable error (attempt {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}): "
f"{error_code} - {str(e)[:150]}"
)
logger.info(f"{context} Waiting {retry_delay}s before retry...")
time.sleep(retry_delay)
continue
else:
logger.error(
f"{context} ✗ All {self.MAX_RETRY_ATTEMPTS} attempts failed. "
f"Last error: {error_code} - {str(e)[:150]}"
)
raise
# If we get here, all retries failed
raise last_exception
def process_video(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]:
"""
Process a video with the given prompt using Gemini API
Args:
video_path: Path to the video file
prompt: Text prompt to use for video analysis
user_email: Email of the user processing the video (for usage tracking)
Returns:
Dictionary with processing result or error
"""
start_time = time.time()
result = {
"success": False,
"message": "",
"content": "",
"processing_time_seconds": 0
}
logger.info(f"Processing video: {video_path}")
logger.info(f"Prompt: {prompt[:100]}..." if len(prompt) > 100 else f"Prompt: {prompt}")
if not os.path.exists(video_path):
error_msg = f"Video file not found at '{video_path}'"
logger.error(error_msg)
result["message"] = error_msg
return result
try:
# Get file size
file_size = os.path.getsize(video_path)
file_size_mb = file_size / (1024 * 1024)
logger.info(f"File size: {file_size_mb:.2f} MB")
# Get video duration to detect too-short videos
try:
video_duration = self.video_splitter.get_video_duration(video_path)
if video_duration:
logger.info(f"Video duration: {video_duration:.2f} seconds ({video_duration/60:.2f} minutes)")
# Check if video is too short (< 1 second can cause INVALID_ARGUMENT)
if video_duration < 1.0:
error_msg = f"Video too short: {video_duration:.2f}s. Gemini requires videos >= 1 second"
logger.error(error_msg)
result["message"] = error_msg
return result
else:
logger.warning("Could not determine video duration")
except Exception as dur_err:
logger.warning(f"Error checking video duration: {str(dur_err)}")
# Determine MIME type for the video
mime_type, _ = mimetypes.guess_type(video_path)
if not mime_type:
logger.info(f"Could not determine MIME type, using default: video/mp4")
mime_type = "video/mp4" # Fallback
else:
logger.info(f"MIME type: {mime_type}")
# Validate MIME type is video
if not mime_type.startswith('video/'):
error_msg = f"Invalid file type: {mime_type}. Only video files are supported (mp4, avi, mov, mkv, webm, etc.)"
logger.error(error_msg)
result["message"] = error_msg
return result
# Validate video file is readable and not corrupted
# This provides a secondary check in case validation at upload didn't happen
try:
import subprocess
ffprobe_path = system_utils.find_ffprobe()
probe_result = subprocess.run(
[ffprobe_path, '-v', 'error', '-show_entries', 'format=duration,format_name',
'-of', 'default=noprint_wrappers=1', video_path],
capture_output=True, text=True, timeout=10
)
if probe_result.returncode != 0:
error_detail = probe_result.stderr.strip()
# Provide user-friendly error messages for common issues
if "moov atom not found" in error_detail.lower():
error_msg = "Video file is incomplete or corrupted (missing header). The file may not have uploaded completely. Please try uploading again."
elif "invalid data found" in error_detail.lower():
error_msg = "Video file contains invalid data and cannot be processed. Please check the file or try re-encoding it."
else:
error_msg = f"Video file appears to be corrupted or unreadable. Technical details: {error_detail}"
logger.error(f"Video validation failed: {error_msg}")
result["message"] = error_msg
return result
logger.info(f"Video file validation successful. Format info: {probe_result.stdout[:100]}")
except subprocess.TimeoutExpired:
logger.warning("Video validation timed out after 10s - proceeding anyway")
except FileNotFoundError:
logger.warning("ffprobe not found - skipping video file validation")
except Exception as val_err:
logger.warning(f"Could not validate video file: {str(val_err)} - proceeding anyway")
# SIMPLIFIED APPROACH: Use inline base64 for all files
# File Upload API has known issues (KeyError: 'file' in SDK 1.45.0-1.49.0)
# Gemini API REQUEST limit: 1GB (1073741824 bytes) - confirmed by testing
# Base64 adds ~37% overhead, so we validate encoded size
# Calculate estimated encoded size (base64 overhead ~37%)
BASE64_OVERHEAD = 1.37
estimated_encoded_mb = file_size_mb * BASE64_OVERHEAD
API_LIMIT_MB = 1000 # 1GB API request limit
# Check if encoded size would exceed API limit
if estimated_encoded_mb > API_LIMIT_MB:
error_msg = (
f"Video chunk is too large: {file_size_mb:.2f}MB raw, "
f"~{estimated_encoded_mb:.1f}MB after base64 encoding. "
f"This exceeds the {API_LIMIT_MB}MB (1GB) API limit. "
f"The video needs to be split into smaller chunks. "
f"Target chunk size: 500MB (accounting for variable bitrate)."
)
logger.error(error_msg)
result["message"] = error_msg
return result
# Warn if approaching API limit (>900MB encoded)
if estimated_encoded_mb > 900:
logger.warning(
f"Chunk is large: {file_size_mb:.1f}MB raw, "
f"~{estimated_encoded_mb:.1f}MB encoded. "
f"Approaching {API_LIMIT_MB}MB API limit. Processing may be slower."
)
# Use base64 encoding for all files (reliable and works consistently)
logger.info(f"Encoding video as base64 (file size: {file_size_mb:.2f}MB)")
encode_start = time.time()
with open(video_path, "rb") as video_file_obj:
video_data = video_file_obj.read()
video_base64 = base64.b64encode(video_data).decode('utf-8')
encode_time = time.time() - encode_start
encoded_size_mb = len(video_base64) / (1024 * 1024)
logger.info(f"Base64 encoding complete in {encode_time:.2f}s. Encoded size: {encoded_size_mb:.2f}MB ({len(video_base64)} chars)")
# Verify actual encoded size is within API limits (1GB)
if encoded_size_mb > API_LIMIT_MB:
error_msg = (
f"Encoded video size ({encoded_size_mb:.2f}MB) exceeds Gemini API limit of {API_LIMIT_MB}MB (1GB). "
f"Original file: {file_size_mb:.2f}MB. "
f"This chunk needs further splitting. Target: 500MB raw chunks. "
f"Variable bitrate caused larger-than-expected chunk size."
)
logger.error(error_msg)
result["message"] = error_msg
return result
# Create the content parts using inline data
prompt_parts = [
{"text": prompt},
{"inline_data": {
"mime_type": mime_type,
"data": video_base64
}}
]
uploaded_file = None # Not using File Upload API
# Use the new retry logic with rate limiting
context = f"[Video: {os.path.basename(video_path)}]"
api_start = time.time()
response = self._make_api_request_with_retry(
model=self.processing_model,
contents=prompt_parts,
context=context
)
api_time = time.time() - api_start
logger.info(f"Received response from Gemini (API call took {api_time:.1f}s)")
# Extract the response content
content = ""
if response.parts:
logger.info(f"Response has {len(response.parts)} parts")
for i, part in enumerate(response.parts):
if hasattr(part, 'text'):
part_text = part.text
content_preview = part_text[:100] + '...' if len(part_text) > 100 else part_text
logger.info(f"Part {i} (text): {content_preview}")
content += part_text
else:
logger.info(f"Part {i} (no text): {type(part)}")
else:
logger.warning("No parts in response")
if hasattr(response, 'prompt_feedback') and response.prompt_feedback:
logger.warning(f"Prompt feedback: {response.prompt_feedback}")
# Set success result
result["success"] = True
result["content"] = content
result["processing_time_seconds"] = round(time.time() - start_time, 2)
logger.info(f"Processed result with {len(content)} characters")
logger.info(f"Total processing time: {result['processing_time_seconds']}s")
# Send usage data to webhook for tracking
self.send_usage_webhook(user_email, prompt)
# Clean up uploaded file if it was used
if uploaded_file:
try:
logger.info(f"Deleting uploaded file: {uploaded_file.name}")
self.client.files.delete(name=uploaded_file.name)
logger.info("File deleted successfully from Gemini storage")
except Exception as del_err:
logger.warning(f"Could not delete file from Gemini storage: {str(del_err)}")
return result
except Exception as e:
import traceback
error_report = ErrorReporter.capture_error(
e,
context={
'video_path': video_path,
'prompt_length': len(prompt),
'operation': 'process_video'
}
)
error_details = traceback.format_exc()
logger.error(f"Error processing video: {str(e)}")
logger.error(error_details)
result["message"] = error_report.format_user_message()
result["error_details"] = error_details
result["error_id"] = error_report.error_id
result["error_category"] = error_report.category.value
return result
def combine_chunk_responses(self, responses: List[str], prompt: str,
num_chunks: int, video_name: str = "") -> str:
"""
Combine responses from multiple chunks of a single video using AI synthesis.
Uses universal synthesis approach - no type detection.
Args:
responses: List of response texts from each chunk
prompt: Original user prompt
num_chunks: Total number of chunks
video_name: Name of the video file
Returns:
Combined response text
"""
logger.info(f"Combining {len(responses)} chunk responses using AI synthesis")
# Try AI synthesis first
try:
# Prepare chunk responses
summaries_text = ""
for i, response in enumerate(responses, 1):
summaries_text += f"\n--- Segment {i} ---\n{response.strip()}\n"
# Universal synthesis prompt for single video
synthesis_prompt = f"""You are creating a final unified response by combining multiple segment analyses from one video.
Context:
- One video was split into {num_chunks} segments for processing
- Each segment was analyzed separately
- Below are the responses from each segment
Original user request:
"{prompt}"
Segment responses:
{summaries_text}
Your task:
1. Combine these segment responses into ONE cohesive response that fulfills the user's request above
2. Do not reference segments, chunks, or parts in your output
3. Present as a unified analysis of the complete video
Provide your unified response:
"""
logger.info("[Chunk Synthesis] Sending synthesis request to Gemini")
response = self._make_api_request_with_retry(
model=self.synthesis_model,
contents=[{"text": synthesis_prompt}],
context="[Chunk Combination]"
)
if response.parts:
synthesized_content = ""
for part in response.parts:
if hasattr(part, 'text'):
synthesized_content += part.text
if synthesized_content:
logger.info("[Chunk Synthesis] Successfully synthesized chunk responses")
return synthesized_content
logger.warning("[Chunk Synthesis] Synthesis returned empty, falling back to concatenation")
except Exception as e:
logger.warning(f"[Chunk Synthesis] Synthesis failed: {e}, falling back to concatenation")
# Fallback: simple concatenation
return self._fallback_concatenation_single_video(responses, num_chunks, video_name)
def _process_single_chunk(self, chunk_info: Tuple[int, str, str, int, str]) -> Tuple[int, Dict[str, Any]]:
"""
Process a single video chunk. Used for parallel processing.
Args:
chunk_info: Tuple of (chunk_index, chunk_path, chunk_prompt, total_chunks, user_email)
Returns:
Tuple of (chunk_index, result_dict)
"""
chunk_index, chunk_path, chunk_prompt, total_chunks, user_email = chunk_info
logger.info(f"[Parallel] Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_path}")
try:
chunk_result = self.process_video(chunk_path, chunk_prompt, user_email)
logger.info(f"[Parallel] Completed chunk {chunk_index + 1}/{total_chunks}")
return (chunk_index, chunk_result)
except Exception as e:
logger.error(f"[Parallel] Error processing chunk {chunk_index + 1}/{total_chunks}: {str(e)}")
return (chunk_index, {
"success": False,
"message": f"Error processing chunk {chunk_index + 1}: {str(e)}",
"content": ""
})
def _process_chunks_parallel(self, chunk_paths: List[str], prompt: str,
user_email: str) -> List[Dict[str, Any]]:
"""
Process multiple video chunks in parallel using ThreadPoolExecutor.
Args:
chunk_paths: List of paths to video chunk files
prompt: Original prompt for video analysis
user_email: User email for tracking
Returns:
List of result dictionaries in order of chunks
"""
num_chunks = len(chunk_paths)
logger.info(f"Starting parallel processing of {num_chunks} chunks with {self.max_parallel_chunks} workers")
# Prepare chunk information for parallel processing
chunk_infos = []
for i, chunk_path in enumerate(chunk_paths):
# Extract video name from chunk path (remove _chunk_XX suffix)
video_name = os.path.basename(chunk_path).rsplit('_chunk_', 1)[0] if '_chunk_' in chunk_path else os.path.basename(chunk_path)
chunk_prompt = self._create_chunk_prompt(prompt, i + 1, num_chunks, video_name)
chunk_infos.append((i, chunk_path, chunk_prompt, num_chunks, user_email))
# Process chunks in parallel
results = [None] * num_chunks # Pre-allocate results list to maintain order
with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor:
# Submit all chunks for processing
future_to_chunk = {
executor.submit(self._process_single_chunk, chunk_info): chunk_info[0]
for chunk_info in chunk_infos
}
# Collect results as they complete
completed = 0
for future in as_completed(future_to_chunk):
chunk_index = future_to_chunk[future]
try:
chunk_index, result = future.result()
results[chunk_index] = result
completed += 1
logger.info(f"[Parallel] Progress: {completed}/{num_chunks} chunks completed")
except Exception as e:
logger.error(f"[Parallel] Unexpected error for chunk {chunk_index + 1}: {str(e)}")
results[chunk_index] = {
"success": False,
"message": f"Unexpected error: {str(e)}",
"content": ""
}
logger.info(f"[Parallel] All {num_chunks} chunks processed")
return results
def process_long_video(self, video_path: str, prompt: str,
user_email: str = "anonymous", use_parallel: bool = True) -> Dict[str, Any]:
"""
Process a long video by splitting it into chunks and combining the results.
Supports both parallel and sequential processing.
Args:
video_path: Path to the video file
prompt: Text prompt to use for video analysis
user_email: Email of the user processing the video (for usage tracking)
use_parallel: If True, process chunks in parallel; if False, process sequentially
Returns:
Dictionary with processing result or error
"""
start_time = time.time()
result = {
"success": False,
"message": "",
"content": "",
"chunks_processed": 0,
"processing_mode": "parallel" if use_parallel else "sequential",
"processing_time_seconds": 0
}
chunk_paths = []
try:
# Check if video needs splitting
num_chunks, duration_minutes = self.video_splitter.get_chunk_info(video_path)
if num_chunks <= 1:
logger.info("Video does not need splitting, processing normally")
return self.process_video(video_path, prompt, user_email)
logger.info(f"Long video detected: {duration_minutes:.2f} minutes, will be split into {num_chunks} chunks")
logger.info(f"Processing mode: {'PARALLEL' if use_parallel else 'SEQUENTIAL'}")
# Split the video
logger.info("Starting video splitting...")
chunk_paths = self.video_splitter.split_video(video_path)
logger.info(f"Video split into {len(chunk_paths)} chunks successfully")
# Process chunks (parallel or sequential)
if use_parallel:
# Parallel processing using ThreadPoolExecutor
chunk_results = self._process_chunks_parallel(chunk_paths, prompt, user_email)
else:
# Sequential processing (original logic)
chunk_results = []
for i, chunk_path in enumerate(chunk_paths, 1):
logger.info(f"[Sequential] Processing chunk {i}/{len(chunk_paths)}: {chunk_path}")
# Extract video name from chunk path (remove _chunk_XX suffix)
video_name = os.path.basename(chunk_path).rsplit('_chunk_', 1)[0] if '_chunk_' in chunk_path else os.path.basename(chunk_path)
# Modify prompt to indicate this is part of a multi-part video
chunk_prompt = self._create_chunk_prompt(prompt, i, len(chunk_paths), video_name)
# Process this chunk
chunk_result = self.process_video(chunk_path, chunk_prompt, user_email)
chunk_results.append(chunk_result)
logger.info(f"[Sequential] Completed chunk {i}/{len(chunk_paths)}")
# Check for failures in any chunk
chunk_responses = []
for i, chunk_result in enumerate(chunk_results, 1):
if not chunk_result["success"]:
error_msg = f"Failed to process chunk {i}/{len(chunk_paths)}: {chunk_result.get('message', 'Unknown error')}"
logger.error(error_msg)
result["message"] = error_msg
result["chunks_processed"] = i - 1
return result
chunk_responses.append(chunk_result["content"])
# Combine all responses
logger.info("Combining responses from all chunks...")
combined_content = self.combine_chunk_responses(
chunk_responses,
prompt,
len(chunk_paths),
video_name=os.path.basename(video_path)
)
result["success"] = True
result["content"] = combined_content
result["chunks_processed"] = len(chunk_paths)
result["processing_time_seconds"] = round(time.time() - start_time, 2)
result["message"] = f"Successfully processed video in {len(chunk_paths)} chunks"
logger.info(f"Long video processing completed successfully: {len(chunk_paths)} chunks")
logger.info(f"Total processing time: {result['processing_time_seconds']}s")
return result
except Exception as e:
import traceback
error_report = ErrorReporter.capture_error(
e,
context={
'video_path': video_path,
'prompt_length': len(prompt),
'operation': 'process_long_video',
'chunks_processed': result.get('chunks_processed', 0)
}
)
error_details = traceback.format_exc()
logger.error(f"Error processing long video: {str(e)}")
logger.error(error_details)
result["message"] = error_report.format_user_message()
result["error_details"] = error_details
result["error_id"] = error_report.error_id
result["error_category"] = error_report.category.value
return result
finally:
# Always clean up chunk files
if chunk_paths:
logger.info("Cleaning up temporary chunk files...")
self.video_splitter.cleanup_chunks(chunk_paths)
def _create_chunk_prompt(self, original_prompt: str, chunk_num: int,
total_chunks: int, video_name: str = "") -> str:
"""
Create a prompt for a video chunk that provides minimal context about its position.
Args:
original_prompt: The original user prompt
chunk_num: Current chunk number (1-indexed)
total_chunks: Total number of chunks
video_name: Name of the video file (for context)
Returns:
Prompt with minimal system context, keeping user's prompt as primary instruction
"""
context = f"""This is segment {chunk_num} of {total_chunks} from video "{video_name}".
Your response will be combined with responses from other segments to create the final result.
{original_prompt}"""
return context
def process_video_auto(self, video_path: str, prompt: str,
user_email: str = "anonymous") -> Dict[str, Any]:
"""
Automatically process a video, handling both short and long videos.
This method detects if the video needs splitting and processes accordingly.
Args:
video_path: Path to the video file
prompt: Text prompt to use for video analysis
user_email: Email of the user processing the video (for usage tracking)
Returns:
Dictionary with processing result or error
"""
logger.info(f"Auto-processing video: {video_path}")
# Check if video needs splitting
if self.video_splitter.needs_splitting(video_path):
logger.info("Video requires splitting, using long video processing")
return self.process_long_video(video_path, prompt, user_email)
else:
logger.info("Video is within single-chunk limit, using standard processing")
return self.process_video(video_path, prompt, user_email)
def process_video_batch(self, video_paths: List[str], filenames: List[str],
prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]:
"""
Process multiple videos as a continuous batch.
Videos are treated as sequential segments of one long video.
Args:
video_paths: List of paths to video files (in order)
filenames: List of original filenames
prompt: Text prompt to use for analysis
user_email: Email of the user
batch_id: Unique identifier for this batch
Returns:
Dictionary with unified result
"""
logger.info(f"Batch {batch_id}: Processing {len(video_paths)} videos")
try:
# Get durations and calculate total
video_infos = []
total_duration_seconds = 0
for video_path, filename in zip(video_paths, filenames):
try:
duration = self.video_splitter.get_video_duration(video_path)
video_infos.append({
'path': video_path,
'filename': filename,
'duration': duration
})
total_duration_seconds += duration
logger.info(f"Batch {batch_id}: {filename} - {duration/60:.1f} minutes")
except Exception as e:
logger.error(f"Batch {batch_id}: Failed to get duration for {filename}: {e}")
raise
total_duration_minutes = total_duration_seconds / 60
chunk_duration_minutes = self.video_splitter.chunk_duration_seconds / 60
logger.info(f"Batch {batch_id}: Total duration: {total_duration_minutes:.1f} minutes")
# Decide processing strategy
needs_chunking = (
total_duration_minutes > chunk_duration_minutes or
any(v['duration'] > self.video_splitter.chunk_duration_seconds
for v in video_infos)
)
if needs_chunking:
logger.info(f"Batch {batch_id}: Using chunked processing")
return self._process_batch_with_chunking(
video_infos, prompt, user_email, batch_id
)
else:
logger.info(f"Batch {batch_id}: Processing directly (no chunking needed)")
return self._process_batch_direct(
video_infos, prompt, user_email, batch_id
)
except Exception as e:
logger.error(f"Batch {batch_id}: Processing error: {str(e)}")
raise
def _process_batch_direct(self, video_infos: List[Dict], prompt: str,
user_email: str, batch_id: str) -> Dict[str, Any]:
"""
Process batch directly without chunking (total duration < 54 minutes).
Uses two-stage synthesis even for short batches.
"""
logger.info(f"Batch {batch_id}: [Stage 1] Direct processing of {len(video_infos)} videos")
stage1_start = time.time()
# Process each video separately first (stage 1: summaries)
summaries = []
for i, video_info in enumerate(video_infos, 1):
logger.info(f"Batch {batch_id}: [Stage 1] Processing video {i}/{len(video_infos)}: {video_info['filename']}")
summary_prompt = self._create_chunk_summary_prompt(
original_prompt=prompt,
chunk_number=i,
total_chunks=len(video_infos),
video_name=video_info['filename']
)
# Log prompt if configured
if self.log_prompts:
logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for video {i}:\n{summary_prompt[:300]}...")
try:
video_start = time.time()
result = self.process_video(video_info['path'], summary_prompt, user_email)
video_time = time.time() - video_start
summary = result.get('content', '')
summaries.append(summary)
logger.info(f"Batch {batch_id}: [Stage 1] Video {i} complete: {len(summary)} chars in {video_time:.2f}s")
# Log summary preview if configured
if self.log_summaries:
logger.debug(f"Batch {batch_id}: [Stage 1] Video {i} summary preview:\n{summary[:300]}...")
except Exception as e:
logger.error(f"Batch {batch_id}: [Stage 1] Failed to process {video_info['filename']}: {e}")
summaries.append(f"[Error processing {video_info['filename']}: {str(e)}]")
stage1_time = time.time() - stage1_start
logger.info(f"Batch {batch_id}: [Stage 1] Complete - {len(summaries)} summaries in {stage1_time:.2f}s")
# Log traceability
logger.info(f"Batch {batch_id}: [Traceability] Video-to-summary mapping:")
for i, video_info in enumerate(video_infos, 1):
logger.info(f"Batch {batch_id}: - Video {i}: {video_info['filename']} → Summary {i}")
# Stage 2: Synthesize all summaries
stage2_start = time.time()
logger.info(f"Batch {batch_id}: [Stage 2] Synthesizing {len(summaries)} summaries")
chunk_metadata = [{'video_name': v['filename'], 'video_idx': i}
for i, v in enumerate(video_infos)]
final_content = self._synthesize_final_result(
summaries=summaries,
chunk_metadata=chunk_metadata,
original_prompt=prompt,
user_email=user_email
)
stage2_time = time.time() - stage2_start
total_time = stage1_time + stage2_time
# Log performance metrics
logger.info(f"Batch {batch_id}: [Metrics] Stage 1: {stage1_time:.2f}s, Stage 2: {stage2_time:.2f}s, Total: {total_time:.2f}s")
logger.info(f"Batch {batch_id}: [Metrics] Avg time per video: {stage1_time/len(video_infos):.2f}s")
return {
'content': final_content,
'total_chunks': len(video_infos),
'synthesis_stages': 2
}
def _process_batch_with_chunking(self, video_infos: List[Dict], prompt: str,
user_email: str, batch_id: str) -> Dict[str, Any]:
"""
Split batch videos into 54-min chunks and process with two-stage synthesis.
Treats all videos as one continuous sequence.
"""
logger.info(f"Batch {batch_id}: Starting chunked processing")
all_chunks = []
chunk_metadata = []
# Step 1: Split all videos into chunks
for video_idx, video_info in enumerate(video_infos):
video_path = video_info['path']
filename = video_info['filename']
duration = video_info['duration']
if duration > self.video_splitter.chunk_duration_seconds:
# Split long video
logger.info(f"Batch {batch_id}: Splitting {filename} ({duration/60:.1f} min)")
try:
chunks = self.video_splitter.split_video(video_path)
for chunk_idx, chunk_path in enumerate(chunks):
all_chunks.append(chunk_path)
chunk_metadata.append({
'video_idx': video_idx,
'video_name': filename,
'chunk_idx': chunk_idx,
'is_split': True
})
except Exception as e:
logger.error(f"Batch {batch_id}: Failed to split {filename}: {e}")
raise
else:
# Use whole video as one chunk
all_chunks.append(video_path)
chunk_metadata.append({
'video_idx': video_idx,
'video_name': filename,
'chunk_idx': 0,
'is_split': False
})
logger.info(f"Batch {batch_id}: Split into {len(all_chunks)} total chunks")
# Step 2: Process chunks with two-stage synthesis
try:
result = self._process_chunks_two_stage(
all_chunks,
chunk_metadata,
prompt,
user_email,
batch_id
)
finally:
# Step 3: Cleanup split chunks (not original videos)
for chunk_path, metadata in zip(all_chunks, chunk_metadata):
if metadata['is_split']:
try:
if os.path.exists(chunk_path):
os.remove(chunk_path)
logger.info(f"Batch {batch_id}: Cleaned up chunk: {os.path.basename(chunk_path)}")
except Exception as e:
logger.warning(f"Batch {batch_id}: Cleanup failed for {chunk_path}: {e}")
return result
def _process_chunks_two_stage(self, chunk_paths: List[str], chunk_metadata: List[Dict],
prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]:
"""
Two-stage synthesis:
Stage 1: Each chunk → concise summary
Stage 2: All summaries → final unified result
"""
logger.info(f"Batch {batch_id}: [Stage 1] Generating summaries for {len(chunk_paths)} chunks")
stage1_start = time.time()
chunk_summaries = []
if self.max_parallel_chunks > 1:
# Parallel processing
logger.info(f"Batch {batch_id}: [Stage 1] Using parallel processing with {self.max_parallel_chunks} workers")
with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor:
futures = []
for i, (chunk_path, metadata) in enumerate(zip(chunk_paths, chunk_metadata)):
summary_prompt = self._create_chunk_summary_prompt(
original_prompt=prompt,
chunk_number=i + 1,
total_chunks=len(chunk_paths),
video_name=metadata['video_name']
)
# Log prompt if configured
if self.log_prompts:
logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for chunk {i+1} ({metadata['video_name']}):\n{summary_prompt[:300]}...")
future = executor.submit(
self._process_single_chunk,
(i, chunk_path, summary_prompt, len(chunk_paths), user_email)
)
futures.append(future)
# Collect results
completed_count = 0
for future in as_completed(futures):
try:
chunk_idx, result = future.result()
# Extract content from result dict
if result.get('success'):
summary = result.get('content', '')
else:
summary = f"[Error: {result.get('message', 'Unknown error')}]"
chunk_summaries.append((chunk_idx, summary))
completed_count += 1
logger.info(f"Batch {batch_id}: [Stage 1] Chunk {chunk_idx + 1}/{len(chunk_paths)} complete ({completed_count}/{len(chunk_paths)} total)")
# Log summary preview if configured
if self.log_summaries and isinstance(summary, str) and not summary.startswith('[Error'):
logger.debug(f"Batch {batch_id}: [Stage 1] Chunk {chunk_idx + 1} summary preview:\n{summary[:300]}...")
except Exception as e:
logger.error(f"Batch {batch_id}: [Stage 1] Failed to process chunk: {e}")
chunk_summaries.append((len(chunk_summaries), f"[Error: {str(e)}]"))
else:
# Sequential processing
logger.info(f"Batch {batch_id}: [Stage 1] Using sequential processing")
for i, (chunk_path, metadata) in enumerate(zip(chunk_paths, chunk_metadata)):
logger.info(f"Batch {batch_id}: [Stage 1] Processing chunk {i+1}/{len(chunk_paths)} from {metadata['video_name']}")
summary_prompt = self._create_chunk_summary_prompt(
original_prompt=prompt,
chunk_number=i + 1,
total_chunks=len(chunk_paths),
video_name=metadata['video_name']
)
# Log prompt if configured
if self.log_prompts:
logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for chunk {i+1}:\n{summary_prompt[:300]}...")
try:
chunk_start = time.time()
result = self.process_video(chunk_path, summary_prompt, user_email)
chunk_time = time.time() - chunk_start
summary = result.get('content', '')
chunk_summaries.append((i, summary))
logger.info(f"Batch {batch_id}: [Stage 1] Chunk {i + 1} complete: {len(summary)} chars in {chunk_time:.2f}s")
# Log summary preview if configured
if self.log_summaries:
logger.debug(f"Batch {batch_id}: [Stage 1] Chunk {i+1} summary preview:\n{summary[:300]}...")
except Exception as e:
logger.error(f"Batch {batch_id}: [Stage 1] Failed to process chunk {i + 1}: {e}")
chunk_summaries.append((i, f"[Error: {str(e)}]"))
# Sort by chunk index
chunk_summaries.sort(key=lambda x: x[0])
summaries = [s[1] for s in chunk_summaries]
stage1_time = time.time() - stage1_start
logger.info(f"Batch {batch_id}: [Stage 1] Complete - {len(summaries)} summaries generated in {stage1_time:.2f}s")
# Log traceability
logger.info(f"Batch {batch_id}: [Traceability] Chunk-to-summary mapping:")
for i, metadata in enumerate(chunk_metadata):
chunk_info = f"chunk {metadata.get('chunk_idx', 0)+1}" if metadata.get('is_split') else "whole video"
logger.info(f"Batch {batch_id}: - Chunk {i+1}: {metadata['video_name']} ({chunk_info}) → Summary {i+1}")
# Stage 2: Synthesize all summaries
stage2_start = time.time()
logger.info(f"Batch {batch_id}: [Stage 2] Synthesizing {len(summaries)} summaries into final result")
final_content = self._synthesize_final_result(
summaries=summaries,
chunk_metadata=chunk_metadata,
original_prompt=prompt,
user_email=user_email
)
stage2_time = time.time() - stage2_start
total_time = stage1_time + stage2_time
# Log performance metrics
logger.info(f"Batch {batch_id}: [Metrics] Stage 1: {stage1_time:.2f}s, Stage 2: {stage2_time:.2f}s, Total: {total_time:.2f}s")
logger.info(f"Batch {batch_id}: [Metrics] Avg time per chunk: {stage1_time/len(chunk_paths):.2f}s")
logger.info(f"Batch {batch_id}: [Metrics] Total API calls: {len(chunk_paths) + 1}") # +1 for synthesis
return {
'content': final_content,
'total_chunks': len(chunk_paths),
'synthesis_stages': 2
}
def _create_chunk_summary_prompt(self, original_prompt: str, chunk_number: int,
total_chunks: int, video_name: str) -> str:
"""
Create prompt for individual video/chunk in batch processing.
User prompt is the primary instruction with minimal synthesis hint.
"""
prompt = f"""You are analyzing video {chunk_number} of {total_chunks}: "{video_name}".
Your response will be synthesized with responses from other videos to create a unified final result.
{original_prompt}"""
return prompt
def _synthesize_final_result(self, summaries: List[str], chunk_metadata: List[Dict],
original_prompt: str, user_email: str) -> str:
"""
Synthesize all chunk summaries into single cohesive result using Gemini.
Uses a universal template that makes the user's prompt the primary instruction.
"""
# Extract video names for context
video_names = list(set(m['video_name'] for m in chunk_metadata))
num_videos = len(video_names)
# Prepare summaries text
summaries_text = ""
total_summary_chars = 0
for i, summary in enumerate(summaries, 1):
video_name = chunk_metadata[i-1]['video_name']
summaries_text += f"\n\n--- Summary {i} (from {video_name}) ---\n{summary.strip()}\n"
total_summary_chars += len(summary)
logger.info(f"[Stage 2] Combined summaries: {len(summaries)} summaries, {total_summary_chars} total chars")
# Create universal synthesis prompt
if num_videos > 1:
video_context = f"{num_videos} videos: {', '.join(video_names)}"
else:
video_context = f"video: {video_names[0]}"
synthesis_prompt = f"""You are creating a final unified response by combining multiple segment summaries from {video_context}.
Here are the segment summaries:
{summaries_text}
Original user request:
{original_prompt}
Your task: Create ONE cohesive response that fulfills the user's request. Integrate information from all summaries naturally, without mentioning segments or chunks.
"""
# Log synthesis prompt if configured
if self.log_prompts:
logger.debug(f"[Stage 2] Synthesis prompt preview:\n{synthesis_prompt[:500]}...")
# Send to Gemini for final synthesis
logger.info(f"[Stage 2] Sending synthesis request to Gemini API (model: {self.synthesis_model})")
synthesis_start = time.time()
try:
response = self._make_api_request_with_retry(
model=self.synthesis_model,
contents=[{"text": synthesis_prompt}],
context="[Batch Synthesis]"
)
synthesis_time = time.time() - synthesis_start
synthesized_content = ""
if response.parts:
for part in response.parts:
if hasattr(part, 'text'):
synthesized_content += part.text
if not synthesized_content:
logger.warning("[Stage 2] Synthesis returned empty, falling back to concatenation")
return self._fallback_concatenation(summaries, chunk_metadata)
logger.info(f"[Stage 2] Synthesis complete: {len(synthesized_content)} chars in {synthesis_time:.2f}s")
# Log synthesis result preview if configured
if self.log_summaries:
logger.debug(f"[Stage 2] Synthesized result preview:\n{synthesized_content[:500]}...")
return synthesized_content
except Exception as e:
logger.error(f"[Stage 2] Synthesis failed: {str(e)}, using fallback")
return self._fallback_concatenation(summaries, chunk_metadata)
def _fallback_concatenation(self, summaries: List[str], chunk_metadata: List[Dict]) -> str:
"""
Fallback method when AI synthesis fails.
Simple concatenation with section markers.
"""
logger.info("Using fallback concatenation method")
result = "# Combined Analysis\n\n"
result += "*Note: This is a combined analysis from multiple video segments.*\n\n"
for i, summary in enumerate(summaries, 1):
video_name = chunk_metadata[i-1]['video_name']
result += f"## Segment {i} - {video_name}\n\n"
result += summary.strip() + "\n\n"
return result
def _fallback_concatenation_single_video(self, responses: List[str], num_chunks: int,
video_name: str = "") -> str:
"""
Fallback method when AI synthesis fails for single video chunks.
Simple concatenation with minimal formatting.
Args:
responses: List of chunk responses
num_chunks: Number of chunks
video_name: Video filename
Returns:
Concatenated response
"""
logger.info("Using fallback concatenation for single video")
result = f"# Complete Video Analysis"
if video_name:
result += f": {video_name}"
result += "\n\n"
result += f"*Note: This video was analyzed in {num_chunks} segments.*\n\n"
for i, response in enumerate(responses, 1):
result += f"## Segment {i} of {num_chunks}\n\n"
result += response.strip() + "\n\n"
return result