Merge branch 'feat/improved-document-parsing' of github.com:presenton/presenton into feat/improved-document-parsing

This commit is contained in:
shiva raj badu 2025-08-04 13:29:25 +05:45
commit a73b0814f6
No known key found for this signature in database
40 changed files with 2106 additions and 216 deletions

View file

@ -5,6 +5,9 @@ out
build
.git
.gitignore
tmp
debug
.fastembed_cache
servers/fastapi/tmp
servers/fastapi/debug
servers/nextjs/node_modules

View file

@ -17,13 +17,17 @@ WORKDIR /app
# Set environment variables
ENV APP_DATA_DIRECTORY=/app_data
ENV TEMP_DIRECTORY=/tmp/presenton
ENV PYTHONPATH="${PYTHONPATH}:/app/servers/fastapi"
# Install ollama
RUN curl -fsSL https://ollama.com/install.sh | sh
# Install dependencies for FastAPI
COPY servers/fastapi/requirements.txt ./
RUN pip install -r requirements.txt
RUN pip install aiohttp aiomysql asyncpg fastapi[standard] \
pathvalidate pdfplumber nltk chromadb sqlmodel redis \
anthropic google-genai openai fastmcp
RUN pip install docling --extra-index-url https://download.pytorch.org/whl/cpu
# Install dependencies for Next.js
WORKDIR /app/servers/nextjs

View file

@ -20,18 +20,21 @@ RUN ls -a
# Set environment variables
ENV APP_DATA_DIRECTORY=/app_data
ENV TEMP_DIRECTORY=/tmp/presenton
ENV PYTHONPATH="${PYTHONPATH}:/app/servers/fastapi"
# Install ollama
RUN curl -fsSL https://ollama.com/install.sh | sh
RUN curl -fsSL http://ollama.com/install.sh | sh
# Install dependencies for FastAPI
COPY servers/fastapi/requirements.txt ./
RUN pip install -r requirements.txt
RUN pip install aiohttp aiomysql asyncpg fastapi[standard] \
pathvalidate pdfplumber nltk chromadb sqlmodel redis \
anthropic google-genai openai fastmcp
RUN pip install docling --extra-index-url https://download.pytorch.org/whl/cpu
# Install dependencies for Next.js
WORKDIR /node_dependencies
COPY servers/nextjs/package.json servers/nextjs/package-lock.json ./
RUN npm install
RUN npm install
# Install chrome for puppeteer
RUN npx puppeteer browsers install chrome@138.0.7204.94 --install-deps
@ -45,4 +48,4 @@ COPY nginx.conf /etc/nginx/nginx.conf
EXPOSE 80
# Start the servers
CMD ["node", "/app/start.js", "--dev"]
CMD ["node", "/app/start.js", "--dev"]

View file

