Implement revision-aware proof analysis pipeline

When a subsequent revision of a proof is uploaded, the analysis now takes
place in context of the previous version's results. The system identifies:
- Resolved issues: fixed in the new revision
- Outstanding issues: still present from previous version
- New issues: introduced in the new revision

Key changes:
- Add resolvedIssues, outstandingIssues, newIssues fields to SubReview
- Add PreviousReviewContext model for passing previous review data
- Update all specialist agents to accept previous_review context
- Extend GeminiService with include_revision_fields parameter
- Add get_latest_version_review() repository method
- Update LeadAgent to synthesize cross-version context in summary
- Fetch previous analysis in WebSocket handler for revisions

First version analysis continues to work exactly as before with revision
fields set to null.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
michael 2026-01-25 10:04:16 -06:00
parent f13fa2f7e8
commit 3a5c3bcde3
12 changed files with 536 additions and 55 deletions

View file

@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from typing import List, Tuple
from typing import List, Optional, Tuple
from app.models.schemas import SubReview
from app.models.schemas import PreviousReviewContext, SubReview
class BaseAgent(ABC):
@ -10,7 +10,11 @@ class BaseAgent(ABC):
name: str = "Base Agent"
@abstractmethod
async def analyze(self, images: List[Tuple[bytes, str]]) -> SubReview:
async def analyze(
self,
images: List[Tuple[bytes, str]],
previous_review: Optional[PreviousReviewContext] = None,
) -> SubReview:
"""
Analyze the proof and return a SubReview.
@ -18,8 +22,12 @@ class BaseAgent(ABC):
images: List of (file_data, mime_type) tuples representing the proof.
For single images/videos, this will contain one tuple.
For multi-page PDFs, this will contain one tuple per page.
previous_review: Optional context from the previous version's review
for revision-aware analysis.
Returns:
SubReview containing ragStatus, feedback, and issues
SubReview containing ragStatus, feedback, and issues.
When previous_review is provided, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
pass

View file

@ -1,7 +1,7 @@
from typing import List, Tuple
from typing import List, Optional, Tuple
from app.agents.base_agent import BaseAgent
from app.models.schemas import SubReview
from app.models.schemas import PreviousReviewContext, SubReview
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
@ -38,12 +38,46 @@ class BrandAgent(BaseAgent):
# Default to Barclaycard
return self.reference_docs.get_barclaycard_brand_spec()
async def analyze(self, images: List[Tuple[bytes, str]], brand: str = "Barclaycard") -> SubReview:
def _build_revision_context(self, previous_review: PreviousReviewContext) -> str:
"""Build prompt section for revision-aware analysis."""
issues_list = "\n".join(f" - {issue}" for issue in previous_review.issues) if previous_review.issues else " (No issues)"
return f"""
---
**REVISION CONTEXT**
This is a revision of a previously reviewed proof. The previous version (Version {previous_review.version}) had the following brand review:
- RAG Status: {previous_review.ragStatus}
- Feedback: {previous_review.feedback}
- Issues identified:
{issues_list}
When analyzing this revision, you MUST:
1. Compare against the previous issues and determine which have been RESOLVED
2. Identify which previous issues are still OUTSTANDING (not fixed)
3. Identify any NEW issues introduced in this revision
Your response MUST include:
- resolvedIssues: Array of issues from the previous version that have been fixed
- outstandingIssues: Array of issues from the previous version that remain unfixed
- newIssues: Array of new issues not present in the previous version
---
"""
async def analyze(
self,
images: List[Tuple[bytes, str]],
previous_review: Optional[PreviousReviewContext] = None,
brand: str = "Barclaycard",
) -> SubReview:
"""
Analyze the proof for brand guideline adherence.
Args:
images: List of (file_data, mime_type) tuples representing the proof
previous_review: Optional context from previous version for revision-aware analysis
brand: Brand to analyze against ('Barclays' or 'Barclaycard')
Returns:
@ -52,12 +86,17 @@ class BrandAgent(BaseAgent):
# Get the appropriate brand specification
brand_context = self._get_brand_context(brand)
# Build revision context if available
revision_context = ""
if previous_review:
revision_context = self._build_revision_context(previous_review)
prompt = f"""You are a brand expert for {brand}. Your role is to analyze marketing proofs for adherence to {brand} brand guidelines.
Here is the {brand} brand specification to use for your analysis:
{brand_context}
{revision_context}
---
Analyze the uploaded proof against the {brand} brand specification above, checking for:
@ -93,9 +132,16 @@ If the proof is nonsensical, not a marketing material, or cannot be analyzed, se
- Example: "Logo placement incorrect (bottom-left) - move to top-right corner per guidelines"
"""
# Determine if revision fields should be included
include_revision_fields = previous_review is not None
# Use single-image or multi-image analysis depending on input
if len(images) == 1:
file_data, file_type = images[0]
return await self.gemini.analyze_with_image(prompt, file_data, file_type)
return await self.gemini.analyze_with_image(
prompt, file_data, file_type, include_revision_fields=include_revision_fields
)
else:
return await self.gemini.analyze_with_images(prompt, images)
return await self.gemini.analyze_with_images(
prompt, images, include_revision_fields=include_revision_fields
)

