ac-tool/backend/server/api/ai_command.py
Vadym Samoilenko 8da149b84e Migrate storage from JSON files to PostgreSQL (asyncpg)
- Add asyncpg connection pool (db/pool.py) with JSONB codec registration
- Add schema.sql with users, clients, dropdown_categories, export_templates, sheets tables
- Add migrate_json.py one-time migration script for existing JSON data
- Rewrite user_store, sheets/manager, api/clients, api/dropdowns, api/export as async DB-backed
- Update all callers (auth, sheets, admin, ai_command, export) to await async functions
- Add postgres:16-alpine service to docker-compose with named volume and health check
- App container depends_on postgres; DATABASE_URL injected via env
- Schema applied automatically on startup; global categories seeded if DB is empty

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-23 19:51:37 +00:00

186 lines
5.9 KiB
Python

"""
AI command API — processes natural language commands against a sheet.
"""
import json
import logging
import os
import re
import aiohttp
from datetime import date
from quart import Blueprint, jsonify, request
from ..auth.middleware import auth_required, get_user_id
from ..sheets.manager import load_sheet_data, update_sheet, generate_next_id, get_sheet_client_id
from ..api.dropdowns import _load_dropdowns
from ..config_runtime import server_config
logger = logging.getLogger(__name__)
ai_bp = Blueprint('ai', __name__, url_prefix='/api/sheets')
SPEECH_CORRECTIONS = {
'delivery balls': 'deliverables',
'delivery ball': 'deliverable',
'delivery': 'deliverables',
'liver': 'deliverables',
'rose': 'rows',
'oh oh h': 'OOH',
'out of home': 'OOH',
}
NUMBER_WORDS = {
'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5',
'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10',
'eleven': '11', 'twelve': '12', 'twenty': '20', 'thirty': '30',
}
_PROMPT_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(__file__))), 'prompts', 'ac_command.txt')
def _load_prompt_template() -> str:
try:
with open(_PROMPT_PATH, 'r') as f:
return f.read()
except Exception:
return ""
def _preprocess(command: str) -> str:
cmd = command.lower()
for wrong, right in SPEECH_CORRECTIONS.items():
cmd = cmd.replace(wrong, right)
for word, digit in NUMBER_WORDS.items():
cmd = re.sub(r'\b' + word + r'\b', digit, cmd)
return cmd
async def _build_hierarchy_rules(client_id: str = None) -> str:
categories = await _load_dropdowns(client_id)
lines = []
for cat in categories:
if cat.get('status') != 'Active':
continue
media_str = ', '.join(cat.get('mediaTypes', []))
lines.append(f"- {cat['name']}: {media_str}")
return '\n'.join(lines)
async def _call_gemini(prompt: str) -> dict:
api_key = server_config.GEMINI_API_KEY
model = server_config.GEMINI_MODEL
url = f"https://generativelanguage.googleapis.com/v1beta/models/{model}:generateContent?key={api_key}"
payload = {"contents": [{"parts": [{"text": prompt}]}]}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as resp:
return await resp.json()
def _extract_json(text: str) -> dict:
start = text.find('{')
end = text.rfind('}')
if start == -1 or end == -1:
raise ValueError("No JSON object found in response")
return json.loads(text[start:end + 1])
@ai_bp.route('/<sheet_id>/command', methods=['POST'])
@auth_required
async def run_command(sheet_id: str):
user_id = get_user_id()
body = await request.get_json() or {}
raw_command = body.get('command', '').strip()
yolo_mode = bool(body.get('yolo_mode', False))
history = body.get('history', '')
if not raw_command:
return jsonify({'error': 'empty_command'}), 400
data = await load_sheet_data(user_id, sheet_id)
if data is None:
return jsonify({'error': 'sheet_not_found'}), 404
command = _preprocess(raw_command)
template = _load_prompt_template()
client_id = await get_sheet_client_id(user_id, sheet_id)
hierarchy = await _build_hierarchy_rules(client_id)
prompt = template.format(
current_date=date.today().isoformat(),
yolo_mode='TRUE' if yolo_mode else 'FALSE',
conversation_history=history or '(none)',
data_context=json.dumps(data),
hierarchy_rules=hierarchy,
command=command,
)
try:
gemini_resp = await _call_gemini(prompt)
except Exception as e:
logger.error(f"Gemini API error: {e}")
return jsonify({'error': 'ai_error', 'message': str(e)}), 502
if 'error' in gemini_resp:
msg = gemini_resp['error'].get('message', 'Unknown error')
return jsonify({'error': 'gemini_error', 'message': msg}), 502
llm_text = (
gemini_resp.get('candidates', [{}])[0]
.get('content', {})
.get('parts', [{}])[0]
.get('text', '')
)
if not llm_text:
return jsonify({'error': 'empty_ai_response'}), 502
try:
action = _extract_json(llm_text)
except Exception:
return jsonify({'error': 'invalid_ai_json', 'debug_llm': llm_text}), 502
operation = action.get('operation')
if operation == 'create':
items = action.get('items', [])
for item in items:
item['Number'] = generate_next_id(data)
item.setdefault('Status', 'Booked')
item.setdefault('Quantity', 1)
data.append(item)
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True, 'operation': 'create', 'count': len(items), 'data': data})
elif operation == 'update':
values = action.get('values', {})
target_ids = action.get('target_ids', [])
count = 0
for row in data:
if not target_ids or row.get('Number') in target_ids:
row.update(values)
count += 1
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True, 'operation': 'update', 'count': count, 'data': data})
elif operation == 'batch_update':
updates = action.get('updates', [])
count = 0
for upd in updates:
num = upd.get('Number')
vals = upd.get('values', {})
for row in data:
if row.get('Number') == num:
row.update(vals)
count += 1
break
await update_sheet(user_id, sheet_id, data)
return jsonify({'success': True, 'operation': 'batch_update', 'count': count, 'data': data})
elif operation == 'question':
return jsonify({'success': True, 'operation': 'question', 'question': action.get('text', '')})
return jsonify({'error': 'unknown_operation', 'operation': operation}), 400