Add background podcast generation and UX improvements
Features: - Background podcast generation (navigate away while processing!) - Podcast status tracking (pending/in_progress/completed/failed) - Database-backed task queue (no external dependencies) - Podcast length control with slider (5-30 minutes) - Fixed podcast delete to update UI immediately - Collapsible source citations in chat (dropdown with count) - Real-time status checking for background tasks Technical: - Created background_tasks.py module - Added background_tasks table to database - Threading-based background execution - Task status tracking: PENDING, IN_PROGRESS, COMPLETED, FAILED - Proper database session management in shared notebooks - Fixed SQLAlchemy DetachedInstanceError in sharing view UX Improvements: - "Generate in Background" button instead of blocking - Status indicator shows when podcast is generating - "Check Status" refresh button - Sources collapsed by default with count badge - Podcast delete shows success message before rerun - Can navigate away during long-running operations 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
parent
b7cd5a58c7
commit
ce09d57cef
4 changed files with 293 additions and 61 deletions
16
server.log
16
server.log
|
|
@ -28,3 +28,19 @@ INFO:mcp.server.lowlevel.server:Warning: PydanticDeprecatedSince20: The `parse_o
|
|||
INFO: 127.0.0.1:56552 - "DELETE /mcp HTTP/1.1" 307 Temporary Redirect
|
||||
INFO:mcp.server.streamable_http:Terminating session: a97441f5cebf4f8b9de93a1c59333123
|
||||
INFO: 127.0.0.1:56552 - "DELETE /mcp/ HTTP/1.1" 200 OK
|
||||
INFO: 127.0.0.1:58570 - "POST /mcp HTTP/1.1" 307 Temporary Redirect
|
||||
INFO:mcp.server.streamable_http_manager:Created new transport with session ID: 403d8c9b18ba43e98eb1ca532b7fbe2d
|
||||
INFO: 127.0.0.1:58570 - "POST /mcp/ HTTP/1.1" 200 OK
|
||||
INFO: 127.0.0.1:58573 - "POST /mcp HTTP/1.1" 307 Temporary Redirect
|
||||
INFO: 127.0.0.1:58574 - "GET /mcp HTTP/1.1" 307 Temporary Redirect
|
||||
INFO: 127.0.0.1:58573 - "POST /mcp/ HTTP/1.1" 202 Accepted
|
||||
INFO: 127.0.0.1:58574 - "GET /mcp/ HTTP/1.1" 200 OK
|
||||
INFO: 127.0.0.1:58576 - "POST /mcp HTTP/1.1" 307 Temporary Redirect
|
||||
INFO: 127.0.0.1:58576 - "POST /mcp/ HTTP/1.1" 200 OK
|
||||
INFO:mcp.server.lowlevel.server:Processing request of type CallToolRequest
|
||||
INFO:httpx:HTTP Request: POST https://api.cloud.llamaindex.ai/api/v1/pipelines/884e242c-86dd-4824-8347-e6dfb91d98dc/retrieve "HTTP/1.1 200 OK"
|
||||
INFO:httpx:HTTP Request: POST https://api.openai.com/v1/responses "HTTP/1.1 200 OK"
|
||||
INFO:mcp.server.lowlevel.server:Warning: PydanticDeprecatedSince20: The `parse_obj` method is deprecated; use `model_validate` instead. Deprecated in Pydantic V2.0 to be removed in V3.0. See Pydantic V2 Migration Guide at https://errors.pydantic.dev/2.11/migration/
|
||||
INFO: 127.0.0.1:58582 - "DELETE /mcp HTTP/1.1" 307 Temporary Redirect
|
||||
INFO:mcp.server.streamable_http:Terminating session: 403d8c9b18ba43e98eb1ca532b7fbe2d
|
||||
INFO: 127.0.0.1:58582 - "DELETE /mcp/ HTTP/1.1" 200 OK
|
||||
|
|
|
|||
221
src/notebookllama/background_tasks.py
Normal file
221
src/notebookllama/background_tasks.py
Normal file
|
|
@ -0,0 +1,221 @@
|
|||
"""
|
||||
Simple background task system using database as queue
|
||||
No external dependencies like Celery needed
|
||||
"""
|
||||
|
||||
from sqlalchemy import Column, Integer, String, Text, DateTime, Enum as SQLEnum
|
||||
from database import Base, engine, get_db
|
||||
from datetime import datetime
|
||||
import enum
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
import asyncio
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class TaskStatus(enum.Enum):
|
||||
PENDING = "pending"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
|
||||
|
||||
class BackgroundTask(Base):
|
||||
__tablename__ = "background_tasks"
|
||||
|
||||
id = Column(Integer, primary_key=True, index=True)
|
||||
task_type = Column(String(100), nullable=False) # 'podcast_generation', etc.
|
||||
status = Column(SQLEnum(TaskStatus), nullable=False, default=TaskStatus.PENDING)
|
||||
notebook_id = Column(Integer, nullable=True)
|
||||
user_id = Column(Integer, nullable=False)
|
||||
parameters = Column(Text, nullable=True) # JSON
|
||||
result = Column(Text, nullable=True) # JSON
|
||||
error_message = Column(Text, nullable=True)
|
||||
created_at = Column(DateTime, default=datetime.utcnow)
|
||||
started_at = Column(DateTime, nullable=True)
|
||||
completed_at = Column(DateTime, nullable=True)
|
||||
|
||||
|
||||
# Create table if it doesn't exist
|
||||
Base.metadata.create_all(bind=engine, tables=[BackgroundTask.__table__])
|
||||
|
||||
|
||||
def create_task(task_type: str, user_id: int, notebook_id: Optional[int] = None, parameters: dict = None) -> BackgroundTask:
|
||||
"""Create a background task"""
|
||||
db = get_db()
|
||||
try:
|
||||
task = BackgroundTask(
|
||||
task_type=task_type,
|
||||
user_id=user_id,
|
||||
notebook_id=notebook_id,
|
||||
parameters=json.dumps(parameters) if parameters else None,
|
||||
status=TaskStatus.PENDING
|
||||
)
|
||||
db.add(task)
|
||||
db.commit()
|
||||
db.refresh(task)
|
||||
return task
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def get_task_status(task_id: int) -> Optional[BackgroundTask]:
|
||||
"""Get task status"""
|
||||
db = get_db()
|
||||
try:
|
||||
task = db.query(BackgroundTask).filter(BackgroundTask.id == task_id).first()
|
||||
return task
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def update_task_status(task_id: int, status: TaskStatus, result: dict = None, error: str = None):
|
||||
"""Update task status"""
|
||||
db = get_db()
|
||||
try:
|
||||
task = db.query(BackgroundTask).filter(BackgroundTask.id == task_id).first()
|
||||
if task:
|
||||
task.status = status
|
||||
if status == TaskStatus.IN_PROGRESS:
|
||||
task.started_at = datetime.utcnow()
|
||||
elif status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
|
||||
task.completed_at = datetime.utcnow()
|
||||
if result:
|
||||
task.result = json.dumps(result)
|
||||
if error:
|
||||
task.error_message = error
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def get_user_tasks(user_id: int, task_type: Optional[str] = None) -> list:
|
||||
"""Get all tasks for a user"""
|
||||
db = get_db()
|
||||
try:
|
||||
query = db.query(BackgroundTask).filter(BackgroundTask.user_id == user_id)
|
||||
if task_type:
|
||||
query = query.filter(BackgroundTask.task_type == task_type)
|
||||
tasks = query.order_by(BackgroundTask.created_at.desc()).limit(50).all()
|
||||
# Return as dicts to avoid session issues
|
||||
return [{
|
||||
'id': t.id,
|
||||
'task_type': t.task_type,
|
||||
'status': t.status,
|
||||
'notebook_id': t.notebook_id,
|
||||
'created_at': t.created_at,
|
||||
'completed_at': t.completed_at,
|
||||
'error_message': t.error_message,
|
||||
'result': json.loads(t.result) if t.result else None
|
||||
} for t in tasks]
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
def get_notebook_podcast_task(notebook_id: int) -> Optional[dict]:
|
||||
"""Get the latest podcast generation task for a notebook"""
|
||||
db = get_db()
|
||||
try:
|
||||
task = db.query(BackgroundTask).filter(
|
||||
BackgroundTask.notebook_id == notebook_id,
|
||||
BackgroundTask.task_type == 'podcast_generation'
|
||||
).order_by(BackgroundTask.created_at.desc()).first()
|
||||
|
||||
if task:
|
||||
return {
|
||||
'id': task.id,
|
||||
'status': task.status,
|
||||
'created_at': task.created_at,
|
||||
'completed_at': task.completed_at,
|
||||
'error_message': task.error_message,
|
||||
'result': json.loads(task.result) if task.result else None
|
||||
}
|
||||
return None
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
|
||||
async def execute_podcast_task(task_id: int):
|
||||
"""Execute podcast generation task in background"""
|
||||
from notebook_manager import get_notebook_by_id, get_notebook_documents, save_notebook_podcast
|
||||
from document_manager import get_latest_document_summary
|
||||
from notebook_synthesis import generate_podcast_outline, generate_podcast_script_from_outline
|
||||
from audio import PODCAST_GEN
|
||||
import json
|
||||
|
||||
update_task_status(task_id, TaskStatus.IN_PROGRESS)
|
||||
|
||||
task = get_task_status(task_id)
|
||||
if not task:
|
||||
return
|
||||
|
||||
params = json.loads(task.parameters) if task.parameters else {}
|
||||
notebook_id = task.notebook_id
|
||||
target_length = params.get('target_length', 10)
|
||||
|
||||
try:
|
||||
documents = get_notebook_documents(notebook_id)
|
||||
|
||||
# Prepare document data
|
||||
doc_data = []
|
||||
combined_content = ""
|
||||
|
||||
for doc in documents:
|
||||
summary = get_latest_document_summary(doc.id)
|
||||
if summary:
|
||||
doc_data.append({
|
||||
'filename': doc.original_filename,
|
||||
'summary': summary.summary,
|
||||
'highlights': json.loads(summary.highlights) if summary.highlights else [],
|
||||
'questions': json.loads(summary.questions) if summary.questions else [],
|
||||
'answers': json.loads(summary.answers) if summary.answers else []
|
||||
})
|
||||
if summary.md_content:
|
||||
combined_content += f"\n\n## {doc.original_filename}\n{summary.md_content}"
|
||||
|
||||
if doc_data and combined_content:
|
||||
# Generate outline
|
||||
outline = await generate_podcast_outline(doc_data, target_length)
|
||||
|
||||
if outline:
|
||||
# Generate script
|
||||
script = await generate_podcast_script_from_outline(outline, combined_content)
|
||||
|
||||
# Generate audio
|
||||
audio_file = await PODCAST_GEN.create_conversation(file_transcript=script)
|
||||
|
||||
# Save to notebook
|
||||
save_notebook_podcast(notebook_id, audio_file)
|
||||
|
||||
# Update task as completed
|
||||
update_task_status(task_id, TaskStatus.COMPLETED, result={'audio_file': audio_file})
|
||||
else:
|
||||
update_task_status(task_id, TaskStatus.FAILED, error="Failed to generate outline")
|
||||
else:
|
||||
update_task_status(task_id, TaskStatus.FAILED, error="No content available")
|
||||
|
||||
except Exception as e:
|
||||
update_task_status(task_id, TaskStatus.FAILED, error=str(e))
|
||||
|
||||
|
||||
def run_background_task(task_id: int):
|
||||
"""Run task in a separate thread"""
|
||||
asyncio.run(execute_podcast_task(task_id))
|
||||
|
||||
|
||||
def start_podcast_generation_background(notebook_id: int, user_id: int, target_length: int = 10) -> int:
|
||||
"""Start podcast generation in background and return task ID"""
|
||||
# Create task
|
||||
task = create_task(
|
||||
task_type='podcast_generation',
|
||||
user_id=user_id,
|
||||
notebook_id=notebook_id,
|
||||
parameters={'target_length': target_length}
|
||||
)
|
||||
|
||||
# Start in background thread
|
||||
thread = threading.Thread(target=run_background_task, args=(task.id,), daemon=True)
|
||||
thread.start()
|
||||
|
||||
return task.id
|
||||
|
|
@ -24,6 +24,7 @@ from database import get_db, DocumentShare, PermissionLevel
|
|||
from workflow import NotebookLMWorkflow, FileInputEvent, NotebookOutputEvent
|
||||
from audio import PODCAST_GEN
|
||||
from notebook_synthesis import generate_notebook_synthesis, generate_podcast_outline, generate_podcast_script_from_outline
|
||||
from background_tasks import start_podcast_generation_background, get_notebook_podcast_task, TaskStatus
|
||||
|
||||
require_auth()
|
||||
user = get_current_user()
|
||||
|
|
@ -204,6 +205,8 @@ if st.session_state.get("sharing_notebook_id") == notebook.id:
|
|||
st.markdown("---")
|
||||
|
||||
# Generate podcast section
|
||||
podcast_task = get_notebook_podcast_task(notebook.id)
|
||||
|
||||
if st.session_state.get("generating_podcast") == notebook.id:
|
||||
st.subheader("🎙️ Generate Podcast")
|
||||
|
||||
|
|
@ -225,50 +228,16 @@ if st.session_state.get("generating_podcast") == notebook.id:
|
|||
|
||||
col_p1, col_p2 = st.columns(2)
|
||||
with col_p1:
|
||||
if st.button("Generate Now", type="primary", key="generate_podcast_btn"):
|
||||
with st.spinner("Generating podcast... This may take several minutes."):
|
||||
try:
|
||||
# Prepare document data
|
||||
doc_data = []
|
||||
combined_content = ""
|
||||
|
||||
for doc in documents:
|
||||
summary = get_latest_document_summary(doc.id)
|
||||
if summary:
|
||||
doc_data.append({
|
||||
'filename': doc.original_filename,
|
||||
'summary': summary.summary,
|
||||
'highlights': json.loads(summary.highlights) if summary.highlights else [],
|
||||
'questions': json.loads(summary.questions) if summary.questions else [],
|
||||
'answers': json.loads(summary.answers) if summary.answers else []
|
||||
})
|
||||
if summary.md_content:
|
||||
combined_content += f"\n\n## {doc.original_filename}\n{summary.md_content}"
|
||||
|
||||
if doc_data and combined_content:
|
||||
# Generate structured outline first
|
||||
st.info("Step 1/3: Creating podcast outline...")
|
||||
outline = asyncio.run(generate_podcast_outline(doc_data, target_length))
|
||||
|
||||
if outline:
|
||||
st.info(f"Step 2/3: Generating {target_length}-minute conversation script...")
|
||||
# Generate script from outline
|
||||
script = asyncio.run(generate_podcast_script_from_outline(outline, combined_content))
|
||||
|
||||
st.info("Step 3/3: Converting to audio...")
|
||||
# Generate audio
|
||||
audio_file = asyncio.run(PODCAST_GEN.create_conversation(file_transcript=script))
|
||||
|
||||
save_notebook_podcast(notebook.id, audio_file)
|
||||
st.success(f"✅ {target_length}-minute podcast generated!")
|
||||
del st.session_state.generating_podcast
|
||||
st.rerun()
|
||||
else:
|
||||
st.error("Failed to generate podcast outline")
|
||||
else:
|
||||
st.error("No content available for podcast generation")
|
||||
except Exception as e:
|
||||
st.error(f"Error: {str(e)}")
|
||||
if st.button("Generate in Background", type="primary", key="generate_podcast_btn"):
|
||||
try:
|
||||
# Start background task
|
||||
task_id = start_podcast_generation_background(notebook.id, user.id, target_length)
|
||||
st.success(f"✅ Podcast generation started in background! (Task #{task_id})")
|
||||
st.info("💡 You can navigate away and come back later. The podcast will be ready in 3-5 minutes.")
|
||||
del st.session_state.generating_podcast
|
||||
st.rerun()
|
||||
except Exception as e:
|
||||
st.error(f"Error starting task: {str(e)}")
|
||||
|
||||
with col_p2:
|
||||
if st.button("Cancel", key="cancel_podcast_btn"):
|
||||
|
|
@ -277,6 +246,19 @@ if st.session_state.get("generating_podcast") == notebook.id:
|
|||
|
||||
st.markdown("---")
|
||||
|
||||
# Show podcast generation status if task exists
|
||||
elif podcast_task and podcast_task['status'] in [TaskStatus.PENDING, TaskStatus.IN_PROGRESS]:
|
||||
st.info(f"🎙️ Podcast generation in progress... Started at {podcast_task['created_at'].strftime('%H:%M')}")
|
||||
if st.button("🔄 Check Status", key="check_podcast_status"):
|
||||
st.rerun()
|
||||
st.markdown("---")
|
||||
elif podcast_task and podcast_task['status'] == TaskStatus.FAILED:
|
||||
st.error(f"❌ Podcast generation failed: {podcast_task.get('error_message', 'Unknown error')}")
|
||||
if st.button("Try Again", key="retry_podcast"):
|
||||
st.session_state.generating_podcast = notebook.id
|
||||
st.rerun()
|
||||
st.markdown("---")
|
||||
|
||||
# Main content
|
||||
documents = get_notebook_documents(notebook.id)
|
||||
|
||||
|
|
@ -415,10 +397,22 @@ if notebook.podcast_path and os.path.exists(notebook.podcast_path):
|
|||
audio_bytes = f.read()
|
||||
st.audio(audio_bytes, format="audio/mp3")
|
||||
|
||||
if st.button("🗑️ Delete Podcast"):
|
||||
if st.button("🗑️ Delete Podcast", key="delete_podcast_btn"):
|
||||
try:
|
||||
os.remove(notebook.podcast_path)
|
||||
save_notebook_podcast(notebook.id, None)
|
||||
# Update notebook to remove podcast path
|
||||
from notebook_manager import update_notebook
|
||||
from database import get_db, Notebook
|
||||
db = get_db()
|
||||
try:
|
||||
nb = db.query(Notebook).filter(Notebook.id == notebook.id).first()
|
||||
if nb:
|
||||
nb.podcast_path = None
|
||||
nb.podcast_generated_at = None
|
||||
db.commit()
|
||||
finally:
|
||||
db.close()
|
||||
st.success("Podcast deleted!")
|
||||
st.rerun()
|
||||
except:
|
||||
pass
|
||||
except Exception as e:
|
||||
st.error(f"Error deleting podcast: {e}")
|
||||
|
|
|
|||
|
|
@ -105,16 +105,16 @@ for message in st.session_state.messages:
|
|||
with st.chat_message(message["role"]):
|
||||
st.markdown(message["content"])
|
||||
if message["role"] == "assistant" and message.get("sources"):
|
||||
# Enhanced source display
|
||||
st.markdown("---")
|
||||
st.caption("📚 Sources cited:")
|
||||
# Parse sources and display as badges
|
||||
# Collapsible source display
|
||||
sources_text = message["sources"]
|
||||
if "## Sources" in sources_text:
|
||||
sources_list = sources_text.split("## Sources")[1].strip().split("\n- ")
|
||||
for source in sources_list[1:]: # Skip first empty element
|
||||
if source.strip():
|
||||
st.markdown(f"🔗 {source.strip()}")
|
||||
source_count = len([s for s in sources_list[1:] if s.strip()])
|
||||
|
||||
with st.expander(f"📚 View {source_count} source(s)", expanded=False):
|
||||
for source in sources_list[1:]:
|
||||
if source.strip():
|
||||
st.markdown(f"🔗 {source.strip()}")
|
||||
|
||||
# Chat input
|
||||
if prompt := st.chat_input("Ask a question about your notebook..."):
|
||||
|
|
@ -140,13 +140,14 @@ if prompt := st.chat_input("Ask a question about your notebook..."):
|
|||
st.markdown(main_response)
|
||||
|
||||
if sources:
|
||||
# Enhanced inline source display
|
||||
st.markdown("---")
|
||||
st.caption("📚 Sources cited:")
|
||||
# Collapsible source display
|
||||
sources_list = sources.split("## Sources")[1].strip().split("\n- ") if "## Sources" in sources else []
|
||||
for source in sources_list[1:]:
|
||||
if source.strip():
|
||||
st.markdown(f"🔗 {source.strip()}")
|
||||
source_count = len([s for s in sources_list[1:] if s.strip()])
|
||||
|
||||
with st.expander(f"📚 View {source_count} source(s)", expanded=False):
|
||||
for source in sources_list[1:]:
|
||||
if source.strip():
|
||||
st.markdown(f"🔗 {source.strip()}")
|
||||
|
||||
st.session_state.messages.append({
|
||||
"role": "assistant",
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue