feat: Initiate presentation MCP server setup

This commit is contained in:
sudipnext 2025-08-02 09:19:42 +05:45
parent bcc3fbf099
commit 14774e26eb
32 changed files with 1660 additions and 28 deletions

View file

@ -8,4 +8,5 @@ build
.gitignore
tmp
debug
.fastembed_cache
.fastembed_cache
servers/nextjs/node_modules

View file

@ -2,7 +2,6 @@ FROM python:3.11-slim-bookworm
# Install Node.js and npm
RUN apt-get update && apt-get install -y \
nginx \
curl \
redis-server
@ -18,6 +17,8 @@ WORKDIR /app
# Set environment variables
ENV APP_DATA_DIRECTORY=/app_data
ENV TEMP_DIRECTORY=/tmp/presenton
ENV PYTHONPATH="${PYTHONPATH}:/app/servers/fastapi"
# Install ollama
RUN curl -fsSL https://ollama.com/install.sh | sh
@ -25,6 +26,7 @@ RUN curl -fsSL https://ollama.com/install.sh | sh
# Install dependencies for FastAPI
COPY servers/fastapi/requirements.txt ./
RUN pip install -r requirements.txt
RUN pip install fastmcp
# Install dependencies for Next.js
WORKDIR /app/servers/nextjs

View file

@ -20,18 +20,20 @@ RUN ls -a
# Set environment variables
ENV APP_DATA_DIRECTORY=/app_data
ENV TEMP_DIRECTORY=/tmp/presenton
ENV PYTHONPATH="${PYTHONPATH}:/app/servers/fastapi"
# Install ollama
RUN curl -fsSL https://ollama.com/install.sh | sh
RUN curl -fsSL http://ollama.com/install.sh | sh
# Install dependencies for FastAPI
COPY servers/fastapi/requirements.txt ./
RUN pip install -r requirements.txt
RUN pip install fastmcp
# Install dependencies for Next.js
WORKDIR /node_dependencies
COPY servers/nextjs/package.json servers/nextjs/package-lock.json ./
RUN npm install
RUN npm install
# Install chrome for puppeteer
RUN npx puppeteer browsers install chrome@138.0.7204.94 --install-deps
@ -45,4 +47,4 @@ COPY nginx.conf /etc/nginx/nginx.conf
EXPOSE 80
# Start the servers
CMD ["/bin/bash", "/app/docker-dev-start.sh"]
CMD ["/bin/bash", "/app/docker-dev-start.sh"]

View file