View file

@ -1,7 +1,7 @@
from typing import List, Tuple
from typing import List, Optional, Tuple
from app.agents.base_agent import BaseAgent
from app.models.schemas import SubReview
from app.models.schemas import PreviousReviewContext, SubReview
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
@ -22,12 +22,45 @@ class ChannelBestPracticesAgent(BaseAgent):
self.gemini = gemini_service
self.reference_docs = reference_docs
async def analyze(self, images: List[Tuple[bytes, str]]) -> SubReview:
def _build_revision_context(self, previous_review: PreviousReviewContext) -> str:
"""Build prompt section for revision-aware analysis."""
issues_list = "\n".join(f" - {issue}" for issue in previous_review.issues) if previous_review.issues else " (No issues)"
return f"""
---
**REVISION CONTEXT**
This is a revision of a previously reviewed proof. The previous version (Version {previous_review.version}) had the following channel best practices review:
- RAG Status: {previous_review.ragStatus}
- Feedback: {previous_review.feedback}
- Issues identified:
{issues_list}
When analyzing this revision, you MUST:
1. Compare against the previous issues and determine which have been RESOLVED
2. Identify which previous issues are still OUTSTANDING (not fixed)
3. Identify any NEW issues introduced in this revision
Your response MUST include:
- resolvedIssues: Array of issues from the previous version that have been fixed
- outstandingIssues: Array of issues from the previous version that remain unfixed
- newIssues: Array of new issues not present in the previous version
---
"""
async def analyze(
self,
images: List[Tuple[bytes, str]],
previous_review: Optional[PreviousReviewContext] = None,
) -> SubReview:
"""
Analyze the proof for channel best practices and content strategy.
Args:
images: List of (file_data, mime_type) tuples representing the proof
previous_review: Optional context from previous version for revision-aware analysis
Returns:
SubReview with channel best practices assessment
@ -35,12 +68,17 @@ class ChannelBestPracticesAgent(BaseAgent):
# Get the channel best practices specification
best_practices_context = self.reference_docs.get_channel_best_practices_spec()
# Build revision context if available
revision_context = ""
if previous_review:
revision_context = self._build_revision_context(previous_review)
prompt = f"""You are a digital channel best practices specialist for Barclays Bank. Your role is to analyze marketing proofs for creative best practices, content strategy, and platform optimization.
Here are the channel best practices guidelines to use for your analysis:
{best_practices_context}
{revision_context}
---
Analyze the uploaded proof for adherence to channel best practices, checking:
@ -87,9 +125,16 @@ If the proof is nonsensical, not a marketing material, or cannot be analyzed, se
- Example: "CTA placement below fold - move above fold for better visibility"
"""
# Determine if revision fields should be included
include_revision_fields = previous_review is not None
# Use single-image or multi-image analysis depending on input
if len(images) == 1:
file_data, file_type = images[0]
return await self.gemini.analyze_with_image(prompt, file_data, file_type)
return await self.gemini.analyze_with_image(
prompt, file_data, file_type, include_revision_fields=include_revision_fields
)
else:
return await self.gemini.analyze_with_images(prompt, images)
return await self.gemini.analyze_with_images(
prompt, images, include_revision_fields=include_revision_fields
)

View file

@ -1,7 +1,7 @@
from typing import List, Tuple
from typing import List, Optional, Tuple
from app.agents.base_agent import BaseAgent
from app.models.schemas import SubReview
from app.models.schemas import PreviousReviewContext, SubReview
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
@ -22,12 +22,45 @@ class ChannelTechSpecsAgent(BaseAgent):
self.gemini = gemini_service
self.reference_docs = reference_docs
async def analyze(self, images: List[Tuple[bytes, str]]) -> SubReview:
def _build_revision_context(self, previous_review: PreviousReviewContext) -> str:
"""Build prompt section for revision-aware analysis."""
issues_list = "\n".join(f" - {issue}" for issue in previous_review.issues) if previous_review.issues else " (No issues)"
return f"""
---
**REVISION CONTEXT**
This is a revision of a previously reviewed proof. The previous version (Version {previous_review.version}) had the following channel tech specs review:
- RAG Status: {previous_review.ragStatus}
- Feedback: {previous_review.feedback}
- Issues identified:
{issues_list}
When analyzing this revision, you MUST:
1. Compare against the previous issues and determine which have been RESOLVED
2. Identify which previous issues are still OUTSTANDING (not fixed)
3. Identify any NEW issues introduced in this revision
Your response MUST include:
- resolvedIssues: Array of issues from the previous version that have been fixed
- outstandingIssues: Array of issues from the previous version that remain unfixed
- newIssues: Array of new issues not present in the previous version
---
"""
async def analyze(
self,
images: List[Tuple[bytes, str]],
previous_review: Optional[PreviousReviewContext] = None,
) -> SubReview:
"""
Analyze the proof for technical specifications compliance.
Args:
images: List of (file_data, mime_type) tuples representing the proof
previous_review: Optional context from previous version for revision-aware analysis
Returns:
SubReview with technical specifications assessment
@ -35,12 +68,17 @@ class ChannelTechSpecsAgent(BaseAgent):
# Get the channel tech specs specification
tech_specs_context = self.reference_docs.get_channel_tech_specs_spec()
# Build revision context if available
revision_context = ""
if previous_review:
revision_context = self._build_revision_context(previous_review)
prompt = f"""You are a digital channel technical specifications specialist for Barclays Bank. Your role is to analyze marketing proofs for technical compliance with platform specifications, dimensions, file formats, and character limits.
Here are the channel technical specifications to use for your analysis:
{tech_specs_context}
{revision_context}
---
Analyze the uploaded proof for technical specification compliance, checking:
@ -93,9 +131,16 @@ If the proof is nonsensical, not a marketing material, or cannot be analyzed, se
- Example: "Image resolution 72dpi - increase to minimum 150dpi for print quality"
"""
# Determine if revision fields should be included
include_revision_fields = previous_review is not None
# Use single-image or multi-image analysis depending on input
if len(images) == 1:
file_data, file_type = images[0]
return await self.gemini.analyze_with_image(prompt, file_data, file_type)
return await self.gemini.analyze_with_image(
prompt, file_data, file_type, include_revision_fields=include_revision_fields
)
else:
return await self.gemini.analyze_with_images(prompt, images)
return await self.gemini.analyze_with_images(
prompt, images, include_revision_fields=include_revision_fields
)

