sandbox-notebookllamalm/src/notebookllama/background_tasks.py
DJP 030cd2ef30 Add Write+Share permission and voice selection
Permission System Upgrade:
- Added SHARE permission level to database enum
- Three permission tiers now:
  * READ: View and chat only
  * WRITE: View, chat, add documents, generate podcasts
  * SHARE (Write+Share): All of WRITE + can share with others
- Only owners can grant SHARE permission
- Users with SHARE can add more collaborators

Voice Selection for Podcasts:
- Added 8 popular ElevenLabs voices to choose from
- Speaker 1 (Analytical): Brian, Adam, Charlie, Daniel
- Speaker 2 (Explanatory): Sarah, Bella, Charlotte, Emily
- Voice IDs stored in task parameters
- Passed through to audio generation
- Default: Brian + Sarah (original voices)

Voice Options:
- Brian (Default Male) - nPczCjzI2devNBz1zQrb
- Sarah (Default Female) - Xb7hH8MSUJpSbSDYk0k2
- Adam (Deep Male) - pNInz6obpgDQGcFmaJgB
- Bella (Friendly Female) - EXAVITQu4vr4xnSDxMaL
- Charlie (Casual Male) - IKne3meq5aSn9XLyUdCD
- Charlotte (Professional Female) - XB0fDUnXU5powFXDhCwa
- Daniel (British Male) - onwK4e9ZLuTAKqWW03F9
- Emily (Warm Female) - LcfcDJNUP1GQjkzn1xUU

UI Updates:
- Voice selection dropdowns in podcast generation
- Helpful descriptions for each speaker role
- Permission level shown as "Write + Share" in UI
- Help text explains each permission tier
- Share button visible for users with SHARE permission

Technical:
- Updated audio.py to accept voice parameters
- Updated background_tasks.py to pass voices
- Database enum extended
- Permission checks updated throughout

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-01 18:15:26 -04:00

345 lines
12 KiB
Python