@ -30,6 +30,12 @@ http {
proxy_connect_timeout 30m;
}
location /mcp/ {
proxy_pass http://localhost:8001;
proxy_read_timeout 30m;
proxy_connect_timeout 30m;
}
location /docs {
proxy_pass http://localhost:8000/docs;
proxy_read_timeout 30m;

View file

@ -0,0 +1,27 @@
from fastapi import APIRouter, HTTPException
import aiohttp
from typing import List, Any
from services.get_layout_by_name import get_layout_by_name
from models.presentation_layout import PresentationLayoutModel
LAYOUTS_ROUTER = APIRouter(prefix="/layouts", tags=["Layouts"])
@LAYOUTS_ROUTER.get("/", summary="Get available layouts")
async def get_layouts():
url = "http://localhost:3000/api/layouts" # Adjust port if needed
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
error_text = await response.text()
raise HTTPException(
status_code=response.status,
detail=f"Failed to fetch layouts: {error_text}"
)
layouts_json = await response.json()
# Optionally, parse into a Pydantic model if you have one matching the structure
return layouts_json
@LAYOUTS_ROUTER.get("/{layout_name}", summary="Get layout details by ID")
async def get_layout_detail(layout_name: str) -> PresentationLayoutModel:
return await get_layout_by_name(layout_name)

View file

@ -0,0 +1 @@
# This file marks the mcp directory as a Python package.

View file

@ -0,0 +1,13 @@
from fastmcp import FastMCP
from app_mcp.tools import register_tools
from app_mcp.services.workflow_orchestrator import WorkflowOrchestrator
def create_mcp_server():
mcp = FastMCP("PresentonMCP")
orchestrator = WorkflowOrchestrator()
register_tools(mcp, orchestrator)
return mcp
uvicorn_config = {
"reload": True,
}

View file

@ -0,0 +1,143 @@
from app_mcp.services.state_machine.states import PresentationState
TRANSITIONS = {
PresentationState.INIT: {
PresentationState.FILES_UPLOADED,
PresentationState.OUTLINE_REQUESTED
},
# Upload and summary flow
PresentationState.FILES_UPLOADED: {
PresentationState.SUMMARY_GENERATED,
PresentationState.UPLOAD_FAILED
},
PresentationState.SUMMARY_GENERATED: {
PresentationState.OUTLINE_REQUESTED,
PresentationState.SUMMARY_FAILED
},
# Outline generation flow
PresentationState.OUTLINE_REQUESTED: {
PresentationState.OUTLINE_GENERATED,
PresentationState.OUTLINE_FAILED
},
PresentationState.OUTLINE_GENERATED: {
PresentationState.OUTLINE_APPROVED,
PresentationState.OUTLINE_REQUESTED,
PresentationState.OUTLINE_FAILED
},
PresentationState.OUTLINE_APPROVED: {
PresentationState.LAYOUT_REQUESTED
},
# Layout selection flow
PresentationState.LAYOUT_REQUESTED: {
PresentationState.LAYOUT_SELECTED
},
PresentationState.LAYOUT_SELECTED: {
PresentationState.GENERATION_IN_PROGRESS,
PresentationState.LAYOUT_REQUESTED
},
# Presentation generation flow
PresentationState.GENERATION_IN_PROGRESS: {
PresentationState.PRESENTATION_READY,
PresentationState.GENERATION_FAILED
},
PresentationState.PRESENTATION_READY: {
PresentationState.EXPORT_REQUESTED,
PresentationState.EDIT_REQUESTED,
PresentationState.OUTLINE_REQUESTED
},
# Export flow
PresentationState.EXPORT_REQUESTED: {
PresentationState.EXPORT_IN_PROGRESS
},
PresentationState.EXPORT_IN_PROGRESS: {
PresentationState.EXPORT_COMPLETE,
PresentationState.EXPORT_FAILED
},
PresentationState.EXPORT_COMPLETE: {
PresentationState.EDIT_REQUESTED,
PresentationState.EXPORT_REQUESTED,
PresentationState.INIT
},
# Edit and revision flow
PresentationState.EDIT_REQUESTED: {
PresentationState.TEMPLATE_EDITING
},
PresentationState.TEMPLATE_EDITING: {
PresentationState.PRESENTATION_READY,
PresentationState.EDIT_FAILED
},
# Error recovery transitions
PresentationState.UPLOAD_FAILED: {
PresentationState.INIT,
PresentationState.FILES_UPLOADED
},
PresentationState.SUMMARY_FAILED: {
PresentationState.FILES_UPLOADED,
PresentationState.OUTLINE_REQUESTED
},
PresentationState.OUTLINE_FAILED: {
PresentationState.OUTLINE_REQUESTED,
PresentationState.INIT
},
PresentationState.GENERATION_FAILED: {
PresentationState.LAYOUT_SELECTED,
PresentationState.OUTLINE_APPROVED
},
PresentationState.EXPORT_FAILED: {
PresentationState.EXPORT_REQUESTED,
PresentationState.PRESENTATION_READY
},
PresentationState.EDIT_FAILED: {
PresentationState.EDIT_REQUESTED,
PresentationState.PRESENTATION_READY
}
}
SUGGESTIONS = {
PresentationState.INIT: "Upload files or start with outline generation",
PresentationState.FILES_UPLOADED: "Generate summary from uploaded files",
PresentationState.SUMMARY_GENERATED: "Generate presentation outline",
PresentationState.OUTLINE_GENERATED: "Review and approve outline, or regenerate",
PresentationState.OUTLINE_APPROVED: "Select presentation layout",
PresentationState.LAYOUT_SELECTED: "Generate presentation",
PresentationState.PRESENTATION_READY: "Export presentation or request edits",
PresentationState.EXPORT_REQUESTED: "Choose export format and generate",
PresentationState.EXPORT_COMPLETE: "Download presentation or start new one",
PresentationState.EDIT_REQUESTED: "Make template-based edits",
}
PROGRESS_WEIGHTS = {
PresentationState.INIT: 0,
PresentationState.FILES_UPLOADED: 10,
PresentationState.SUMMARY_GENERATED: 20,
PresentationState.OUTLINE_REQUESTED: 25,
PresentationState.OUTLINE_GENERATED: 35,
PresentationState.OUTLINE_APPROVED: 40,
PresentationState.LAYOUT_REQUESTED: 45,
PresentationState.LAYOUT_SELECTED: 50,
PresentationState.GENERATION_IN_PROGRESS: 70,
PresentationState.PRESENTATION_READY: 85,
PresentationState.EXPORT_REQUESTED: 90,
PresentationState.EXPORT_IN_PROGRESS: 95,
PresentationState.EXPORT_COMPLETE: 100,
PresentationState.TEMPLATE_EDITING: 60,
}
ERROR_STATES = {
PresentationState.UPLOAD_FAILED,
PresentationState.SUMMARY_FAILED,
PresentationState.OUTLINE_FAILED,
PresentationState.GENERATION_FAILED,
PresentationState.EXPORT_FAILED,
PresentationState.EDIT_FAILED
}

View file

@ -0,0 +1,20 @@
from typing import Dict, Set, Optional, Any
from dataclasses import dataclass
@dataclass
class StateContext:
"""Context data that travels with the state machine"""
presentation_id: Optional[str] = None
summary: Optional[str] = None
title: Optional[str] = None
outlines: Optional[list] = None
layout: Optional[str] = None
file_paths: Optional[list] = None
export_format: Optional[str] = None
export_path: Optional[str] = None
error_message: Optional[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}

View file

@ -0,0 +1,101 @@
from typing import Dict, Set, Any
from app_mcp.services.state_machine.context import StateContext
from app_mcp.services.state_machine.states import PresentationState
from app_mcp.services.state_machine.constants import TRANSITIONS, SUGGESTIONS, PROGRESS_WEIGHTS, ERROR_STATES
class PresentationStateMachine:
def __init__(self):
self.state = PresentationState.INIT
self.context = StateContext()
self._state_history = [PresentationState.INIT]
self._transitions = TRANSITIONS
self._error_states = ERROR_STATES
self._suggestions = SUGGESTIONS
self._progress_weights = PROGRESS_WEIGHTS
def transition(self, new_state: PresentationState, context_updates: Dict[str, Any] = None):
"""
Transition to new state with optional context updates
Args:
new_state (PresentationState): The state to transition to
context_updates (Dict[str, Any], optional): Context data to update during transition
Raises:
ValueError: If the transition is not valid
"""
if not self.is_valid_transition(new_state):
raise ValueError(f"Invalid transition from {self.state} to {new_state}")
# Update context if provided
if context_updates:
for key, value in context_updates.items():
if hasattr(self.context, key):
setattr(self.context, key, value)
else:
self.context.metadata[key] = value
# Record state history
self._state_history.append(new_state)
self.state = new_state
def is_valid_transition(self, new_state: PresentationState) -> bool:
"""Check if transition to new state is valid"""
return new_state in self._transitions.get(self.state, set())
def get_available_transitions(self) -> Set[PresentationState]:
"""Get all valid transitions from current state"""
return self._transitions.get(self.state, set())
def can_transition_to(self, target_state: PresentationState) -> bool:
"""Check if can transition to target state"""
return target_state in self.get_available_transitions()
def is_terminal_state(self) -> bool:
"""Check if current state is terminal (no outgoing transitions)"""
return len(self.get_available_transitions()) == 0
def is_error_state(self) -> bool:
"""Check if current state is an error state"""
return self.state in self._error_states
def get_workflow_progress(self) -> float:
"""Calculate workflow progress as percentage"""
return self._progress_weights.get(self.state, 0)
def get_next_suggested_action(self) -> str:
"""Get suggested next action based on current state"""
return self._suggestions.get(self.state, "No suggestions available")
def reset(self):
"""Reset state machine to initial state"""
self.state = PresentationState.INIT
self.context = StateContext()
self._state_history = [PresentationState.INIT]
def get_state_history(self) -> list:
"""Get history of states visited"""
return self._state_history.copy()
def rollback_to_previous_state(self) -> bool:
"""Rollback to previous state if possible"""
if len(self._state_history) < 2:
return False
# Remove current state from history
self._state_history.pop()
previous_state = self._state_history[-1]
if self.is_valid_transition(previous_state):
self.state = previous_state
return True
else:
self._state_history.append(self.state)
return False
def __str__(self):
return f"PresentationStateMachine(state={self.state.name}, progress={self.get_workflow_progress()}%)"
def __repr__(self):
return (f"PresentationStateMachine(state={self.state.name}, "
f"context={self.context}, "
f"history_length={len(self._state_history)})")

View file

@ -0,0 +1,40 @@
from enum import Enum, auto
class PresentationState(Enum):
"""
Represents the various states in the presentation workflow.
"""
INIT = auto()
# Upload and summary phase
FILES_UPLOADED = auto()
SUMMARY_GENERATED = auto()
# Outline generation phase
OUTLINE_REQUESTED = auto()
OUTLINE_GENERATED = auto()
OUTLINE_APPROVED = auto()
# Layout selection phase
LAYOUT_REQUESTED = auto()
LAYOUT_SELECTED = auto()
# Presentation generation phase
GENERATION_IN_PROGRESS = auto()
PRESENTATION_READY = auto()
# Export phase
EXPORT_REQUESTED = auto()
EXPORT_IN_PROGRESS = auto()
EXPORT_COMPLETE = auto()
# Edit and revision loops
EDIT_REQUESTED = auto()
TEMPLATE_EDITING = auto()
# Error states
UPLOAD_FAILED = auto()
SUMMARY_FAILED = auto()
OUTLINE_FAILED = auto()
GENERATION_FAILED = auto()
EXPORT_FAILED = auto()
EDIT_FAILED = auto()

View file

@ -0,0 +1,354 @@
from typing import Dict, Any, Optional, List
from dataclasses import asdict
from app_mcp.services.state_machine.machine import PresentationStateMachine
from app_mcp.services.state_machine.states import PresentationState
from utils.user_config import update_env_with_user_config
from app_mcp.wrapper.upload_and_generate_summary import upload_and_summarize_files
from app_mcp.wrapper.generate_outline import generate_outline
from app_mcp.wrapper.presentation_generation import process_post_outline_workflow
from app_mcp.wrapper.presentation_export import export_presentation_and_get_path
from app_mcp.wrapper.list_layout import list_layouts
class WorkflowOrchestrator:
"""
Orchestrates the presentation generation workflow using FSM
- Handles session management
- Executes
- file uploads
- summary generation
- outline generation
- layout selection
- presentation generation
- export
- Provides status and context management
- Allows for session-based operations
- Supports error handling and recovery
"""
def __init__(self):
"""
Initiating:
- The environment with user configuration from the user config file.
- The Finite State Machine (FSM) for presentation workflow.
- Active sessions dictionary to manage multiple workflows.
"""
try:
update_env_with_user_config()
except Exception as e:
print(f"Error updating environment with user config: {e}")
self.fsm = PresentationStateMachine()
self._active_sessions: Dict[str, PresentationStateMachine] = {}
def create_session(self, session_id: str) -> PresentationStateMachine:
"""
Create a new workflow session with the given session ID.
If a session with the same ID already exists, it will be replaced.
Session will Remain for the lifetime of the application.
Args:
session_id (str): Unique identifier for the session.
"""
if not session_id or not isinstance(session_id, str):
raise ValueError("Session ID must be a non-empty string")
session_id = session_id.strip()
if not session_id:
raise ValueError("Session ID cannot be empty")
if session_id in self._active_sessions:
self.remove_session(session_id)
print(f"Session {session_id} already exists, replacing it.")
self._active_sessions[session_id] = PresentationStateMachine()
return self._active_sessions[session_id]
def get_session(self, session_id: str) -> Optional[PresentationStateMachine]:
"""Get existing workflow session"""
if not session_id or not isinstance(session_id, str):
return None
return self._active_sessions.get(session_id.strip())
def remove_session(self, session_id: str) -> bool:
"""Remove workflow session"""
return self._active_sessions.pop(session_id, None) is not None
async def execute_upload_and_summarize(self, session_id: str, files: List[Any]) -> Dict[str, Any]:
"""
Execute file upload and summary generation workflow step.
Args:
session_id (str): Unique identifier for the session.
files (List[Any]): List of files to be uploaded and summarized.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and any
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.FILES_UPLOADED)
result = await upload_and_summarize_files(files)
# Update context and transition to summary generated
context_updates = {
"summary": result["summary"],
"file_paths": result["file_paths"]
}
fsm.transition(PresentationState.SUMMARY_GENERATED, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"result": result
}
except Exception as e:
fsm.transition(PresentationState.UPLOAD_FAILED, {"error_message": str(e)})
print(f"There was an error uploading and summarizing files: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def execute_generate_outline(self, session_id: str, prompt: str, **kwargs) -> Dict[str, Any]:
"""
Execute outline generation workflow step
Args:
session_id (str): Unique identifier for the session.
prompt (str): The prompt to generate the outline.
**kwargs: Additional parameters for outline generation.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and generated outline.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.OUTLINE_REQUESTED)
result = await generate_outline(prompt, summary=fsm.context.summary, **kwargs)
# Update the Context and transition to outline generated
context_updates = {
"title": result["title"],
"outlines": result["outlines"]
}
fsm.transition(PresentationState.OUTLINE_GENERATED, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": "Review outline and approve, or request regeneration",
"result": result,
"can_approve": True,
"can_regenerate": True
}
except Exception as e:
fsm.transition(PresentationState.OUTLINE_FAILED, {"error_message": str(e)})
print(f"Error generating outline for session {session_id}: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def approve_outline(self, session_id: str) -> Dict[str, Any]:
"""
Approve the generated outline
Args:
session_id (str): Unique identifier for the session.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
if fsm.state != PresentationState.OUTLINE_GENERATED:
raise ValueError(f"Cannot approve outline in state {fsm.state.name}")
fsm.transition(PresentationState.OUTLINE_APPROVED)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action()
}
async def execute_layout_selection(self, session_id: str, layout: str) -> Dict[str, Any]:
"""
Execute layout selection workflow step
Args:
session_id (str): Unique identifier for the session.
layout (str): Selected layout for the presentation.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and selected layout.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.LAYOUT_REQUESTED)
#Updating the context and transitioning to LAYOUT_SELECTED
context_updates = {"layout": layout}
fsm.transition(PresentationState.LAYOUT_SELECTED, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"selected_layout": layout
}
except Exception as e:
print(f"Error selecting layout for session {session_id}: {e}")
return {
"status": "error",
"error": str(e),
"next_action": "Please select a valid layout"
}
async def execute_presentation_generation(self, session_id: str, **kwargs) -> Dict[str, Any]:
"""
Execute presentation generation workflow step
Args:
session_id (str): Unique identifier for the session.
**kwargs: Additional parameters for presentation generation.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and generated presentation.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.GENERATION_IN_PROGRESS)
notes = kwargs.get('notes', [])
result = await process_post_outline_workflow(
title=fsm.context.title,
outlines=fsm.context.outlines,
notes=notes,
layout=fsm.context.layout,
prompt=fsm.context.metadata.get('original_prompt', ""),
sql_session=None,
**kwargs
)
#Updating the Context and transitioning to PRESENTATION_READY
context_updates = {"presentation_id": result["presentation_id"]}
fsm.transition(PresentationState.PRESENTATION_READY, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"result": result
}
except Exception as e:
fsm.transition(PresentationState.GENERATION_FAILED, {"error_message": str(e)})
print(f"Error generating presentation for session {session_id}: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def execute_export(self, session_id: str, export_format: str = "pptx") -> Dict[str, Any]:
"""
Execute presentation export workflow step
Args:
session_id (str): Unique identifier for the session.
export_format (str): Format to export the presentation (e.g., "pptx", "pdf").
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and export
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
# Transition to EXPORT_REQUESTED state
fsm.transition(PresentationState.EXPORT_REQUESTED, {"export_format": export_format})
fsm.transition(PresentationState.EXPORT_IN_PROGRESS)
result = await export_presentation_and_get_path(
presentation_id=fsm.context.presentation_id,
title=fsm.context.title,
export_as=export_format
)
print("RResult of export:", result)
#Updating the Context and transitioning to EXPORT_COMPLETE
context_updates = {"export_path": result["path"]}
fsm.transition(PresentationState.EXPORT_COMPLETE, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": "Download your presentation or start a new one",
"result": result
}
except Exception as e:
fsm.transition(PresentationState.EXPORT_FAILED, {"error_message": str(e)})
print(f"Error exporting presentation for session {session_id}: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def get_available_layouts(self) -> List[Any]:
"""
Get available presentation layouts
"""
return await list_layouts()
def get_workflow_status(self, session_id: str) -> Dict[str, Any]:
"""Get current workflow status"""
fsm = self.get_session(session_id)
if not fsm:
return {"error": "Session not found"}
return {
"session_id": session_id,
"current_state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"available_transitions": [s.name for s in fsm.get_available_transitions()],
"is_error_state": fsm.is_error_state(),
"context": asdict(fsm.context),
"state_history": [s.name for s in fsm.get_state_history()]
}
def get_all_sessions(self) -> Dict[str, Dict[str, Any]]:
"""
Get status of all active sessions
"""
return {
session_id: self.get_workflow_status(session_id)
for session_id in self._active_sessions.keys()
}

View file

@ -0,0 +1,38 @@
"""MCP Tools package for presentation generation."""
from app_mcp.tools.choose_layout import register_choose_layout
from app_mcp.tools.export_presentation import register_export_presentation
from app_mcp.tools.regenerate_outline import register_regenerate_outline
from app_mcp.tools.get_status import register_get_status
from app_mcp.tools.show_layouts import register_show_layouts
from app_mcp.tools.start_presentation import register_start_presentation
from app_mcp.tools.help_me import register_help_me
from app_mcp.tools.continue_workflow import register_continue_workflow
__all__ = [
'register_choose_layout',
'register_export_presentation',
'register_regenerate_outline',
'register_get_status',
'register_show_layouts',
'register_start_presentation',
'register_help_me',
'register_continue_workflow',
'register_tools',
]
def register_tools(mcp, orchestrator):
"""Register all MCP tools in a fancy way."""
tools = [
register_choose_layout,
register_export_presentation,
register_regenerate_outline,
register_get_status,
register_show_layouts,
register_start_presentation,
register_help_me,
register_continue_workflow
]
for tool in tools:
tool(mcp, orchestrator)

View file

@ -0,0 +1,46 @@
from typing import Dict, Any
def register_choose_layout(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("choose_layout")
async def choose_layout(session_id: str, layout_name: str) -> Dict[str, Any]:
"""
🎨 Select a visual style and theme for your presentation.
Choose from available professional layouts that determine:
- Color scheme and visual design
- Slide structure and layout patterns
- Font choices and styling
- Overall presentation aesthetic
Use 'show_layouts' first to see all available options.
Args:
session_id: Your presentation session ID
layout_name: Name of the layout you want to use
"""
try:
result = await orchestrator.execute_layout_selection(session_id, layout_name)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": f"Perfect! I've selected the '{layout_name}' layout for your presentation.",
"suggestion": "Now I'll generate all the slides with content, images, and styling. This might take a minute or two.",
"available_actions": {
"continue": "Start generating the presentation",
"change_layout": "Actually, let me pick a different layout"
}
}
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return choose_layout

View file

@ -0,0 +1,122 @@
from typing import Dict, Any
def register_continue_workflow(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("continue_workflow")
async def continue_workflow(
session_id: str,
action: str = "continue"
) -> Dict[str, Any]:
"""
Move to the next step in creating your presentation.
This tool automatically determines what should happen next based on where
you are in the process:
- After starting: Generates your presentation outline
- After outline: Shows available layouts to choose from
- After layout: Creates your complete presentation
Just call this when you're ready to proceed to the next step!
Args:
session_id: Your presentation session ID
action: What to do next (usually just "continue")
"""
try:
# Validate session_id
if not session_id or not isinstance(session_id, str):
return {
"status": "error",
"error": "Valid session_id is required",
"suggestion": "Use the same session_id from start_presentation"
}
session_id = session_id.strip()
fsm = orchestrator.get_session(session_id)
if not fsm:
return {
"status": "error",
"error": "Session not found. Please start a new presentation first.",
"suggestion": "Call start_presentation to begin"
}
current_state = fsm.state.name
if current_state in ["FILES_UPLOADED", "SUMMARY_GENERATED", "INIT"]:
# Generate outline
prompt = fsm.context.metadata.get("original_prompt", "")
n_slides = fsm.context.metadata.get("n_slides", 8)
language = fsm.context.metadata.get("language", "English")
if not prompt:
return {
"status": "error",
"error": "No prompt found in session. Please start over.",
"suggestion": "Call start_presentation with a valid prompt"
}
result = await orchestrator.execute_generate_outline(
session_id, prompt, n_slides=n_slides, language=language
)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": "Here's your presentation outline:",
"title": result["result"]["title"],
"outlines": result["result"]["outlines"],
"suggestion": "Take a look at the outline. If it looks good, use 'continue_workflow' again to proceed to layout selection.",
"next_step": "Call continue_workflow again to choose layouts, or use regenerate_outline to try different approach"
}
return result
elif current_state == "OUTLINE_GENERATED":
# Auto-approve and move to layouts
await orchestrator.approve_outline(session_id)
layouts = await orchestrator.get_available_layouts()
return {
"status": "success",
"session_id": session_id,
"message": "Great! Now let's choose a visual style for your presentation.",
"available_layouts": layouts,
"suggestion": "Choose a layout that fits your content and audience. Use 'choose_layout' with the layout name.",
"next_step": "Call choose_layout with your preferred layout name"
}
elif current_state == "LAYOUT_SELECTED":
# Generate presentation
result = await orchestrator.execute_presentation_generation(session_id)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": "🎉 Your presentation is ready!",
"title": result["result"]["title"],
"presentation_id": result["result"]["presentation_id"],
"suggestion": "Your presentation has been generated successfully! Use 'export_presentation' to download it.",
"next_step": "Call export_presentation with format 'pptx' or 'pdf'"
}
return result
else:
return {
"status": "info",
"message": f"Currently in {current_state} state.",
"suggestion": "Use get_status to see what actions are available.",
"next_step": "Call get_status for guidance"
}
except Exception as e:
return {
"status": "error",
"error": f"Workflow error: {str(e)}",
"session_id": session_id if 'session_id' in locals() else "unknown",
"suggestion": "Use get_status to check your current progress"
}
return continue_workflow

