video-query/backend/video_processor.py
2025-09-18 14:25:24 -05:00

224 lines
No EOL
11 KiB
Python

import google.generativeai as genai
import mimetypes
import time
import os
import logging
import requests
import json
import datetime
from typing import Dict, Any, Optional
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
logger = logging.getLogger('video_query')
class VideoProcessor:
"""
Class to handle video uploads and processing with Gemini API.
"""
# Default prompts for different modes
PROMPTS = {
"meeting_summary": "Generate a detailed summary of the meeting in the attached video recording, including discussion points and action items with owners",
"process_documentation": "Generate detailed process documentation suitable for reference or training based on the process illustrated in the attached video recording. Write the documentation so that a new user will be able to follow step by step and accomplish the task illustrated in the video",
"documentation_with_charts": "Analyze this video to create comprehensive process documentation with workflow diagrams for a knowledge base article. Follow these requirements exactly:\n\n1. CONTENT REQUIREMENTS:\n - Provide a detailed step-by-step explanation of the process shown\n - Be extremely verbose and thorough - include all relevant details, context, and nuances\n - Structure as a complete knowledge base article with clear sections\n - Include overview, detailed steps, tips, and troubleshooting where applicable\n\n2. MERMAID DIAGRAM REQUIREMENTS:\n - Create workflow diagrams using valid Mermaid syntax where helpful\n - CRITICAL: Use only simple alphanumeric text in node descriptions and labels\n - CRITICAL: No special characters like quotes brackets colons semicolons or symbols in node text\n - CRITICAL: Use underscores instead of spaces in node IDs and labels\n - CRITICAL: Keep all text simple to avoid syntax errors\n - Example format: Start_Process --> Complete_Task --> End_Process\n - Use flowchart format: graph TD or graph LR\n\n3. OUTPUT STRUCTURE:\n - Title and overview section\n - Prerequisites section if applicable\n - Detailed step-by-step process\n - Mermaid workflow diagram(s) showing the process flow\n - Tips and best practices\n - Troubleshooting common issues\n\nEnsure all Mermaid diagrams use simple text without special characters to prevent parsing errors.",
"custom": "" # Custom prompt will be provided by the user
}
# Maximum video duration in minutes (Gemini limitation)
MAX_VIDEO_DURATION = 55
# Threshold for chunked upload (10MB)
CHUNKED_UPLOAD_THRESHOLD = 10 * 1024 * 1024
# Webhook URL for tracking usage
WEBHOOK_URL = "https://hook.us1.make.celonis.com/8ri1h8b2he4wudp2jku69mgcxumzxf3v"
def __init__(self, api_key: Optional[str] = None):
"""Initialize with API key from environment variable or direct setting"""
self.api_key = api_key or os.getenv("GOOGLE_API_KEY")
if not self.api_key:
logger.error("API key not provided")
raise ValueError("API key not provided - set GOOGLE_API_KEY environment variable or pass when initializing")
# Configure the Gemini client
logger.info("Initializing Gemini API client")
genai.configure(api_key=self.api_key)
logger.info("Gemini API client initialized successfully")
def send_usage_webhook(self, user_email: str, prompt: str) -> None:
"""
Send usage data to webhook for tracking purposes
Args:
user_email: Email of the user who processed the video
prompt: The prompt used for processing
"""
try:
current_datetime = datetime.datetime.now().isoformat()
webhook_data = {
"tool": "VIDEOQUERY",
"date": current_datetime,
"user": user_email,
"model": "GEMINI",
"settings": "no settings",
"subTool": "no subTool",
"prompt": prompt,
"negativePrompt": "no NEGATIVE_PROMPT",
"image": "no image"
}
logger.info(f"Sending usage data to webhook for user: {user_email}")
response = requests.post(
self.WEBHOOK_URL,
headers={"Content-Type": "application/json"},
data=json.dumps(webhook_data),
timeout=10 # 10 second timeout
)
if response.status_code == 200:
logger.info("Successfully sent usage data to webhook")
else:
logger.warning(f"Webhook request failed with status code: {response.status_code}")
logger.warning(f"Response: {response.text}")
except Exception as e:
logger.error(f"Error sending usage data to webhook: {str(e)}")
# Don't raise the exception - webhook failure shouldn't block the main flow
def process_video(self, video_path: str, prompt: str, user_email: str = "anonymous") -> Dict[str, Any]:
"""
Process a video with the given prompt using Gemini API
Args:
video_path: Path to the video file
prompt: Text prompt to use for video analysis
user_email: Email of the user processing the video (for usage tracking)
Returns:
Dictionary with processing result or error
"""
result = {
"success": False,
"message": "",
"content": ""
}
logger.info(f"Processing video: {video_path}")
logger.info(f"Prompt: {prompt[:100]}..." if len(prompt) > 100 else f"Prompt: {prompt}")
if not os.path.exists(video_path):
error_msg = f"Video file not found at '{video_path}'"
logger.error(error_msg)
result["message"] = error_msg
return result
try:
# Get file size
file_size = os.path.getsize(video_path)
logger.info(f"File size: {file_size / (1024 * 1024):.2f} MB")
# Upload the video file
logger.info("Uploading video to Gemini API...")
# Log the file size in relation to our threshold (for informational purposes only)
if file_size > self.CHUNKED_UPLOAD_THRESHOLD:
logger.info(f"File size exceeds {self.CHUNKED_UPLOAD_THRESHOLD/(1024*1024):.2f} MB threshold")
else:
logger.info(f"File size below {self.CHUNKED_UPLOAD_THRESHOLD/(1024*1024):.2f} MB threshold")
# All uploads use the same method (our chunking happens in the frontend)
# Google API may handle large files internally in their own way
video_file = genai.upload_file(
path=video_path,
display_name=os.path.basename(video_path)
)
logger.info(f"Upload successful. File URI: {video_file.uri}")
logger.info(f"Initial file state: {video_file.state.name}")
# Wait for processing if needed
processing_wait_count = 0
while video_file.state.name == "PROCESSING":
processing_wait_count += 1
logger.info(f"File is still processing. Wait count: {processing_wait_count}")
time.sleep(2) # Wait for 2 seconds before checking again
video_file = genai.get_file(name=video_file.name) # Re-fetch file state
logger.info(f"Updated file state: {video_file.state.name}")
if video_file.state.name != "ACTIVE":
error_msg = f"Error: File did not become active. Current state: {video_file.state.name}"
logger.error(error_msg)
result["message"] = error_msg
return result
# Determine MIME type for the video
mime_type, _ = mimetypes.guess_type(video_path)
if not mime_type:
logger.info(f"Could not determine MIME type, using default: video/mp4")
mime_type = "video/mp4" # Fallback
else:
logger.info(f"MIME type: {mime_type}")
# Create the content parts for the prompt
prompt_parts = [
{"text": prompt},
{"file_data": {
"file_uri": video_file.uri,
"mime_type": mime_type
}}
]
# Initialize the model and generate content
logger.info("Initializing GenerativeModel...")
model = genai.GenerativeModel(model_name="gemini-2.5-pro")
logger.info("Sending prompt to Gemini for processing...")
response = model.generate_content(prompt_parts)
logger.info("Received response from Gemini")
# Extract the response content
content = ""
if response.parts:
logger.info(f"Response has {len(response.parts)} parts")
for i, part in enumerate(response.parts):
if hasattr(part, 'text'):
part_text = part.text
content_preview = part_text[:100] + '...' if len(part_text) > 100 else part_text
logger.info(f"Part {i} (text): {content_preview}")
content += part_text
else:
logger.info(f"Part {i} (no text): {type(part)}")
else:
logger.warning("No parts in response")
if hasattr(response, 'prompt_feedback') and response.prompt_feedback:
logger.warning(f"Prompt feedback: {response.prompt_feedback}")
# Set success result
result["success"] = True
result["content"] = content
logger.info(f"Processed result with {len(content)} characters")
# Send usage data to webhook for tracking
self.send_usage_webhook(user_email, prompt)
# Attempt to delete the file from Gemini storage
try:
logger.info(f"Deleting file from Gemini storage: {video_file.name}")
genai.delete_file(name=video_file.name)
logger.info("File deleted successfully from Gemini storage")
except Exception as del_err:
logger.warning(f"Could not delete file from Gemini storage: {str(del_err)}")
return result
except Exception as e:
import traceback
error_details = traceback.format_exc()
logger.error(f"Error processing video: {str(e)}")
logger.error(error_details)
result["message"] = f"Error processing video: {str(e)}"
result["error_details"] = error_details
return result