gmal-scope-builder/backend/app/services/ai_matching.py
DJP ecaa5012d9 P1: Frontend profile/tool selector + match feedback loop
- Team Shape tab: profile selector (Conservative/Moderate/Aggressive)
- BTG tool toggles (Pencil, OMG, Creative X, Cortex, Semblance, Share of Model)
- Per-discipline rates shown inline with combined profile+tool percentages
- Efficiency % column in table showing rate per role
- Flat rate fallback still available (10/25/50/75/90%)
- Match feedback endpoint: POST /matches/{id}/feedback (confirm/reject)
- Feedback learning: confirmed matches stored, checked before AI calls
- Known matches applied instantly (no API call, $0 cost)
- Remaining unknowns sent to Claude as before

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-09 14:04:49 -04:00

324 lines
14 KiB
Python

"""AI-powered matching of client assets to GMAL catalog using Claude."""
import asyncio
import logging
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from sqlalchemy import select, text, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.gmal import GmalAsset
from app.models.project import ClientAsset, Match, MatchConfidence
from app.utils.claude_client import call_claude, extract_tool_result
logger = logging.getLogger(__name__)
# Cancel flag - set project_id to cancel
_cancel_lock = threading.Lock()
_cancelled_projects: set[int] = set()
BATCH_SIZE = 10
def cancel_matching(project_id: int):
with _cancel_lock:
_cancelled_projects.add(project_id)
def _is_cancelled(project_id: int) -> bool:
with _cancel_lock:
return project_id in _cancelled_projects
def _clear_cancel(project_id: int):
with _cancel_lock:
_cancelled_projects.discard(project_id)
MATCH_TOOLS = [
{
"name": "submit_matches",
"description": "Submit the best GMAL matches for a client asset.",
"input_schema": {
"type": "object",
"properties": {
"matches": {
"type": "array",
"items": {
"type": "object",
"properties": {
"gmal_id": {
"type": "string",
"description": "The GMAL ID of the matched asset (e.g., 'GMAL101')"
},
"confidence": {
"type": "string",
"enum": ["exact", "close", "multiple", "none"],
"description": "exact=direct match, close=similar but differences exist, multiple=one of several candidates, none=no reasonable match"
},
"confidence_score": {
"type": "number",
"description": "Confidence score from 0.0 to 1.0"
},
"reasoning": {
"type": "string",
"description": "Why this GMAL was matched - what makes it similar"
},
"caveats": {
"type": "string",
"description": "Important differences between what the client asked for and what the GMAL asset covers. Include scope gaps, complexity mismatches, format differences."
},
},
"required": ["gmal_id", "confidence", "confidence_score", "reasoning", "caveats"],
},
"minItems": 1,
"maxItems": 3,
"description": "Return your single best match. Only include a 2nd or 3rd match if they score within 5% of the best match.",
},
},
"required": ["matches"],
},
}
]
SYSTEM_PROMPT = """You are a GMAL asset matching specialist for a creative production agency.
Your job is to match client-described assets/deliverables to the closest equivalent(s) in the GMAL catalog.
You are given the FULL GMAL catalog. Each entry has: GMAL ID | Asset Name | Complexity | Category.
Guidelines:
- Match based on the TYPE of deliverable first, then complexity level.
- Clients use different terminology than GMAL. Use your understanding of creative production to bridge the gap:
- "Key Visual" / "KV" = Photography/Key Visual GMALs
- "PDP copy" / "product listing" = Copywriting/eCommerce GMALs
- "Launch video" / "hero video" = Campaign Video/TVC GMALs
- "Presentation deck" / "toolbox" = Presentation GMALs
- "Display banner" / "digital ad" = Standard Banner/Display GMALs
- "Social post" / "social content" = Social Content/Social Video GMALs
- "BTS" / "behind the scenes" = Behind The Scenes GMALs
- Return your SINGLE BEST match. Only include additional matches if they score within 5% of the best.
- If the client asset maps clearly to one GMAL, set confidence="exact" with score 0.9-1.0.
- If similar but with notable differences, set confidence="close" with score 0.6-0.89.
- If nothing matches well, return the closest option with confidence="none" and score below 0.3.
- Always explain caveats: what the GMAL includes/excludes vs what the client described.
- Pay attention to complexity: a "simple banner" should match a Simple complexity GMAL, not Complex.
- Be generous with scoring when the match is semantically correct even if the naming differs."""
def _match_single_asset(client_asset_name, client_asset_desc, volume, catalog_text, num_assets):
"""Run a single match call to Claude (synchronous, for use in thread pool)."""
user_msg = f"""Match this client asset to the best GMAL equivalent(s):
CLIENT ASSET:
Name: {client_asset_name}
Description: {client_asset_desc or 'No description provided'}
Volume: {volume}
FULL GMAL CATALOG ({num_assets} assets):
{catalog_text}"""
response = call_claude(
system=SYSTEM_PROMPT,
user_message=user_msg,
tools=MATCH_TOOLS,
tool_choice={"type": "tool", "name": "submit_matches"},
max_tokens=2048,
)
usage = getattr(response, '_usage_info', {"input_tokens": 0, "output_tokens": 0, "cost_usd": 0})
return extract_tool_result(response), usage
async def match_client_assets(
db: AsyncSession,
project_id: int,
client_assets: list[ClientAsset],
) -> list[Match]:
"""Match all client assets against the GMAL catalog.
Runs in parallel batches of BATCH_SIZE. Commits after each batch
so the frontend can poll for progress. Supports cancellation.
"""
_clear_cancel(project_id)
# Snapshot client asset data before any commits (ORM objects expire after commit)
asset_snapshots = [
{"id": ca.id, "raw_name": ca.raw_name, "raw_description": ca.raw_description, "volume": ca.volume}
for ca in client_assets
]
# Load all GMAL assets - send full compact catalog to Claude (only ~3k tokens)
result = await db.execute(
select(GmalAsset).where(GmalAsset.has_hour_routes == True).order_by(GmalAsset.gmal_id)
)
all_gmals = result.scalars().all()
gmal_by_id = {g.gmal_id: g for g in all_gmals}
# Build compact catalog once - reused for every match call
catalog_text = _format_compact_catalog(all_gmals)
logger.info(f"Full GMAL catalog: {len(all_gmals)} assets, ~{len(catalog_text)} chars")
# Load confirmed feedback for instant matching (learning loop)
from app.models.feedback import MatchFeedback
feedback_result = await db.execute(
select(MatchFeedback).where(MatchFeedback.confirmed == True)
)
all_feedback = feedback_result.scalars().all()
# Build lookup: normalized client_term -> gmal_asset_id
feedback_map: dict[str, int] = {}
for fb in all_feedback:
if fb.client_term:
feedback_map[fb.client_term] = fb.gmal_asset_id
logger.info(f"Loaded {len(feedback_map)} confirmed feedback mappings")
# Check feedback for instant matches (no AI needed)
gmal_by_db_id = {g.id: g for g in all_gmals}
all_matches = []
remaining_snapshots = []
for snap in asset_snapshots:
normalized = (snap["raw_name"] or "").strip().lower()
if normalized in feedback_map:
gmal_db_id = feedback_map[normalized]
gmal = gmal_by_db_id.get(gmal_db_id)
if gmal:
match = Match(
client_asset_id=snap["id"],
gmal_asset_id=gmal_db_id,
confidence=MatchConfidence.EXACT,
confidence_score=0.95,
ai_reasoning=f"Matched from confirmed feedback (previously verified match to {gmal.gmal_id})",
caveat_text="Auto-matched from learning system - verify if context differs from previous use.",
is_selected=True,
rank=1,
)
db.add(match)
all_matches.append(match)
logger.info(f"Feedback match: '{snap['raw_name']}' -> {gmal.gmal_id}")
continue
remaining_snapshots.append(snap)
if all_matches:
await db.commit()
logger.info(f"Instant feedback matches: {len(all_matches)}, remaining for AI: {len(remaining_snapshots)}")
total = len(remaining_snapshots)
# Process in batches
for batch_start in range(0, total, BATCH_SIZE):
if _is_cancelled(project_id):
logger.info(f"Matching cancelled for project {project_id} at {batch_start}/{total}")
break
batch = remaining_snapshots[batch_start:batch_start + BATCH_SIZE]
batch_num = batch_start // BATCH_SIZE + 1
logger.info(f"Matching batch {batch_num} ({batch_start+1}-{min(batch_start+BATCH_SIZE, total)} of {total})")
# Run batch in parallel using thread pool
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=BATCH_SIZE) as executor:
futures = []
for snap in batch:
if _is_cancelled(project_id):
break
future = loop.run_in_executor(
executor,
_match_single_asset,
snap["raw_name"],
snap["raw_description"],
snap["volume"],
catalog_text,
len(all_gmals),
)
futures.append((snap, future))
# Collect results and accumulate costs
batch_input = 0
batch_output = 0
batch_cost = 0.0
for snap, future in futures:
try:
tool_result, usage = await future
batch_input += usage.get("input_tokens", 0)
batch_output += usage.get("output_tokens", 0)
batch_cost += usage.get("cost_usd", 0)
if tool_result and "matches" in tool_result:
raw_matches = tool_result["matches"]
top_score = raw_matches[0].get("confidence_score", 0) if raw_matches else 0
auto_select = top_score >= 0.8
# Only keep alternatives within 5% of top score
filtered = [raw_matches[0]] if raw_matches else []
for m in raw_matches[1:]:
if abs((m.get("confidence_score", 0) - top_score)) <= 0.05:
filtered.append(m)
for rank, m in enumerate(filtered, 1):
gmal = gmal_by_id.get(m["gmal_id"])
if not gmal:
logger.warning(f"Claude returned unknown GMAL ID: {m['gmal_id']}")
continue
match = Match(
client_asset_id=snap["id"],
gmal_asset_id=gmal.id,
confidence=MatchConfidence(m["confidence"]),
confidence_score=m.get("confidence_score"),
ai_reasoning=m.get("reasoning"),
caveat_text=m.get("caveats"),
is_selected=(rank == 1 and auto_select),
rank=rank,
)
db.add(match)
all_matches.append(match)
else:
logger.warning(f"No match result for: {snap['raw_name']}")
except Exception as e:
logger.error(f"Error matching '{snap['raw_name']}': {e}")
# Save batch costs to project
from app.models.project import Project
proj_result = await db.execute(select(Project).where(Project.id == project_id))
project = proj_result.scalar_one_or_none()
if project:
project.ai_input_tokens = (project.ai_input_tokens or 0) + batch_input
project.ai_output_tokens = (project.ai_output_tokens or 0) + batch_output
project.ai_cost_usd = float(project.ai_cost_usd or 0) + batch_cost
project.ai_call_count = (project.ai_call_count or 0) + len(batch)
# Commit after each batch so frontend can see progress
await db.commit()
logger.info(f"Batch {batch_num} committed, {len(all_matches)} total matches so far")
_clear_cancel(project_id)
return all_matches
def _format_compact_catalog(all_gmals: list[GmalAsset]) -> str:
"""Format the full GMAL catalog for Claude with AI-enhanced descriptions where available.
Without AI descriptions: ~3k tokens (just names)
With AI descriptions: ~15-20k tokens (names + condensed descriptions)
Still much cheaper and more accurate than pre-filtering.
"""
lines = []
current_cat = None
for g in sorted(all_gmals, key=lambda x: (x.sub_category or '', x.gmal_id)):
if g.sub_category != current_cat:
current_cat = g.sub_category
lines.append(f"\n[{current_cat}]")
complexity = g.complexity_name or f"L{g.complexity_level}"
lines.append(f" {g.gmal_id}: {g.unique_name or g.asset_name} ({complexity})")
# Include AI-enhanced description if available (condensed to ~200 chars)
if g.ai_enhanced_description:
desc = g.ai_enhanced_description
if len(desc) > 250:
desc = desc[:250] + "..."
lines.append(f" > {desc}")
return "\n".join(lines)