from google import genai import mimetypes import time import os import logging import requests import json import datetime import base64 from typing import Dict, Any, Optional, List, Tuple from dotenv import load_dotenv from video_splitter import VideoSplitter from concurrent.futures import ThreadPoolExecutor, as_completed import threading from system_utils import system_utils from error_reporter import ErrorReporter, ErrorCategory # Load environment variables from .env file load_dotenv() logger = logging.getLogger('video_query') class VideoProcessor: """ Class to handle video uploads and processing with Gemini API. """ # Default prompts for different modes PROMPTS = { "meeting_summary": "Generate a detailed summary of the meeting in the attached video recording, including discussion points and action items with owners", "process_documentation": "Generate detailed process documentation suitable for reference or training based on the process illustrated in the attached video recording. Write the documentation so that a new user will be able to follow step by step and accomplish the task illustrated in the video", "documentation_with_charts": "Analyze this video to create comprehensive process documentation with workflow diagrams for a knowledge base article. Follow these requirements exactly:\n\n1. CONTENT REQUIREMENTS:\n - Provide a detailed step-by-step explanation of the process shown\n - Be extremely verbose and thorough - include all relevant details, context, and nuances\n - Structure as a complete knowledge base article with clear sections\n - Include overview, detailed steps, tips, and troubleshooting where applicable\n\n2. MERMAID DIAGRAM REQUIREMENTS:\n - Create workflow diagrams using valid Mermaid syntax where helpful\n - CRITICAL: Use only simple alphanumeric text in node descriptions and labels\n - CRITICAL: No special characters like quotes brackets colons semicolons or symbols in node text\n - CRITICAL: Use underscores instead of spaces in node IDs and labels\n - CRITICAL: Keep all text simple to avoid syntax errors\n - Example format: Start_Process --> Complete_Task --> End_Process\n - Use flowchart format: graph TD or graph LR\n\n3. OUTPUT STRUCTURE:\n - Title and overview section\n - Prerequisites section if applicable\n - Detailed step-by-step process\n - Mermaid workflow diagram(s) showing the process flow\n - Tips and best practices\n - Troubleshooting common issues\n\nEnsure all Mermaid diagrams use simple text without special characters to prevent parsing errors.", "custom": "" # Custom prompt will be provided by the user } # Maximum video duration in minutes (Gemini limitation) MAX_VIDEO_DURATION = 55 # Threshold for chunked upload (10MB) CHUNKED_UPLOAD_THRESHOLD = 10 * 1024 * 1024 # Webhook URL for tracking usage WEBHOOK_URL = "https://hook.us1.make.celonis.com/8ri1h8b2he4wudp2jku69mgcxumzxf3v" # Parallel processing configuration # Default max workers for parallel chunk processing DEFAULT_MAX_WORKERS = 4 # Default concurrent workers # Model configuration DEFAULT_PROCESSING_MODEL = "gemini-2.5-pro" # Model for individual video processing DEFAULT_SYNTHESIS_MODEL = "gemini-2.5-pro" # Model for batch synthesis # Retry configuration MAX_RETRY_ATTEMPTS = 5 # Maximum retry attempts RETRY_DELAYS = [5, 10, 20, 40, 60] # Exponential backoff delays in seconds def __init__(self, api_key: Optional[str] = None, max_parallel_chunks: int = None): """ Initialize with API key from environment variable or direct setting Args: api_key: Google API key for Gemini max_parallel_chunks: Maximum number of chunks to process in parallel (default: from MAX_PARALLEL_CHUNKS env var or 4) """ self.api_key = api_key or os.getenv("GOOGLE_API_KEY") if not self.api_key: logger.error("API key not provided") raise ValueError("API key not provided - set GOOGLE_API_KEY environment variable or pass when initializing") # Initialize the Gemini client logger.info("Initializing Gemini API client") self.client = genai.Client(api_key=self.api_key) logger.info("Gemini API client initialized successfully") # Set parallel processing configuration if max_parallel_chunks: self.max_parallel_chunks = max_parallel_chunks else: # Load from environment variable or use default env_max_workers = os.getenv("MAX_PARALLEL_CHUNKS") if env_max_workers: self.max_parallel_chunks = int(env_max_workers) else: self.max_parallel_chunks = self.DEFAULT_MAX_WORKERS logger.info(f"Parallel processing: max {self.max_parallel_chunks} concurrent chunks") # Initialize video splitter self.video_splitter = VideoSplitter() # Load configuration from environment variables self.processing_model = os.getenv("VIDEO_PROCESSOR_MODEL", self.DEFAULT_PROCESSING_MODEL) self.synthesis_model = os.getenv("VIDEO_SYNTHESIS_MODEL", self.DEFAULT_SYNTHESIS_MODEL) self.log_prompts = os.getenv("BATCH_PROCESSING_LOG_PROMPTS", "false").lower() == "true" self.log_summaries = os.getenv("BATCH_PROCESSING_LOG_SUMMARIES", "false").lower() == "true" logger.info(f"Configuration: processing_model={self.processing_model}, synthesis_model={self.synthesis_model}") logger.info(f"Logging: prompts={self.log_prompts}, summaries={self.log_summaries}") def send_usage_webhook(self, user_email: str, prompt: str) -> None: """ Send usage data to webhook for tracking purposes Args: user_email: Email of the user who processed the video prompt: The prompt used for processing """ try: current_datetime = datetime.datetime.now().isoformat() webhook_data = { "tool": "VIDEOQUERY", "date": current_datetime, "user": user_email, "model": "GEMINI", "settings": "no settings", "subTool": "no subTool", "prompt": prompt, "negativePrompt": "no NEGATIVE_PROMPT", "image": "no image" } logger.info(f"Sending usage data to webhook for user: {user_email}") response = requests.post( self.WEBHOOK_URL, headers={"Content-Type": "application/json"}, data=json.dumps(webhook_data), timeout=10 # 10 second timeout ) if response.status_code == 200: logger.info("Successfully sent usage data to webhook") else: logger.warning(f"Webhook request failed with status code: {response.status_code}") logger.warning(f"Response: {response.text}") except Exception as e: logger.error(f"Error sending usage data to webhook: {str(e)}") # Don't raise the exception - webhook failure shouldn't block the main flow def _extract_error_code(self, error_message: str) -> str: """ Extract HTTP error code from error message. Args: error_message: Error message string Returns: Error code (e.g., "503", "429") or "UNKNOWN" """ import re match = re.search(r'(\d{3})\s+(UNAVAILABLE|TOO_MANY_REQUESTS|RESOURCE_EXHAUSTED|INVALID_ARGUMENT|INTERNAL)', error_message, re.IGNORECASE) if match: return match.group(1) return "UNKNOWN" def _is_retryable_error(self, error_str: str, error_code: str, attempt: int) -> Tuple[bool, int]: """ Determine if error is retryable and calculate retry delay. Args: error_str: Error message (lowercase) error_code: Extracted error code attempt: Current attempt number (0-indexed) Returns: Tuple of (is_retryable: bool, retry_delay_seconds: int) """ # 503 UNAVAILABLE - Model overloaded (RETRYABLE with longer delays) if '503' in error_code or 'unavailable' in error_str: delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)] logger.warning(f"503 UNAVAILABLE detected - API overloaded, will retry in {delay}s") return (True, delay) # 429 TOO_MANY_REQUESTS - Rate limit (RETRYABLE with longer delays) if '429' in error_code or 'too many requests' in error_str or 'rate limit' in error_str: delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)] logger.warning(f"429 RATE LIMIT detected, will retry in {delay}s") return (True, delay) # 500 INTERNAL_SERVER_ERROR (RETRYABLE) if '500' in error_code or 'internal server error' in error_str: delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)] logger.warning(f"500 INTERNAL ERROR detected, will retry in {delay}s") return (True, delay) # RESOURCE_EXHAUSTED (RETRYABLE) if 'resource_exhausted' in error_str or 'quota' in error_str: delay = self.RETRY_DELAYS[min(attempt, len(self.RETRY_DELAYS) - 1)] logger.warning(f"Resource exhausted - quota or rate limit, will retry in {delay}s") return (True, delay) # Network errors (RETRYABLE with shorter delays) if any(err in error_str for err in ['name resolution', 'connection', 'timeout', 'network']): delay = 5 # Fixed 5s delay for network issues logger.warning(f"Network error detected, will retry in {delay}s") return (True, delay) # 400 INVALID_ARGUMENT - Usually not retryable if '400' in error_code or 'invalid_argument' in error_str: logger.error(f"400 INVALID_ARGUMENT - not retryable") return (False, 0) # Default: not retryable logger.error(f"Error not recognized as retryable: {error_str[:100]}") return (False, 0) def _make_api_request_with_retry(self, model: str, contents: list, context: str = "") -> any: """ Make API request with intelligent retry logic. Handles 503 (overload), 429 (rate limit), 500 (server error), and network errors. Args: model: Model name to use contents: Content to send to the API context: Context description for logging (e.g., "[Video: example.mp4]") Returns: API response object Raises: Exception: If all retry attempts fail """ last_exception = None for attempt in range(self.MAX_RETRY_ATTEMPTS): try: # Make the API call if attempt == 0: logger.info(f"{context} Sending request to Gemini API") else: logger.info(f"{context} Retry attempt {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}") response = self.client.models.generate_content( model=model, contents=contents ) # Success! if attempt > 0: logger.info(f"{context} ✓ Request succeeded after {attempt + 1} attempts") else: logger.info(f"{context} ✓ Request succeeded on first attempt") return response except Exception as e: last_exception = e error_str = str(e).lower() error_code = self._extract_error_code(str(e)) # Log detailed error information for INVALID_ARGUMENT (helps debug) if 'invalid_argument' in error_str or '400' in error_str: logger.error("=" * 80) logger.error(f"{context} INVALID_ARGUMENT ERROR:") logger.error(f" Error: {str(e)[:200]}") logger.error(f" Model: {model}") logger.error(f" Attempt: {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}") logger.error("=" * 80) # Determine if retryable is_retryable, retry_delay = self._is_retryable_error(error_str, error_code, attempt) if not is_retryable: logger.error(f"{context} Non-retryable error: {error_code} - {str(e)[:100]}") raise if attempt < self.MAX_RETRY_ATTEMPTS - 1: logger.warning( f"{context} Retryable error (attempt {attempt + 1}/{self.MAX_RETRY_ATTEMPTS}): " f"{error_code} - {str(e)[:150]}" ) logger.info(f"{context} Waiting {retry_delay}s before retry...") time.sleep(retry_delay) continue else: logger.error( f"{context} ✗ All {self.MAX_RETRY_ATTEMPTS} attempts failed. " f"Last error: {error_code} - {str(e)[:150]}" ) raise # If we get here, all retries failed raise last_exception def process_video(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]: """ Process a video with the given prompt using Gemini API Args: video_path: Path to the video file prompt: Text prompt to use for video analysis user_email: Email of the user processing the video (for usage tracking) Returns: Dictionary with processing result or error """ start_time = time.time() result = { "success": False, "message": "", "content": "", "processing_time_seconds": 0 } logger.info(f"Processing video: {video_path}") logger.info(f"Prompt: {prompt[:100]}..." if len(prompt) > 100 else f"Prompt: {prompt}") if not os.path.exists(video_path): error_msg = f"Video file not found at '{video_path}'" logger.error(error_msg) result["message"] = error_msg return result try: # Get file size file_size = os.path.getsize(video_path) file_size_mb = file_size / (1024 * 1024) logger.info(f"File size: {file_size_mb:.2f} MB") # Get video duration to detect too-short videos try: video_duration = self.video_splitter.get_video_duration(video_path) if video_duration: logger.info(f"Video duration: {video_duration:.2f} seconds ({video_duration/60:.2f} minutes)") # Check if video is too short (< 1 second can cause INVALID_ARGUMENT) if video_duration < 1.0: error_msg = f"Video too short: {video_duration:.2f}s. Gemini requires videos >= 1 second" logger.error(error_msg) result["message"] = error_msg return result else: logger.warning("Could not determine video duration") except Exception as dur_err: logger.warning(f"Error checking video duration: {str(dur_err)}") # Determine MIME type for the video mime_type, _ = mimetypes.guess_type(video_path) if not mime_type: logger.info(f"Could not determine MIME type, using default: video/mp4") mime_type = "video/mp4" # Fallback else: logger.info(f"MIME type: {mime_type}") # Validate MIME type is video if not mime_type.startswith('video/'): error_msg = f"Invalid file type: {mime_type}. Only video files are supported (mp4, avi, mov, mkv, webm, etc.)" logger.error(error_msg) result["message"] = error_msg return result # Validate video file is readable and not corrupted # This provides a secondary check in case validation at upload didn't happen try: import subprocess ffprobe_path = system_utils.find_ffprobe() probe_result = subprocess.run( [ffprobe_path, '-v', 'error', '-show_entries', 'format=duration,format_name', '-of', 'default=noprint_wrappers=1', video_path], capture_output=True, text=True, timeout=10 ) if probe_result.returncode != 0: error_detail = probe_result.stderr.strip() # Provide user-friendly error messages for common issues if "moov atom not found" in error_detail.lower(): error_msg = "Video file is incomplete or corrupted (missing header). The file may not have uploaded completely. Please try uploading again." elif "invalid data found" in error_detail.lower(): error_msg = "Video file contains invalid data and cannot be processed. Please check the file or try re-encoding it." else: error_msg = f"Video file appears to be corrupted or unreadable. Technical details: {error_detail}" logger.error(f"Video validation failed: {error_msg}") result["message"] = error_msg return result logger.info(f"Video file validation successful. Format info: {probe_result.stdout[:100]}") except subprocess.TimeoutExpired: logger.warning("Video validation timed out after 10s - proceeding anyway") except FileNotFoundError: logger.warning("ffprobe not found - skipping video file validation") except Exception as val_err: logger.warning(f"Could not validate video file: {str(val_err)} - proceeding anyway") # SIMPLIFIED APPROACH: Use inline base64 for all files # File Upload API has known issues (KeyError: 'file' in SDK 1.45.0-1.49.0) # Gemini API REQUEST limit: 1GB (1073741824 bytes) - confirmed by testing # Base64 adds ~37% overhead, so we validate encoded size # Calculate estimated encoded size (base64 overhead ~37%) BASE64_OVERHEAD = 1.37 estimated_encoded_mb = file_size_mb * BASE64_OVERHEAD API_LIMIT_MB = 1000 # 1GB API request limit # Check if encoded size would exceed API limit if estimated_encoded_mb > API_LIMIT_MB: error_msg = ( f"Video chunk is too large: {file_size_mb:.2f}MB raw, " f"~{estimated_encoded_mb:.1f}MB after base64 encoding. " f"This exceeds the {API_LIMIT_MB}MB (1GB) API limit. " f"The video needs to be split into smaller chunks. " f"Target chunk size: 500MB (accounting for variable bitrate)." ) logger.error(error_msg) result["message"] = error_msg return result # Warn if approaching API limit (>900MB encoded) if estimated_encoded_mb > 900: logger.warning( f"Chunk is large: {file_size_mb:.1f}MB raw, " f"~{estimated_encoded_mb:.1f}MB encoded. " f"Approaching {API_LIMIT_MB}MB API limit. Processing may be slower." ) # Use base64 encoding for all files (reliable and works consistently) logger.info(f"Encoding video as base64 (file size: {file_size_mb:.2f}MB)") encode_start = time.time() with open(video_path, "rb") as video_file_obj: video_data = video_file_obj.read() video_base64 = base64.b64encode(video_data).decode('utf-8') encode_time = time.time() - encode_start encoded_size_mb = len(video_base64) / (1024 * 1024) logger.info(f"Base64 encoding complete in {encode_time:.2f}s. Encoded size: {encoded_size_mb:.2f}MB ({len(video_base64)} chars)") # Verify actual encoded size is within API limits (1GB) if encoded_size_mb > API_LIMIT_MB: error_msg = ( f"Encoded video size ({encoded_size_mb:.2f}MB) exceeds Gemini API limit of {API_LIMIT_MB}MB (1GB). " f"Original file: {file_size_mb:.2f}MB. " f"This chunk needs further splitting. Target: 500MB raw chunks. " f"Variable bitrate caused larger-than-expected chunk size." ) logger.error(error_msg) result["message"] = error_msg return result # Create the content parts using inline data prompt_parts = [ {"text": prompt}, {"inline_data": { "mime_type": mime_type, "data": video_base64 }} ] uploaded_file = None # Not using File Upload API # Use the new retry logic with rate limiting context = f"[Video: {os.path.basename(video_path)}]" api_start = time.time() response = self._make_api_request_with_retry( model=self.processing_model, contents=prompt_parts, context=context ) api_time = time.time() - api_start logger.info(f"Received response from Gemini (API call took {api_time:.1f}s)") # Extract the response content content = "" if response.parts: logger.info(f"Response has {len(response.parts)} parts") for i, part in enumerate(response.parts): if hasattr(part, 'text'): part_text = part.text content_preview = part_text[:100] + '...' if len(part_text) > 100 else part_text logger.info(f"Part {i} (text): {content_preview}") content += part_text else: logger.info(f"Part {i} (no text): {type(part)}") else: logger.warning("No parts in response") if hasattr(response, 'prompt_feedback') and response.prompt_feedback: logger.warning(f"Prompt feedback: {response.prompt_feedback}") # Set success result result["success"] = True result["content"] = content result["processing_time_seconds"] = round(time.time() - start_time, 2) logger.info(f"Processed result with {len(content)} characters") logger.info(f"Total processing time: {result['processing_time_seconds']}s") # Send usage data to webhook for tracking self.send_usage_webhook(user_email, prompt) # Clean up uploaded file if it was used if uploaded_file: try: logger.info(f"Deleting uploaded file: {uploaded_file.name}") self.client.files.delete(name=uploaded_file.name) logger.info("File deleted successfully from Gemini storage") except Exception as del_err: logger.warning(f"Could not delete file from Gemini storage: {str(del_err)}") return result except Exception as e: import traceback error_report = ErrorReporter.capture_error( e, context={ 'video_path': video_path, 'prompt_length': len(prompt), 'operation': 'process_video' } ) error_details = traceback.format_exc() logger.error(f"Error processing video: {str(e)}") logger.error(error_details) result["message"] = error_report.format_user_message() result["error_details"] = error_details result["error_id"] = error_report.error_id result["error_category"] = error_report.category.value return result def combine_chunk_responses(self, responses: List[str], prompt: str, num_chunks: int, video_name: str = "") -> str: """ Combine responses from multiple chunks of a single video using AI synthesis. Uses universal synthesis approach - no type detection. Args: responses: List of response texts from each chunk prompt: Original user prompt num_chunks: Total number of chunks video_name: Name of the video file Returns: Combined response text """ logger.info(f"Combining {len(responses)} chunk responses using AI synthesis") # Try AI synthesis first try: # Prepare chunk responses summaries_text = "" for i, response in enumerate(responses, 1): summaries_text += f"\n--- Segment {i} ---\n{response.strip()}\n" # Universal synthesis prompt for single video synthesis_prompt = f"""You are creating a final unified response by combining multiple segment analyses from one video. Context: - One video was split into {num_chunks} segments for processing - Each segment was analyzed separately - Below are the responses from each segment Original user request: "{prompt}" Segment responses: {summaries_text} Your task: 1. Combine these segment responses into ONE cohesive response that fulfills the user's request above 2. Do not reference segments, chunks, or parts in your output 3. Present as a unified analysis of the complete video Provide your unified response: """ logger.info("[Chunk Synthesis] Sending synthesis request to Gemini") response = self._make_api_request_with_retry( model=self.synthesis_model, contents=[{"text": synthesis_prompt}], context="[Chunk Combination]" ) if response.parts: synthesized_content = "" for part in response.parts: if hasattr(part, 'text'): synthesized_content += part.text if synthesized_content: logger.info("[Chunk Synthesis] Successfully synthesized chunk responses") return synthesized_content logger.warning("[Chunk Synthesis] Synthesis returned empty, falling back to concatenation") except Exception as e: logger.warning(f"[Chunk Synthesis] Synthesis failed: {e}, falling back to concatenation") # Fallback: simple concatenation return self._fallback_concatenation_single_video(responses, num_chunks, video_name) def _process_single_chunk(self, chunk_info: Tuple[int, str, str, int, str]) -> Tuple[int, Dict[str, Any]]: """ Process a single video chunk. Used for parallel processing. Args: chunk_info: Tuple of (chunk_index, chunk_path, chunk_prompt, total_chunks, user_email) Returns: Tuple of (chunk_index, result_dict) """ chunk_index, chunk_path, chunk_prompt, total_chunks, user_email = chunk_info logger.info(f"[Parallel] Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_path}") try: chunk_result = self.process_video(chunk_path, chunk_prompt, user_email) logger.info(f"[Parallel] Completed chunk {chunk_index + 1}/{total_chunks}") return (chunk_index, chunk_result) except Exception as e: logger.error(f"[Parallel] Error processing chunk {chunk_index + 1}/{total_chunks}: {str(e)}") return (chunk_index, { "success": False, "message": f"Error processing chunk {chunk_index + 1}: {str(e)}", "content": "" }) def _process_chunks_parallel(self, chunk_paths: List[str], prompt: str, user_email: str) -> List[Dict[str, Any]]: """ Process multiple video chunks in parallel using ThreadPoolExecutor. Args: chunk_paths: List of paths to video chunk files prompt: Original prompt for video analysis user_email: User email for tracking Returns: List of result dictionaries in order of chunks """ num_chunks = len(chunk_paths) logger.info(f"Starting parallel processing of {num_chunks} chunks with {self.max_parallel_chunks} workers") # Prepare chunk information for parallel processing chunk_infos = [] for i, chunk_path in enumerate(chunk_paths): # Extract video name from chunk path (remove _chunk_XX suffix) video_name = os.path.basename(chunk_path).rsplit('_chunk_', 1)[0] if '_chunk_' in chunk_path else os.path.basename(chunk_path) chunk_prompt = self._create_chunk_prompt(prompt, i + 1, num_chunks, video_name) chunk_infos.append((i, chunk_path, chunk_prompt, num_chunks, user_email)) # Process chunks in parallel results = [None] * num_chunks # Pre-allocate results list to maintain order with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor: # Submit all chunks for processing future_to_chunk = { executor.submit(self._process_single_chunk, chunk_info): chunk_info[0] for chunk_info in chunk_infos } # Collect results as they complete completed = 0 for future in as_completed(future_to_chunk): chunk_index = future_to_chunk[future] try: chunk_index, result = future.result() results[chunk_index] = result completed += 1 logger.info(f"[Parallel] Progress: {completed}/{num_chunks} chunks completed") except Exception as e: logger.error(f"[Parallel] Unexpected error for chunk {chunk_index + 1}: {str(e)}") results[chunk_index] = { "success": False, "message": f"Unexpected error: {str(e)}", "content": "" } logger.info(f"[Parallel] All {num_chunks} chunks processed") return results def process_long_video(self, video_path: str, prompt: str, user_email: str = "anonymous", use_parallel: bool = True) -> Dict[str, Any]: """ Process a long video by splitting it into chunks and combining the results. Supports both parallel and sequential processing. Args: video_path: Path to the video file prompt: Text prompt to use for video analysis user_email: Email of the user processing the video (for usage tracking) use_parallel: If True, process chunks in parallel; if False, process sequentially Returns: Dictionary with processing result or error """ start_time = time.time() result = { "success": False, "message": "", "content": "", "chunks_processed": 0, "processing_mode": "parallel" if use_parallel else "sequential", "processing_time_seconds": 0 } chunk_paths = [] try: # Check if video needs splitting num_chunks, duration_minutes = self.video_splitter.get_chunk_info(video_path) if num_chunks <= 1: logger.info("Video does not need splitting, processing normally") return self.process_video(video_path, prompt, user_email) logger.info(f"Long video detected: {duration_minutes:.2f} minutes, will be split into {num_chunks} chunks") logger.info(f"Processing mode: {'PARALLEL' if use_parallel else 'SEQUENTIAL'}") # Split the video logger.info("Starting video splitting...") chunk_paths = self.video_splitter.split_video(video_path) logger.info(f"Video split into {len(chunk_paths)} chunks successfully") # Process chunks (parallel or sequential) if use_parallel: # Parallel processing using ThreadPoolExecutor chunk_results = self._process_chunks_parallel(chunk_paths, prompt, user_email) else: # Sequential processing (original logic) chunk_results = [] for i, chunk_path in enumerate(chunk_paths, 1): logger.info(f"[Sequential] Processing chunk {i}/{len(chunk_paths)}: {chunk_path}") # Extract video name from chunk path (remove _chunk_XX suffix) video_name = os.path.basename(chunk_path).rsplit('_chunk_', 1)[0] if '_chunk_' in chunk_path else os.path.basename(chunk_path) # Modify prompt to indicate this is part of a multi-part video chunk_prompt = self._create_chunk_prompt(prompt, i, len(chunk_paths), video_name) # Process this chunk chunk_result = self.process_video(chunk_path, chunk_prompt, user_email) chunk_results.append(chunk_result) logger.info(f"[Sequential] Completed chunk {i}/{len(chunk_paths)}") # Check for failures in any chunk chunk_responses = [] for i, chunk_result in enumerate(chunk_results, 1): if not chunk_result["success"]: error_msg = f"Failed to process chunk {i}/{len(chunk_paths)}: {chunk_result.get('message', 'Unknown error')}" logger.error(error_msg) result["message"] = error_msg result["chunks_processed"] = i - 1 return result chunk_responses.append(chunk_result["content"]) # Combine all responses logger.info("Combining responses from all chunks...") combined_content = self.combine_chunk_responses( chunk_responses, prompt, len(chunk_paths), video_name=os.path.basename(video_path) ) result["success"] = True result["content"] = combined_content result["chunks_processed"] = len(chunk_paths) result["processing_time_seconds"] = round(time.time() - start_time, 2) result["message"] = f"Successfully processed video in {len(chunk_paths)} chunks" logger.info(f"Long video processing completed successfully: {len(chunk_paths)} chunks") logger.info(f"Total processing time: {result['processing_time_seconds']}s") return result except Exception as e: import traceback error_report = ErrorReporter.capture_error( e, context={ 'video_path': video_path, 'prompt_length': len(prompt), 'operation': 'process_long_video', 'chunks_processed': result.get('chunks_processed', 0) } ) error_details = traceback.format_exc() logger.error(f"Error processing long video: {str(e)}") logger.error(error_details) result["message"] = error_report.format_user_message() result["error_details"] = error_details result["error_id"] = error_report.error_id result["error_category"] = error_report.category.value return result finally: # Always clean up chunk files if chunk_paths: logger.info("Cleaning up temporary chunk files...") self.video_splitter.cleanup_chunks(chunk_paths) def _create_chunk_prompt(self, original_prompt: str, chunk_num: int, total_chunks: int, video_name: str = "") -> str: """ Create a prompt for a video chunk that provides minimal context about its position. Args: original_prompt: The original user prompt chunk_num: Current chunk number (1-indexed) total_chunks: Total number of chunks video_name: Name of the video file (for context) Returns: Prompt with minimal system context, keeping user's prompt as primary instruction """ context = f"""This is segment {chunk_num} of {total_chunks} from video "{video_name}". Your response will be combined with responses from other segments to create the final result. {original_prompt}""" return context def process_video_auto(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]: """ Automatically process a video, handling both short and long videos. This method detects if the video needs splitting and processes accordingly. Args: video_path: Path to the video file prompt: Text prompt to use for video analysis user_email: Email of the user processing the video (for usage tracking) Returns: Dictionary with processing result or error """ logger.info(f"Auto-processing video: {video_path}") # Check if video needs splitting if self.video_splitter.needs_splitting(video_path): logger.info("Video requires splitting, using long video processing") return self.process_long_video(video_path, prompt, user_email) else: logger.info("Video is within single-chunk limit, using standard processing") return self.process_video(video_path, prompt, user_email) def process_video_batch(self, video_paths: List[str], filenames: List[str], prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]: """ Process multiple videos as a continuous batch. Videos are treated as sequential segments of one long video. Args: video_paths: List of paths to video files (in order) filenames: List of original filenames prompt: Text prompt to use for analysis user_email: Email of the user batch_id: Unique identifier for this batch Returns: Dictionary with unified result """ logger.info(f"Batch {batch_id}: Processing {len(video_paths)} videos") try: # Get durations and calculate total video_infos = [] total_duration_seconds = 0 for video_path, filename in zip(video_paths, filenames): try: duration = self.video_splitter.get_video_duration(video_path) video_infos.append({ 'path': video_path, 'filename': filename, 'duration': duration }) total_duration_seconds += duration logger.info(f"Batch {batch_id}: {filename} - {duration/60:.1f} minutes") except Exception as e: logger.error(f"Batch {batch_id}: Failed to get duration for {filename}: {e}") raise total_duration_minutes = total_duration_seconds / 60 chunk_duration_minutes = self.video_splitter.chunk_duration_seconds / 60 logger.info(f"Batch {batch_id}: Total duration: {total_duration_minutes:.1f} minutes") # Decide processing strategy needs_chunking = ( total_duration_minutes > chunk_duration_minutes or any(v['duration'] > self.video_splitter.chunk_duration_seconds for v in video_infos) ) if needs_chunking: logger.info(f"Batch {batch_id}: Using chunked processing") return self._process_batch_with_chunking( video_infos, prompt, user_email, batch_id ) else: logger.info(f"Batch {batch_id}: Processing directly (no chunking needed)") return self._process_batch_direct( video_infos, prompt, user_email, batch_id ) except Exception as e: logger.error(f"Batch {batch_id}: Processing error: {str(e)}") raise def _process_batch_direct(self, video_infos: List[Dict], prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]: """ Process batch directly without chunking (total duration < 54 minutes). Uses two-stage synthesis even for short batches. """ logger.info(f"Batch {batch_id}: [Stage 1] Direct processing of {len(video_infos)} videos") stage1_start = time.time() # Process each video separately first (stage 1: summaries) summaries = [] for i, video_info in enumerate(video_infos, 1): logger.info(f"Batch {batch_id}: [Stage 1] Processing video {i}/{len(video_infos)}: {video_info['filename']}") summary_prompt = self._create_chunk_summary_prompt( original_prompt=prompt, chunk_number=i, total_chunks=len(video_infos), video_name=video_info['filename'] ) # Log prompt if configured if self.log_prompts: logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for video {i}:\n{summary_prompt[:300]}...") try: video_start = time.time() result = self.process_video(video_info['path'], summary_prompt, user_email) video_time = time.time() - video_start summary = result.get('content', '') summaries.append(summary) logger.info(f"Batch {batch_id}: [Stage 1] Video {i} complete: {len(summary)} chars in {video_time:.2f}s") # Log summary preview if configured if self.log_summaries: logger.debug(f"Batch {batch_id}: [Stage 1] Video {i} summary preview:\n{summary[:300]}...") except Exception as e: logger.error(f"Batch {batch_id}: [Stage 1] Failed to process {video_info['filename']}: {e}") summaries.append(f"[Error processing {video_info['filename']}: {str(e)}]") stage1_time = time.time() - stage1_start logger.info(f"Batch {batch_id}: [Stage 1] Complete - {len(summaries)} summaries in {stage1_time:.2f}s") # Log traceability logger.info(f"Batch {batch_id}: [Traceability] Video-to-summary mapping:") for i, video_info in enumerate(video_infos, 1): logger.info(f"Batch {batch_id}: - Video {i}: {video_info['filename']} → Summary {i}") # Stage 2: Synthesize all summaries stage2_start = time.time() logger.info(f"Batch {batch_id}: [Stage 2] Synthesizing {len(summaries)} summaries") chunk_metadata = [{'video_name': v['filename'], 'video_idx': i} for i, v in enumerate(video_infos)] final_content = self._synthesize_final_result( summaries=summaries, chunk_metadata=chunk_metadata, original_prompt=prompt, user_email=user_email ) stage2_time = time.time() - stage2_start total_time = stage1_time + stage2_time # Log performance metrics logger.info(f"Batch {batch_id}: [Metrics] Stage 1: {stage1_time:.2f}s, Stage 2: {stage2_time:.2f}s, Total: {total_time:.2f}s") logger.info(f"Batch {batch_id}: [Metrics] Avg time per video: {stage1_time/len(video_infos):.2f}s") return { 'content': final_content, 'total_chunks': len(video_infos), 'synthesis_stages': 2 } def _process_batch_with_chunking(self, video_infos: List[Dict], prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]: """ Split batch videos into 54-min chunks and process with two-stage synthesis. Treats all videos as one continuous sequence. """ logger.info(f"Batch {batch_id}: Starting chunked processing") all_chunks = [] chunk_metadata = [] # Step 1: Split all videos into chunks for video_idx, video_info in enumerate(video_infos): video_path = video_info['path'] filename = video_info['filename'] duration = video_info['duration'] if duration > self.video_splitter.chunk_duration_seconds: # Split long video logger.info(f"Batch {batch_id}: Splitting {filename} ({duration/60:.1f} min)") try: chunks = self.video_splitter.split_video(video_path) for chunk_idx, chunk_path in enumerate(chunks): all_chunks.append(chunk_path) chunk_metadata.append({ 'video_idx': video_idx, 'video_name': filename, 'chunk_idx': chunk_idx, 'is_split': True }) except Exception as e: logger.error(f"Batch {batch_id}: Failed to split {filename}: {e}") raise else: # Use whole video as one chunk all_chunks.append(video_path) chunk_metadata.append({ 'video_idx': video_idx, 'video_name': filename, 'chunk_idx': 0, 'is_split': False }) logger.info(f"Batch {batch_id}: Split into {len(all_chunks)} total chunks") # Step 2: Process chunks with two-stage synthesis try: result = self._process_chunks_two_stage( all_chunks, chunk_metadata, prompt, user_email, batch_id ) finally: # Step 3: Cleanup split chunks (not original videos) for chunk_path, metadata in zip(all_chunks, chunk_metadata): if metadata['is_split']: try: if os.path.exists(chunk_path): os.remove(chunk_path) logger.info(f"Batch {batch_id}: Cleaned up chunk: {os.path.basename(chunk_path)}") except Exception as e: logger.warning(f"Batch {batch_id}: Cleanup failed for {chunk_path}: {e}") return result def _process_chunks_two_stage(self, chunk_paths: List[str], chunk_metadata: List[Dict], prompt: str, user_email: str, batch_id: str) -> Dict[str, Any]: """ Two-stage synthesis: Stage 1: Each chunk → concise summary Stage 2: All summaries → final unified result """ logger.info(f"Batch {batch_id}: [Stage 1] Generating summaries for {len(chunk_paths)} chunks") stage1_start = time.time() chunk_summaries = [] if self.max_parallel_chunks > 1: # Parallel processing logger.info(f"Batch {batch_id}: [Stage 1] Using parallel processing with {self.max_parallel_chunks} workers") with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor: futures = [] for i, (chunk_path, metadata) in enumerate(zip(chunk_paths, chunk_metadata)): summary_prompt = self._create_chunk_summary_prompt( original_prompt=prompt, chunk_number=i + 1, total_chunks=len(chunk_paths), video_name=metadata['video_name'] ) # Log prompt if configured if self.log_prompts: logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for chunk {i+1} ({metadata['video_name']}):\n{summary_prompt[:300]}...") future = executor.submit( self._process_single_chunk, (i, chunk_path, summary_prompt, len(chunk_paths), user_email) ) futures.append(future) # Collect results completed_count = 0 for future in as_completed(futures): try: chunk_idx, result = future.result() # Extract content from result dict if result.get('success'): summary = result.get('content', '') else: summary = f"[Error: {result.get('message', 'Unknown error')}]" chunk_summaries.append((chunk_idx, summary)) completed_count += 1 logger.info(f"Batch {batch_id}: [Stage 1] Chunk {chunk_idx + 1}/{len(chunk_paths)} complete ({completed_count}/{len(chunk_paths)} total)") # Log summary preview if configured if self.log_summaries and isinstance(summary, str) and not summary.startswith('[Error'): logger.debug(f"Batch {batch_id}: [Stage 1] Chunk {chunk_idx + 1} summary preview:\n{summary[:300]}...") except Exception as e: logger.error(f"Batch {batch_id}: [Stage 1] Failed to process chunk: {e}") chunk_summaries.append((len(chunk_summaries), f"[Error: {str(e)}]")) else: # Sequential processing logger.info(f"Batch {batch_id}: [Stage 1] Using sequential processing") for i, (chunk_path, metadata) in enumerate(zip(chunk_paths, chunk_metadata)): logger.info(f"Batch {batch_id}: [Stage 1] Processing chunk {i+1}/{len(chunk_paths)} from {metadata['video_name']}") summary_prompt = self._create_chunk_summary_prompt( original_prompt=prompt, chunk_number=i + 1, total_chunks=len(chunk_paths), video_name=metadata['video_name'] ) # Log prompt if configured if self.log_prompts: logger.debug(f"Batch {batch_id}: [Stage 1] Prompt for chunk {i+1}:\n{summary_prompt[:300]}...") try: chunk_start = time.time() result = self.process_video(chunk_path, summary_prompt, user_email) chunk_time = time.time() - chunk_start summary = result.get('content', '') chunk_summaries.append((i, summary)) logger.info(f"Batch {batch_id}: [Stage 1] Chunk {i + 1} complete: {len(summary)} chars in {chunk_time:.2f}s") # Log summary preview if configured if self.log_summaries: logger.debug(f"Batch {batch_id}: [Stage 1] Chunk {i+1} summary preview:\n{summary[:300]}...") except Exception as e: logger.error(f"Batch {batch_id}: [Stage 1] Failed to process chunk {i + 1}: {e}") chunk_summaries.append((i, f"[Error: {str(e)}]")) # Sort by chunk index chunk_summaries.sort(key=lambda x: x[0]) summaries = [s[1] for s in chunk_summaries] stage1_time = time.time() - stage1_start logger.info(f"Batch {batch_id}: [Stage 1] Complete - {len(summaries)} summaries generated in {stage1_time:.2f}s") # Log traceability logger.info(f"Batch {batch_id}: [Traceability] Chunk-to-summary mapping:") for i, metadata in enumerate(chunk_metadata): chunk_info = f"chunk {metadata.get('chunk_idx', 0)+1}" if metadata.get('is_split') else "whole video" logger.info(f"Batch {batch_id}: - Chunk {i+1}: {metadata['video_name']} ({chunk_info}) → Summary {i+1}") # Stage 2: Synthesize all summaries stage2_start = time.time() logger.info(f"Batch {batch_id}: [Stage 2] Synthesizing {len(summaries)} summaries into final result") final_content = self._synthesize_final_result( summaries=summaries, chunk_metadata=chunk_metadata, original_prompt=prompt, user_email=user_email ) stage2_time = time.time() - stage2_start total_time = stage1_time + stage2_time # Log performance metrics logger.info(f"Batch {batch_id}: [Metrics] Stage 1: {stage1_time:.2f}s, Stage 2: {stage2_time:.2f}s, Total: {total_time:.2f}s") logger.info(f"Batch {batch_id}: [Metrics] Avg time per chunk: {stage1_time/len(chunk_paths):.2f}s") logger.info(f"Batch {batch_id}: [Metrics] Total API calls: {len(chunk_paths) + 1}") # +1 for synthesis return { 'content': final_content, 'total_chunks': len(chunk_paths), 'synthesis_stages': 2 } def _create_chunk_summary_prompt(self, original_prompt: str, chunk_number: int, total_chunks: int, video_name: str) -> str: """ Create prompt for individual video/chunk in batch processing. User prompt is the primary instruction with minimal synthesis hint. """ prompt = f"""You are analyzing video {chunk_number} of {total_chunks}: "{video_name}". Your response will be synthesized with responses from other videos to create a unified final result. {original_prompt}""" return prompt def _synthesize_final_result(self, summaries: List[str], chunk_metadata: List[Dict], original_prompt: str, user_email: str) -> str: """ Synthesize all chunk summaries into single cohesive result using Gemini. Uses a universal template that makes the user's prompt the primary instruction. """ # Extract video names for context video_names = list(set(m['video_name'] for m in chunk_metadata)) num_videos = len(video_names) # Prepare summaries text summaries_text = "" total_summary_chars = 0 for i, summary in enumerate(summaries, 1): video_name = chunk_metadata[i-1]['video_name'] summaries_text += f"\n\n--- Summary {i} (from {video_name}) ---\n{summary.strip()}\n" total_summary_chars += len(summary) logger.info(f"[Stage 2] Combined summaries: {len(summaries)} summaries, {total_summary_chars} total chars") # Create universal synthesis prompt if num_videos > 1: video_context = f"{num_videos} videos: {', '.join(video_names)}" else: video_context = f"video: {video_names[0]}" synthesis_prompt = f"""You are creating a final unified response by combining multiple segment summaries from {video_context}. Here are the segment summaries: {summaries_text} Original user request: {original_prompt} Your task: Create ONE cohesive response that fulfills the user's request. Integrate information from all summaries naturally, without mentioning segments or chunks. """ # Log synthesis prompt if configured if self.log_prompts: logger.debug(f"[Stage 2] Synthesis prompt preview:\n{synthesis_prompt[:500]}...") # Send to Gemini for final synthesis logger.info(f"[Stage 2] Sending synthesis request to Gemini API (model: {self.synthesis_model})") synthesis_start = time.time() try: response = self._make_api_request_with_retry( model=self.synthesis_model, contents=[{"text": synthesis_prompt}], context="[Batch Synthesis]" ) synthesis_time = time.time() - synthesis_start synthesized_content = "" if response.parts: for part in response.parts: if hasattr(part, 'text'): synthesized_content += part.text if not synthesized_content: logger.warning("[Stage 2] Synthesis returned empty, falling back to concatenation") return self._fallback_concatenation(summaries, chunk_metadata) logger.info(f"[Stage 2] Synthesis complete: {len(synthesized_content)} chars in {synthesis_time:.2f}s") # Log synthesis result preview if configured if self.log_summaries: logger.debug(f"[Stage 2] Synthesized result preview:\n{synthesized_content[:500]}...") return synthesized_content except Exception as e: logger.error(f"[Stage 2] Synthesis failed: {str(e)}, using fallback") return self._fallback_concatenation(summaries, chunk_metadata) def _fallback_concatenation(self, summaries: List[str], chunk_metadata: List[Dict]) -> str: """ Fallback method when AI synthesis fails. Simple concatenation with section markers. """ logger.info("Using fallback concatenation method") result = "# Combined Analysis\n\n" result += "*Note: This is a combined analysis from multiple video segments.*\n\n" for i, summary in enumerate(summaries, 1): video_name = chunk_metadata[i-1]['video_name'] result += f"## Segment {i} - {video_name}\n\n" result += summary.strip() + "\n\n" return result def _fallback_concatenation_single_video(self, responses: List[str], num_chunks: int, video_name: str = "") -> str: """ Fallback method when AI synthesis fails for single video chunks. Simple concatenation with minimal formatting. Args: responses: List of chunk responses num_chunks: Number of chunks video_name: Video filename Returns: Concatenated response """ logger.info("Using fallback concatenation for single video") result = f"# Complete Video Analysis" if video_name: result += f": {video_name}" result += "\n\n" result += f"*Note: This video was analyzed in {num_chunks} segments.*\n\n" for i, response in enumerate(responses, 1): result += f"## Segment {i} of {num_chunks}\n\n" result += response.strip() + "\n\n" return result