View file

@ -0,0 +1,55 @@
from typing import Dict, Any
def register_export_presentation(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("export_presentation")
async def export_presentation(
session_id: str,
format: str = "pptx"
) -> Dict[str, Any]:
"""
📁 Download your finished presentation in your preferred format.
Export your completed presentation as:
- "pptx" - PowerPoint format (editable, best for sharing and presenting)
- "pdf" - PDF format (read-only, best for viewing and printing)
The exported file will be ready for download immediately.
Args:
session_id: Your presentation session ID
format: Export format - either "pptx" or "pdf"
"""
try:
if format.lower() not in ["pdf", "pptx"]:
return {
"status": "error",
"error": "Please choose either 'pdf' or 'pptx' format",
"session_id": session_id
}
result = await orchestrator.execute_export(session_id, format.lower())
print("Export result:", result)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": f"🎉 Your presentation has been exported as {format.upper()}!",
"path": result["result"]["path"],
"suggestion": "You can download it now, or start creating another presentation.",
"available_actions": {
"download": "Download the presentation",
"new_presentation": "Create a new presentation",
"edit": "Make edits to this presentation"
}
}
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return export_presentation

View file

@ -0,0 +1,83 @@
from typing import Dict, Any, Optional, List
def register_get_status(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("get_status")
def get_status(session_id: str) -> Dict[str, Any]:
"""
📊 Check your presentation creation progress.
See exactly where you are in the process:
- What step you're currently on
- How much progress you've made
- What you can do next
- Any issues that need attention
Perfect for checking in if you're unsure what to do next!
Args:
session_id: Your presentation session ID
"""
try:
if not session_id or not isinstance(session_id, str):
return {
"status": "error",
"error": "Valid session_id is required"
}
session_id = session_id.strip()
status = orchestrator.get_workflow_status(session_id)
if "error" in status:
return {
"status": "error",
"error": "Session not found. Start a new presentation with 'start_presentation'.",
"available_sessions": list(orchestrator._active_sessions.keys())
}
state = status["current_state"]
# Provide user-friendly status messages
friendly_messages = {
"INIT": "Ready to start! Use 'start_presentation' to begin.",
"SUMMARY_GENERATED": "Files processed. Use 'continue_workflow' to generate outline.",
"OUTLINE_GENERATED": "Outline created. Use 'continue_workflow' to proceed to layouts.",
"OUTLINE_APPROVED": "Outline approved. Use 'choose_layout' to select a theme.",
"LAYOUT_SELECTED": "Layout chosen. Use 'continue_workflow' to generate presentation.",
"PRESENTATION_READY": "Presentation generated! Use 'export_presentation' to download.",
"EXPORT_COMPLETE": "All done! Presentation exported successfully."
}
next_actions = {
"INIT": "start_presentation",
"SUMMARY_GENERATED": "continue_workflow",
"OUTLINE_GENERATED": "continue_workflow (or regenerate_outline)",
"OUTLINE_APPROVED": "choose_layout",
"LAYOUT_SELECTED": "continue_workflow",
"PRESENTATION_READY": "export_presentation",
"EXPORT_COMPLETE": "Download file or start_presentation for new one"
}
return {
"status": "success",
"session_id": session_id,
"current_step": state,
"progress": f"{status['progress']:.0f}%",
"message": friendly_messages.get(state, f"Currently in {state} state"),
"next_action": next_actions.get(state, status["next_action"]),
"context": {
"prompt": status["context"].get("metadata", {}).get("original_prompt"),
"n_slides": status["context"].get("metadata", {}).get("n_slides"),
"language": status["context"].get("metadata", {}).get("language")
}
}
except Exception as e:
return {
"status": "error",
"error": f"Status check failed: {str(e)}",
"suggestion": "Try start_presentation to begin a new session"
}
return get_status

View file

@ -0,0 +1,49 @@
from typing import Dict, Any, Optional, List
def register_help_me(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("help")
def help() -> Dict[str, Any]:
"""
Get help and guidance for creating presentations.
Shows you:
- Step-by-step workflow guide
- Available commands and what they do
- Example usage to get you started
- Tips for best results
Perfect for first-time users or when you need a refresher!
"""
return {
"status": "info",
"message": "🎯 Complete Guide to Creating Presentations",
"workflow": {
"step_1": "🚀 start_presentation - Begin with your topic and optional files",
"step_2": "📋 continue_workflow - Generate and review your outline",
"step_3": "🎨 choose_layout - Pick a visual style that fits your content",
"step_4": "⚡ continue_workflow - Generate your complete presentation",
"step_5": "📁 export_presentation - Download as PowerPoint or PDF"
},
"helpful_commands": {
"get_status": "📊 Check your current progress anytime",
"show_layouts": "👀 Browse available themes and styles",
"regenerate_outline": "🔄 Try a different outline approach",
"help": "❓ Show this helpful guide"
},
"quick_start": {
"with_files": "start_presentation(session_id='my-session', prompt='Your topic', files=[uploaded_files])",
"text_only": "start_presentation(session_id='my-session', prompt='Create a presentation about sustainable energy')",
"custom": "start_presentation(session_id='my-session', prompt='Your topic', n_slides=10, language='Spanish')"
},
"tips": [
"💡 Be specific in your prompt for better results",
"📎 Upload relevant files to enhance your content",
"🎨 Choose layouts that match your audience and purpose",
"📊 Use get_status anytime to see what's next"
]
}
return help

View file

@ -0,0 +1,63 @@
from typing import Dict, Any, Optional, List
from app_mcp.tools.continue_workflow import register_continue_workflow
from app_mcp.services.state_machine.states import PresentationState
def register_regenerate_outline(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("regenerate_outline")
async def regenerate_outline(
session_id: str,
new_prompt: Optional[str] = None,
n_slides: Optional[int] = None,
language: Optional[str] = None
) -> Dict[str, Any]:
"""
🔄 Create a new outline with different requirements.
Not happy with the generated outline? Use this to:
- Try a different angle or focus for your topic
- Change the number of slides
- Adjust the language or tone
- Incorporate new requirements
Args:
session_id: Your presentation session ID
new_prompt: New description of what you want (optional)
n_slides: Different number of slides (optional)
language: Different language (optional)
"""
try:
fsm = orchestrator.get_session(session_id)
if not fsm:
return {"status": "error", "error": "Session not found"}
# Update parameters if provided
if new_prompt:
fsm.context.metadata["original_prompt"] = new_prompt
if n_slides:
fsm.context.metadata["n_slides"] = n_slides
if language:
fsm.context.metadata["language"] = language
# Reset to outline generation
if fsm.can_transition_to(PresentationState.OUTLINE_REQUESTED):
fsm.transition(PresentationState.OUTLINE_REQUESTED)
# Generate new outline
continue_workflow = await register_continue_workflow(mcp, orchestrator)
result = await continue_workflow(session_id, "continue")
if result["status"] == "success":
result["message"] = "I've created a new outline for you:"
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return regenerate_outline

View file

@ -0,0 +1,39 @@
from typing import Dict, Any
def register_show_layouts(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("show_layouts")
async def show_layouts(session_id: str) -> Dict[str, Any]:
"""
👀 Browse all available presentation themes and layouts.
See the complete list of professional layouts including:
- Business and corporate themes
- Creative and modern designs
- Academic and educational styles
- Technical and data-focused layouts
Each layout comes with its own color scheme, fonts, and slide structures.
Args:
session_id: Your presentation session ID
"""
try:
layouts = await orchestrator.get_available_layouts()
return {
"status": "success",
"session_id": session_id,
"message": "Here are all the available presentation layouts:",
"layouts": layouts,
"suggestion": "Choose one using 'choose_layout' with the layout name."
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return show_layouts

View file

@ -0,0 +1,108 @@
from typing import List, Dict, Any, Optional
def register_start_presentation(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("start_presentation")
async def start_presentation(
session_id: str,
prompt: str,
files: Optional[List] = None,
n_slides: int = 8,
language: str = "English"
) -> Dict[str, Any]:
"""
🚀 Start creating a new presentation with your idea!
This is your entry point to create presentations. You can:
- Start with just a text prompt describing what you want
- Upload files (PDFs, docs, etc.) to base your presentation on
- Specify how many slides you want (default: 8)
- Choose the language for your presentation
Examples:
- "Create a presentation about climate change solutions"
- "Make slides about our Q4 financial results" (with uploaded files)
- "Build a training deck for new employees"
Args:
session_id: Unique identifier for your presentation session
prompt: Describe what your presentation should be about
files: Optional list of files to analyze and include
n_slides: Number of slides to generate (default: 8)
language: Presentation language (default: English)
"""
try:
if not session_id or not isinstance(session_id, str) or len(session_id.strip()) == 0:
return {
"status": "error",
"error": "Session ID is required and must be a non-empty string",
"example": "Use something like: session_id='my_presentation_123'"
}
if not prompt or not isinstance(prompt, str) or len(prompt.strip()) == 0:
return {
"status": "error",
"error": "Prompt is required and must be a non-empty string",
"example": "prompt='Create a presentation about AI in healthcare'"
}
# Clean session_id
session_id = session_id.strip()
# Create session
orchestrator.create_session(session_id)
# Store initial parameters
fsm = orchestrator.get_session(session_id)
if not fsm:
return {
"status": "error",
"error": "Failed to create session",
"session_id": session_id
}
fsm.context.metadata.update({
"original_prompt": prompt.strip(),
"n_slides": max(1, min(50, n_slides)), # Validate slide count
"language": language.strip() if language else "English"
})
# Handle files if provided
if files and len(files) > 0:
result = await orchestrator.execute_upload_and_summarize(session_id, files)
if result["status"] == "error":
return result
return {
"status": "success",
"session_id": session_id,
"message": "Great! I've uploaded and analyzed your files. Here's a summary:",
"summary": result["result"]["summary"],
"prompt": prompt,
"suggestion": f"Now I can create a presentation outline based on your prompt '{prompt}' and the file content. Use 'continue_workflow' to proceed.",
"next_step": "Call continue_workflow to generate the outline"
}
else:
# Direct outline generation without files
return {
"status": "success",
"session_id": session_id,
"message": f"Perfect! Let's create a presentation about: '{prompt}'",
"suggestion": "I'll generate an outline with the key topics and structure. Use 'continue_workflow' to proceed.",
"next_step": "Call continue_workflow to generate the outline",
"parameters": {
"n_slides": fsm.context.metadata["n_slides"],
"language": fsm.context.metadata["language"]
}
}
except Exception as e:
return {
"status": "error",
"error": f"Unexpected error: {str(e)}",
"session_id": session_id if 'session_id' in locals() else "unknown",
"suggestion": "Please try again with a valid session_id and prompt"
}
return start_presentation

View file

@ -0,0 +1,52 @@
from typing import Dict, Any
from models.sql.presentation import PresentationModel
from models.sql.slide import SlideModel
from models.presentation_from_template import GetPresentationUsingTemplateRequest
from utils.dict_utils import deep_update
from utils.export_utils import export_presentation
from sqlmodel import select
from fastapi import HTTPException
class EditFromTemplateTools:
def __init__(self):
pass
def register(self, mcp):
@mcp.tool("edit_from_template")
async def edit_from_template(
data: GetPresentationUsingTemplateRequest,
sql_session
) -> Dict[str, Any]:
"""
Create a new presentation from a template and updated slide data, then export.
"""
presentation = await sql_session.get(PresentationModel, data.presentation_id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
slides = await sql_session.scalars(
select(SlideModel).where(SlideModel.presentation == data.presentation_id)
)
new_presentation = presentation.get_new_presentation()
new_slides = []
for each_slide in slides:
updated_content = None
new_slide_data = list(filter(lambda x: x.index == each_slide.index, data.data))
if new_slide_data:
updated_content = deep_update(each_slide.content, new_slide_data[0].content)
new_slides.append(
each_slide.get_new_slide(new_presentation.id, updated_content)
)
sql_session.add(new_presentation)
sql_session.add_all(new_slides)
await sql_session.commit()
presentation_and_path = await export_presentation(
new_presentation.id, new_presentation.title, data.export_as
)
return {
**presentation_and_path.model_dump(),
"edit_path": f"/presentation?id={new_presentation.id}",
}

View file

@ -0,0 +1,35 @@
import json
from typing import Dict, Any, Optional
from models.presentation_outline_model import PresentationOutlineModel
from utils.llm_calls.generate_presentation_outlines import generate_ppt_outline
async def generate_outline(
prompt: str,
n_slides: int = 8,
language: str = "English",
summary: Optional[str] = None,
) -> Dict[str, Any]:
"""
Generate presentation outlines given a prompt, number of slides, language, and optional summary.
Returns the parsed outline data.
"""
presentation_content_text = ""
async for chunk in generate_ppt_outline(
prompt,
n_slides,
language,
summary,
):
presentation_content_text += chunk
presentation_content_json = json.loads(presentation_content_text)
presentation_content = PresentationOutlineModel(
**presentation_content_json)
outlines = [slide.model_dump()
for slide in presentation_content.slides[:n_slides]]
return {
"title": presentation_content.title,
"outlines": outlines,
"notes": presentation_content.notes
}

View file

@ -0,0 +1,8 @@
from typing import List, Any
from api.v1.ppt.endpoints.layouts import get_layouts
async def list_layouts() -> List[Any]:
"""
Retrieve and return a list of all available presentation layouts.
"""
return await get_layouts()

View file

@ -0,0 +1,23 @@
from typing import Literal, Dict, Any
from utils.export_utils import export_presentation
# Standalone function for workflow orchestrator
async def export_presentation_and_get_path(
presentation_id: str,
title: str,
export_as: Literal["pptx", "pdf"] = "pptx"
) -> Dict[str, Any]:
"""
Export the presentation and return the export path and edit path.
"""
presentation_and_path = await export_presentation(
presentation_id, title, export_as
)
# model_dump() is assumed to return a dict with the export path and related info
data = presentation_and_path.model_dump()
# Map export_path to path if needed
return {
**data,
"edit_path": f"/presentation?id={presentation_id}",
}

View file

@ -0,0 +1,128 @@
import random
from typing import List, Dict, Any, Optional
from models.presentation_outline_model import SlideOutlineModel
from models.presentation_layout import PresentationLayoutModel
from models.presentation_structure_model import PresentationStructureModel
from models.sql.presentation import PresentationModel
from models.sql.slide import SlideModel
from services.get_layout_by_name import get_layout_by_name
from utils.llm_calls.generate_presentation_structure import generate_presentation_structure
from utils.llm_calls.generate_slide_content import get_slide_content_from_type_and_outline
from services.image_generation_service import ImageGenerationService
from services.icon_finder_service import IconFinderService
from utils.asset_directory_utils import get_images_directory
from utils.process_slides import process_slide_and_fetch_assets
from models.presentation_outline_model import PresentationOutlineModel
from utils.randomizers import get_random_uuid
import asyncio
from services.database import get_async_session
from sqlalchemy.ext.asyncio import AsyncSession
# Standalone function for workflow orchestrator
async def process_post_outline_workflow(
title: str,
outlines: List[Dict[str, Any]],
notes: Optional[str]=[],
layout: str = "general",
language: str = "English",
prompt: str = "",
n_slides: int = 8,
sql_session: Optional[AsyncSession] = None,
) -> Dict[str, Any]:
"""
Process the workflow after outlines are generated: layout, structure, slides, assets, save, and ask for export.
"""
# 1. Parse Layout
layout_model: PresentationLayoutModel = await get_layout_by_name(layout)
total_slide_layouts = len(layout_model.slides)
# 2. Generate Structure
if layout_model.ordered:
presentation_structure = layout_model.to_presentation_structure()
else:
presentation_structure: PresentationStructureModel = (
await generate_presentation_structure(
presentation_outline=PresentationOutlineModel(
title=title,
slides=outlines,
notes=notes,
),
presentation_layout=layout_model,
)
)
presentation_structure.slides = presentation_structure.slides[:n_slides]
for index in range(n_slides):
random_slide_index = random.randint(0, total_slide_layouts - 1)
if index >= n_slides:
presentation_structure.slides.append(random_slide_index)
continue
if presentation_structure.slides[index] >= total_slide_layouts:
presentation_structure.slides[index] = random_slide_index
# 3. Create PresentationModel
presentation_id = get_random_uuid()
presentation = PresentationModel(
id=presentation_id,
title=title,
n_slides=n_slides,
language=language,
outlines=outlines,
prompt=prompt,
notes=notes,
layout=layout_model.model_dump(),
structure=presentation_structure.model_dump(),
)
image_generation_service = ImageGenerationService(get_images_directory())
icon_finder_service = IconFinderService()
async_asset_generation_tasks = []
# 4. Generate slide content and save slides
slides: List[SlideModel] = []
for i, slide_layout_index in enumerate(presentation_structure.slides):
slide_layout = layout_model.slides[slide_layout_index]
slide_content = await get_slide_content_from_type_and_outline(
slide_layout, SlideOutlineModel(**outlines[i]), language
)
slide = SlideModel(
presentation=presentation_id,
layout_group=layout_model.name,
layout=slide_layout.id,
index=i,
content=slide_content,
)
async_asset_generation_tasks.append(
process_slide_and_fetch_assets(
image_generation_service, icon_finder_service, slide
)
)
slides.append(slide)
generated_assets_lists = await asyncio.gather(*async_asset_generation_tasks)
generated_assets = []
for assets_list in generated_assets_lists:
generated_assets.extend(assets_list)
# 5. Save PresentationModel and Slides
if sql_session is None:
from services.database import get_async_session
async for session in get_async_session():
session.add(presentation)
session.add_all(slides)
session.add_all(generated_assets)
await session.commit()
else:
sql_session.add(presentation)
sql_session.add_all(slides)
sql_session.add_all(generated_assets)
await sql_session.commit()
# 6. Ask user if they want to export and in which format
return {
"presentation_id": presentation_id,
"title": title,
"message": "Presentation is ready. Would you like to export? (pdf or pptx)",
"export_options": ["pdf", "pptx"]
}

View file

@ -0,0 +1,31 @@
from typing import List, Dict, Any
from fastapi import UploadFile
from services import TEMP_FILE_SERVICE
from services.documents_loader import DocumentsLoader
from utils.randomizers import get_random_uuid
from utils.llm_calls.generate_document_summary import generate_document_summary
# Standalone function for workflow orchestrator
async def upload_and_summarize_files(
files: List[UploadFile]
) -> Dict[str, Any]:
"""
Upload files, generate a document summary, and return both summary and file paths.
"""
if not files:
raise ValueError("No files provided")
temp_dir = TEMP_FILE_SERVICE.create_temp_dir(get_random_uuid())
file_paths = []
for upload in files:
temp_path = TEMP_FILE_SERVICE.create_temp_file_path(upload.filename, temp_dir)
with open(temp_path, "wb") as f:
f.write(await upload.read())
file_paths.append(temp_path)
documents_loader = DocumentsLoader(file_paths=file_paths)
await documents_loader.load_documents(temp_dir)
summary = await generate_document_summary(documents_loader.documents)
return {
"summary": summary,
"file_paths": file_paths,
}

View file

@ -0,0 +1,24 @@
import sys
import os
import argparse
import asyncio
from app_mcp.server import create_mcp_server, uvicorn_config
async def main():
parser = argparse.ArgumentParser(description="Run the FastAPI server")
parser.add_argument(
"--port", type=int, default=8001, help="Port number to run the server on"
)
args = parser.parse_args()
mcp = create_mcp_server()
await mcp.run_async(
transport="http",
host="0.0.0.0",
port=args.port,
uvicorn_config=uvicorn_config
)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -26,8 +26,8 @@ export async function GET(request: Request) {
waitUntil: "networkidle0",
timeout: 80000,
});
await page.waitForSelector("[data-layouts]", { timeout: 10000 });
await page.waitForSelector("[data-layouts]", { timeout: 30000 });
// Extract both data-layouts and data-group-settings attributes
const { dataLayouts, dataGroupSettings } = await page.$eval(

View file

@ -1,19 +1,23 @@
/* This script starts the FastAPI and Next.js servers, setting up user configuration if necessary. It reads environment variables to configure API keys and other settings, ensuring that the user configuration file is created if it doesn't exist. The script also handles the starting of both servers and keeps the Node.js process alive until one of the servers exits. */
const path = require('path');
const { spawn } = require('child_process');
const fs = require('fs');
const path = require("path");
const { spawn } = require("child_process");
const fs = require("fs");
const fastapiDir = path.join(__dirname, 'servers/fastapi');
const nextjsDir = path.join(__dirname, 'servers/nextjs');
const fastapiDir = path.join(__dirname, "servers/fastapi");
const nextjsDir = path.join(__dirname, "servers/nextjs");
const isDev = process.env.NODE_ENV === 'development';
const canChangeKeys = process.env.CAN_CHANGE_KEYS !== 'false';
const isDev = process.env.NODE_ENV === "development";
const canChangeKeys = process.env.CAN_CHANGE_KEYS !== "false";
const fastapiPort = 8000;
const nextjsPort = 3000;
const appmcpPort = 8001;
const userConfigPath = path.join(process.env.APP_DATA_DIRECTORY, 'userConfig.json');
const userConfigPath = path.join(
process.env.APP_DATA_DIRECTORY,
"userConfig.json",
);
const userDataDir = path.dirname(userConfigPath);
// Create user_data directory if it doesn't exist
@ -25,13 +29,12 @@ process.env.USER_CONFIG_PATH = userConfigPath;
//? UserConfig is only setup if API Keys can be changed
const setupUserConfigFromEnv = () => {
let existingConfig = {};
if (fs.existsSync(userConfigPath)) {
existingConfig = JSON.parse(fs.readFileSync(userConfigPath, 'utf8'));
existingConfig = JSON.parse(fs.readFileSync(userConfigPath, "utf8"));
}
if (!['ollama', 'openai', 'google'].includes(existingConfig.LLM)) {
if (!["ollama", "openai", "google"].includes(existingConfig.LLM)) {
existingConfig.LLM = undefined;
}
@ -42,18 +45,19 @@ const setupUserConfigFromEnv = () => {
OLLAMA_URL: process.env.OLLAMA_URL || existingConfig.OLLAMA_URL,
OLLAMA_MODEL: process.env.OLLAMA_MODEL || existingConfig.OLLAMA_MODEL,
CUSTOM_LLM_URL: process.env.CUSTOM_LLM_URL || existingConfig.CUSTOM_LLM_URL,
CUSTOM_LLM_API_KEY: process.env.CUSTOM_LLM_API_KEY || existingConfig.CUSTOM_LLM_API_KEY,
CUSTOM_LLM_API_KEY:
process.env.CUSTOM_LLM_API_KEY || existingConfig.CUSTOM_LLM_API_KEY,
CUSTOM_MODEL: process.env.CUSTOM_MODEL || existingConfig.CUSTOM_MODEL,
PEXELS_API_KEY: process.env.PEXELS_API_KEY || existingConfig.PEXELS_API_KEY,
PIXABAY_API_KEY: process.env.PIXABAY_API_KEY || existingConfig.PIXABAY_API_KEY,
PIXABAY_API_KEY:
process.env.PIXABAY_API_KEY || existingConfig.PIXABAY_API_KEY,
IMAGE_PROVIDER: process.env.IMAGE_PROVIDER || existingConfig.IMAGE_PROVIDER,
USE_CUSTOM_URL: process.env.USE_CUSTOM_URL || existingConfig.USE_CUSTOM_URL,
};
fs.writeFileSync(userConfigPath, JSON.stringify(userConfig));
}
};
const startServers = async () => {
const fastApiProcess = spawn(
"python",
["server.py", "--port", fastapiPort.toString(), "--reload", isDev],
@ -61,13 +65,27 @@ const startServers = async () => {
cwd: fastapiDir,
stdio: "inherit",
env: process.env,
}
},
);
fastApiProcess.on("error", err => {
fastApiProcess.on("error", (err) => {
console.error("FastAPI process failed to start:", err);
});
const appmcpProcess = spawn(
"python",
["mcp_server.py", "--port", appmcpPort.toString()],
{
cwd: fastapiDir,
stdio: "inherit",
env: process.env,
},
);
appmcpProcess.on("error", (err) => {
console.error("App MCP process failed to start:", err);
});
const nextjsProcess = spawn(
"npm",
["run", isDev ? "dev" : "start", "--", "-p", nextjsPort.toString()],
@ -75,17 +93,17 @@ const startServers = async () => {
cwd: nextjsDir,
stdio: "inherit",
env: process.env,
}
},
);
nextjsProcess.on("error", err => {
nextjsProcess.on("error", (err) => {
console.error("Next.js process failed to start:", err);
});
// Keep the Node process alive until both servers exit
const exitCode = await Promise.race([
new Promise(resolve => fastApiProcess.on("exit", resolve)),
new Promise(resolve => nextjsProcess.on("exit", resolve)),
new Promise((resolve) => fastApiProcess.on("exit", resolve)),
new Promise((resolve) => nextjsProcess.on("exit", resolve)),
]);
console.log(`One of the processes exited. Exit code: ${exitCode}`);