1313 lines
58 KiB
Python
1313 lines
58 KiB
Python
from google import genai
|
|
import mimetypes
|
|
import time
|
|
import os
|
|
import logging
|
|
import requests
|
|
import json
|
|
import datetime
|
|
import base64
|
|
from typing import Dict, Any, Optional, List, Tuple
|
|
from dotenv import load_dotenv
|
|
from video_splitter import VideoSplitter
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
import threading
|
|
from system_utils import system_utils
|
|
from error_reporter import ErrorReporter, ErrorCategory
|
|
|
|
# Load environment variables from .env file
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger('video_query')
|
|
|
|
class VideoProcessor:
|
|
"""
|
|
Class to handle video uploads and processing with Gemini API.
|
|
"""
|
|
# Default prompts for different modes
|
|
PROMPTS = {
|
|
"meeting_summary": "Generate a detailed summary of the meeting in the attached video recording, including discussion points and action items with owners",
|
|
"process_documentation": "Generate detailed process documentation suitable for reference or training based on the process illustrated in the attached video recording. Write the documentation so that a new user will be able to follow step by step and accomplish the task illustrated in the video",
|
|
"documentation_with_charts": "Analyze this video to create comprehensive process documentation with workflow diagrams for a knowledge base article. Follow these requirements exactly:\n\n1. CONTENT REQUIREMENTS:\n - Provide a detailed step-by-step explanation of the process shown\n - Be extremely verbose and thorough - include all relevant details, context, and nuances\n - Structure as a complete knowledge base article with clear sections\n - Include overview, detailed steps, tips, and troubleshooting where applicable\n\n2. MERMAID DIAGRAM REQUIREMENTS:\n - Create workflow diagrams using valid Mermaid syntax where helpful\n - CRITICAL: Use only simple alphanumeric text in node descriptions and labels\n - CRITICAL: No special characters like quotes brackets colons semicolons or symbols in node text\n - CRITICAL: Use underscores instead of spaces in node IDs and labels\n - CRITICAL: Keep all text simple to avoid syntax errors\n - Example format: Start_Process --> Complete_Task --> End_Process\n - Use flowchart format: graph TD or graph LR\n\n3. OUTPUT STRUCTURE:\n - Title and overview section\n - Prerequisites section if applicable\n - Detailed step-by-step process\n - Mermaid workflow diagram(s) showing the process flow\n - Tips and best practices\n - Troubleshooting common issues\n\nEnsure all Mermaid diagrams use simple text without special characters to prevent parsing errors.",
|
|
"custom": "" # Custom prompt will be provided by the user
|
|
}
|
|
|
|
# Maximum video duration in minutes (Gemini limitation)
|
|
MAX_VIDEO_DURATION = 55
|
|
|
|
# Threshold for chunked upload (10MB)
|
|
CHUNKED_UPLOAD_THRESHOLD = 10 * 1024 * 1024
|
|
|
|
# Webhook URL for tracking usage
|
|
WEBHOOK_URL = "https://hook.us1.make.celonis.com/8ri1h8b2he4wudp2jku69mgcxumzxf3v"
|
|
|
|
# Parallel processing configuration
|
|
# Default max workers for parallel chunk processing
|
|
DEFAULT_MAX_WORKERS = 4 # Default concurrent workers
|
|
|
|
# Model configuration
|
|
DEFAULT_PROCESSING_MODEL = "gemini-2.5-pro" # Model for individual video processing
|
|
DEFAULT_SYNTHESIS_MODEL = "gemini-2.5-pro" # Model for batch synthesis
|
|
|
|
# Retry configuration
|
|
MAX_RETRY_ATTEMPTS = 5 # Maximum retry attempts
|
|
RETRY_DELAYS = [5, 10, 20, 40, 60] # Exponential backoff delays in seconds
|
|
|
|
def __init__(self, api_key: Optional[str] = None, max_parallel_chunks: int = None):
|
|
"""
|
|
Initialize with API key from environment variable or direct setting
|
|
|
|
Args:
|
|
api_key: Google API key for Gemini
|
|
max_parallel_chunks: Maximum number of chunks to process in parallel
|
|
(default: from MAX_PARALLEL_CHUNKS env var or 4)
|
|
"""
|
|
self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
|
|
if not self.api_key:
|
|
logger.error("API key not provided")
|
|
raise ValueError("API key not provided - set GOOGLE_API_KEY environment variable or pass when initializing")
|
|
|
|
# Initialize the Gemini client
|
|
logger.info("Initializing Gemini API client")
|
|
self.client = genai.Client(api_key=self.api_key)
|
|
logger.info("Gemini API client initialized successfully")
|
|
|
|
# Set parallel processing configuration
|
|
if max_parallel_chunks:
|
|
self.max_parallel_chunks = max_parallel_chunks
|
|
else:
|
|
# Load from environment variable or use default
|
|
env_max_workers = os.getenv("MAX_PARALLEL_CHUNKS")
|
|
if env_max_workers:
|
|
self.max_parallel_chunks = int(env_max_workers)
|
|
else:
|
|
self.max_parallel_chunks = self.DEFAULT_MAX_WORKERS
|
|
|
|
logger.info(f"Parallel processing: max {self.max_parallel_chunks} concurrent chunks")
|
|
|
|
# Initialize video splitter
|
|
self.video_splitter = VideoSplitter()
|
|
|
|
# Load configuration from environment variables
|
|
self.processing_model = os.getenv("VIDEO_PROCESSOR_MODEL", self.DEFAULT_PROCESSING_MODEL)
|
|
self.synthesis_model = os.getenv("VIDEO_SYNTHESIS_MODEL", self.DEFAULT_SYNTHESIS_MODEL)
|
|
self.log_prompts = os.getenv("BATCH_PROCESSING_LOG_PROMPTS", "false").lower() == "true"
|
|
self.log_summaries = os.getenv("BATCH_PROCESSING_LOG_SUMMARIES", "false").lower() == "true"
|
|
|
|
logger.info(f"Configuration: processing_model={self.processing_model}, synthesis_model={self.synthesis_model}")
|
|
logger.info(f"Logging: prompts={self.log_prompts}, summaries={self.log_summaries}")
|
|
|
|
def send_usage_webhook(self, user_email: str, prompt: str) -> None:
|
|
"""
|
|
Send usage data to webhook for tracking purposes
|
|
|
|
Args:
|
|
user_email: Email of the user who processed the video
|
|
prompt: The prompt used for processing
|
|
"""
|
|
try:
|
|
current_datetime = datetime.datetime.now().isoformat()
|
|
|
|
webhook_data = {
|
|
"tool": "VIDEOQUERY",
|
|
"date": current_datetime,
|
|
"user": user_email,
|
|
"model": "GEMINI",
|
|
"settings": "no settings",
|
|
"subTool": "no subTool",
|
|
"prompt": prompt,
|
|
"negativePrompt": "no NEGATIVE_PROMPT",
|
|
"image": "no image"
|
|
}
|
|
|
|
logger.info(f"Sending usage data to webhook for user: {user_email}")
|
|
|
|
response = requests.post(
|
|
self.WEBHOOK_URL,
|
|
headers={"Content-Type": "application/json"},
|
|
data=json.dumps(webhook_data),
|
|
timeout=10 # 10 second timeout
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
logger.info("Successfully sent usage data to webhook")
|
|
else:
|
|
logger.warning(f"Webhook request failed with status code: {response.status_code}")
|
|
logger.warning(f"Response: {response.text}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending usage data to webhook: {str(e)}")
|
|
# Don't raise the exception - webhook failure shouldn't block the main flow
|
|
|
|
def _extract_error_code(self, error_message: str) -> str:
|
|
"""
|
|
Extract HTTP error code from error message.
|
|
|
|
Args:
|
|
error_message: Error message string
|
|
|
|
Returns:
|
|
Error code (e.g., "503", "429") or "UNKNOWN"
|
|
"""
|
|
import re
|
|
match = re.search(r'(\d{3})\s+(UNAVAILABLE|TOO_MANY_REQUESTS|RESOURCE_EXHAUSTED|INVALID_ARGUMENT|INTERNAL)',
|
|
error_message, re.IGNORECASE)
|
|
if match:
|
|
return match.group(1)
|
|
return "UNKNOWN"
|
|
|
|
def _is_retryable_error(self, error_str: str, error_code: str, attempt: int) -> Tuple[bool, int]:
|
|
"""
|
|
Determine if error is retryable and calculate retry delay.
|
|
|
|
Args:
|
|
error_str: Error message (lowercase)
|
|
error_code: Extracted error code
|
|
attempt: Current attempt number (0-indexed)
|
|
|
|
Returns:
|
|
Tuple of (is_retryable: bool, retry_delay_seconds: int)
|
|
"""
|
|
# 503 UNAVAILABLE - Model overloaded (RETRYABLE with longer delays)
|
|
if '503' in error_code or 'unavailable' in error_str:
|
|
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
|
|
logger.warning(f"503 UNAVAILABLE detected - API overloaded, will retry in {delay}s")
|
|
return (True, delay)
|
|
|
|
# 429 TOO_MANY_REQUESTS - Rate limit (RETRYABLE with longer delays)
|
|
if '429' in error_code or 'too many requests' in error_str or 'rate limit' in error_str:
|
|
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
|
|
logger.warning(f"429 RATE LIMIT detected, will retry in {delay}s")
|
|
return (True, delay)
|
|
|
|
# 500 INTERNAL_SERVER_ERROR (RETRYABLE)
|
|
if '500' in error_code or 'internal server error' in error_str:
|
|
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
|
|
logger.warning(f"500 INTERNAL ERROR detected, will retry in {delay}s")
|
|
return (True, delay)
|
|
|
|
# RESOURCE_EXHAUSTED (RETRYABLE)
|
|
if 'resource_exhausted' in error_str or 'quota' in error_str:
|
|
delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)]
|
|
logger.warning(f"Resource exhausted - quota or rate limit, will retry in {delay}s")
|
|
return (True, delay)
|
|
|
|
# Network errors (RETRYABLE with shorter delays)
|
|
if any(err in error_str for err in ['name resolution', 'connection', 'timeout', 'network']):
|
|
delay = 5 # Fixed 5s delay for network issues
|
|
logger.warning(f"Network error detected, will retry in {delay}s")
|
|
return (True, delay)
|
|
|
|
# 400 INVALID_ARGUMENT - Usually not retryable
|
|
if '400' in error_code or 'invalid_argument' in error_str:
|
|
logger.error(f"400 INVALID_ARGUMENT - not retryable")
|
|
return (False, 0)
|
|
|
|
# Default: not retryable
|
|
logger.error(f"Error not recognized as retryable: {error_str[:100]}")
|
|
return (False, 0)
|
|
|
|
def _make_api_request_with_retry(self, model: str, contents: list, context: str = "") -> any:
|
|
"""
|
|
Make API request with intelligent retry logic.
|
|
Handles 503 (overload), 429 (rate limit), 500 (server error), and network errors.
|
|
|
|
Args:
|
|
model: Model name to use
|
|
contents: Content to send to the API
|
|
context: Context description for logging (e.g., "[Video: example.mp4]")
|
|
|
|
Returns:
|
|
API response object
|
|
|
|
Raises:
|
|
Exception: If all retry attempts fail
|
|
"""
|
|
last_exception = None
|
|
|
|
for attempt in range(self.MAX_RETRY_ATTEMPTS):
|
|
try:
|
|
# Make the API call
|
|
if attempt == 0:
|
|
logger.info(f"{context} Sending request to Gemini API")
|
|
else:
|
|
logger.info(f"{context} Retry attempt {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}")
|
|
|
|
response = self.client.models.generate_content(
|
|
model=model,
|
|
contents=contents
|
|
)
|
|
|
|
# Success!
|
|
if attempt > 0:
|
|
logger.info(f"{context} ✓ Request succeeded after {attempt + 1} attempts")
|
|
else:
|
|
logger.info(f"{context} ✓ Request succeeded on first attempt")
|
|
|
|
return response
|
|
|
|
except Exception as e:
|
|
last_exception = e
|
|
error_str = str(e).lower()
|
|
error_code = self._extract_error_code(str(e))
|
|
|
|
# Log detailed error information for INVALID_ARGUMENT (helps debug)
|
|
if 'invalid_argument' in error_str or '400' in error_str:
|
|
logger.error("=" * 80)
|
|
logger.error(f"{context} INVALID_ARGUMENT ERROR:")
|
|
logger.error(f" Error: {str(e)[:200]}")
|
|
logger.error(f" Model: {model}")
|
|
logger.error(f" Attempt: {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}")
|
|
logger.error("=" * 80)
|
|
|
|
# Determine if retryable
|
|
is_retryable, retry_delay = self._is_retryable_error(error_str, error_code, attempt)
|
|
|
|
if not is_retryable:
|
|
logger.error(f"{context} Non-retryable error: {error_code} - {str(e)[:100]}")
|
|
raise
|
|
|
|
if attempt < self.MAX_RETRY_ATTEMPTS - 1:
|
|
logger.warning(
|
|
f"{context} Retryable error (attempt {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}): "
|
|
f"{error_code} - {str(e)[:150]}"
|
|
)
|
|
logger.info(f"{context} Waiting {retry_delay}s before retry...")
|
|
time.sleep(retry_delay)
|
|
continue
|
|
else:
|
|
logger.error(
|
|
f"{context} ✗ All {self.MAX_RETRY_ATTEMPTS} attempts failed. "
|
|
f"Last error: {error_code} - {str(e)[:150]}"
|
|
)
|
|
raise
|
|
|
|
# If we get here, all retries failed
|
|
raise last_exception
|
|
|
|
def process_video(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]:
|
|
"""
|
|
Process a video with the given prompt using Gemini API
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
prompt: Text prompt to use for video analysis
|
|
user_email: Email of the user processing the video (for usage tracking)
|
|
|
|
Returns:
|
|
Dictionary with processing result or error
|
|
"""
|
|
start_time = time.time()
|
|
|
|
result = {
|
|
"success": False,
|
|
"message": "",
|
|
"content": "",
|
|
"processing_time_seconds": 0
|
|
}
|
|
|
|
logger.info(f"Processing video: {video_path}")
|
|
logger.info(f"Prompt: {prompt[:100]}..." if len(prompt) > 100 else f"Prompt: {prompt}")
|
|
|
|
if not os.path.exists(video_path):
|
|
error_msg = f"Video file not found at '{video_path}'"
|
|
logger.error(error_msg)
|
|
result["message"] = error_msg
|
|
return result
|
|
|
|
try:
|
|
# Get file size
|
|
file_size = os.path.getsize(video_path)
|
|
file_size_mb = file_size / (1024 * 1024)
|
|
logger.info(f"File size: {file_size_mb:.2f} MB")
|
|
|
|
# Get video duration to detect too-short videos
|
|
try:
|
|
video_duration = self.video_splitter.get_video_duration(video_path)
|
|
if video_duration:
|
|
logger.info(f"Video duration: {video_duration:.2f} seconds ({video_duration/60:.2f} minutes)")
|
|
|
|
# Check if video is too short (< 1 second can cause INVALID_ARGUMENT)
|
|
if video_duration < 1.0:
|
|
error_msg = f"Video too short: {video_duration:.2f}s. Gemini requires videos >= 1 second"
|
|
logger.error(error_msg)
|
|
result["message"] = error_msg
|
|
return result
|
|
else:
|
|
logger.warning("Could not determine video duration")
|
|
except Exception as dur_err:
|
|
logger.warning(f"Error checking video duration: {str(dur_err)}")
|
|
|
|
# Determine MIME type for the video
|
|
mime_type, _ = mimetypes.guess_type(video_path)
|
|
if not mime_type:
|
|
logger.info(f"Could not determine MIME type, using default: video/mp4")
|
|
mime_type = "video/mp4" # Fallback
|
|
else:
|
|
logger.info(f"MIME type: {mime_type}")
|
|
|
|
# Validate MIME type is video
|
|
if not mime_type.startswith('video/'):
|
|
error_msg = f"Invalid file type: {mime_type}. Only video files are supported (mp4, avi, mov, mkv, webm, etc.)"
|
|
logger.error(error_msg)
|
|
result["message"] = error_msg
|
|
return result
|
|
|
|
# Validate video file is readable and not corrupted
|
|
# This provides a secondary check in case validation at upload didn't happen
|
|
try:
|
|
import subprocess
|
|
ffprobe_path = system_utils.find_ffprobe()
|
|
probe_result = subprocess.run(
|
|
[ffprobe_path, '-v', 'error', '-show_entries', 'format=duration,format_name',
|
|
'-of', 'default=noprint_wrappers=1', video_path],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if probe_result.returncode != 0:
|
|
error_detail = probe_result.stderr.strip()
|
|
|
|
# Provide user-friendly error messages for common issues
|
|
if "moov atom not found" in error_detail.lower():
|
|
error_msg = "Video file is incomplete or corrupted (missing header). The file may not have uploaded completely. Please try uploading again."
|
|
elif "invalid data found" in error_detail.lower():
|
|
error_msg = "Video file contains invalid data and cannot be processed. Please check the file or try re-encoding it."
|
|
else:
|
|
error_msg = f"Video file appears to be corrupted or unreadable. Technical details: {error_detail}"
|
|
|
|
logger.error(f"Video validation failed: {error_msg}")
|
|
result["message"] = error_msg
|
|
return result
|
|
logger.info(f"Video file validation successful. Format info: {probe_result.stdout[:100]}")
|
|
except subprocess.TimeoutExpired:
|
|
logger.warning("Video validation timed out after 10s - proceeding anyway")
|
|
except FileNotFoundError:
|
|
logger.warning("ffprobe not found - skipping video file validation")
|
|
except Exception as val_err:
|
|
logger.warning(f"Could not validate video file: {str(val_err)} - proceeding anyway")
|
|
|
|
# SIMPLIFIED APPROACH: Use inline base64 for all files
|
|
# File Upload API has known issues (KeyError: 'file' in SDK 1.45.0-1.49.0)
|
|
# Gemini API REQUEST limit: 1GB (1073741824 bytes) - confirmed by testing
|
|
# Base64 adds ~37% overhead, so we validate encoded size
|
|
|
|
# Calculate estimated encoded size (base64 overhead ~37%)
|
|
BASE64_OVERHEAD = 1.37
|
|
estimated_encoded_mb = file_size_mb * BASE64_OVERHEAD
|
|
API_LIMIT_MB = 1000 # 1GB API request limit
|
|
|
|
# Check if encoded size would exceed API limit
|
|
if estimated_encoded_mb > API_LIMIT_MB:
|
|
error_msg = (
|
|
f"Video chunk is too large: {file_size_mb:.2f}MB raw, "
|
|
f"~{estimated_encoded_mb:.1f}MB after base64 encoding. "
|
|
f"This exceeds the {API_LIMIT_MB}MB (1GB) API limit. "
|
|
f"The video needs to be split into smaller chunks. "
|
|
f"Target chunk size: 500MB (accounting for variable bitrate)."
|
|
)
|
|
logger.error(error_msg)
|
|
result["message"] = error_msg
|
|
return result
|
|
|
|
# Warn if approaching API limit (>900MB encoded)
|
|
if estimated_encoded_mb > 900:
|
|
logger.warning(
|
|
f"Chunk is large: {file_size_mb:.1f}MB raw, "
|
|
f"~{estimated_encoded_mb:.1f}MB encoded. "
|
|
f"Approaching {API_LIMIT_MB}MB API limit. Processing may be slower."
|
|
)
|
|
|
|
# Use base64 encoding for all files (reliable and works consistently)
|
|
logger.info(f"Encoding video as base64 (file size: {file_size_mb:.2f}MB)")
|
|
encode_start = time.time()
|
|
|
|
with open(video_path, "rb") as video_file_obj:
|
|
video_data = video_file_obj.read()
|
|
video_base64 = base64.b64encode(video_data).decode('utf-8')
|
|
|
|
encode_time = time.time() - encode_start
|
|
encoded_size_mb = len(video_base64) / (1024 * 1024)
|
|
logger.info(f"Base64 encoding complete in {encode_time:.2f}s. Encoded size: {encoded_size_mb:.2f}MB ({len(video_base64)} chars)")
|
|
|
|
# Verify actual encoded size is within API limits (1GB)
|
|
if encoded_size_mb > API_LIMIT_MB:
|
|
error_msg = (
|
|
f"Encoded video size ({encoded_size_mb:.2f}MB) exceeds Gemini API limit of {API_LIMIT_MB}MB (1GB). "
|
|
f"Original file: {file_size_mb:.2f}MB. "
|
|
f"This chunk needs further splitting. Target: 500MB raw chunks. "
|
|
f"Variable bitrate caused larger-than-expected chunk size."
|
|
)
|
|
logger.error(error_msg)
|
|
result["message"] = error_msg
|
|
return result
|
|
|
|
# Create the content parts using inline data
|
|
prompt_parts = [
|
|
{"text": prompt},
|
|
{"inline_data": {
|
|
"mime_type": mime_type,
|
|
"data": video_base64
|
|
}}
|
|
]
|
|
|
|
uploaded_file = None # Not using File Upload API
|
|
|
|
# Use the new retry logic with rate limiting
|
|
context = f"[Video: {os.path.basename(video_path)}]"
|
|
api_start = time.time()
|
|
|
|
response = self._make_api_request_with_retry(
|
|
model=self.processing_model,
|
|
contents=prompt_parts,
|
|
context=context
|
|
)
|
|
|
|
api_time = time.time() - api_start
|
|
logger.info(f"Received response from Gemini (API call took {api_time:.1f}s)")
|
|
|
|
# Extract the response content
|
|
content = ""
|
|
if response.parts:
|
|
logger.info(f"Response has {len(response.parts)} parts")
|
|
for i, part in enumerate(response.parts):
|
|
if hasattr(part, 'text'):
|
|
part_text = part.text
|
|
content_preview = part_text[:100] + '...' if len(part_text) > 100 else part_text
|
|
logger.info(f"Part {i} (text): {content_preview}")
|
|
content += part_text
|
|
else:
|
|
logger.info(f"Part {i} (no text): {type(part)}")
|
|
else:
|
|
logger.warning("No parts in response")
|
|
if hasattr(response, 'prompt_feedback') and response.prompt_feedback:
|
|
logger.warning(f"Prompt feedback: {response.prompt_feedback}")
|
|
|
|
# Set success result
|
|
result["success"] = True
|
|
result["content"] = content
|
|
result["processing_time_seconds"] = round(time.time() - start_time, 2)
|
|
logger.info(f"Processed result with {len(content)} characters")
|
|
logger.info(f"Total processing time: {result['processing_time_seconds']}s")
|
|
|
|
# Send usage data to webhook for tracking
|
|
self.send_usage_webhook(user_email, prompt)
|
|
|
|
# Clean up uploaded file if it was used
|
|
if uploaded_file:
|
|
try:
|
|
logger.info(f"Deleting uploaded file: {uploaded_file.name}")
|
|
self.client.files.delete(name=uploaded_file.name)
|
|
logger.info("File deleted successfully from Gemini storage")
|
|
except Exception as del_err:
|
|
logger.warning(f"Could not delete file from Gemini storage: {str(del_err)}")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
error_report = ErrorReporter.capture_error(
|
|
e,
|
|
context={
|
|
'video_path': video_path,
|
|
'prompt_length': len(prompt),
|
|
'operation': 'process_video'
|
|
}
|
|
)
|
|
error_details = traceback.format_exc()
|
|
logger.error(f"Error processing video: {str(e)}")
|
|
logger.error(error_details)
|
|
result["message"] = error_report.format_user_message()
|
|
result["error_details"] = error_details
|
|
result["error_id"] = error_report.error_id
|
|
result["error_category"] = error_report.category.value
|
|
return result
|
|
|
|
def combine_chunk_responses(self, responses: List[str], prompt: str,
|
|
num_chunks: int, video_name: str = "") -> str:
|
|
"""
|
|
Combine responses from multiple chunks of a single video using AI synthesis.
|
|
Uses universal synthesis approach - no type detection.
|
|
|
|
Args:
|
|
responses: List of response texts from each chunk
|
|
prompt: Original user prompt
|
|
num_chunks: Total number of chunks
|
|
video_name: Name of the video file
|
|
|
|
Returns:
|
|
Combined response text
|
|
"""
|
|
logger.info(f"Combining {len(responses)} chunk responses using AI synthesis")
|
|
|
|
# Try AI synthesis first
|
|
try:
|
|
# Prepare chunk responses
|
|
summaries_text = ""
|
|
for i, response in enumerate(responses, 1):
|
|
summaries_text += f"\n--- Segment {i} ---\n{response.strip()}\n"
|
|
|
|
# Universal synthesis prompt for single video
|
|
synthesis_prompt = f"""You are creating a final unified response by combining multiple segment analyses from one video.
|
|
|
|
Context:
|
|
- One video was split into {num_chunks} segments for processing
|
|
- Each segment was analyzed separately
|
|
- Below are the responses from each segment
|
|
|
|
Original user request:
|
|
"{prompt}"
|
|
|
|
Segment responses:
|
|
{summaries_text}
|
|
|
|
Your task:
|
|
1. Combine these segment responses into ONE cohesive response that fulfills the user's request above
|
|
2. Do not reference segments, chunks, or parts in your output
|
|
3. Present as a unified analysis of the complete video
|
|
|
|
Provide your unified response:
|
|
"""
|
|
|
|
logger.info("[Chunk Synthesis] Sending synthesis request to Gemini")
|
|
response = self._make_api_request_with_retry(
|
|
model=self.synthesis_model,
|
|
contents=[{"text": synthesis_prompt}],
|
|
context="[Chunk Combination]"
|
|
)
|
|
|
|
if response.parts:
|
|
synthesized_content = ""
|
|
for part in response.parts:
|
|
if hasattr(part, 'text'):
|
|
synthesized_content += part.text
|
|
|
|
if synthesized_content:
|
|
logger.info("[Chunk Synthesis] Successfully synthesized chunk responses")
|
|
return synthesized_content
|
|
|
|
logger.warning("[Chunk Synthesis] Synthesis returned empty, falling back to concatenation")
|
|
|
|
except Exception as e:
|
|
logger.warning(f"[Chunk Synthesis] Synthesis failed: {e}, falling back to concatenation")
|
|
|
|
# Fallback: simple concatenation
|
|
return self._fallback_concatenation_single_video(responses, num_chunks, video_name)
|
|
|
|
def _process_single_chunk(self, chunk_info: Tuple[int, str, str, int, str]) -> Tuple[int, Dict[str, Any]]:
|
|
"""
|
|
Process a single video chunk. Used for parallel processing.
|
|
|
|
Args:
|
|
chunk_info: Tuple of (chunk_index, chunk_path, chunk_prompt, total_chunks, user_email)
|
|
|
|
Returns:
|
|
Tuple of (chunk_index, result_dict)
|
|
"""
|
|
chunk_index, chunk_path, chunk_prompt, total_chunks, user_email = chunk_info
|
|
|
|
logger.info(f"[Parallel] Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_path}")
|
|
|
|
try:
|
|
chunk_result = self.process_video(chunk_path, chunk_prompt, user_email)
|
|
logger.info(f"[Parallel] Completed chunk {chunk_index + 1}/{total_chunks}")
|
|
return (chunk_index, chunk_result)
|
|
except Exception as e:
|
|
logger.error(f"[Parallel] Error processing chunk {chunk_index + 1}/{total_chunks}: {str(e)}")
|
|
return (chunk_index, {
|
|
"success": False,
|
|
"message": f"Error processing chunk {chunk_index + 1}: {str(e)}",
|
|
"content": ""
|
|
})
|
|
|
|
def _process_chunks_parallel(self, chunk_paths: List[str], prompt: str,
|
|
user_email: str) -> List[Dict[str, Any]]:
|
|
"""
|
|
Process multiple video chunks in parallel using ThreadPoolExecutor.
|
|
|
|
Args:
|
|
chunk_paths: List of paths to video chunk files
|
|
prompt: Original prompt for video analysis
|
|
user_email: User email for tracking
|
|
|
|
Returns:
|
|
List of result dictionaries in order of chunks
|
|
"""
|
|
num_chunks = len(chunk_paths)
|
|
logger.info(f"Starting parallel processing of {num_chunks} chunks with {self.max_parallel_chunks} workers")
|
|
|
|
# Prepare chunk information for parallel processing
|
|
chunk_infos = []
|
|
for i, chunk_path in enumerate(chunk_paths):
|
|
# Extract video name from chunk path (remove _chunk_XX suffix)
|
|
video_name = os.path.basename(chunk_path).rsplit('_chunk_', 1)[0] if '_chunk_' in chunk_path else os.path.basename(chunk_path)
|
|
chunk_prompt = self._create_chunk_prompt(prompt, i + 1, num_chunks, video_name)
|
|
chunk_infos.append((i, chunk_path, chunk_prompt, num_chunks, user_email))
|
|
|
|
# Process chunks in parallel
|
|
results = [None] * num_chunks # Pre-allocate results list to maintain order
|
|
|
|
with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor:
|
|
# Submit all chunks for processing
|
|
future_to_chunk = {
|
|
executor.submit(self._process_single_chunk, chunk_info): chunk_info[0]
|
|
for chunk_info in chunk_infos
|
|
}
|
|
|
|
# Collect results as they complete
|
|
completed = 0
|
|
for future in as_completed(future_to_chunk):
|
|
chunk_index = future_to_chunk[future]
|
|
try:
|
|
chunk_index, result = future.result()
|
|
results[chunk_index] = result
|
|
completed += 1
|
|
logger.info(f"[Parallel] Progress: {completed}/{num_chunks} chunks completed")
|
|
except Exception as e:
|
|
logger.error(f"[Parallel] Unexpected error for chunk {chunk_index + 1}: {str(e)}")
|
|
results[chunk_index] = {
|
|
"success": False,
|
|
"message": f"Unexpected error: {str(e)}",
|
|
"content": ""
|
|
}
|
|
|
|
logger.info(f"[Parallel] All {num_chunks} chunks processed")
|
|
return results
|
|
|
|
def process_long_video(self, video_path: str, prompt: str,
|
|
user_email: str = "anonymous", use_parallel: bool = True) -> Dict[str, Any]:
|
|
"""
|
|
Process a long video by splitting it into chunks and combining the results.
|
|
Supports both parallel and sequential processing.
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
prompt: Text prompt to use for video analysis
|
|
user_email: Email of the user processing the video (for usage tracking)
|
|
use_parallel: If True, process chunks in parallel; if False, process sequentially
|
|
|
|
Returns:
|
|
Dictionary with processing result or error
|
|
"""
|
|
start_time = time.time()
|
|
|
|
result = {
|
|
"success": False,
|
|
"message": "",
|
|
"content": "",
|
|
"chunks_processed": 0,
|
|
"processing_mode": "parallel" if use_parallel else "sequential",
|
|
"processing_time_seconds": 0
|
|
}
|
|
|
|
chunk_paths = []
|
|
|
|
try:
|
|
# Check if video needs splitting
|
|
num_chunks, duration_minutes = self.video_splitter.get_chunk_info(video_path)
|
|
|
|
if num_chunks <= 1:
|
|
logger.info("Video does not need splitting, processing normally")
|
|
return self.process_video(video_path, prompt, user_email)
|
|
|
|
logger.info(f"Long video detected: {duration_minutes:.2f} minutes, will be split into {num_chunks} chunks")
|
|
logger.info(f"Processing mode: {'PARALLEL' if use_parallel else 'SEQUENTIAL'}")
|
|
|
|
# Split the video
|
|
logger.info("Starting video splitting...")
|
|
chunk_paths = self.video_splitter.split_video(video_path)
|
|
logger.info(f"Video split into {len(chunk_paths)} chunks successfully")
|
|
|
|
# Process chunks (parallel or sequential)
|
|
if use_parallel:
|
|
# Parallel processing using ThreadPoolExecutor
|
|
chunk_results = self._process_chunks_parallel(chunk_paths, prompt, user_email)
|
|
else:
|
|
# Sequential processing (original logic)
|
|
chunk_results = []
|
|
for i, chunk_path in enumerate(chunk_paths, 1):
|
|
logger.info(f"[Sequential] Processing chunk {i}/{len(chunk_paths)}: {chunk_path}")
|
|
|
|
# Extract video name from chunk path (remove _chunk_XX suffix)
|
|
video_name = os.path.basename(chunk_path).rsplit('_chunk_', 1)[0] if '_chunk_' in chunk_path else os.path.basename(chunk_path)
|
|
|
|
# Modify prompt to indicate this is part of a multi-part video
|
|
chunk_prompt = self._create_chunk_prompt(prompt, i, len(chunk_paths), video_name)
|
|
|
|
# Process this chunk
|
|
chunk_result = self.process_video(chunk_path, chunk_prompt, user_email)
|
|
chunk_results.append(chunk_result)
|
|
|
|
logger.info(f"[Sequential] Completed chunk {i}/{len(chunk_paths)}")
|
|
|
|
# Check for failures in any chunk
|
|
chunk_responses = []
|
|
for i, chunk_result in enumerate(chunk_results, 1):
|
|
if not chunk_result["success"]:
|
|
error_msg = f"Failed to process chunk {i}/{len(chunk_paths)}: {chunk_result.get('message', 'Unknown error')}"
|
|
logger.error(error_msg)
|
|
result["message"] = error_msg
|
|
result["chunks_processed"] = i - 1
|
|
return result
|
|
|
|
chunk_responses.append(chunk_result["content"])
|
|
|
|
# Combine all responses
|
|
logger.info("Combining responses from all chunks...")
|
|
combined_content = self.combine_chunk_responses(
|
|
chunk_responses,
|
|
prompt,
|
|
len(chunk_paths),
|
|
video_name=os.path.basename(video_path)
|
|
)
|
|
|
|
result["success"] = True
|
|
result["content"] = combined_content
|
|
result["chunks_processed"] = len(chunk_paths)
|
|
result["processing_time_seconds"] = round(time.time() - start_time, 2)
|
|
result["message"] = f"Successfully processed video in {len(chunk_paths)} chunks"
|
|
|
|
logger.info(f"Long video processing completed successfully: {len(chunk_paths)} chunks")
|
|
logger.info(f"Total processing time: {result['processing_time_seconds']}s")
|
|
|
|
return result
|
|
|
|
except Exception as e:
|
|
import traceback
|
|
error_report = ErrorReporter.capture_error(
|
|
e,
|
|
context={
|
|
'video_path': video_path,
|
|
'prompt_length': len(prompt),
|
|
'operation': 'process_long_video',
|
|
'chunks_processed': result.get('chunks_processed', 0)
|
|
}
|
|
)
|
|
error_details = traceback.format_exc()
|
|
logger.error(f"Error processing long video: {str(e)}")
|
|
logger.error(error_details)
|
|
result["message"] = error_report.format_user_message()
|
|
result["error_details"] = error_details
|
|
result["error_id"] = error_report.error_id
|
|
result["error_category"] = error_report.category.value
|
|
return result
|
|
|
|
finally:
|
|
# Always clean up chunk files
|
|
if chunk_paths:
|
|
logger.info("Cleaning up temporary chunk files...")
|
|
self.video_splitter.cleanup_chunks(chunk_paths)
|
|
|
|
def _create_chunk_prompt(self, original_prompt: str, chunk_num: int,
|
|
total_chunks: int, video_name: str = "") -> str:
|
|
"""
|
|
Create a prompt for a video chunk that provides minimal context about its position.
|
|
|
|
Args:
|
|
original_prompt: The original user prompt
|
|
chunk_num: Current chunk number (1-indexed)
|
|
total_chunks: Total number of chunks
|
|
video_name: Name of the video file (for context)
|
|
|
|
Returns:
|
|
Prompt with minimal system context, keeping user's prompt as primary instruction
|
|
"""
|
|
context = f"""This is segment {chunk_num} of {total_chunks} from video "{video_name}".
|
|
Your response will be combined with responses from other segments to create the final result.
|
|
|
|
{original_prompt}"""
|
|
return context
|
|
|
|
def process_video_auto(self, video_path: str, prompt: str,
|
|
user_email: str = "anonymous") -> Dict[str, Any]:
|
|
"""
|
|
Automatically process a video, handling both short and long videos.
|
|
This method detects if the video needs splitting and processes accordingly.
|
|
|
|
Args:
|
|
video_path: Path to the video file
|
|
prompt: Text prompt to use for video analysis
|
|
user_email: Email of the user processing the video (for usage tracking)
|
|
|
|
Returns:
|
|
Dictionary with processing result or error
|
|
"""
|
|
logger.info(f"Auto-processing video: {video_path}")
|
|
|
|
# Check if video needs splitting
|
|
if self.video_splitter.needs_splitting(video_path):
|
|
logger.info("Video requires splitting, using long video processing")
|
|
return self.process_long_video(video_path, prompt, user_email)
|
|
else:
|
|
logger.info("Video is within single-chunk limit, using standard processing")
|
|
return self.process_video(video_path, prompt, user_email)
|
|
|
|
def process_video_batch(self, video_paths: List[str], filenames: List[str],
|
|
prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Process multiple videos as a continuous batch.
|
|
Videos are treated as sequential segments of one long video.
|
|
|
|
Args:
|
|
video_paths: List of paths to video files (in order)
|
|
filenames: List of original filenames
|
|
prompt: Text prompt to use for analysis
|
|
user_email: Email of the user
|
|
batch_id: Unique identifier for this batch
|
|
|
|
Returns:
|
|
Dictionary with unified result
|
|
"""
|
|
logger.info(f"Batch {batch_id}: Processing {len(video_paths)} videos")
|
|
|
|
try:
|
|
# Get durations and calculate total
|
|
video_infos = []
|
|
total_duration_seconds = 0
|
|
|
|
for video_path, filename in zip(video_paths, filenames):
|
|
try:
|
|
duration = self.video_splitter.get_video_duration(video_path)
|
|
video_infos.append({
|
|
'path': video_path,
|
|
'filename': filename,
|
|
'duration': duration
|
|
})
|
|
total_duration_seconds += duration
|
|
logger.info(f"Batch {batch_id}: {filename} - {duration/60:.1f} minutes")
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_id}: Failed to get duration for {filename}: {e}")
|
|
raise
|
|
|
|
total_duration_minutes = total_duration_seconds / 60
|
|
chunk_duration_minutes = self.video_splitter.chunk_duration_seconds / 60
|
|
|
|
logger.info(f"Batch {batch_id}: Total duration: {total_duration_minutes:.1f} minutes")
|
|
|
|
# Decide processing strategy
|
|
needs_chunking = (
|
|
total_duration_minutes > chunk_duration_minutes or
|
|
any(v['duration'] > self.video_splitter.chunk_duration_seconds
|
|
for v in video_infos)
|
|
)
|
|
|
|
if needs_chunking:
|
|
logger.info(f"Batch {batch_id}: Using chunked processing")
|
|
return self._process_batch_with_chunking(
|
|
video_infos, prompt, user_email, batch_id
|
|
)
|
|
else:
|
|
logger.info(f"Batch {batch_id}: Processing directly (no chunking needed)")
|
|
return self._process_batch_direct(
|
|
video_infos, prompt, user_email, batch_id
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_id}: Processing error: {str(e)}")
|
|
raise
|
|
|
|
def _process_batch_direct(self, video_infos: List[Dict], prompt: str,
|
|
user_email: str, batch_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Process batch directly without chunking (total duration < 54 minutes).
|
|
Uses two-stage synthesis even for short batches.
|
|
"""
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Direct processing of {len(video_infos)} videos")
|
|
stage1_start = time.time()
|
|
|
|
# Process each video separately first (stage 1: summaries)
|
|
summaries = []
|
|
for i, video_info in enumerate(video_infos, 1):
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Processing video {i}/{len(video_infos)}: {video_info['filename']}")
|
|
|
|
summary_prompt = self._create_chunk_summary_prompt(
|
|
original_prompt=prompt,
|
|
chunk_number=i,
|
|
total_chunks=len(video_infos),
|
|
video_name=video_info['filename']
|
|
)
|
|
|
|
# Log prompt if configured
|
|
if self.log_prompts:
|
|
logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for video {i}:\n{summary_prompt[:300]}...")
|
|
|
|
try:
|
|
video_start = time.time()
|
|
result = self.process_video(video_info['path'], summary_prompt, user_email)
|
|
video_time = time.time() - video_start
|
|
summary = result.get('content', '')
|
|
summaries.append(summary)
|
|
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Video {i} complete: {len(summary)} chars in {video_time:.2f}s")
|
|
|
|
# Log summary preview if configured
|
|
if self.log_summaries:
|
|
logger.debug(f"Batch {batch_id}: [Stage 1] Video {i} summary preview:\n{summary[:300]}...")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_id}: [Stage 1] Failed to process {video_info['filename']}: {e}")
|
|
summaries.append(f"[Error processing {video_info['filename']}: {str(e)}]")
|
|
|
|
stage1_time = time.time() - stage1_start
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Complete - {len(summaries)} summaries in {stage1_time:.2f}s")
|
|
|
|
# Log traceability
|
|
logger.info(f"Batch {batch_id}: [Traceability] Video-to-summary mapping:")
|
|
for i, video_info in enumerate(video_infos, 1):
|
|
logger.info(f"Batch {batch_id}: - Video {i}: {video_info['filename']} → Summary {i}")
|
|
|
|
# Stage 2: Synthesize all summaries
|
|
stage2_start = time.time()
|
|
logger.info(f"Batch {batch_id}: [Stage 2] Synthesizing {len(summaries)} summaries")
|
|
chunk_metadata = [{'video_name': v['filename'], 'video_idx': i}
|
|
for i, v in enumerate(video_infos)]
|
|
|
|
final_content = self._synthesize_final_result(
|
|
summaries=summaries,
|
|
chunk_metadata=chunk_metadata,
|
|
original_prompt=prompt,
|
|
user_email=user_email
|
|
)
|
|
|
|
stage2_time = time.time() - stage2_start
|
|
total_time = stage1_time + stage2_time
|
|
|
|
# Log performance metrics
|
|
logger.info(f"Batch {batch_id}: [Metrics] Stage 1: {stage1_time:.2f}s, Stage 2: {stage2_time:.2f}s, Total: {total_time:.2f}s")
|
|
logger.info(f"Batch {batch_id}: [Metrics] Avg time per video: {stage1_time/len(video_infos):.2f}s")
|
|
|
|
return {
|
|
'content': final_content,
|
|
'total_chunks': len(video_infos),
|
|
'synthesis_stages': 2
|
|
}
|
|
def _process_batch_with_chunking(self, video_infos: List[Dict], prompt: str,
|
|
user_email: str, batch_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Split batch videos into 54-min chunks and process with two-stage synthesis.
|
|
Treats all videos as one continuous sequence.
|
|
"""
|
|
logger.info(f"Batch {batch_id}: Starting chunked processing")
|
|
|
|
all_chunks = []
|
|
chunk_metadata = []
|
|
|
|
# Step 1: Split all videos into chunks
|
|
for video_idx, video_info in enumerate(video_infos):
|
|
video_path = video_info['path']
|
|
filename = video_info['filename']
|
|
duration = video_info['duration']
|
|
|
|
if duration > self.video_splitter.chunk_duration_seconds:
|
|
# Split long video
|
|
logger.info(f"Batch {batch_id}: Splitting {filename} ({duration/60:.1f} min)")
|
|
try:
|
|
chunks = self.video_splitter.split_video(video_path)
|
|
for chunk_idx, chunk_path in enumerate(chunks):
|
|
all_chunks.append(chunk_path)
|
|
chunk_metadata.append({
|
|
'video_idx': video_idx,
|
|
'video_name': filename,
|
|
'chunk_idx': chunk_idx,
|
|
'is_split': True
|
|
})
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_id}: Failed to split {filename}: {e}")
|
|
raise
|
|
else:
|
|
# Use whole video as one chunk
|
|
all_chunks.append(video_path)
|
|
chunk_metadata.append({
|
|
'video_idx': video_idx,
|
|
'video_name': filename,
|
|
'chunk_idx': 0,
|
|
'is_split': False
|
|
})
|
|
|
|
logger.info(f"Batch {batch_id}: Split into {len(all_chunks)} total chunks")
|
|
|
|
# Step 2: Process chunks with two-stage synthesis
|
|
try:
|
|
result = self._process_chunks_two_stage(
|
|
all_chunks,
|
|
chunk_metadata,
|
|
prompt,
|
|
user_email,
|
|
batch_id
|
|
)
|
|
finally:
|
|
# Step 3: Cleanup split chunks (not original videos)
|
|
for chunk_path, metadata in zip(all_chunks, chunk_metadata):
|
|
if metadata['is_split']:
|
|
try:
|
|
if os.path.exists(chunk_path):
|
|
os.remove(chunk_path)
|
|
logger.info(f"Batch {batch_id}: Cleaned up chunk: {os.path.basename(chunk_path)}")
|
|
except Exception as e:
|
|
logger.warning(f"Batch {batch_id}: Cleanup failed for {chunk_path}: {e}")
|
|
|
|
return result
|
|
|
|
def _process_chunks_two_stage(self, chunk_paths: List[str], chunk_metadata: List[Dict],
|
|
prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]:
|
|
"""
|
|
Two-stage synthesis:
|
|
Stage 1: Each chunk → concise summary
|
|
Stage 2: All summaries → final unified result
|
|
"""
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Generating summaries for {len(chunk_paths)} chunks")
|
|
stage1_start = time.time()
|
|
|
|
chunk_summaries = []
|
|
|
|
if self.max_parallel_chunks > 1:
|
|
# Parallel processing
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Using parallel processing with {self.max_parallel_chunks} workers")
|
|
with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor:
|
|
futures = []
|
|
|
|
for i, (chunk_path, metadata) in enumerate(zip(chunk_paths, chunk_metadata)):
|
|
summary_prompt = self._create_chunk_summary_prompt(
|
|
original_prompt=prompt,
|
|
chunk_number=i + 1,
|
|
total_chunks=len(chunk_paths),
|
|
video_name=metadata['video_name']
|
|
)
|
|
|
|
# Log prompt if configured
|
|
if self.log_prompts:
|
|
logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for chunk {i+1} ({metadata['video_name']}):\n{summary_prompt[:300]}...")
|
|
|
|
future = executor.submit(
|
|
self._process_single_chunk,
|
|
(i, chunk_path, summary_prompt, len(chunk_paths), user_email)
|
|
)
|
|
futures.append(future)
|
|
|
|
# Collect results
|
|
completed_count = 0
|
|
for future in as_completed(futures):
|
|
try:
|
|
chunk_idx, result = future.result()
|
|
|
|
# Extract content from result dict
|
|
if result.get('success'):
|
|
summary = result.get('content', '')
|
|
else:
|
|
summary = f"[Error: {result.get('message', 'Unknown error')}]"
|
|
|
|
chunk_summaries.append((chunk_idx, summary))
|
|
completed_count += 1
|
|
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Chunk {chunk_idx + 1}/{len(chunk_paths)} complete ({completed_count}/{len(chunk_paths)} total)")
|
|
|
|
# Log summary preview if configured
|
|
if self.log_summaries and isinstance(summary, str) and not summary.startswith('[Error'):
|
|
logger.debug(f"Batch {batch_id}: [Stage 1] Chunk {chunk_idx + 1} summary preview:\n{summary[:300]}...")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_id}: [Stage 1] Failed to process chunk: {e}")
|
|
chunk_summaries.append((len(chunk_summaries), f"[Error: {str(e)}]"))
|
|
|
|
else:
|
|
# Sequential processing
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Using sequential processing")
|
|
for i, (chunk_path, metadata) in enumerate(zip(chunk_paths, chunk_metadata)):
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Processing chunk {i+1}/{len(chunk_paths)} from {metadata['video_name']}")
|
|
|
|
summary_prompt = self._create_chunk_summary_prompt(
|
|
original_prompt=prompt,
|
|
chunk_number=i + 1,
|
|
total_chunks=len(chunk_paths),
|
|
video_name=metadata['video_name']
|
|
)
|
|
|
|
# Log prompt if configured
|
|
if self.log_prompts:
|
|
logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for chunk {i+1}:\n{summary_prompt[:300]}...")
|
|
|
|
try:
|
|
chunk_start = time.time()
|
|
result = self.process_video(chunk_path, summary_prompt, user_email)
|
|
chunk_time = time.time() - chunk_start
|
|
summary = result.get('content', '')
|
|
chunk_summaries.append((i, summary))
|
|
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Chunk {i + 1} complete: {len(summary)} chars in {chunk_time:.2f}s")
|
|
|
|
# Log summary preview if configured
|
|
if self.log_summaries:
|
|
logger.debug(f"Batch {batch_id}: [Stage 1] Chunk {i+1} summary preview:\n{summary[:300]}...")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Batch {batch_id}: [Stage 1] Failed to process chunk {i + 1}: {e}")
|
|
chunk_summaries.append((i, f"[Error: {str(e)}]"))
|
|
|
|
# Sort by chunk index
|
|
chunk_summaries.sort(key=lambda x: x[0])
|
|
summaries = [s[1] for s in chunk_summaries]
|
|
|
|
stage1_time = time.time() - stage1_start
|
|
logger.info(f"Batch {batch_id}: [Stage 1] Complete - {len(summaries)} summaries generated in {stage1_time:.2f}s")
|
|
|
|
# Log traceability
|
|
logger.info(f"Batch {batch_id}: [Traceability] Chunk-to-summary mapping:")
|
|
for i, metadata in enumerate(chunk_metadata):
|
|
chunk_info = f"chunk {metadata.get('chunk_idx', 0)+1}" if metadata.get('is_split') else "whole video"
|
|
logger.info(f"Batch {batch_id}: - Chunk {i+1}: {metadata['video_name']} ({chunk_info}) → Summary {i+1}")
|
|
|
|
# Stage 2: Synthesize all summaries
|
|
stage2_start = time.time()
|
|
logger.info(f"Batch {batch_id}: [Stage 2] Synthesizing {len(summaries)} summaries into final result")
|
|
|
|
final_content = self._synthesize_final_result(
|
|
summaries=summaries,
|
|
chunk_metadata=chunk_metadata,
|
|
original_prompt=prompt,
|
|
user_email=user_email
|
|
)
|
|
|
|
stage2_time = time.time() - stage2_start
|
|
total_time = stage1_time + stage2_time
|
|
|
|
# Log performance metrics
|
|
logger.info(f"Batch {batch_id}: [Metrics] Stage 1: {stage1_time:.2f}s, Stage 2: {stage2_time:.2f}s, Total: {total_time:.2f}s")
|
|
logger.info(f"Batch {batch_id}: [Metrics] Avg time per chunk: {stage1_time/len(chunk_paths):.2f}s")
|
|
logger.info(f"Batch {batch_id}: [Metrics] Total API calls: {len(chunk_paths) + 1}") # +1 for synthesis
|
|
|
|
return {
|
|
'content': final_content,
|
|
'total_chunks': len(chunk_paths),
|
|
'synthesis_stages': 2
|
|
}
|
|
|
|
def _create_chunk_summary_prompt(self, original_prompt: str, chunk_number: int,
|
|
total_chunks: int, video_name: str) -> str:
|
|
"""
|
|
Create prompt for individual video/chunk in batch processing.
|
|
User prompt is the primary instruction with minimal synthesis hint.
|
|
"""
|
|
prompt = f"""You are analyzing video {chunk_number} of {total_chunks}: "{video_name}".
|
|
Your response will be synthesized with responses from other videos to create a unified final result.
|
|
|
|
{original_prompt}"""
|
|
return prompt
|
|
|
|
def _synthesize_final_result(self, summaries: List[str], chunk_metadata: List[Dict],
|
|
original_prompt: str, user_email: str) -> str:
|
|
"""
|
|
Synthesize all chunk summaries into single cohesive result using Gemini.
|
|
Uses a universal template that makes the user's prompt the primary instruction.
|
|
"""
|
|
# Extract video names for context
|
|
video_names = list(set(m['video_name'] for m in chunk_metadata))
|
|
num_videos = len(video_names)
|
|
|
|
# Prepare summaries text
|
|
summaries_text = ""
|
|
total_summary_chars = 0
|
|
for i, summary in enumerate(summaries, 1):
|
|
video_name = chunk_metadata[i-1]['video_name']
|
|
summaries_text += f"\n\n--- Summary {i} (from {video_name}) ---\n{summary.strip()}\n"
|
|
total_summary_chars += len(summary)
|
|
|
|
logger.info(f"[Stage 2] Combined summaries: {len(summaries)} summaries, {total_summary_chars} total chars")
|
|
|
|
# Create universal synthesis prompt
|
|
if num_videos > 1:
|
|
video_context = f"{num_videos} videos: {', '.join(video_names)}"
|
|
else:
|
|
video_context = f"video: {video_names[0]}"
|
|
|
|
synthesis_prompt = f"""You are creating a final unified response by combining multiple segment summaries from {video_context}.
|
|
|
|
Here are the segment summaries:
|
|
{summaries_text}
|
|
|
|
Original user request:
|
|
{original_prompt}
|
|
|
|
Your task: Create ONE cohesive response that fulfills the user's request. Integrate information from all summaries naturally, without mentioning segments or chunks.
|
|
"""
|
|
|
|
# Log synthesis prompt if configured
|
|
if self.log_prompts:
|
|
logger.debug(f"[Stage 2] Synthesis prompt preview:\n{synthesis_prompt[:500]}...")
|
|
|
|
# Send to Gemini for final synthesis
|
|
logger.info(f"[Stage 2] Sending synthesis request to Gemini API (model: {self.synthesis_model})")
|
|
|
|
synthesis_start = time.time()
|
|
try:
|
|
response = self._make_api_request_with_retry(
|
|
model=self.synthesis_model,
|
|
contents=[{"text": synthesis_prompt}],
|
|
context="[Batch Synthesis]"
|
|
)
|
|
|
|
synthesis_time = time.time() - synthesis_start
|
|
|
|
synthesized_content = ""
|
|
if response.parts:
|
|
for part in response.parts:
|
|
if hasattr(part, 'text'):
|
|
synthesized_content += part.text
|
|
|
|
if not synthesized_content:
|
|
logger.warning("[Stage 2] Synthesis returned empty, falling back to concatenation")
|
|
return self._fallback_concatenation(summaries, chunk_metadata)
|
|
|
|
logger.info(f"[Stage 2] Synthesis complete: {len(synthesized_content)} chars in {synthesis_time:.2f}s")
|
|
|
|
# Log synthesis result preview if configured
|
|
if self.log_summaries:
|
|
logger.debug(f"[Stage 2] Synthesized result preview:\n{synthesized_content[:500]}...")
|
|
|
|
return synthesized_content
|
|
|
|
except Exception as e:
|
|
logger.error(f"[Stage 2] Synthesis failed: {str(e)}, using fallback")
|
|
return self._fallback_concatenation(summaries, chunk_metadata)
|
|
|
|
def _fallback_concatenation(self, summaries: List[str], chunk_metadata: List[Dict]) -> str:
|
|
"""
|
|
Fallback method when AI synthesis fails.
|
|
Simple concatenation with section markers.
|
|
"""
|
|
logger.info("Using fallback concatenation method")
|
|
|
|
result = "# Combined Analysis\n\n"
|
|
result += "*Note: This is a combined analysis from multiple video segments.*\n\n"
|
|
|
|
for i, summary in enumerate(summaries, 1):
|
|
video_name = chunk_metadata[i-1]['video_name']
|
|
result += f"## Segment {i} - {video_name}\n\n"
|
|
result += summary.strip() + "\n\n"
|
|
|
|
return result
|
|
|
|
def _fallback_concatenation_single_video(self, responses: List[str], num_chunks: int,
|
|
video_name: str = "") -> str:
|
|
"""
|
|
Fallback method when AI synthesis fails for single video chunks.
|
|
Simple concatenation with minimal formatting.
|
|
|
|
Args:
|
|
responses: List of chunk responses
|
|
num_chunks: Number of chunks
|
|
video_name: Video filename
|
|
|
|
Returns:
|
|
Concatenated response
|
|
"""
|
|
logger.info("Using fallback concatenation for single video")
|
|
|
|
result = f"# Complete Video Analysis"
|
|
if video_name:
|
|
result += f": {video_name}"
|
|
result += "\n\n"
|
|
result += f"*Note: This video was analyzed in {num_chunks} segments.*\n\n"
|
|
|
|
for i, response in enumerate(responses, 1):
|
|
result += f"## Segment {i} of {num_chunks}\n\n"
|
|
result += response.strip() + "\n\n"
|
|
|
|
return result
|