View file

@ -1,3 +1,5 @@
from typing import Optional
from app.models.schemas import SubReview, RagStatus, OverallStatus
from app.services.gemini_service import GeminiService
@ -24,15 +26,64 @@ class LeadAgent:
"""
self.gemini = gemini_service
def _build_revision_context(self, previous_analysis: dict, reviews: dict[str, SubReview]) -> str:
"""Build revision context section for the synthesis prompt."""
previous_version = previous_analysis.get("version", "N/A")
# Aggregate resolved/outstanding/new issues across all agents
all_resolved = []
all_outstanding = []
all_new = []
for agent_name, review in reviews.items():
if review.resolvedIssues:
all_resolved.extend([f"[{agent_name}] {issue}" for issue in review.resolvedIssues])
if review.outstandingIssues:
all_outstanding.extend([f"[{agent_name}] {issue}" for issue in review.outstandingIssues])
if review.newIssues:
all_new.extend([f"[{agent_name}] {issue}" for issue in review.newIssues])
resolved_list = "\n".join(f" - {issue}" for issue in all_resolved) if all_resolved else " (None)"
outstanding_list = "\n".join(f" - {issue}" for issue in all_outstanding) if all_outstanding else " (None)"
new_list = "\n".join(f" - {issue}" for issue in all_new) if all_new else " (None)"
return f"""
---
**REVISION CONTEXT**
This is a revision (comparing against Version {previous_version}). The analysis has identified the following changes:
**RESOLVED ISSUES** (fixed in this revision):
{resolved_list}
**OUTSTANDING ISSUES** (still need attention):
{outstanding_list}
**NEW ISSUES** (introduced in this revision):
{new_list}
In your summary:
1. Acknowledge this is a revision and summarize the progress made
2. Highlight any resolved issues positively
3. Emphasize outstanding issues that still need to be addressed
4. Call out any regressions (new issues) that were introduced
---
"""
async def synthesize(
self,
reviews: dict[str, SubReview],
previous_analysis: Optional[dict] = None,
) -> tuple[OverallStatus, str, str | None]:
"""
Synthesize specialist reviews into final verdict and summary.
Args:
reviews: Dictionary mapping agent names to their SubReview results
previous_analysis: Optional dict containing the previous version's analysis
results. When provided, enables revision-aware summary.
Returns:
Tuple of (overall_status, summary, financial_promotion_reason)
@ -50,6 +101,11 @@ class LeadAgent:
else None
)
# Build revision context if previous analysis is available
revision_context = ""
if previous_analysis and previous_analysis.get("version"):
revision_context = self._build_revision_context(previous_analysis, reviews)
# Build the prompt for Gemini to generate summary
prompt = f"""
You are a Lead Agent responsible for auditing a marketing proof. You have received feedback from specialist AI agents.
@ -74,7 +130,7 @@ Your summary should:
- List key issues as bullet points (max 3-5 bullets)
- Each bullet: one sentence, actionable
- For 'Passed': briefly note any amber items in 1-2 bullets
{revision_context}
Here are the specialist reviews:
{self._format_reviews(reviews)}

View file

@ -1,7 +1,7 @@
from typing import List, Tuple
from typing import List, Optional, Tuple
from app.agents.base_agent import BaseAgent
from app.models.schemas import SubReview
from app.models.schemas import PreviousReviewContext, SubReview
from app.services.gemini_service import GeminiService
from app.services.reference_docs import ReferenceDocsService
@ -22,12 +22,45 @@ class LegalAgent(BaseAgent):
self.gemini = gemini_service
self.reference_docs = reference_docs
async def analyze(self, images: List[Tuple[bytes, str]]) -> SubReview:
def _build_revision_context(self, previous_review: PreviousReviewContext) -> str:
"""Build prompt section for revision-aware analysis."""
issues_list = "\n".join(f" - {issue}" for issue in previous_review.issues) if previous_review.issues else " (No issues)"
return f"""
---
**REVISION CONTEXT**
This is a revision of a previously reviewed proof. The previous version (Version {previous_review.version}) had the following legal review:
- RAG Status: {previous_review.ragStatus}
- Feedback: {previous_review.feedback}
- Issues identified:
{issues_list}
When analyzing this revision, you MUST:
1. Compare against the previous issues and determine which have been RESOLVED
2. Identify which previous issues are still OUTSTANDING (not fixed)
3. Identify any NEW issues introduced in this revision
Your response MUST include:
- resolvedIssues: Array of issues from the previous version that have been fixed
- outstandingIssues: Array of issues from the previous version that remain unfixed
- newIssues: Array of new issues not present in the previous version
---
"""
async def analyze(
self,
images: List[Tuple[bytes, str]],
previous_review: Optional[PreviousReviewContext] = None,
) -> SubReview:
"""
Analyze the proof for legal compliance.
Args:
images: List of (file_data, mime_type) tuples representing the proof
previous_review: Optional context from previous version for revision-aware analysis
Returns:
SubReview with legal compliance assessment
@ -35,12 +68,17 @@ class LegalAgent(BaseAgent):
# Get the legal specification
legal_context = self.reference_docs.get_legal_spec()
# Build revision context if available
revision_context = ""
if previous_review:
revision_context = self._build_revision_context(previous_review)
prompt = f"""You are a legal compliance specialist for Barclays Bank. Your role is to analyze marketing proofs for legal compliance, advertising standards, and regulatory requirements.
Here are the legal guidelines to use for your analysis:
{legal_context}
{revision_context}
---
Analyze the uploaded proof for legal compliance, checking:
@ -98,9 +136,16 @@ If the proof is nonsensical, not a marketing material, or cannot be analyzed, se
- Example: "Missing APR disclaimer - add representative APR per FCA requirements"
"""
# Determine if revision fields should be included
include_revision_fields = previous_review is not None
# Use single-image or multi-image analysis depending on input
if len(images) == 1:
file_data, file_type = images[0]
return await self.gemini.analyze_with_image(prompt, file_data, file_type)
return await self.gemini.analyze_with_image(
prompt, file_data, file_type, include_revision_fields=include_revision_fields
)
else:
return await self.gemini.analyze_with_images(prompt, images)
return await self.gemini.analyze_with_images(
prompt, images, include_revision_fields=include_revision_fields
)