@ -30,6 +30,12 @@ http {
proxy_connect_timeout 30m;
}
location /mcp/ {
proxy_pass http://localhost:8001;
proxy_read_timeout 30m;
proxy_connect_timeout 30m;
}
location /docs {
proxy_pass http://localhost:8000/docs;
proxy_read_timeout 30m;

View file

@ -0,0 +1,27 @@
from fastapi import APIRouter, HTTPException
import aiohttp
from typing import List, Any
from services.get_layout_by_name import get_layout_by_name
from models.presentation_layout import PresentationLayoutModel
LAYOUTS_ROUTER = APIRouter(prefix="/layouts", tags=["Layouts"])
@LAYOUTS_ROUTER.get("/", summary="Get available layouts")
async def get_layouts():
url = "http://localhost:3000/api/layouts" # Adjust port if needed
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status != 200:
error_text = await response.text()
raise HTTPException(
status_code=response.status,
detail=f"Failed to fetch layouts: {error_text}"
)
layouts_json = await response.json()
# Optionally, parse into a Pydantic model if you have one matching the structure
return layouts_json
@LAYOUTS_ROUTER.get("/{layout_name}", summary="Get layout details by ID")
async def get_layout_detail(layout_name: str) -> PresentationLayoutModel:
return await get_layout_by_name(layout_name)

View file

@ -78,6 +78,13 @@ async def stream_outlines(
]
presentation.outlines = presentation_outlines.model_dump()
presentation.title = (
presentation_outlines.slides[0][:50]
.replace("#", "")
.replace("/", "")
.replace("\\", "")
.replace("\n", "")
)
sql_session.add(presentation)
await sql_session.commit()

View file

@ -0,0 +1 @@
# This file marks the mcp directory as a Python package.

View file

@ -0,0 +1,13 @@
from fastmcp import FastMCP
from app_mcp.tools import register_tools
from app_mcp.services.workflow_orchestrator import WorkflowOrchestrator
def create_mcp_server():
mcp = FastMCP("PresentonMCP")
orchestrator = WorkflowOrchestrator()
register_tools(mcp, orchestrator)
return mcp
uvicorn_config = {
"reload": True,
}

View file

@ -0,0 +1,143 @@
from app_mcp.services.state_machine.states import PresentationState
TRANSITIONS = {
PresentationState.INIT: {
PresentationState.FILES_UPLOADED,
PresentationState.OUTLINE_REQUESTED
},
# Upload and summary flow
PresentationState.FILES_UPLOADED: {
PresentationState.SUMMARY_GENERATED,
PresentationState.UPLOAD_FAILED
},
PresentationState.SUMMARY_GENERATED: {
PresentationState.OUTLINE_REQUESTED,
PresentationState.SUMMARY_FAILED
},
# Outline generation flow
PresentationState.OUTLINE_REQUESTED: {
PresentationState.OUTLINE_GENERATED,
PresentationState.OUTLINE_FAILED
},
PresentationState.OUTLINE_GENERATED: {
PresentationState.OUTLINE_APPROVED,
PresentationState.OUTLINE_REQUESTED,
PresentationState.OUTLINE_FAILED
},
PresentationState.OUTLINE_APPROVED: {
PresentationState.LAYOUT_REQUESTED
},
# Layout selection flow
PresentationState.LAYOUT_REQUESTED: {
PresentationState.LAYOUT_SELECTED
},
PresentationState.LAYOUT_SELECTED: {
PresentationState.GENERATION_IN_PROGRESS,
PresentationState.LAYOUT_REQUESTED
},
# Presentation generation flow
PresentationState.GENERATION_IN_PROGRESS: {
PresentationState.PRESENTATION_READY,
PresentationState.GENERATION_FAILED
},
PresentationState.PRESENTATION_READY: {
PresentationState.EXPORT_REQUESTED,
PresentationState.EDIT_REQUESTED,
PresentationState.OUTLINE_REQUESTED
},
# Export flow
PresentationState.EXPORT_REQUESTED: {
PresentationState.EXPORT_IN_PROGRESS
},
PresentationState.EXPORT_IN_PROGRESS: {
PresentationState.EXPORT_COMPLETE,
PresentationState.EXPORT_FAILED
},
PresentationState.EXPORT_COMPLETE: {
PresentationState.EDIT_REQUESTED,
PresentationState.EXPORT_REQUESTED,
PresentationState.INIT
},
# Edit and revision flow
PresentationState.EDIT_REQUESTED: {
PresentationState.TEMPLATE_EDITING
},
PresentationState.TEMPLATE_EDITING: {
PresentationState.PRESENTATION_READY,
PresentationState.EDIT_FAILED
},
# Error recovery transitions
PresentationState.UPLOAD_FAILED: {
PresentationState.INIT,
PresentationState.FILES_UPLOADED
},
PresentationState.SUMMARY_FAILED: {
PresentationState.FILES_UPLOADED,
PresentationState.OUTLINE_REQUESTED
},
PresentationState.OUTLINE_FAILED: {
PresentationState.OUTLINE_REQUESTED,
PresentationState.INIT
},
PresentationState.GENERATION_FAILED: {
PresentationState.LAYOUT_SELECTED,
PresentationState.OUTLINE_APPROVED
},
PresentationState.EXPORT_FAILED: {
PresentationState.EXPORT_REQUESTED,
PresentationState.PRESENTATION_READY
},
PresentationState.EDIT_FAILED: {
PresentationState.EDIT_REQUESTED,
PresentationState.PRESENTATION_READY
}
}
SUGGESTIONS = {
PresentationState.INIT: "Upload files or start with outline generation",
PresentationState.FILES_UPLOADED: "Generate summary from uploaded files",
PresentationState.SUMMARY_GENERATED: "Generate presentation outline",
PresentationState.OUTLINE_GENERATED: "Review and approve outline, or regenerate",
PresentationState.OUTLINE_APPROVED: "Select presentation layout",
PresentationState.LAYOUT_SELECTED: "Generate presentation",
PresentationState.PRESENTATION_READY: "Export presentation or request edits",
PresentationState.EXPORT_REQUESTED: "Choose export format and generate",
PresentationState.EXPORT_COMPLETE: "Download presentation or start new one",
PresentationState.EDIT_REQUESTED: "Make template-based edits",
}
PROGRESS_WEIGHTS = {
PresentationState.INIT: 0,
PresentationState.FILES_UPLOADED: 10,
PresentationState.SUMMARY_GENERATED: 20,
PresentationState.OUTLINE_REQUESTED: 25,
PresentationState.OUTLINE_GENERATED: 35,
PresentationState.OUTLINE_APPROVED: 40,
PresentationState.LAYOUT_REQUESTED: 45,
PresentationState.LAYOUT_SELECTED: 50,
PresentationState.GENERATION_IN_PROGRESS: 70,
PresentationState.PRESENTATION_READY: 85,
PresentationState.EXPORT_REQUESTED: 90,
PresentationState.EXPORT_IN_PROGRESS: 95,
PresentationState.EXPORT_COMPLETE: 100,
PresentationState.TEMPLATE_EDITING: 60,
}
ERROR_STATES = {
PresentationState.UPLOAD_FAILED,
PresentationState.SUMMARY_FAILED,
PresentationState.OUTLINE_FAILED,
PresentationState.GENERATION_FAILED,
PresentationState.EXPORT_FAILED,
PresentationState.EDIT_FAILED
}

View file

@ -0,0 +1,20 @@
from typing import Dict, Set, Optional, Any
from dataclasses import dataclass
@dataclass
class StateContext:
"""Context data that travels with the state machine"""
presentation_id: Optional[str] = None
summary: Optional[str] = None
title: Optional[str] = None
outlines: Optional[list] = None
layout: Optional[str] = None
file_paths: Optional[list] = None
export_format: Optional[str] = None
export_path: Optional[str] = None
error_message: Optional[str] = None
metadata: Dict[str, Any] = None
def __post_init__(self):
if self.metadata is None:
self.metadata = {}

View file

@ -0,0 +1,101 @@
from typing import Dict, Set, Any
from app_mcp.services.state_machine.context import StateContext
from app_mcp.services.state_machine.states import PresentationState
from app_mcp.services.state_machine.constants import TRANSITIONS, SUGGESTIONS, PROGRESS_WEIGHTS, ERROR_STATES
class PresentationStateMachine:
def __init__(self):
self.state = PresentationState.INIT
self.context = StateContext()
self._state_history = [PresentationState.INIT]
self._transitions = TRANSITIONS
self._error_states = ERROR_STATES
self._suggestions = SUGGESTIONS
self._progress_weights = PROGRESS_WEIGHTS
def transition(self, new_state: PresentationState, context_updates: Dict[str, Any] = None):
"""
Transition to new state with optional context updates
Args:
new_state (PresentationState): The state to transition to
context_updates (Dict[str, Any], optional): Context data to update during transition
Raises:
ValueError: If the transition is not valid
"""
if not self.is_valid_transition(new_state):
raise ValueError(f"Invalid transition from {self.state} to {new_state}")
# Update context if provided
if context_updates:
for key, value in context_updates.items():
if hasattr(self.context, key):
setattr(self.context, key, value)
else:
self.context.metadata[key] = value
# Record state history
self._state_history.append(new_state)
self.state = new_state
def is_valid_transition(self, new_state: PresentationState) -> bool:
"""Check if transition to new state is valid"""
return new_state in self._transitions.get(self.state, set())
def get_available_transitions(self) -> Set[PresentationState]:
"""Get all valid transitions from current state"""
return self._transitions.get(self.state, set())
def can_transition_to(self, target_state: PresentationState) -> bool:
"""Check if can transition to target state"""
return target_state in self.get_available_transitions()
def is_terminal_state(self) -> bool:
"""Check if current state is terminal (no outgoing transitions)"""
return len(self.get_available_transitions()) == 0
def is_error_state(self) -> bool:
"""Check if current state is an error state"""
return self.state in self._error_states
def get_workflow_progress(self) -> float:
"""Calculate workflow progress as percentage"""
return self._progress_weights.get(self.state, 0)
def get_next_suggested_action(self) -> str:
"""Get suggested next action based on current state"""
return self._suggestions.get(self.state, "No suggestions available")
def reset(self):
"""Reset state machine to initial state"""
self.state = PresentationState.INIT
self.context = StateContext()
self._state_history = [PresentationState.INIT]
def get_state_history(self) -> list:
"""Get history of states visited"""
return self._state_history.copy()
def rollback_to_previous_state(self) -> bool:
"""Rollback to previous state if possible"""
if len(self._state_history) < 2:
return False
# Remove current state from history
self._state_history.pop()
previous_state = self._state_history[-1]
if self.is_valid_transition(previous_state):
self.state = previous_state
return True
else:
self._state_history.append(self.state)
return False
def __str__(self):
return f"PresentationStateMachine(state={self.state.name}, progress={self.get_workflow_progress()}%)"
def __repr__(self):
return (f"PresentationStateMachine(state={self.state.name}, "
f"context={self.context}, "
f"history_length={len(self._state_history)})")

View file

@ -0,0 +1,40 @@
from enum import Enum, auto
class PresentationState(Enum):
"""
Represents the various states in the presentation workflow.
"""
INIT = auto()
# Upload and summary phase
FILES_UPLOADED = auto()
SUMMARY_GENERATED = auto()
# Outline generation phase
OUTLINE_REQUESTED = auto()
OUTLINE_GENERATED = auto()
OUTLINE_APPROVED = auto()
# Layout selection phase
LAYOUT_REQUESTED = auto()
LAYOUT_SELECTED = auto()
# Presentation generation phase
GENERATION_IN_PROGRESS = auto()
PRESENTATION_READY = auto()
# Export phase
EXPORT_REQUESTED = auto()
EXPORT_IN_PROGRESS = auto()
EXPORT_COMPLETE = auto()
# Edit and revision loops
EDIT_REQUESTED = auto()
TEMPLATE_EDITING = auto()
# Error states
UPLOAD_FAILED = auto()
SUMMARY_FAILED = auto()
OUTLINE_FAILED = auto()
GENERATION_FAILED = auto()
EXPORT_FAILED = auto()
EDIT_FAILED = auto()

View file

@ -0,0 +1,354 @@
from typing import Dict, Any, Optional, List
from dataclasses import asdict
from app_mcp.services.state_machine.machine import PresentationStateMachine
from app_mcp.services.state_machine.states import PresentationState
from utils.user_config import update_env_with_user_config
from app_mcp.wrapper.upload_and_generate_summary import upload_and_summarize_files
from app_mcp.wrapper.generate_outline import generate_outline
from app_mcp.wrapper.presentation_generation import process_post_outline_workflow
from app_mcp.wrapper.presentation_export import export_presentation_and_get_path
from app_mcp.wrapper.list_layout import list_layouts
class WorkflowOrchestrator:
"""
Orchestrates the presentation generation workflow using FSM
- Handles session management
- Executes
- file uploads
- summary generation
- outline generation
- layout selection
- presentation generation
- export
- Provides status and context management
- Allows for session-based operations
- Supports error handling and recovery
"""
def __init__(self):
"""
Initiating:
- The environment with user configuration from the user config file.
- The Finite State Machine (FSM) for presentation workflow.
- Active sessions dictionary to manage multiple workflows.
"""
try:
update_env_with_user_config()
except Exception as e:
print(f"Error updating environment with user config: {e}")
self.fsm = PresentationStateMachine()
self._active_sessions: Dict[str, PresentationStateMachine] = {}
def create_session(self, session_id: str) -> PresentationStateMachine:
"""
Create a new workflow session with the given session ID.
If a session with the same ID already exists, it will be replaced.
Session will Remain for the lifetime of the application.
Args:
session_id (str): Unique identifier for the session.
"""
if not session_id or not isinstance(session_id, str):
raise ValueError("Session ID must be a non-empty string")
session_id = session_id.strip()
if not session_id:
raise ValueError("Session ID cannot be empty")
if session_id in self._active_sessions:
self.remove_session(session_id)
print(f"Session {session_id} already exists, replacing it.")
self._active_sessions[session_id] = PresentationStateMachine()
return self._active_sessions[session_id]
def get_session(self, session_id: str) -> Optional[PresentationStateMachine]:
"""Get existing workflow session"""
if not session_id or not isinstance(session_id, str):
return None
return self._active_sessions.get(session_id.strip())
def remove_session(self, session_id: str) -> bool:
"""Remove workflow session"""
return self._active_sessions.pop(session_id, None) is not None
async def execute_upload_and_summarize(self, session_id: str, files: List[Any]) -> Dict[str, Any]:
"""
Execute file upload and summary generation workflow step.
Args:
session_id (str): Unique identifier for the session.
files (List[Any]): List of files to be uploaded and summarized.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and any
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.FILES_UPLOADED)
result = await upload_and_summarize_files(files)
# Update context and transition to summary generated
context_updates = {
"summary": result["summary"],
"file_paths": result["file_paths"]
}
fsm.transition(PresentationState.SUMMARY_GENERATED, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"result": result
}
except Exception as e:
fsm.transition(PresentationState.UPLOAD_FAILED, {"error_message": str(e)})
print(f"There was an error uploading and summarizing files: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def execute_generate_outline(self, session_id: str, prompt: str, **kwargs) -> Dict[str, Any]:
"""
Execute outline generation workflow step
Args:
session_id (str): Unique identifier for the session.
prompt (str): The prompt to generate the outline.
**kwargs: Additional parameters for outline generation.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and generated outline.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.OUTLINE_REQUESTED)
result = await generate_outline(prompt, summary=fsm.context.summary, **kwargs)
# Update the Context and transition to outline generated
context_updates = {
"title": result["title"],
"outlines": result["outlines"]
}
fsm.transition(PresentationState.OUTLINE_GENERATED, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": "Review outline and approve, or request regeneration",
"result": result,
"can_approve": True,
"can_regenerate": True
}
except Exception as e:
fsm.transition(PresentationState.OUTLINE_FAILED, {"error_message": str(e)})
print(f"Error generating outline for session {session_id}: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def approve_outline(self, session_id: str) -> Dict[str, Any]:
"""
Approve the generated outline
Args:
session_id (str): Unique identifier for the session.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
if fsm.state != PresentationState.OUTLINE_GENERATED:
raise ValueError(f"Cannot approve outline in state {fsm.state.name}")
fsm.transition(PresentationState.OUTLINE_APPROVED)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action()
}
async def execute_layout_selection(self, session_id: str, layout: str) -> Dict[str, Any]:
"""
Execute layout selection workflow step
Args:
session_id (str): Unique identifier for the session.
layout (str): Selected layout for the presentation.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and selected layout.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.LAYOUT_REQUESTED)
#Updating the context and transitioning to LAYOUT_SELECTED
context_updates = {"layout": layout}
fsm.transition(PresentationState.LAYOUT_SELECTED, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"selected_layout": layout
}
except Exception as e:
print(f"Error selecting layout for session {session_id}: {e}")
return {
"status": "error",
"error": str(e),
"next_action": "Please select a valid layout"
}
async def execute_presentation_generation(self, session_id: str, **kwargs) -> Dict[str, Any]:
"""
Execute presentation generation workflow step
Args:
session_id (str): Unique identifier for the session.
**kwargs: Additional parameters for presentation generation.
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and generated presentation.
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
fsm.transition(PresentationState.GENERATION_IN_PROGRESS)
notes = kwargs.get('notes', [])
result = await process_post_outline_workflow(
title=fsm.context.title,
outlines=fsm.context.outlines,
notes=notes,
layout=fsm.context.layout,
prompt=fsm.context.metadata.get('original_prompt', ""),
sql_session=None,
**kwargs
)
#Updating the Context and transitioning to PRESENTATION_READY
context_updates = {"presentation_id": result["presentation_id"]}
fsm.transition(PresentationState.PRESENTATION_READY, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"result": result
}
except Exception as e:
fsm.transition(PresentationState.GENERATION_FAILED, {"error_message": str(e)})
print(f"Error generating presentation for session {session_id}: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def execute_export(self, session_id: str, export_format: str = "pptx") -> Dict[str, Any]:
"""
Execute presentation export workflow step
Args:
session_id (str): Unique identifier for the session.
export_format (str): Format to export the presentation (e.g., "pptx", "pdf").
Returns:
Dict[str, Any]: Result containing status, state, progress, next action, and export
"""
fsm = self.get_session(session_id)
if not fsm:
raise ValueError(f"Session {session_id} not found")
try:
# Transition to EXPORT_REQUESTED state
fsm.transition(PresentationState.EXPORT_REQUESTED, {"export_format": export_format})
fsm.transition(PresentationState.EXPORT_IN_PROGRESS)
result = await export_presentation_and_get_path(
presentation_id=fsm.context.presentation_id,
title=fsm.context.title,
export_as=export_format
)
print("RResult of export:", result)
#Updating the Context and transitioning to EXPORT_COMPLETE
context_updates = {"export_path": result["path"]}
fsm.transition(PresentationState.EXPORT_COMPLETE, context_updates)
return {
"status": "success",
"state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": "Download your presentation or start a new one",
"result": result
}
except Exception as e:
fsm.transition(PresentationState.EXPORT_FAILED, {"error_message": str(e)})
print(f"Error exporting presentation for session {session_id}: {e}")
return {
"status": "error",
"state": fsm.state.name,
"error": str(e),
"next_action": fsm.get_next_suggested_action()
}
async def get_available_layouts(self) -> List[Any]:
"""
Get available presentation layouts
"""
return await list_layouts()
def get_workflow_status(self, session_id: str) -> Dict[str, Any]:
"""Get current workflow status"""
fsm = self.get_session(session_id)
if not fsm:
return {"error": "Session not found"}
return {
"session_id": session_id,
"current_state": fsm.state.name,
"progress": fsm.get_workflow_progress(),
"next_action": fsm.get_next_suggested_action(),
"available_transitions": [s.name for s in fsm.get_available_transitions()],
"is_error_state": fsm.is_error_state(),
"context": asdict(fsm.context),
"state_history": [s.name for s in fsm.get_state_history()]
}
def get_all_sessions(self) -> Dict[str, Dict[str, Any]]:
"""
Get status of all active sessions
"""
return {
session_id: self.get_workflow_status(session_id)
for session_id in self._active_sessions.keys()
}

View file

@ -0,0 +1,38 @@
"""MCP Tools package for presentation generation."""
from app_mcp.tools.choose_layout import register_choose_layout
from app_mcp.tools.export_presentation import register_export_presentation
from app_mcp.tools.regenerate_outline import register_regenerate_outline
from app_mcp.tools.get_status import register_get_status
from app_mcp.tools.show_layouts import register_show_layouts
from app_mcp.tools.start_presentation import register_start_presentation
from app_mcp.tools.help_me import register_help_me
from app_mcp.tools.continue_workflow import register_continue_workflow
__all__ = [
'register_choose_layout',
'register_export_presentation',
'register_regenerate_outline',
'register_get_status',
'register_show_layouts',
'register_start_presentation',
'register_help_me',
'register_continue_workflow',
'register_tools',
]
def register_tools(mcp, orchestrator):
"""Register all MCP tools in a fancy way."""
tools = [
register_choose_layout,
register_export_presentation,
register_regenerate_outline,
register_get_status,
register_show_layouts,
register_start_presentation,
register_help_me,
register_continue_workflow
]
for tool in tools:
tool(mcp, orchestrator)

View file

@ -0,0 +1,46 @@
from typing import Dict, Any
def register_choose_layout(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("choose_layout")
async def choose_layout(session_id: str, layout_name: str) -> Dict[str, Any]:
"""
🎨 Select a visual style and theme for your presentation.
Choose from available professional layouts that determine:
- Color scheme and visual design
- Slide structure and layout patterns
- Font choices and styling
- Overall presentation aesthetic
Use 'show_layouts' first to see all available options. Only show the layout name and short description.
Args:
session_id: Your presentation session ID
layout_name: Name of the layout you want to use
"""
try:
result = await orchestrator.execute_layout_selection(session_id, layout_name)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": f"Perfect! I've selected the '{layout_name}' layout for your presentation.",
"suggestion": "Now I'll generate all the slides with content, images, and styling. This might take a minute or two.",
"available_actions": {
"continue": "Start generating the presentation",
"change_layout": "Actually, let me pick a different layout"
}
}
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return choose_layout

View file

@ -0,0 +1,122 @@
from typing import Dict, Any
def register_continue_workflow(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("continue_workflow")
async def continue_workflow(
session_id: str,
action: str = "continue"
) -> Dict[str, Any]:
"""
Move to the next step in creating your presentation.
This tool automatically determines what should happen next based on where
you are in the process:
- After starting: Generates your presentation outline
- After outline: Shows available layouts to choose from
- After layout: Creates your complete presentation
Just call this when you're ready to proceed to the next step!
Args:
session_id: Your presentation session ID
action: What to do next (usually just "continue")
"""
try:
# Validate session_id
if not session_id or not isinstance(session_id, str):
return {
"status": "error",
"error": "Valid session_id is required",
"suggestion": "Use the same session_id from start_presentation"
}
session_id = session_id.strip()
fsm = orchestrator.get_session(session_id)
if not fsm:
return {
"status": "error",
"error": "Session not found. Please start a new presentation first.",
"suggestion": "Call start_presentation to begin"
}
current_state = fsm.state.name
if current_state in ["FILES_UPLOADED", "SUMMARY_GENERATED", "INIT"]:
# Generate outline
prompt = fsm.context.metadata.get("original_prompt", "")
n_slides = fsm.context.metadata.get("n_slides", 8)
language = fsm.context.metadata.get("language", "English")
if not prompt:
return {
"status": "error",
"error": "No prompt found in session. Please start over.",
"suggestion": "Call start_presentation with a valid prompt"
}
result = await orchestrator.execute_generate_outline(
session_id, prompt, n_slides=n_slides, language=language
)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": "Here's your presentation outline:",
"title": result["result"]["title"],
"outlines": result["result"]["outlines"],
"suggestion": "Take a look at the outline. If it looks good, use 'continue_workflow' again to proceed to layout selection.",
"next_step": "Call continue_workflow again to choose layouts, or use regenerate_outline to try different approach"
}
return result
elif current_state == "OUTLINE_GENERATED":
# Auto-approve and move to layouts
await orchestrator.approve_outline(session_id)
layouts = await orchestrator.get_available_layouts()
return {
"status": "success",
"session_id": session_id,
"message": "Great! Now let's choose a visual style for your presentation.",
"available_layouts": layouts,
"suggestion": "Choose a layout that fits your content and audience. Use 'choose_layout' with the layout name.",
"next_step": "Call choose_layout with your preferred layout name"
}
elif current_state == "LAYOUT_SELECTED":
# Generate presentation
result = await orchestrator.execute_presentation_generation(session_id)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": "🎉 Your presentation is ready!",
"title": result["result"]["title"],
"presentation_id": result["result"]["presentation_id"],
"suggestion": "Your presentation has been generated successfully! Use 'export_presentation' to download it.",
"next_step": "Call export_presentation with format 'pptx' or 'pdf'"
}
return result
else:
return {
"status": "info",
"message": f"Currently in {current_state} state.",
"suggestion": "Use get_status to see what actions are available.",
"next_step": "Call get_status for guidance"
}
except Exception as e:
return {
"status": "error",
"error": f"Workflow error: {str(e)}",
"session_id": session_id if 'session_id' in locals() else "unknown",
"suggestion": "Use get_status to check your current progress"
}
return continue_workflow

View file

@ -0,0 +1,56 @@
from typing import Dict, Any
def register_export_presentation(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("export_presentation")
async def export_presentation(
session_id: str,
format: str = "pptx",
export_path: str = None
) -> Dict[str, Any]:
"""
📁 Download your finished presentation in your preferred format.
Export your completed presentation as:
- "pptx" - PowerPoint format (editable, best for sharing and presenting)
- "pdf" - PDF format (read-only, best for viewing and printing)
The exported file will be ready for download immediately.
Args:
session_id: Your presentation session ID
format: Export format - either "pptx" or "pdf"
"""
try:
if format.lower() not in ["pdf", "pptx"]:
return {
"status": "error",
"error": "Please choose either 'pdf' or 'pptx' format",
"session_id": session_id
}
result = await orchestrator.execute_export(session_id, format.lower())
print("Export result:", result)
if result["status"] == "success":
return {
"status": "success",
"session_id": session_id,
"message": f"🎉 Your presentation has been exported as {format.upper()}!",
"path": result["result"]["path"],
"suggestion": "You can download it now, or start creating another presentation.",
"available_actions": {
"download": "Download the presentation",
"new_presentation": "Create a new presentation",
"edit": "Make edits to this presentation"
}
}
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return export_presentation

View file

@ -0,0 +1,83 @@
from typing import Dict, Any, Optional, List
def register_get_status(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("get_status")
def get_status(session_id: str) -> Dict[str, Any]:
"""
📊 Check your presentation creation progress.
See exactly where you are in the process:
- What step you're currently on
- How much progress you've made
- What you can do next
- Any issues that need attention
Perfect for checking in if you're unsure what to do next!
Args:
session_id: Your presentation session ID
"""
try:
if not session_id or not isinstance(session_id, str):
return {
"status": "error",
"error": "Valid session_id is required"
}
session_id = session_id.strip()
status = orchestrator.get_workflow_status(session_id)
if "error" in status:
return {
"status": "error",
"error": "Session not found. Start a new presentation with 'start_presentation'.",
"available_sessions": list(orchestrator._active_sessions.keys())
}
state = status["current_state"]
# Provide user-friendly status messages
friendly_messages = {
"INIT": "Ready to start! Use 'start_presentation' to begin.",
"SUMMARY_GENERATED": "Files processed. Use 'continue_workflow' to generate outline.",
"OUTLINE_GENERATED": "Outline created. Use 'continue_workflow' to proceed to layouts.",
"OUTLINE_APPROVED": "Outline approved. Use 'choose_layout' to select a theme.",
"LAYOUT_SELECTED": "Layout chosen. Use 'continue_workflow' to generate presentation.",
"PRESENTATION_READY": "Presentation generated! Use 'export_presentation' to download.",
"EXPORT_COMPLETE": "All done! Presentation exported successfully."
}
next_actions = {
"INIT": "start_presentation",
"SUMMARY_GENERATED": "continue_workflow",
"OUTLINE_GENERATED": "continue_workflow (or regenerate_outline)",
"OUTLINE_APPROVED": "choose_layout",
"LAYOUT_SELECTED": "continue_workflow",
"PRESENTATION_READY": "export_presentation",
"EXPORT_COMPLETE": "Download file or start_presentation for new one"
}
return {
"status": "success",
"session_id": session_id,
"current_step": state,
"progress": f"{status['progress']:.0f}%",
"message": friendly_messages.get(state, f"Currently in {state} state"),
"next_action": next_actions.get(state, status["next_action"]),
"context": {
"prompt": status["context"].get("metadata", {}).get("original_prompt"),
"n_slides": status["context"].get("metadata", {}).get("n_slides"),
"language": status["context"].get("metadata", {}).get("language")
}
}
except Exception as e:
return {
"status": "error",
"error": f"Status check failed: {str(e)}",
"suggestion": "Try start_presentation to begin a new session"
}
return get_status

View file

@ -0,0 +1,49 @@
from typing import Dict, Any, Optional, List
def register_help_me(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("help")
def help() -> Dict[str, Any]:
"""
Get help and guidance for creating presentations.
Shows you:
- Step-by-step workflow guide
- Available commands and what they do
- Example usage to get you started
- Tips for best results
Perfect for first-time users or when you need a refresher!
"""
return {
"status": "info",
"message": "🎯 Complete Guide to Creating Presentations",
"workflow": {
"step_1": "🚀 start_presentation - Begin with your topic and optional files",
"step_2": "📋 continue_workflow - Generate and review your outline",
"step_3": "🎨 choose_layout - Pick a visual style that fits your content",
"step_4": "⚡ continue_workflow - Generate your complete presentation",
"step_5": "📁 export_presentation - Download as PowerPoint or PDF"
},
"helpful_commands": {
"get_status": "📊 Check your current progress anytime",
"show_layouts": "👀 Browse available themes and styles",
"regenerate_outline": "🔄 Try a different outline approach",
"help": "❓ Show this helpful guide"
},
"quick_start": {
"with_files": "start_presentation(session_id='my-session', prompt='Your topic', files=[uploaded_files])",
"text_only": "start_presentation(session_id='my-session', prompt='Create a presentation about sustainable energy')",
"custom": "start_presentation(session_id='my-session', prompt='Your topic', n_slides=10, language='Spanish')"
},
"tips": [
"💡 Be specific in your prompt for better results",
"📎 Upload relevant files to enhance your content",
"🎨 Choose layouts that match your audience and purpose",
"📊 Use get_status anytime to see what's next"
]
}
return help

View file

@ -0,0 +1,63 @@
from typing import Dict, Any, Optional, List
from app_mcp.tools.continue_workflow import register_continue_workflow
from app_mcp.services.state_machine.states import PresentationState
def register_regenerate_outline(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("regenerate_outline")
async def regenerate_outline(
session_id: str,
new_prompt: Optional[str] = None,
n_slides: Optional[int] = None,
language: Optional[str] = None
) -> Dict[str, Any]:
"""
🔄 Create a new outline with different requirements.
Not happy with the generated outline? Use this to:
- Try a different angle or focus for your topic
- Change the number of slides
- Adjust the language or tone
- Incorporate new requirements
Args:
session_id: Your presentation session ID
new_prompt: New description of what you want (optional)
n_slides: Different number of slides (optional)
language: Different language (optional)
"""
try:
fsm = orchestrator.get_session(session_id)
if not fsm:
return {"status": "error", "error": "Session not found"}
# Update parameters if provided
if new_prompt:
fsm.context.metadata["original_prompt"] = new_prompt
if n_slides:
fsm.context.metadata["n_slides"] = n_slides
if language:
fsm.context.metadata["language"] = language
# Reset to outline generation
if fsm.can_transition_to(PresentationState.OUTLINE_REQUESTED):
fsm.transition(PresentationState.OUTLINE_REQUESTED)
# Generate new outline
continue_workflow = register_continue_workflow(mcp, orchestrator)
result = await continue_workflow(session_id=session_id, action="continue")
if result["status"] == "success":
result["message"] = "I've created a new outline for you:"
return result
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return regenerate_outline

View file

@ -0,0 +1,39 @@
from typing import Dict, Any
def register_show_layouts(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("show_layouts")
async def show_layouts(session_id: str) -> Dict[str, Any]:
"""
👀 Browse all available presentation themes and layouts.
See the complete list of professional layouts including:
- Business and corporate themes
- Creative and modern designs
- Academic and educational styles
- Technical and data-focused layouts
Each layout comes with its own color scheme, fonts, and slide structures.
Args:
session_id: Your presentation session ID
"""
try:
layouts = await orchestrator.get_available_layouts()
return {
"status": "success",
"session_id": session_id,
"message": "Here are all the available presentation layouts:",
"layouts": layouts,
"suggestion": "Choose one using 'choose_layout' with the layout name."
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"session_id": session_id
}
return show_layouts

View file

@ -0,0 +1,110 @@
from typing import List, Dict, Any, Optional
def register_start_presentation(mcp, orchestrator):
"""Register all workflow-related tools for chat-based interaction"""
@mcp.tool("start_presentation")
async def start_presentation(
session_id: str,
prompt: str,
files: Optional[List] = None,
n_slides: int = 8,
language: str = "English"
) -> Dict[str, Any]:
"""
🚀 Start creating a new presentation with your idea!
This is your entry point to create presentations. You can:
- Start with just a text prompt describing what you want
- Upload files (PDFs, docs, etc.) to base your presentation on
- Specify how many slides you want (default: 8)
- Choose the language for your presentation
Examples:
- "Create a presentation about climate change solutions"
- "Make slides about our Q4 financial results" (with uploaded files)
- "Build a training deck for new employees"
Args:
session_id: Unique identifier for your presentation session
prompt: Describe what your presentation should be about
files: Optional list of files to analyze and include
n_slides: Number of slides to generate (default: 8)
language: Presentation language (default: English)
"""
try:
if not session_id or not isinstance(session_id, str) or len(session_id.strip()) == 0:
return {
"status": "error",
"error": "Session ID is required and must be a non-empty string",
"example": "Use something like: session_id='my_presentation_123'"
}
if not prompt or not isinstance(prompt, str) or len(prompt.strip()) == 0:
return {
"status": "error",
"error": "Prompt is required and must be a non-empty string",
"example": "prompt='Create a presentation about AI in healthcare'"
}
# Clean session_id
session_id = session_id.strip()
# Create session
orchestrator.create_session(session_id)
# Store initial parameters
fsm = orchestrator.get_session(session_id)
if not fsm:
return {
"status": "error",
"error": "Failed to create session",
"session_id": session_id
}
fsm.context.metadata.update({
"original_prompt": prompt.strip(),
"n_slides": max(1, min(50, n_slides)), # Validate slide count
"language": language.strip() if language else "English"
})
# Debug log to verify metadata update
print("DEBUG: Metadata after update:", fsm.context.metadata)
# Handle files if provided
if files and len(files) > 0:
result = await orchestrator.execute_upload_and_summarize(session_id, files)
if result["status"] == "error":
return result
return {
"status": "success",
"session_id": session_id,
"message": "Great! I've uploaded and analyzed your files. Here's a summary:",
"summary": result["result"]["summary"],
"prompt": prompt,
"suggestion": f"Now I can create a presentation outline based on your prompt '{prompt}' and the file content. Use 'continue_workflow' to proceed.",
"next_step": "Call continue_workflow to generate the outline"
}
else:
# Direct outline generation without files
return {
"status": "success",
"session_id": session_id,
"message": f"Perfect! Let's create a presentation about: '{prompt}'",
"suggestion": "I'll generate an outline with the key topics and structure. Use 'continue_workflow' to proceed.",
"next_step": "Call continue_workflow to generate the outline",
"parameters": {
"n_slides": fsm.context.metadata.get("n_slides", 8), # Ensure n_slides is retrieved correctly
"language": fsm.context.metadata.get("language", "English") # Ensure language is retrieved correctly
}
}
except Exception as e:
return {
"status": "error",
"error": f"Unexpected error: {str(e)}",
"session_id": session_id if 'session_id' in locals() else "unknown",
"suggestion": "Please try again with a valid session_id and prompt"
}
return start_presentation

View file

@ -0,0 +1,52 @@
from typing import Dict, Any
from models.sql.presentation import PresentationModel
from models.sql.slide import SlideModel
from models.presentation_from_template import GetPresentationUsingTemplateRequest
from utils.dict_utils import deep_update
from utils.export_utils import export_presentation
from sqlmodel import select
from fastapi import HTTPException
class EditFromTemplateTools:
def __init__(self):
pass
def register(self, mcp):
@mcp.tool("edit_from_template")
async def edit_from_template(
data: GetPresentationUsingTemplateRequest,
sql_session
) -> Dict[str, Any]:
"""
Create a new presentation from a template and updated slide data, then export.
"""
presentation = await sql_session.get(PresentationModel, data.presentation_id)
if not presentation:
raise HTTPException(status_code=404, detail="Presentation not found")
slides = await sql_session.scalars(
select(SlideModel).where(SlideModel.presentation == data.presentation_id)
)
new_presentation = presentation.get_new_presentation()
new_slides = []
for each_slide in slides:
updated_content = None
new_slide_data = list(filter(lambda x: x.index == each_slide.index, data.data))
if new_slide_data:
updated_content = deep_update(each_slide.content, new_slide_data[0].content)
new_slides.append(
each_slide.get_new_slide(new_presentation.id, updated_content)
)
sql_session.add(new_presentation)
sql_session.add_all(new_slides)
await sql_session.commit()
presentation_and_path = await export_presentation(
new_presentation.id, new_presentation.title, data.export_as
)
return {
**presentation_and_path.model_dump(),
"edit_path": f"/presentation?id={new_presentation.id}",
}

View file

@ -0,0 +1,35 @@
import json
from typing import Dict, Any, Optional
from models.presentation_outline_model import PresentationOutlineModel
from utils.llm_calls.generate_presentation_outlines import generate_ppt_outline
async def generate_outline(
prompt: str,
n_slides: int = 8,
language: str = "English",
summary: Optional[str] = None,
) -> Dict[str, Any]:
"""
Generate presentation outlines given a prompt, number of slides, language, and optional summary.
Returns the parsed outline data.
"""
presentation_content_text = ""
async for chunk in generate_ppt_outline(
prompt,
n_slides,
language,
summary,
):
presentation_content_text += chunk
presentation_content_json = json.loads(presentation_content_text)
presentation_content = PresentationOutlineModel(
**presentation_content_json)
outlines = [slide.model_dump()
for slide in presentation_content.slides[:n_slides]]
return {
"title": presentation_content.title,
"outlines": outlines,
"notes": presentation_content.notes
}

View file

@ -0,0 +1,8 @@
from typing import List, Any
from api.v1.ppt.endpoints.layouts import get_layouts
async def list_layouts() -> List[Any]:
"""
Retrieve and return a list of all available presentation layouts.
"""
return await get_layouts()

View file

@ -0,0 +1,25 @@
from typing import Literal, Dict, Any
from utils.export_utils import export_presentation
# Standalone function for workflow orchestrator
async def export_presentation_and_get_path(
presentation_id: str,
title: str,
export_as: Literal["pptx", "pdf"] = "pptx"
) -> Dict[str, Any]:
"""
Export the presentation and return the export path and edit path.
"""
presentation_and_path = await export_presentation(
presentation_id, title, export_as
)
# model_dump() is assumed to return a dict with the export path and related info
data = presentation_and_path.model_dump()
print("Exported presentation data:", data)
# Map export_path to path if needed
return {
**data,
"edit_path": f"/presentation?id={presentation_id}",
"export_path": data["path"],
}

View file

@ -0,0 +1,128 @@
import random
from typing import List, Dict, Any, Optional
from models.presentation_outline_model import SlideOutlineModel
from models.presentation_layout import PresentationLayoutModel
from models.presentation_structure_model import PresentationStructureModel
from models.sql.presentation import PresentationModel
from models.sql.slide import SlideModel
from utils.get_layout_by_name import get_layout_by_name
from utils.llm_calls.generate_presentation_structure import generate_presentation_structure
from utils.llm_calls.generate_slide_content import get_slide_content_from_type_and_outline
from services.image_generation_service import ImageGenerationService
from services.icon_finder_service import IconFinderService
from utils.asset_directory_utils import get_images_directory
from utils.process_slides import process_slide_and_fetch_assets
from models.presentation_outline_model import PresentationOutlineModel
from utils.randomizers import get_random_uuid
import asyncio
from services.database import get_async_session
from sqlalchemy.ext.asyncio import AsyncSession
# Standalone function for workflow orchestrator
async def process_post_outline_workflow(
title: str,
outlines: List[Dict[str, Any]],
notes: Optional[str]=[],
layout: str = "general",
language: str = "English",
prompt: str = "",
n_slides: int = 8,
sql_session: Optional[AsyncSession] = None,
) -> Dict[str, Any]:
"""
Process the workflow after outlines are generated: layout, structure, slides, assets, save, and ask for export.
"""
# 1. Parse Layout
layout_model: PresentationLayoutModel = await get_layout_by_name(layout)
total_slide_layouts = len(layout_model.slides)
# 2. Generate Structure
if layout_model.ordered:
presentation_structure = layout_model.to_presentation_structure()
else:
presentation_structure: PresentationStructureModel = (
await generate_presentation_structure(
presentation_outline=PresentationOutlineModel(
title=title,
slides=outlines,
notes=notes,
),
presentation_layout=layout_model,
)
)
presentation_structure.slides = presentation_structure.slides[:n_slides]
for index in range(n_slides):
random_slide_index = random.randint(0, total_slide_layouts - 1)
if index >= n_slides:
presentation_structure.slides.append(random_slide_index)
continue
if presentation_structure.slides[index] >= total_slide_layouts:
presentation_structure.slides[index] = random_slide_index
# 3. Create PresentationModel
presentation_id = get_random_uuid()
presentation = PresentationModel(
id=presentation_id,
title=title,
n_slides=n_slides,
language=language,
outlines=outlines,
prompt=prompt,
notes=notes,
layout=layout_model.model_dump(),
structure=presentation_structure.model_dump(),
)
image_generation_service = ImageGenerationService(get_images_directory())
icon_finder_service = IconFinderService()
async_asset_generation_tasks = []
# 4. Generate slide content and save slides
slides: List[SlideModel] = []
for i, slide_layout_index in enumerate(presentation_structure.slides):
slide_layout = layout_model.slides[slide_layout_index]
slide_content = await get_slide_content_from_type_and_outline(
slide_layout, SlideOutlineModel(**outlines[i]), language
)
slide = SlideModel(
presentation=presentation_id,
layout_group=layout_model.name,
layout=slide_layout.id,
index=i,
content=slide_content,
)
async_asset_generation_tasks.append(
process_slide_and_fetch_assets(
image_generation_service, icon_finder_service, slide
)
)
slides.append(slide)
generated_assets_lists = await asyncio.gather(*async_asset_generation_tasks)
generated_assets = []
for assets_list in generated_assets_lists:
generated_assets.extend(assets_list)
# 5. Save PresentationModel and Slides
if sql_session is None:
from services.database import get_async_session
async for session in get_async_session():
session.add(presentation)
session.add_all(slides)
session.add_all(generated_assets)
await session.commit()
else:
sql_session.add(presentation)
sql_session.add_all(slides)
sql_session.add_all(generated_assets)
await sql_session.commit()
# 6. Ask user if they want to export and in which format
return {
"presentation_id": presentation_id,
"title": title,
"message": "Presentation is ready. Would you like to export? (pdf or pptx)",
"export_options": ["pdf", "pptx"]
}

View file

@ -0,0 +1,31 @@
from typing import List, Dict, Any
from fastapi import UploadFile
from services import TEMP_FILE_SERVICE
from services.documents_loader import DocumentsLoader
from utils.randomizers import get_random_uuid
from utils.llm_calls.generate_document_summary import generate_document_summary
# Standalone function for workflow orchestrator
async def upload_and_summarize_files(
files: List[UploadFile]
) -> Dict[str, Any]:
"""
Upload files, generate a document summary, and return both summary and file paths.
"""
if not files:
raise ValueError("No files provided")
temp_dir = TEMP_FILE_SERVICE.create_temp_dir(get_random_uuid())
file_paths = []
for upload in files:
temp_path = TEMP_FILE_SERVICE.create_temp_file_path(upload.filename, temp_dir)
with open(temp_path, "wb") as f:
f.write(await upload.read())
file_paths.append(temp_path)
documents_loader = DocumentsLoader(file_paths=file_paths)
await documents_loader.load_documents(temp_dir)
summary = await generate_document_summary(documents_loader.documents)
return {
"summary": summary,
"file_paths": file_paths,
}

View file

@ -0,0 +1,24 @@
import sys
import os
import argparse
import asyncio
from app_mcp.server import create_mcp_server, uvicorn_config
async def main():
parser = argparse.ArgumentParser(description="Run the FastAPI server")
parser.add_argument(
"--port", type=int, default=8001, help="Port number to run the server on"
)
args = parser.parse_args()
mcp = create_mcp_server()
await mcp.run_async(
transport="http",
host="0.0.0.0",
port=args.port,
uvicorn_config=uvicorn_config
)
if __name__ == "__main__":
asyncio.run(main())

View file

@ -1,176 +0,0 @@
accelerate==1.9.0
aiohappyeyeballs==2.6.1
aiohttp==3.12.15
aiomysql==0.2.0
aiosignal==1.4.0
aiosqlite==0.21.0
annotated-types==0.7.0
anthropic==0.60.0
anyio==4.9.0
async-timeout==5.0.1
asyncpg==0.30.0
attrs==25.3.0
backoff==2.2.1
bcrypt==4.3.0
beautifulsoup4==4.13.4
build==1.3.0
cachetools==5.5.2
certifi==2025.8.3
cffi==1.17.1
charset-normalizer==3.4.2
chromadb==1.0.15
click==8.2.2
coloredlogs==15.0.1
cryptography==45.0.5
dill==0.4.0
distro==1.9.0
dnspython==2.7.0
docling==2.43.0
docling-core==2.44.1
docling-ibm-models==3.9.0
docling-parse==4.1.0
durationpy==0.10
easyocr==1.7.2
email-validator==2.2.0
et-xmlfile==2.0.0
fastapi==0.116.1
fastapi-cli==0.0.8
fastapi-cloud-cli==0.1.5
filelock==3.18.0
filetype==1.2.0
flatbuffers==25.2.10
frozenlist==1.7.0
fsspec==2025.7.0
google-auth==2.40.3
google-genai==1.28.0
googleapis-common-protos==1.70.0
greenlet==3.2.3
grpcio==1.74.0
h11==0.16.0
hf-xet==1.1.5
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
huggingface-hub==0.34.3
humanfriendly==10.0
idna==3.10
imageio==2.37.0
importlib-metadata==8.7.0
importlib-resources==6.5.2
jinja2==3.1.6
jiter==0.10.0
joblib==1.5.1
jsonlines==3.1.0
jsonref==1.1.0
jsonschema==4.25.0
jsonschema-specifications==2025.4.1
kubernetes==33.1.0
latex2mathml==3.78.0
lazy-loader==0.4
lxml==5.4.0
markdown-it-py==3.0.0
marko==2.1.4
markupsafe==3.0.1
mdurl==0.1.2
mmh3==5.2.0
mpire==2.10.2
mpmath==1.3.0
multidict==6.6.3
multiprocess==0.70.18
networkx==3.5
ninja==1.11.1.4
nltk==3.9.1
numpy==2.3.2
oauthlib==3.3.1
onnxruntime==1.22.1
openai==1.98.0
opencv-python-headless==4.11.0.86
openpyxl==3.1.5
opentelemetry-api==1.36.0
opentelemetry-exporter-otlp-proto-common==1.36.0
opentelemetry-exporter-otlp-proto-grpc==1.36.0
opentelemetry-proto==1.36.0
opentelemetry-sdk==1.36.0
opentelemetry-semantic-conventions==0.57b0
orjson==3.11.1
overrides==7.7.0
packaging==25.0
pandas==2.3.1
pathvalidate==3.3.1
pdfminer-six==20250506
pdfplumber==0.11.7
pillow==11.3.0
pluggy==1.6.0
posthog==5.4.0
propcache==0.3.2
protobuf==6.31.1
psutil==7.0.0
pyasn1==0.6.1
pyasn1-modules==0.4.2
pybase64==1.4.2
pyclipper==1.3.0.post6
pycparser==2.22
pydantic==2.11.7
pydantic-core==2.33.2
pydantic-settings==2.10.1
pygments==2.19.2
pylatexenc==2.10
pymysql==1.1.1
pypdfium2==4.30.0
pypika==0.48.9
pyproject-hooks==1.2.0
python-bidi==0.6.6
python-dateutil==2.9.0.post0
python-docx==1.2.0
python-dotenv==1.1.1
python-multipart==0.0.20
python-pptx==1.0.2
pytz==2025.2
pyyaml==6.0.2
redis==6.2.0
referencing==0.36.2
regex==2025.7.34
requests==2.32.4
requests-oauthlib==2.0.0
rich==14.1.0
rich-toolkit==0.14.9
rignore==0.6.4
rpds-py==0.26.0
rsa==4.9.1
rtree==1.4.0
safetensors==0.5.3
scikit-image==0.25.2
scipy==1.16.1
semchunk==2.2.2
sentry-sdk==2.34.1
shapely==2.1.1
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
soupsieve==2.7
sqlalchemy==2.0.42
sqlmodel==0.0.24
starlette==0.47.2
sympy==1.14.0
tabulate==0.9.0
tenacity==8.5.0
tifffile==2025.6.11
tokenizers==0.21.4
--extra-index-url https://download.pytorch.org/whl/cpu
torch==2.7.1+cpu
torchvision==0.22.1+cpu
tqdm==4.67.1
transformers==4.54.1
typer==0.16.0
typing-extensions==4.14.1
typing-inspection==0.4.1
tzdata==2025.2
urllib3==2.5.0
uvicorn==0.35.0
uvloop==0.21.0
watchfiles==1.1.0
websocket-client==1.8.0
websockets==15.0.1
xlsxwriter==3.2.5
yarl==1.20.1
zipp==3.23.0

View file

@ -0,0 +1,421 @@
import asyncio
import pytest
from fastmcp import FastMCP, Client
from app_mcp.tools.start_presentation import register_start_presentation
from app_mcp.tools.help_me import register_help_me
from app_mcp.tools.continue_workflow import register_continue_workflow
from app_mcp.tools.regenerate_outline import register_regenerate_outline
from app_mcp.tools.export_presentation import register_export_presentation
from app_mcp.tools.show_layouts import register_show_layouts
from app_mcp.tools.get_status import register_get_status
from app_mcp.tools.choose_layout import register_choose_layout
from app_mcp.services.state_machine.machine import PresentationStateMachine
from app_mcp.services.state_machine.context import StateContext
from unittest.mock import patch, MagicMock
@pytest.fixture
def mcp_server():
with patch("app_mcp.services.workflow_orchestrator.WorkflowOrchestrator") as MockOrchestrator:
mock_orchestrator = MockOrchestrator.return_value
mcp = FastMCP("TestServer")
#Mocking the StateContext Too
mock_context = StateContext()
mock_context.metadata = {}
mock_fsm = MagicMock(spec=PresentationStateMachine)
mock_fsm.context = mock_context
mock_orchestrator.get_session.return_value = mock_fsm
# Register all tool functions with the mocked orchestrator
register_start_presentation(mcp=mcp, orchestrator=mock_orchestrator)
register_help_me(mcp=mcp, orchestrator=mock_orchestrator)
register_continue_workflow(mcp=mcp, orchestrator=mock_orchestrator)
register_regenerate_outline(mcp=mcp, orchestrator=mock_orchestrator)
register_export_presentation(mcp=mcp, orchestrator=mock_orchestrator)
register_show_layouts(mcp=mcp, orchestrator=mock_orchestrator)
register_get_status(mcp=mcp, orchestrator=mock_orchestrator)
register_choose_layout(mcp=mcp, orchestrator=mock_orchestrator)
return mcp
# Grouped test classes for each tool
class TestStartPresentation:
"""
Tests for the start_presentation tool
"""
def test_success(self, mcp_server):
"""
Test successful start_presentation call with all required parameters.
Checks for correct status, session_id, and parameter values in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {
"session_id": "test_session",
"prompt": "Test Presentation",
"files": None,
"n_slides": 5,
"language": "English"
}
result = await client.call_tool("start_presentation", params)
assert result.data["status"] == "success"
assert result.data["session_id"] == "test_session"
assert "message" in result.data
assert "suggestion" in result.data
assert "next_step" in result.data
assert "parameters" in result.data
assert result.data["parameters"]["n_slides"] == 5
assert result.data["parameters"]["language"] == "English"
asyncio.run(run())
def test_missing_session_id(self, mcp_server):
"""
Test start_presentation with missing session_id.
Expects error status and appropriate error message.
"""
async def run():
async with Client(mcp_server) as client:
params = {"prompt": "Test Presentation", "session_id": ""}
result = await client.call_tool("start_presentation", params)
assert result.data["status"] == "error"
assert "Session ID is required" in result.data["error"]
asyncio.run(run())
def test_missing_prompt(self, mcp_server):
"""
Test start_presentation with missing prompt.
Expects error status and appropriate error message.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session", "prompt": ""}
result = await client.call_tool("start_presentation", params)
assert result.data["status"] == "error"
assert "Prompt is required" in result.data["error"]
asyncio.run(run())
def test_invalid_prompt_type(self, mcp_server):
"""
Test start_presentation with invalid prompt type (None).
Expects error status and appropriate error message.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session",
"prompt": ""}
result = await client.call_tool("start_presentation", params)
assert result.data["status"] == "error"
assert "Prompt is required" in result.data["error"]
asyncio.run(run())
class TestHelp:
"""
Tests for the help tool
"""
def test_help(self, mcp_server):
"""
Test help tool with no parameters.
Checks for info status and presence of help fields in response.
"""
async def run():
async with Client(mcp_server) as client:
result = await client.call_tool("help", {})
data = result.data
assert data["status"] == "info"
assert "message" in data
assert "workflow" in data
assert "helpful_commands" in data
assert "quick_start" in data
assert "tips" in data
assert "step_1" in data["workflow"]
assert "get_status" in data["helpful_commands"]
assert isinstance(data["tips"], list)
asyncio.run(run())
class TestContinueWorkflow:
"""
Tests for the continue_workflow tool
"""
def test_success(self, mcp_server):
"""
Test continue_workflow with valid session_id.
Checks for correct status and required fields in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session"}
result = await client.call_tool("continue_workflow", params)
data = result.data
assert "status" in data
assert data["status"] in ["success", "error", "info"]
if data["status"] == "success":
assert data["session_id"] == "test_session"
assert "next_step" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
def test_missing_session_id(self, mcp_server):
"""
Test continue_workflow with missing session_id.
Expects error status and appropriate error message.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": ""}
result = await client.call_tool("continue_workflow", params)
data = result.data
assert data["status"] == "error"
assert "Valid session_id is required" in data["error"]
asyncio.run(run())
class TestRegenerateOutline:
"""
Tests for the regenerate_outline tool
"""
def test_success(self, mcp_server):
"""
Test regenerate_outline with valid session_id.
Checks for correct status and required fields in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session"}
result = await client.call_tool("regenerate_outline", params)
data = result.data
assert "status" in data
assert data["status"] in ["success", "error"]
if data["status"] == "success":
assert "message" in data
assert "session_id" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
class TestExportPresentation:
"""
Tests for the export_presentation tool
"""
def test_success_pptx(self, mcp_server):
"""
Test export_presentation with format 'pptx'.
Checks for success status, correct session_id, and pptx path in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session", "format": "pptx"}
result = await client.call_tool("export_presentation", params)
data = result.data
assert "status" in data
if data["status"] == "success":
assert data["session_id"] == "test_session"
assert data["message"].endswith("PPTX!")
assert "path" in data
assert "suggestion" in data
assert "available_actions" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
def test_success_pdf(self, mcp_server):
"""
Test export_presentation with format 'pdf'.
Checks for success status, correct session_id, and pdf path in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session", "format": "pdf"}
result = await client.call_tool("export_presentation", params)
data = result.data
assert "status" in data
if data["status"] == "success":
assert data["session_id"] == "test_session"
assert data["message"].endswith("PDF!")
assert "path" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
def test_invalid_format(self, mcp_server):
"""
Test export_presentation with invalid format (not 'pdf' or 'pptx').
Expects error status and appropriate error message.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session", "format": "docx"}
result = await client.call_tool("export_presentation", params)
data = result.data
assert data["status"] == "error"
assert "Please choose either 'pdf' or 'pptx' format" in data["error"]
asyncio.run(run())
def test_missing_session_id(self, mcp_server):
"""
Test export_presentation with missing session_id.
Expects error status and session_id error in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "", "format": "pptx"}
result = await client.call_tool("export_presentation", params)
data = result.data
assert data["status"] == "error"
assert "session_id" in data
asyncio.run(run())
class TestShowLayouts:
"""
Tests for the show_layouts tool
"""
def test_success(self, mcp_server):
"""
Test show_layouts with valid session_id.
Checks for success status, layouts list, and suggestion in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session"}
result = await client.call_tool("show_layouts", params)
data = result.data
assert "status" in data
if data["status"] == "success":
assert data["session_id"] == "test_session"
assert "layouts" in data
assert isinstance(
data["layouts"], list) or data["layouts"] is not None
assert "message" in data
assert "suggestion" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
def test_missing_session_id(self, mcp_server):
"""
Test show_layouts with missing session_id.
Expects error status and session_id error in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": ""}
result = await client.call_tool("show_layouts", params)
data = result.data
assert data["status"] == "error"
assert "session_id" in data
asyncio.run(run())
class TestGetStatus:
"""
Tests for the get_status tool
"""
def test_success(self, mcp_server):
"""
Test get_status with valid session_id.
Checks for success status, progress, and context in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session"}
result = await client.call_tool("get_status", params)
data = result.data
assert "status" in data
if data["status"] == "success":
assert data["session_id"] == "test_session"
assert "current_step" in data
assert "progress" in data
assert "message" in data
assert "next_action" in data
assert "context" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
def test_missing_session_id(self, mcp_server):
"""
Test get_status with missing session_id.
Expects error status and appropriate error message.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": ""}
result = await client.call_tool("get_status", params)
data = result.data
assert data["status"] == "error"
assert "Valid session_id is required" in data["error"]
asyncio.run(run())
class TestChooseLayout:
"""
Tests for the choose_layout tool
"""
def test_success(self, mcp_server):
"""
Test choose_layout with valid session_id and layout_name.
Checks for success status, available actions, and suggestion in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session",
"layout_name": "default"}
result = await client.call_tool("choose_layout", params)
data = result.data
assert "status" in data
if data["status"] == "success":
assert data["session_id"] == "test_session"
assert "message" in data
assert "suggestion" in data
assert "available_actions" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())
def test_missing_session_id(self, mcp_server):
"""
Test choose_layout with missing session_id.
Expects error status and session_id error in response.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "", "layout_name": "default"}
result = await client.call_tool("choose_layout", params)
data = result.data
assert data["status"] == "error"
assert "session_id" in data
asyncio.run(run())
def test_missing_layout_name(self, mcp_server):
"""
Test choose_layout with missing layout_name.
Checks for error status if layout_name is required.
"""
async def run():
async with Client(mcp_server) as client:
params = {"session_id": "test_session", "layout_name": ""}
result = await client.call_tool("choose_layout", params)
data = result.data
assert "status" in data
if data["status"] == "error":
assert "error" in data
asyncio.run(run())

View file

@ -26,8 +26,8 @@ export async function GET(request: Request) {
waitUntil: "networkidle0",
timeout: 80000,
});
await page.waitForSelector("[data-layouts]", { timeout: 10000 });
await page.waitForSelector("[data-layouts]", { timeout: 30000 });
// Extract both data-layouts and data-group-settings attributes
const { dataLayouts, dataGroupSettings } = await page.$eval(

View file

@ -8,15 +8,15 @@ export const layoutName = 'Classic Dark Pie Chart and Metrics'
export const layoutDescription = 'A modern slide with dark background, metrics on the left, and pie chart visualization on the right.'
const chartDataSchema = z.object({
name: z.string().meta({ description: "Data point name" }),
name: z.string().min(2).max(30).meta({ description: "Data point name" }),
value: z.number().meta({ description: "Data point value" }),
});
const pieChartAndMetricsSchema = z.object({
title: z.string().min(3).max(100).default('Introduction to Nepal\'s Trade').meta({
title: z.string().min(3).max(80).default('Introduction to Nepal\'s Trade').meta({
description: "Main title of the slide",
}),
description: z.string().min(10).max(200).default('Nepal\'s landlocked geography heavily influences its trade, fostering reliance on India and China.').meta({
description: z.string().min(10).max(100).default('Nepal\'s landlocked geography heavily influences its trade, fostering reliance on India and China.').meta({
description: "Description text",
}),
metrics: z.array(z.object({
@ -37,13 +37,7 @@ const pieChartAndMetricsSchema = z.object({
{ name: 'Other GDP', value: 50.6 },
]).meta({
description: "Pie chart data",
}),
showLegend: z.boolean().default(true).meta({
description: "Whether to show chart legend",
}),
showTooltip: z.boolean().default(true).meta({
description: "Whether to show chart tooltip",
}),
})
})
const chartConfig = {
@ -70,7 +64,7 @@ interface PieChartAndMetricsLayoutProps {
}
const PieChartAndMetricsLayout: React.FC<PieChartAndMetricsLayoutProps> = ({ data: slideData }) => {
const { title, description, metrics, chartData, showLegend = true, showTooltip = true } = slideData;
const { title, description, metrics, chartData } = slideData;
const CustomLegend = () => (
<div className="flex justify-center space-x-8 mt-4">
@ -89,7 +83,7 @@ const PieChartAndMetricsLayout: React.FC<PieChartAndMetricsLayoutProps> = ({ dat
const renderPieChart = () => {
return (
<PieChart>
{showTooltip && <ChartTooltip content={<ChartTooltipContent />} />}
<ChartTooltip content={<ChartTooltipContent />} />
<Pie
data={chartData}
fill="#8b5cf6"
@ -149,7 +143,7 @@ const PieChartAndMetricsLayout: React.FC<PieChartAndMetricsLayoutProps> = ({ dat
<ChartContainer config={chartConfig} className="h-[500px] w-[500px]">
{renderPieChart()}
</ChartContainer>
{showLegend && <CustomLegend />}
<CustomLegend />
</div>
</div>
</div>

View file

@ -8,15 +8,15 @@ export const layoutName = 'Classic Dark Bar Graph'
export const layoutDescription = 'A modern slide with dark background, gradient title, bar chart visualization, and footer text.'
const barDataSchema = z.object({
name: z.string().meta({ description: "Product name" }),
name: z.string().min(2).max(30).meta({ description: "Product name" }),
value: z.number().meta({ description: "Export value in millions" }),
});
const barGraphSchema = z.object({
title: z.string().min(3).max(100).default('Export Overview: Key Products').meta({
title: z.string().min(3).max(80).default('Export Overview: Key Products').meta({
description: "Main title of the slide",
}),
description: z.string().min(10).max(150).default('Nepal\'s total exports were $1.3 billion in 2022, a 21% decrease from 2021, but showed a 47.5% YoY increase by Nov 2024.').meta({
description: z.string().min(10).max(120).default('Nepal\'s total exports were $1.3 billion in 2022, a 21% decrease from 2021, but showed a 47.5% YoY increase by Nov 2024.').meta({
description: "Description text",
}),
chartData: z.array(barDataSchema).min(2).max(6).default([

View file

@ -19,7 +19,7 @@ const comparisonSectionSchema = z.object({
});
const comparisonSchema = z.object({
title: z.string().min(3).max(100).default('Key Commodities in Focus').meta({
title: z.string().min(3).max(80).default('Key Commodities in Focus').meta({
description: "Main title of the slide",
}),
comparisonSections: z.array(comparisonSectionSchema).min(2).max(2).default([

View file

@ -14,10 +14,10 @@ const metricItemSchema = z.object({
});
const metricsSchema = z.object({
title: z.string().min(3).max(100).default('Top Export Destinations').meta({
title: z.string().min(3).max(80).default('Top Export Destinations').meta({
description: "Main title of the slide",
}),
description: z.string().min(10).max(200).default('Nepal exports 760 products to 132 countries, with a strong focus on regional trade.').meta({
description: z.string().min(10).max(120).default('Nepal exports 760 products to 132 countries, with a strong focus on regional trade.').meta({
description: "Description text",
}),
metrics: z.array(metricItemSchema).min(2).max(6).default([

View file

@ -7,12 +7,12 @@ export const layoutName = 'Classic Dark Bullet Point with Description'
export const layoutDescription = 'A modern slide with dark background, image on the left (2/5), and bullet points with descriptions in boxes on the right (3/5).'
const bulletPointSchema = z.object({
title: z.string().min(3).max(80).meta({ description: "Bullet point title" }),
content: z.string().min(10).max(150).meta({ description: "Bullet point content (max 150 characters)" }),
title: z.string().min(3).max(60).meta({ description: "Bullet point title" }),
content: z.string().min(10).max(120).meta({ description: "Bullet point content (max 150 characters)" }),
});
const bulletPointWithDescriptionSchema = z.object({
title: z.string().min(3).max(100).default('Trade Policies and Challenges').meta({
title: z.string().min(3).max(80).default('Trade Policies and Challenges').meta({
description: "Main title of the slide",
}),
bulletPoints: z.array(bulletPointSchema).min(2).max(3).default([

View file

@ -18,6 +18,8 @@ const canChangeKeys = process.env.CAN_CHANGE_KEYS !== 'false';
const fastapiPort = 8000;
const nextjsPort = 3000;
const appmcpPort = 8001;
const userConfigPath = join(process.env.APP_DATA_DIRECTORY, 'userConfig.json');
const userDataDir = dirname(userConfigPath);
@ -48,13 +50,13 @@ process.env.USER_CONFIG_PATH = userConfigPath;
//? UserConfig is only setup if API Keys can be changed
const setupUserConfigFromEnv = () => {
let existingConfig = {};
if (existsSync(userConfigPath)) {
existingConfig = JSON.parse(readFileSync(userConfigPath, 'utf8'));
}
if (!['ollama', 'openai', 'google'].includes(existingConfig.LLM)) {
if (!["ollama", "openai", "google"].includes(existingConfig.LLM)) {
existingConfig.LLM = undefined;
}
@ -69,19 +71,22 @@ const setupUserConfigFromEnv = () => {
ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY || existingConfig.ANTHROPIC_API_KEY,
ANTHROPIC_MODEL: process.env.ANTHROPIC_MODEL || existingConfig.ANTHROPIC_MODEL,
CUSTOM_LLM_URL: process.env.CUSTOM_LLM_URL || existingConfig.CUSTOM_LLM_URL,
CUSTOM_LLM_API_KEY: process.env.CUSTOM_LLM_API_KEY || existingConfig.CUSTOM_LLM_API_KEY,
CUSTOM_LLM_API_KEY:
process.env.CUSTOM_LLM_API_KEY || existingConfig.CUSTOM_LLM_API_KEY,
CUSTOM_MODEL: process.env.CUSTOM_MODEL || existingConfig.CUSTOM_MODEL,
PEXELS_API_KEY: process.env.PEXELS_API_KEY || existingConfig.PEXELS_API_KEY,
PIXABAY_API_KEY: process.env.PIXABAY_API_KEY || existingConfig.PIXABAY_API_KEY,
PIXABAY_API_KEY:
process.env.PIXABAY_API_KEY || existingConfig.PIXABAY_API_KEY,
IMAGE_PROVIDER: process.env.IMAGE_PROVIDER || existingConfig.IMAGE_PROVIDER,
EXTENDED_REASONING: process.env.EXTENDED_REASONING || existingConfig.EXTENDED_REASONING,
USE_CUSTOM_URL: process.env.USE_CUSTOM_URL || existingConfig.USE_CUSTOM_URL,
};
writeFileSync(userConfigPath, JSON.stringify(userConfig));
}
const startServers = async () => {
const startServers = async () => {
const fastApiProcess = spawn(
"python",
["server.py", "--port", fastapiPort.toString(), "--reload", isDev],
@ -89,13 +94,27 @@ const startServers = async () => {
cwd: fastapiDir,
stdio: "inherit",
env: process.env,
}
},
);
fastApiProcess.on("error", err => {
fastApiProcess.on("error", (err) => {
console.error("FastAPI process failed to start:", err);
});
const appmcpProcess = spawn(
"python",
["mcp_server.py", "--port", appmcpPort.toString()],
{
cwd: fastapiDir,
stdio: "inherit",
env: process.env,
},
);
appmcpProcess.on("error", (err) => {
console.error("App MCP process failed to start:", err);
});
const nextjsProcess = spawn(
"npm",
["run", isDev ? "dev" : "start", "--", "-p", nextjsPort.toString()],
@ -103,10 +122,10 @@ const startServers = async () => {
cwd: nextjsDir,
stdio: "inherit",
env: process.env,
}
},
);
nextjsProcess.on("error", err => {
nextjsProcess.on("error", (err) => {
console.error("Next.js process failed to start:", err);
});
@ -140,6 +159,7 @@ const startServers = async () => {
// Keep the Node process alive until both servers exit
const exitCode = await Promise.race([
new Promise(resolve => fastApiProcess.on("exit", resolve)),
new Promise(resolve => nextjsProcess.on("exit", resolve)),
new Promise(resolve => ollamaProcess.on("exit", resolve)),