from google import genai import mimetypes import time import os import logging import requests import json import datetime import base64 from typing import Dict, Any, Optional, List, Tuple from dotenv import load_dotenv from video_splitter import VideoSplitter from concurrent.futures import ThreadPoolExecutor, as_completed import threading # Load environment variables from .env file load_dotenv() logger = logging.getLogger('video_query') class VideoProcessor: """ Class to handle video uploads and processing with Gemini API. """ # Default prompts for different modes PROMPTS = { "meeting_summary": "Generate a detailed summary of the meeting in the attached video recording, including discussion points and action items with owners", "process_documentation": "Generate detailed process documentation suitable for reference or training based on the process illustrated in the attached video recording. Write the documentation so that a new user will be able to follow step by step and accomplish the task illustrated in the video", "documentation_with_charts": "Analyze this video to create comprehensive process documentation with workflow diagrams for a knowledge base article. Follow these requirements exactly:\n\n1. CONTENT REQUIREMENTS:\n - Provide a detailed step-by-step explanation of the process shown\n - Be extremely verbose and thorough - include all relevant details, context, and nuances\n - Structure as a complete knowledge base article with clear sections\n - Include overview, detailed steps, tips, and troubleshooting where applicable\n\n2. MERMAID DIAGRAM REQUIREMENTS:\n - Create workflow diagrams using valid Mermaid syntax where helpful\n - CRITICAL: Use only simple alphanumeric text in node descriptions and labels\n - CRITICAL: No special characters like quotes brackets colons semicolons or symbols in node text\n - CRITICAL: Use underscores instead of spaces in node IDs and labels\n - CRITICAL: Keep all text simple to avoid syntax errors\n - Example format: Start_Process --> Complete_Task --> End_Process\n - Use flowchart format: graph TD or graph LR\n\n3. OUTPUT STRUCTURE:\n - Title and overview section\n - Prerequisites section if applicable\n - Detailed step-by-step process\n - Mermaid workflow diagram(s) showing the process flow\n - Tips and best practices\n - Troubleshooting common issues\n\nEnsure all Mermaid diagrams use simple text without special characters to prevent parsing errors.", "custom": "" # Custom prompt will be provided by the user } # Maximum video duration in minutes (Gemini limitation) MAX_VIDEO_DURATION = 55 # Threshold for chunked upload (10MB) CHUNKED_UPLOAD_THRESHOLD = 10 * 1024 * 1024 # Webhook URL for tracking usage WEBHOOK_URL = "https://hook.us1.make.celonis.com/8ri1h8b2he4wudp2jku69mgcxumzxf3v" # Parallel processing configuration # Default max workers for parallel chunk processing # Free tier: 5 RPM (use 3-4 workers to be safe) # Paid tier: 150 RPM (can use more workers) DEFAULT_MAX_WORKERS = 4 # Conservative default for free tier def __init__(self, api_key: Optional[str] = None, max_parallel_chunks: int = None): """ Initialize with API key from environment variable or direct setting Args: api_key: Google API key for Gemini max_parallel_chunks: Maximum number of chunks to process in parallel (default: 4, recommended 3-4 for free tier, up to 10+ for paid tier) """ self.api_key = api_key or os.getenv("GOOGLE_API_KEY") if not self.api_key: logger.error("API key not provided") raise ValueError("API key not provided - set GOOGLE_API_KEY environment variable or pass when initializing") # Initialize the Gemini client logger.info("Initializing Gemini API client") self.client = genai.Client(api_key=self.api_key) logger.info("Gemini API client initialized successfully") # Set parallel processing configuration self.max_parallel_chunks = max_parallel_chunks or self.DEFAULT_MAX_WORKERS logger.info(f"Parallel processing enabled with max {self.max_parallel_chunks} concurrent chunks") # Initialize video splitter self.video_splitter = VideoSplitter() # Thread lock for rate limiting self._rate_limit_lock = threading.Lock() def send_usage_webhook(self, user_email: str, prompt: str) -> None: """ Send usage data to webhook for tracking purposes Args: user_email: Email of the user who processed the video prompt: The prompt used for processing """ try: current_datetime = datetime.datetime.now().isoformat() webhook_data = { "tool": "VIDEOQUERY", "date": current_datetime, "user": user_email, "model": "GEMINI", "settings": "no settings", "subTool": "no subTool", "prompt": prompt, "negativePrompt": "no NEGATIVE_PROMPT", "image": "no image" } logger.info(f"Sending usage data to webhook for user: {user_email}") response = requests.post( self.WEBHOOK_URL, headers={"Content-Type": "application/json"}, data=json.dumps(webhook_data), timeout=10 # 10 second timeout ) if response.status_code == 200: logger.info("Successfully sent usage data to webhook") else: logger.warning(f"Webhook request failed with status code: {response.status_code}") logger.warning(f"Response: {response.text}") except Exception as e: logger.error(f"Error sending usage data to webhook: {str(e)}") # Don't raise the exception - webhook failure shouldn't block the main flow def process_video(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]: """ Process a video with the given prompt using Gemini API Args: video_path: Path to the video file prompt: Text prompt to use for video analysis user_email: Email of the user processing the video (for usage tracking) Returns: Dictionary with processing result or error """ start_time = time.time() result = { "success": False, "message": "", "content": "", "processing_time_seconds": 0 } logger.info(f"Processing video: {video_path}") logger.info(f"Prompt: {prompt[:100]}..." if len(prompt) > 100 else f"Prompt: {prompt}") if not os.path.exists(video_path): error_msg = f"Video file not found at '{video_path}'" logger.error(error_msg) result["message"] = error_msg return result try: # Get file size file_size = os.path.getsize(video_path) file_size_mb = file_size / (1024 * 1024) logger.info(f"File size: {file_size_mb:.2f} MB") # Determine MIME type for the video mime_type, _ = mimetypes.guess_type(video_path) if not mime_type: logger.info(f"Could not determine MIME type, using default: video/mp4") mime_type = "video/mp4" # Fallback else: logger.info(f"MIME type: {mime_type}") # Use different approach based on file size # Small files (< 10MB): use inline base64 data (faster, no upload wait) # Large files (>= 10MB): use file upload API (handles larger files) # Note: Base64 adds ~37% overhead, so 10MB file = ~13.7MB base64 SIZE_THRESHOLD_MB = 10 uploaded_file = None if file_size_mb < SIZE_THRESHOLD_MB: # Small file: Use base64 encoding for inline data logger.info(f"File < {SIZE_THRESHOLD_MB}MB, using inline base64 data") with open(video_path, "rb") as video_file_obj: video_data = video_file_obj.read() video_base64 = base64.b64encode(video_data).decode('utf-8') logger.info(f"Base64 encoding complete. Size: {len(video_base64)} characters") # Create the content parts using inline data prompt_parts = [ {"text": prompt}, {"inline_data": { "mime_type": mime_type, "data": video_base64 }} ] else: # Large file: Use file upload API logger.info(f"File >= {SIZE_THRESHOLD_MB}MB, using file upload API") upload_start = time.time() uploaded_file = self.client.files.upload( file=video_path ) logger.info(f"Upload complete in {time.time() - upload_start:.1f}s. File URI: {uploaded_file.uri}") logger.info(f"Initial file state: {uploaded_file.state}") # Wait for file to be processed while uploaded_file.state == "PROCESSING": logger.info("File is still processing, waiting...") time.sleep(2) uploaded_file = self.client.files.get(name=uploaded_file.name) logger.info(f"Updated file state: {uploaded_file.state}") if uploaded_file.state != "ACTIVE": error_msg = f"File upload failed. State: {uploaded_file.state}" logger.error(error_msg) result["message"] = error_msg return result logger.info("File is ACTIVE and ready for processing") # Create content parts using file reference prompt_parts = [ {"text": prompt}, {"file_data": { "file_uri": uploaded_file.uri, "mime_type": mime_type }} ] # Rate limiting: Wait to avoid hitting API limits # Free tier: 5 RPM, so minimum 12 seconds between requests with self._rate_limit_lock: time.sleep(2) # 2 second delay between API calls # Use the client to generate content with the new SDK API logger.info("Sending prompt to Gemini for processing...") api_start = time.time() response = self.client.models.generate_content( model="gemini-2.5-pro", contents=prompt_parts ) api_time = time.time() - api_start logger.info(f"Received response from Gemini (API call took {api_time:.1f}s)") # Extract the response content content = "" if response.parts: logger.info(f"Response has {len(response.parts)} parts") for i, part in enumerate(response.parts): if hasattr(part, 'text'): part_text = part.text content_preview = part_text[:100] + '...' if len(part_text) > 100 else part_text logger.info(f"Part {i} (text): {content_preview}") content += part_text else: logger.info(f"Part {i} (no text): {type(part)}") else: logger.warning("No parts in response") if hasattr(response, 'prompt_feedback') and response.prompt_feedback: logger.warning(f"Prompt feedback: {response.prompt_feedback}") # Set success result result["success"] = True result["content"] = content result["processing_time_seconds"] = round(time.time() - start_time, 2) logger.info(f"Processed result with {len(content)} characters") logger.info(f"Total processing time: {result['processing_time_seconds']}s") # Send usage data to webhook for tracking self.send_usage_webhook(user_email, prompt) # Clean up uploaded file if it was used if uploaded_file: try: logger.info(f"Deleting uploaded file: {uploaded_file.name}") self.client.files.delete(name=uploaded_file.name) logger.info("File deleted successfully from Gemini storage") except Exception as del_err: logger.warning(f"Could not delete file from Gemini storage: {str(del_err)}") return result except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error processing video: {str(e)}") logger.error(error_details) result["message"] = f"Error processing video: {str(e)}" result["error_details"] = error_details return result def combine_chunk_responses(self, responses: List[str], prompt: str, num_chunks: int) -> str: """ Intelligently combine responses from multiple video chunks. Args: responses: List of response texts from each chunk prompt: Original prompt used for processing num_chunks: Total number of chunks processed Returns: Combined response text """ logger.info(f"Combining {len(responses)} chunk responses") # Detect the prompt type to determine combination strategy prompt_lower = prompt.lower() is_meeting_summary = "meeting" in prompt_lower or "summary" in prompt_lower is_documentation = "documentation" in prompt_lower or "process" in prompt_lower is_with_charts = "mermaid" in prompt_lower or "diagram" in prompt_lower or "chart" in prompt_lower if is_with_charts: return self._combine_with_charts(responses, num_chunks) elif is_meeting_summary: return self._combine_meeting_summary(responses, num_chunks) elif is_documentation: return self._combine_documentation(responses, num_chunks) else: return self._combine_generic(responses, num_chunks) def _combine_generic(self, responses: List[str], num_chunks: int) -> str: """Generic combination: simple sequential joining with section headers.""" logger.info("Using generic combination strategy") combined = [] combined.append(f"# Complete Video Analysis\n") combined.append(f"*This video was processed in {num_chunks} parts due to its length.*\n") for i, response in enumerate(responses, 1): combined.append(f"\n## Part {i} of {num_chunks}\n") combined.append(response.strip()) return "\n".join(combined) def _combine_meeting_summary(self, responses: List[str], num_chunks: int) -> str: """Combination strategy optimized for meeting summaries.""" logger.info("Using meeting summary combination strategy") # First, try to synthesize the segments into a unified summary try: logger.info("Attempting to synthesize segments into unified meeting summary") synthesized = self._synthesize_meeting_segments(responses, num_chunks) if synthesized: return synthesized else: logger.warning("Synthesis failed, falling back to segment concatenation") except Exception as e: logger.warning(f"Error during synthesis: {e}, falling back to segment concatenation") # Fallback: simple concatenation with formatting combined = [] combined.append(f"# Complete Meeting Recording Summary\n") combined.append(f"*This recording was analyzed in {num_chunks} segments.*\n") combined.append(f"\n---\n") # Combine all discussion points with clear time markers for i, response in enumerate(responses, 1): time_range = self._format_time_range(i, num_chunks) combined.append(f"\n## Segment {i}: {time_range}\n") combined.append(response.strip()) combined.append(f"\n---\n") # Add consolidated note combined.append(f"\n### Notes") combined.append(f"- Review all segments above for discussion points and action items") combined.append(f"- Total recording duration: ~{num_chunks * 50} minutes") combined.append(f"- Recording was split into {num_chunks} segments for analysis") return "\n".join(combined) def _synthesize_meeting_segments(self, responses: List[str], num_chunks: int) -> Optional[str]: """ Use AI to synthesize multiple segment summaries into one unified meeting summary. Args: responses: List of segment summaries num_chunks: Number of segments Returns: Unified meeting summary or None if synthesis fails """ try: # Prepare the segments for synthesis segments_text = "" for i, response in enumerate(responses, 1): time_range = self._format_time_range(i, num_chunks) segments_text += f"\n\n### Segment {i} ({time_range}):\n{response.strip()}\n" # Create synthesis prompt synthesis_prompt = f"""You are analyzing a meeting recording that was split into {num_chunks} segments due to its length. Below are the summaries from each segment. Your task is to create ONE unified, comprehensive meeting summary that integrates all the information. SEGMENT SUMMARIES: {segments_text} Please provide a SINGLE, UNIFIED meeting summary that: 1. Combines all discussion points into one cohesive narrative (not separated by segments) 2. Consolidates all action items into one master list (removing duplicates if any) 3. Identifies main themes and outcomes across the entire meeting 4. Maintains chronological flow where relevant 5. Uses clear sections: Meeting Summary, Discussion Points, Action Items (with owners) Format the output as a professional meeting summary document. Do not reference the segments in your output - write as if this was analyzed as one continuous meeting.""" logger.info("Sending synthesis request to Gemini") synthesis_response = self.client.models.generate_content( model="gemini-2.5-pro", contents=synthesis_prompt ) if synthesis_response.parts: synthesized_content = "" for part in synthesis_response.parts: if hasattr(part, 'text'): synthesized_content += part.text if synthesized_content: logger.info("Successfully synthesized unified meeting summary") # Add header noting this was synthesized final_output = "# Meeting Summary\n\n" final_output += f"*Synthesized from {num_chunks}-segment analysis*\n\n" final_output += "---\n\n" final_output += synthesized_content return final_output logger.warning("No content in synthesis response") return None except Exception as e: logger.error(f"Error during synthesis: {str(e)}") return None def _combine_documentation(self, responses: List[str], num_chunks: int) -> str: """Combination strategy optimized for process documentation.""" logger.info("Using documentation combination strategy") combined = [] combined.append(f"# Complete Process Documentation\n") combined.append(f"*This process was documented from a {num_chunks}-part video recording.*\n") combined.append(f"\n## Overview\n") combined.append(f"This documentation covers the complete process shown in the video. " f"The content has been organized sequentially across all segments.\n") for i, response in enumerate(responses, 1): combined.append(f"\n## Section {i}: {self._format_time_range(i, num_chunks)}\n") combined.append(response.strip()) combined.append(f"\n\n---\n*End of documentation*") return "\n".join(combined) def _combine_with_charts(self, responses: List[str], num_chunks: int) -> str: """Combination strategy for documentation with Mermaid diagrams.""" logger.info("Using documentation with charts combination strategy") combined = [] combined.append(f"# Complete Process Documentation with Workflow Diagrams\n") combined.append(f"*This analysis spans {num_chunks} video segments.*\n") # First, add all text content combined.append(f"\n## Overview and Detailed Steps\n") for i, response in enumerate(responses, 1): combined.append(f"\n### Part {i}: {self._format_time_range(i, num_chunks)}\n") # Separate mermaid diagrams from text content parts = response.split("```mermaid") text_part = parts[0].strip() combined.append(text_part) # Add mermaid diagrams in a dedicated section if len(parts) > 1: for j, diagram_part in enumerate(parts[1:], 1): if "```" in diagram_part: diagram_code = diagram_part.split("```")[0] combined.append(f"\n**Workflow Diagram {i}.{j}:**\n") combined.append(f"```mermaid{diagram_code}```\n") # Add any remaining text after the diagram remaining_text = "```".join(diagram_part.split("```")[1:]).strip() if remaining_text: combined.append(remaining_text) combined.append(f"\n\n---\n*Complete documentation generated from {num_chunks}-part video analysis*") return "\n".join(combined) def _format_time_range(self, part_num: int, total_parts: int, chunk_duration: int = 50) -> str: """Format time range for a video part.""" start_min = (part_num - 1) * chunk_duration end_min = part_num * chunk_duration if part_num < total_parts else "End" if isinstance(end_min, int): return f"{start_min}-{end_min} minutes" else: return f"{start_min}+ minutes" def _process_single_chunk(self, chunk_info: Tuple[int, str, str, int, str]) -> Tuple[int, Dict[str, Any]]: """ Process a single video chunk. Used for parallel processing. Args: chunk_info: Tuple of (chunk_index, chunk_path, chunk_prompt, total_chunks, user_email) Returns: Tuple of (chunk_index, result_dict) """ chunk_index, chunk_path, chunk_prompt, total_chunks, user_email = chunk_info logger.info(f"[Parallel] Processing chunk {chunk_index + 1}/{total_chunks}: {chunk_path}") try: chunk_result = self.process_video(chunk_path, chunk_prompt, user_email) logger.info(f"[Parallel] Completed chunk {chunk_index + 1}/{total_chunks}") return (chunk_index, chunk_result) except Exception as e: logger.error(f"[Parallel] Error processing chunk {chunk_index + 1}/{total_chunks}: {str(e)}") return (chunk_index, { "success": False, "message": f"Error processing chunk {chunk_index + 1}: {str(e)}", "content": "" }) def _process_chunks_parallel(self, chunk_paths: List[str], prompt: str, user_email: str) -> List[Dict[str, Any]]: """ Process multiple video chunks in parallel using ThreadPoolExecutor. Args: chunk_paths: List of paths to video chunk files prompt: Original prompt for video analysis user_email: User email for tracking Returns: List of result dictionaries in order of chunks """ num_chunks = len(chunk_paths) logger.info(f"Starting parallel processing of {num_chunks} chunks with {self.max_parallel_chunks} workers") # Prepare chunk information for parallel processing chunk_infos = [] for i, chunk_path in enumerate(chunk_paths): chunk_prompt = self._create_chunk_prompt(prompt, i + 1, num_chunks) chunk_infos.append((i, chunk_path, chunk_prompt, num_chunks, user_email)) # Process chunks in parallel results = [None] * num_chunks # Pre-allocate results list to maintain order with ThreadPoolExecutor(max_workers=self.max_parallel_chunks) as executor: # Submit all chunks for processing future_to_chunk = { executor.submit(self._process_single_chunk, chunk_info): chunk_info[0] for chunk_info in chunk_infos } # Collect results as they complete completed = 0 for future in as_completed(future_to_chunk): chunk_index = future_to_chunk[future] try: chunk_index, result = future.result() results[chunk_index] = result completed += 1 logger.info(f"[Parallel] Progress: {completed}/{num_chunks} chunks completed") except Exception as e: logger.error(f"[Parallel] Unexpected error for chunk {chunk_index + 1}: {str(e)}") results[chunk_index] = { "success": False, "message": f"Unexpected error: {str(e)}", "content": "" } logger.info(f"[Parallel] All {num_chunks} chunks processed") return results def process_long_video(self, video_path: str, prompt: str, user_email: str = "anonymous", use_parallel: bool = True) -> Dict[str, Any]: """ Process a long video by splitting it into chunks and combining the results. Supports both parallel and sequential processing. Args: video_path: Path to the video file prompt: Text prompt to use for video analysis user_email: Email of the user processing the video (for usage tracking) use_parallel: If True, process chunks in parallel; if False, process sequentially Returns: Dictionary with processing result or error """ start_time = time.time() result = { "success": False, "message": "", "content": "", "chunks_processed": 0, "processing_mode": "parallel" if use_parallel else "sequential", "processing_time_seconds": 0 } chunk_paths = [] try: # Check if video needs splitting num_chunks, duration_minutes = self.video_splitter.get_chunk_info(video_path) if num_chunks <= 1: logger.info("Video does not need splitting, processing normally") return self.process_video(video_path, prompt, user_email) logger.info(f"Long video detected: {duration_minutes:.2f} minutes, will be split into {num_chunks} chunks") logger.info(f"Processing mode: {'PARALLEL' if use_parallel else 'SEQUENTIAL'}") # Split the video logger.info("Starting video splitting...") chunk_paths = self.video_splitter.split_video(video_path) logger.info(f"Video split into {len(chunk_paths)} chunks successfully") # Process chunks (parallel or sequential) if use_parallel: # Parallel processing using ThreadPoolExecutor chunk_results = self._process_chunks_parallel(chunk_paths, prompt, user_email) else: # Sequential processing (original logic) chunk_results = [] for i, chunk_path in enumerate(chunk_paths, 1): logger.info(f"[Sequential] Processing chunk {i}/{len(chunk_paths)}: {chunk_path}") # Modify prompt to indicate this is part of a multi-part video chunk_prompt = self._create_chunk_prompt(prompt, i, len(chunk_paths)) # Process this chunk chunk_result = self.process_video(chunk_path, chunk_prompt, user_email) chunk_results.append(chunk_result) logger.info(f"[Sequential] Completed chunk {i}/{len(chunk_paths)}") # Check for failures in any chunk chunk_responses = [] for i, chunk_result in enumerate(chunk_results, 1): if not chunk_result["success"]: error_msg = f"Failed to process chunk {i}/{len(chunk_paths)}: {chunk_result.get('message', 'Unknown error')}" logger.error(error_msg) result["message"] = error_msg result["chunks_processed"] = i - 1 return result chunk_responses.append(chunk_result["content"]) # Combine all responses logger.info("Combining responses from all chunks...") combined_content = self.combine_chunk_responses( chunk_responses, prompt, len(chunk_paths) ) result["success"] = True result["content"] = combined_content result["chunks_processed"] = len(chunk_paths) result["processing_time_seconds"] = round(time.time() - start_time, 2) result["message"] = f"Successfully processed video in {len(chunk_paths)} chunks" logger.info(f"Long video processing completed successfully: {len(chunk_paths)} chunks") logger.info(f"Total processing time: {result['processing_time_seconds']}s") return result except Exception as e: import traceback error_details = traceback.format_exc() logger.error(f"Error processing long video: {str(e)}") logger.error(error_details) result["message"] = f"Error processing long video: {str(e)}" result["error_details"] = error_details return result finally: # Always clean up chunk files if chunk_paths: logger.info("Cleaning up temporary chunk files...") self.video_splitter.cleanup_chunks(chunk_paths) def _create_chunk_prompt(self, original_prompt: str, chunk_num: int, total_chunks: int) -> str: """ Create a prompt for a video chunk that provides context about its position. Args: original_prompt: The original user prompt chunk_num: Current chunk number (1-indexed) total_chunks: Total number of chunks Returns: Modified prompt for the chunk """ # For meeting summaries, modify the prompt to focus on just summarizing what's in this segment prompt_lower = original_prompt.lower() is_meeting = "meeting" in prompt_lower if is_meeting: # For meetings, ask for a partial summary of this segment only if chunk_num == 1: context = f"[SEGMENT {chunk_num} of {total_chunks} - First 50 minutes] " context += "Provide a summary of the discussion points and any action items covered in THIS segment only. " context += "Do not try to provide a complete meeting summary - just summarize what happens in this part. " elif chunk_num == total_chunks: context = f"[SEGMENT {chunk_num} of {total_chunks} - Final segment] " context += "Provide a summary of the discussion points and any action items covered in THIS final segment only. " context += "This continues from previous segments, but only summarize what happens in this part. " else: context = f"[SEGMENT {chunk_num} of {total_chunks} - Middle segment] " context += "Provide a summary of the discussion points and any action items covered in THIS segment only. " context += "This is a middle portion of a longer recording - only summarize what happens in this part. " return context + original_prompt else: # For other types, use the original approach context = f"[PART {chunk_num} of {total_chunks}] " if chunk_num == 1: context += "This is the first segment of a longer video. " elif chunk_num == total_chunks: context += "This is the final segment continuing from previous parts. " else: context += "This is a middle segment continuing from previous parts. " return context + original_prompt def process_video_auto(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]: """ Automatically process a video, handling both short and long videos. This method detects if the video needs splitting and processes accordingly. Args: video_path: Path to the video file prompt: Text prompt to use for video analysis user_email: Email of the user processing the video (for usage tracking) Returns: Dictionary with processing result or error """ logger.info(f"Auto-processing video: {video_path}") # Check if video needs splitting if self.video_splitter.needs_splitting(video_path): logger.info("Video requires splitting, using long video processing") return self.process_long_video(video_path, prompt, user_email) else: logger.info("Video is within single-chunk limit, using standard processing") return self.process_video(video_path, prompt, user_email)