View file

@ -26,6 +26,21 @@ class SubReview(BaseModel):
issues: list[str]
isFinancialPromotion: Optional[bool] = None
financialPromotionReason: Optional[str] = None
# Revision-aware fields (populated when analyzing version N > 1)
resolvedIssues: Optional[list[str]] = None
outstandingIssues: Optional[list[str]] = None
newIssues: Optional[list[str]] = None
class Config:
use_enum_values = True
class PreviousReviewContext(BaseModel):
"""Context from a previous version's review for revision-aware analysis."""
version: int
ragStatus: RagStatus
feedback: str
issues: list[str]
class Config:
use_enum_values = True

View file

@ -148,6 +148,65 @@ class ProofRepository:
version = result.scalar_one_or_none()
return version if version else 0
async def get_previous_version_review(
self,
proof_id: uuid.UUID,
current_version: int,
) -> Optional[dict]:
"""
Get the agent_review from the previous version (N-1) of a proof.
Args:
proof_id: The proof ID
current_version: The version being analyzed (will fetch N-1)
Returns:
The agent_review dict from version N-1, or None if no previous version
"""
if current_version <= 1:
return None
previous_version_number = current_version - 1
result = await self.session.execute(
select(ProofVersion.agent_review, ProofVersion.version)
.where(
ProofVersion.proof_id == proof_id,
ProofVersion.version == previous_version_number,
)
)
row = result.first()
if row and row.agent_review:
review = row.agent_review
review["version"] = row.version
return review
return None
async def get_latest_version_review(
self,
proof_id: uuid.UUID,
) -> Optional[dict]:
"""
Get the agent_review from the latest version of a proof.
Args:
proof_id: The proof ID
Returns:
The agent_review dict with version number attached, or None
"""
result = await self.session.execute(
select(ProofVersion.agent_review, ProofVersion.version)
.where(ProofVersion.proof_id == proof_id)
.order_by(ProofVersion.version.desc())
.limit(1)
)
row = result.first()
if row and row.agent_review:
review = row.agent_review
review["version"] = row.version
return review
return None
async def get_or_create_proof(
self,
campaign_id: uuid.UUID,

View file

@ -2,7 +2,7 @@ import asyncio
import logging
from typing import Callable, Awaitable, List, Tuple, Optional
from app.models.schemas import SubReview, AgentReview, OverallStatus
from app.models.schemas import PreviousReviewContext, RagStatus, SubReview, AgentReview, OverallStatus
logger = logging.getLogger(__name__)
from app.agents.brand_agent import BrandAgent
@ -29,6 +29,14 @@ class AnalysisService:
# Agent execution order
AGENT_ORDER = ["Legal Agent", "Brand Agent", "Channel Best Practices Agent", "Channel Tech Specs Agent"]
# Mapping from agent name to the key in AgentReview/previous_analysis dict
AGENT_REVIEW_KEY_MAP = {
"Legal Agent": "legalAgentReview",
"Brand Agent": "brandAgentReview",
"Channel Best Practices Agent": "channelBestPracticesAgentReview",
"Channel Tech Specs Agent": "channelTechSpecsAgentReview",
}
def __init__(
self,
gemini_service: GeminiService,
@ -53,24 +61,62 @@ class AnalysisService:
}
self.lead_agent = LeadAgent(gemini_service)
def _extract_previous_review_context(
self,
agent_name: str,
previous_analysis: Optional[dict],
) -> Optional[PreviousReviewContext]:
"""
Extract the previous review context for a specific agent.
Args:
agent_name: Name of the agent
previous_analysis: Full previous analysis dict (or None)
Returns:
PreviousReviewContext for the agent, or None if not available
"""
if not previous_analysis:
return None
review_key = self.AGENT_REVIEW_KEY_MAP.get(agent_name)
if not review_key:
return None
agent_review = previous_analysis.get(review_key)
if not agent_review:
return None
version = previous_analysis.get("version", 0)
if version == 0:
return None
return PreviousReviewContext(
version=version,
ragStatus=RagStatus(agent_review.get("ragStatus", "Error")),
feedback=agent_review.get("feedback", ""),
issues=agent_review.get("issues", []),
)
async def _run_agent(
self,
agent_name: str,
images: List[Tuple[bytes, str]],
brand: str,
on_agent_update: AgentCallback | None,
previous_review: Optional[PreviousReviewContext] = None,
) -> Tuple[str, SubReview]:
"""Run a single agent with callback notifications."""
agent = self.agents[agent_name]
logger.info(f"[ANALYSIS] Starting agent: {agent_name}")
logger.info(f"[ANALYSIS] Starting agent: {agent_name}, has_previous_review: {previous_review is not None}")
if on_agent_update:
await on_agent_update(agent_name, None)
if agent_name == "Brand Agent":
review = await agent.analyze(images, brand=brand)
review = await agent.analyze(images, previous_review=previous_review, brand=brand)
else:
review = await agent.analyze(images)
review = await agent.analyze(images, previous_review=previous_review)
logger.info(f"[ANALYSIS] Agent completed: {agent_name} - ragStatus: {review.ragStatus}")
if on_agent_update:
@ -85,6 +131,7 @@ class AnalysisService:
on_agent_update: AgentCallback | None = None,
is_wip: bool = False,
brand: str = "Barclaycard",
previous_analysis: Optional[dict] = None,
) -> Tuple[AgentReview, Optional[List[Tuple[bytes, int, int]]]]:
"""
Analyze a proof using all agents in parallel.
@ -97,6 +144,9 @@ class AnalysisService:
and (agent_name, review) when agent completes.
is_wip: Whether this is a work-in-progress analysis
brand: Brand to use for brand guidelines analysis ('Barclays' or 'Barclaycard')
previous_analysis: Optional dict containing the previous version's analysis
results. When provided, enables revision-aware analysis
that identifies resolved/outstanding/new issues.
Returns:
Tuple of:
@ -104,7 +154,8 @@ class AnalysisService:
- List of rasterized PDF pages if input was PDF, else None
Each page is (png_bytes, width, height)
"""
logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}")
previous_version = previous_analysis.get("version") if previous_analysis else None
logger.info(f"[ANALYSIS] Starting proof analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, is_wip: {is_wip}, brand: {brand}, previous_version: {previous_version}")
reviews: dict[str, SubReview] = {}
# Prepare images for analysis
@ -139,9 +190,15 @@ class AnalysisService:
# Single image/video - wrap in list
images = [(file_data, file_type)]
# Run all agents in parallel
# Run all agents in parallel, passing previous review context to each
tasks = [
self._run_agent(agent_name, images, brand, on_agent_update)
self._run_agent(
agent_name,
images,
brand,
on_agent_update,
previous_review=self._extract_previous_review_context(agent_name, previous_analysis),
)
for agent_name in self.AGENT_ORDER
]
results = await asyncio.gather(*tasks)
@ -152,7 +209,9 @@ class AnalysisService:
if on_agent_update:
await on_agent_update("Summary", None)
overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(reviews)
overall_status, summary, financial_promotion_reason = await self.lead_agent.synthesize(
reviews, previous_analysis=previous_analysis
)
logger.info(f"[ANALYSIS] Analysis complete - overallStatus: {overall_status}")
# Build the complete AgentReview