"""
Simple background task system using database as queue
No external dependencies like Celery needed
"""
from sqlalchemy import Column, Integer, String, Text, DateTime, Enum as SQLEnum
from database import Base, engine, get_db
from datetime import datetime
import enum
import json
import threading
import time
import asyncio
from typing import Optional
class TaskStatus(enum.Enum):
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
class BackgroundTask(Base):
__tablename__ = "background_tasks"
id = Column(Integer, primary_key=True, index=True)
task_type = Column(String(100), nullable=False) # 'podcast_generation', etc.
status = Column(SQLEnum(TaskStatus), nullable=False, default=TaskStatus.PENDING)
notebook_id = Column(Integer, nullable=True)
user_id = Column(Integer, nullable=False)
parameters = Column(Text, nullable=True) # JSON
result = Column(Text, nullable=True) # JSON
error_message = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
# Create table if it doesn't exist
Base.metadata.create_all(bind=engine, tables=[BackgroundTask.__table__])
def create_task(task_type: str, user_id: int, notebook_id: Optional[int] = None, parameters: dict = None) -> BackgroundTask:
"""Create a background task"""
db = get_db()
try:
task = BackgroundTask(
task_type=task_type,
user_id=user_id,
notebook_id=notebook_id,
parameters=json.dumps(parameters) if parameters else None,
status=TaskStatus.PENDING
)
db.add(task)
db.commit()
db.refresh(task)
return task
finally:
db.close()
def get_task_status(task_id: int) -> Optional[BackgroundTask]:
"""Get task status"""
db = get_db()
try:
task = db.query(BackgroundTask).filter(BackgroundTask.id == task_id).first()
return task
finally:
db.close()
def update_task_status(task_id: int, status: TaskStatus, result: dict = None, error: str = None):
"""Update task status"""
db = get_db()
try:
task = db.query(BackgroundTask).filter(BackgroundTask.id == task_id).first()
if task:
task.status = status
if status == TaskStatus.IN_PROGRESS:
task.started_at = datetime.utcnow()
elif status in [TaskStatus.COMPLETED, TaskStatus.FAILED]:
task.completed_at = datetime.utcnow()
if result:
task.result = json.dumps(result)
if error:
task.error_message = error
db.commit()
finally:
db.close()
def get_user_tasks(user_id: int, task_type: Optional[str] = None) -> list:
"""Get all tasks for a user"""
db = get_db()
try:
query = db.query(BackgroundTask).filter(BackgroundTask.user_id == user_id)
if task_type:
query = query.filter(BackgroundTask.task_type == task_type)
tasks = query.order_by(BackgroundTask.created_at.desc()).limit(50).all()
# Return as dicts to avoid session issues
return [{
'id': t.id,
'task_type': t.task_type,
'status': t.status,
'notebook_id': t.notebook_id,
'created_at': t.created_at,
'completed_at': t.completed_at,
'error_message': t.error_message,
'result': json.loads(t.result) if t.result else None
} for t in tasks]
finally:
db.close()
def get_notebook_podcast_task(notebook_id: int) -> Optional[dict]:
"""Get the latest podcast generation task for a notebook"""
db = get_db()
try:
task = db.query(BackgroundTask).filter(
BackgroundTask.notebook_id == notebook_id,
BackgroundTask.task_type == 'podcast_generation'
).order_by(BackgroundTask.created_at.desc()).first()
if task:
return {
'id': task.id,
'status': task.status,
'created_at': task.created_at,
'completed_at': task.completed_at,
'error_message': task.error_message,
'result': json.loads(task.result) if task.result else None
}
return None
finally:
db.close()
async def execute_document_processing_task(task_id: int):
"""Execute document processing task in background"""
from notebook_manager import get_notebook_by_id, add_document_to_notebook
from document_manager import create_document, create_document_summary
from utils import process_file as direct_process_file
import json
import os
update_task_status(task_id, TaskStatus.IN_PROGRESS)
task = get_task_status(task_id)
if not task:
return
params = json.loads(task.parameters) if task.parameters else {}
file_path = params.get('file_path')
original_filename = params.get('original_filename')
notebook_id = task.notebook_id
user_id = task.user_id
try:
# Create document record
document = create_document(user_id, file_path, original_filename)
if not document:
update_task_status(task_id, TaskStatus.FAILED, error="Failed to create document record")
return
# Add to notebook
add_document_to_notebook(notebook_id, document.id)
# Process with LlamaCloud
notebook_json, md_text = await direct_process_file(filename=file_path)
if notebook_json:
notebook_data = json.loads(notebook_json)
# Save summary
create_document_summary(
user_id, document.id,
notebook_data.get('summary', ''),
notebook_data.get('highlights', []),
notebook_data.get('questions', []),
notebook_data.get('answers', []),
md_text or ''
)
# Clean up temp file
if os.path.exists(file_path):
os.remove(file_path)
update_task_status(task_id, TaskStatus.COMPLETED, result={'document_id': document.id})
else:
update_task_status(task_id, TaskStatus.FAILED, error="Processing failed")
except Exception as e:
update_task_status(task_id, TaskStatus.FAILED, error=str(e))
# Clean up temp file on error
if file_path and os.path.exists(file_path):
os.remove(file_path)
async def execute_podcast_task(task_id: int):
"""Execute podcast generation task in background"""
from notebook_manager import get_notebook_by_id, get_notebook_documents, save_notebook_podcast
from document_manager import get_latest_document_summary
from notebook_synthesis import generate_podcast_outline, generate_podcast_script_from_outline
from audio import PODCAST_GEN
import json
update_task_status(task_id, TaskStatus.IN_PROGRESS)
task = get_task_status(task_id)
if not task:
return
params = json.loads(task.parameters) if task.parameters else {}
notebook_id = task.notebook_id
target_length = params.get('target_length', 10)
custom_theme = params.get('custom_theme')
custom_prompt = params.get('custom_prompt')
voice1_id = params.get('voice1_id')
voice2_id = params.get('voice2_id')
try:
documents = get_notebook_documents(notebook_id)
# Prepare document data
doc_data = []
combined_content = ""
for doc in documents:
summary = get_latest_document_summary(doc.id)
if summary:
doc_data.append({
'filename': doc.original_filename,
'summary': summary.summary,
'highlights': json.loads(summary.highlights) if summary.highlights else [],
'questions': json.loads(summary.questions) if summary.questions else [],
'answers': json.loads(summary.answers) if summary.answers else []
})
if summary.md_content:
combined_content += f"\n\n## {doc.original_filename}\n{summary.md_content}"
if doc_data and combined_content:
# Generate outline with custom theme/prompt
outline = await generate_podcast_outline(doc_data, target_length, custom_theme, custom_prompt)
if outline:
# Generate script
script = await generate_podcast_script_from_outline(outline, combined_content)
# Generate audio with selected voices
audio_file = await PODCAST_GEN.create_conversation(
file_transcript=script,
voice1_id=voice1_id,
voice2_id=voice2_id
)
# Save to notebook
save_notebook_podcast(notebook_id, audio_file)
# Update task as completed
update_task_status(task_id, TaskStatus.COMPLETED, result={'audio_file': audio_file})
else:
update_task_status(task_id, TaskStatus.FAILED, error="Failed to generate outline")
else:
update_task_status(task_id, TaskStatus.FAILED, error="No content available")
except Exception as e:
update_task_status(task_id, TaskStatus.FAILED, error=str(e))
def run_background_task(task_id: int):
"""Run task in a separate thread"""
task = get_task_status(task_id)
if not task:
return
if task.task_type == 'podcast_generation':
asyncio.run(execute_podcast_task(task_id))
elif task.task_type == 'document_processing':
asyncio.run(execute_document_processing_task(task_id))
def start_podcast_generation_background(notebook_id: int, user_id: int, target_length: int = 10, extra_params: dict = None) -> int:
"""Start podcast generation in background and return task ID"""
# Merge parameters
params = {'target_length': target_length}
if extra_params:
params.update(extra_params)
# Create task
task = create_task(
task_type='podcast_generation',
user_id=user_id,
notebook_id=notebook_id,
parameters=params
)
# Start in background thread
thread = threading.Thread(target=run_background_task, args=(task.id,), daemon=True)
thread.start()
return task.id
def start_document_processing_background(file_path: str, original_filename: str, notebook_id: int, user_id: int) -> int:
"""Start document processing in background and return task ID"""
# Create task
task = create_task(
task_type='document_processing',
user_id=user_id,
notebook_id=notebook_id,
parameters={
'file_path': file_path,
'original_filename': original_filename
}
)
# Start in background thread
thread = threading.Thread(target=run_background_task, args=(task.id,), daemon=True)
thread.start()
return task.id
def get_notebook_processing_tasks(notebook_id: int) -> list:
"""Get all document processing tasks for a notebook"""
db = get_db()
try:
tasks = db.query(BackgroundTask).filter(
BackgroundTask.notebook_id == notebook_id,
BackgroundTask.task_type == 'document_processing'
).order_by(BackgroundTask.created_at.desc()).all()
return [{
'id': t.id,
'status': t.status,
'filename': json.loads(t.parameters).get('original_filename') if t.parameters else 'Unknown',
'created_at': t.created_at,
'completed_at': t.completed_at,
'error_message': t.error_message
} for t in tasks]
finally:
db.close()