View file

@ -29,6 +29,7 @@ class GeminiService:
prompt: str,
file_data: bytes,
file_type: str,
include_revision_fields: bool = False,
) -> SubReview:
"""
Analyze an image/file with Gemini and return a structured SubReview.
@ -37,12 +38,15 @@ class GeminiService:
prompt: The analysis prompt including reference doc context
file_data: Raw bytes of the file
file_type: MIME type of the file (e.g., "image/png")
include_revision_fields: If True, require revision fields in response schema
Returns:
SubReview with ragStatus, feedback, and issues
SubReview with ragStatus, feedback, and issues.
When include_revision_fields is True, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
try:
logger.info(f"[GEMINI API] Starting image analysis - file_type: {file_type}, file_size: {len(file_data)} bytes")
logger.info(f"[GEMINI API] Starting image analysis - file_type: {file_type}, file_size: {len(file_data)} bytes, revision_fields: {include_revision_fields}")
# Create inline data part for the file
file_part = types.Part.from_bytes(
@ -77,6 +81,25 @@ class GeminiService:
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
}
# Add revision fields if requested
if include_revision_fields:
response_schema["properties"]["resolvedIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that have been resolved in this revision."
}
response_schema["properties"]["outstandingIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that remain unresolved in this revision."
}
response_schema["properties"]["newIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "New issues introduced in this revision that were not present in the previous version."
}
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
# Make the API call
logger.info(f"[GEMINI API] Calling Gemini model: {self.model}")
response = await self.client.aio.models.generate_content(
@ -102,12 +125,19 @@ class GeminiService:
issues=[]
)
# Return successful analysis
return SubReview(
ragStatus=RagStatus(parsed_result["ragStatus"]),
feedback=parsed_result["feedback"],
issues=parsed_result["issues"]
)
# Build SubReview with optional revision fields
review_kwargs = {
"ragStatus": RagStatus(parsed_result["ragStatus"]),
"feedback": parsed_result["feedback"],
"issues": parsed_result["issues"],
}
if include_revision_fields:
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
return SubReview(**review_kwargs)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")
@ -128,6 +158,7 @@ class GeminiService:
self,
prompt: str,
images: List[Tuple[bytes, str]],
include_revision_fields: bool = False,
) -> SubReview:
"""
Analyze multiple images with Gemini and return a structured SubReview.
@ -137,12 +168,15 @@ class GeminiService:
Args:
prompt: The analysis prompt including reference doc context
images: List of (file_data, mime_type) tuples for each image
include_revision_fields: If True, require revision fields in response schema
Returns:
SubReview with ragStatus, feedback, and issues
SubReview with ragStatus, feedback, and issues.
When include_revision_fields is True, also includes resolvedIssues,
outstandingIssues, and newIssues.
"""
try:
logger.info(f"[GEMINI API] Starting multi-image analysis - {len(images)} images")
logger.info(f"[GEMINI API] Starting multi-image analysis - {len(images)} images, revision_fields: {include_revision_fields}")
# Create inline data parts for all images
file_parts = []
@ -178,6 +212,25 @@ class GeminiService:
"required": ["analysisStatus", "ragStatus", "feedback", "issues"]
}
# Add revision fields if requested
if include_revision_fields:
response_schema["properties"]["resolvedIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that have been resolved in this revision."
}
response_schema["properties"]["outstandingIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "Issues from the previous version that remain unresolved in this revision."
}
response_schema["properties"]["newIssues"] = {
"type": "array",
"items": {"type": "string"},
"description": "New issues introduced in this revision that were not present in the previous version."
}
response_schema["required"].extend(["resolvedIssues", "outstandingIssues", "newIssues"])
# Combine file parts with prompt
contents = file_parts + [prompt]
@ -206,12 +259,19 @@ class GeminiService:
issues=[]
)
# Return successful analysis
return SubReview(
ragStatus=RagStatus(parsed_result["ragStatus"]),
feedback=parsed_result["feedback"],
issues=parsed_result["issues"]
)
# Build SubReview with optional revision fields
review_kwargs = {
"ragStatus": RagStatus(parsed_result["ragStatus"]),
"feedback": parsed_result["feedback"],
"issues": parsed_result["issues"],
}
if include_revision_fields:
review_kwargs["resolvedIssues"] = parsed_result.get("resolvedIssues", [])
review_kwargs["outstandingIssues"] = parsed_result.get("outstandingIssues", [])
review_kwargs["newIssues"] = parsed_result.get("newIssues", [])
return SubReview(**review_kwargs)
except json.JSONDecodeError as e:
logger.error(f"[GEMINI API] JSON parse error: {str(e)}")

View file

@ -81,6 +81,9 @@ async def handle_analyze_message(
"issues": review.issues,
"isFinancialPromotion": review.isFinancialPromotion,
"financialPromotionReason": review.financialPromotionReason,
"resolvedIssues": review.resolvedIssues,
"outstandingIssues": review.outstandingIssues,
"newIssues": review.newIssues,
}
})
@ -88,6 +91,31 @@ async def handle_analyze_message(
brand = data.get("brand", "Barclaycard") # Default to Barclaycard if not specified
logger.info(f"[WEBSOCKET] Brand selection: {brand}")
# Fetch previous analysis if this is a revision
previous_analysis = None
campaign_id = data.get("campaign_id")
proof_name = data.get("proof_name")
if campaign_id and proof_name:
try:
logger.info(f"[WEBSOCKET] Checking for previous analysis - campaign: {campaign_id}, proof: {proof_name}")
async with async_session_factory() as session:
proof_repo = ProofRepository(session)
existing_proof = await proof_repo.get_by_campaign_and_name(
uuid.UUID(campaign_id), proof_name
)
if existing_proof:
previous_analysis = await proof_repo.get_latest_version_review(existing_proof.id)
if previous_analysis:
logger.info(f"[WEBSOCKET] Found previous analysis version {previous_analysis.get('version')}")
else:
logger.info("[WEBSOCKET] No previous analysis found (new proof)")
else:
logger.info("[WEBSOCKET] No existing proof found (new proof)")
except Exception as e:
logger.warning(f"[WEBSOCKET] Failed to fetch previous analysis: {str(e)}")
# Continue without previous analysis - still run the current analysis
# Run the analysis
logger.info("[WEBSOCKET] Starting analysis...")
result, pdf_pages = await analysis_service.analyze_proof(
@ -96,6 +124,7 @@ async def handle_analyze_message(
on_agent_update=on_agent_update,
is_wip=is_wip,
brand=brand,
previous_analysis=previous_analysis,
)
# Build the result dict
@ -106,21 +135,33 @@ async def handle_analyze_message(
"issues": result.legalAgentReview.issues,
"isFinancialPromotion": result.legalAgentReview.isFinancialPromotion,
"financialPromotionReason": result.legalAgentReview.financialPromotionReason,
"resolvedIssues": result.legalAgentReview.resolvedIssues,
"outstandingIssues": result.legalAgentReview.outstandingIssues,
"newIssues": result.legalAgentReview.newIssues,
},
"brandAgentReview": {
"ragStatus": result.brandAgentReview.ragStatus,
"feedback": result.brandAgentReview.feedback,
"issues": result.brandAgentReview.issues,
"resolvedIssues": result.brandAgentReview.resolvedIssues,
"outstandingIssues": result.brandAgentReview.outstandingIssues,
"newIssues": result.brandAgentReview.newIssues,
},
"channelBestPracticesAgentReview": {
"ragStatus": result.channelBestPracticesAgentReview.ragStatus,
"feedback": result.channelBestPracticesAgentReview.feedback,
"issues": result.channelBestPracticesAgentReview.issues,
"resolvedIssues": result.channelBestPracticesAgentReview.resolvedIssues,
"outstandingIssues": result.channelBestPracticesAgentReview.outstandingIssues,
"newIssues": result.channelBestPracticesAgentReview.newIssues,
},
"channelTechSpecsAgentReview": {
"ragStatus": result.channelTechSpecsAgentReview.ragStatus,
"feedback": result.channelTechSpecsAgentReview.feedback,
"issues": result.channelTechSpecsAgentReview.issues,
"resolvedIssues": result.channelTechSpecsAgentReview.resolvedIssues,
"outstandingIssues": result.channelTechSpecsAgentReview.outstandingIssues,
"newIssues": result.channelTechSpecsAgentReview.newIssues,
},
"leadAgentSummary": result.leadAgentSummary,
"overallStatus": result.overallStatus,
@ -130,8 +171,6 @@ async def handle_analyze_message(
# Persist to database if campaign info provided
proof_id: Optional[str] = None
version_id: Optional[str] = None
campaign_id = data.get("campaign_id")
proof_name = data.get("proof_name")
if campaign_id and proof_name:
try:

View file

@ -13,6 +13,10 @@ export interface SubReview {
ragStatus: RagStatus;
feedback: string;
issues: string[];
// Revision-aware fields (populated when analyzing version N > 1)
resolvedIssues?: string[];
outstandingIssues?: string[];
newIssues?: string[];
}
export type OverallStatus = 'Passed' | 'Failed' | 'Analysis Error' | 'Requires Manual Legal Review';