baic_dashboard/backend/app.py
Vadym Samoilenko 0165622f3d Fix date range filter ignoring messages without Timestamp
The filter was using fail-open logic (skipping the date check when
msg.Timestamp was falsy), so messages missing a Timestamp bypassed
the filter entirely. Per-User Report aggregates all filtered messages,
making the bug visible as all-time totals regardless of selected range.

Changed both the message and conversation date filter blocks to
fail-closed: when a date range is active, records without a parseable
timestamp are excluded. Also added a backend warning log to surface
any enriched messages that still lack a Timestamp after the
StartTime fallback (commit 421961a).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-10 14:58:57 +01:00

364 lines
No EOL
15 KiB
Python

import requests
import os
import json
import logging
import jwt
from functools import wraps
from flask import Flask, jsonify, request
from flask_cors import CORS
# Configure logging to show debug information temporarily
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Create Flask application
app = Flask(__name__)
CORS(app) # Enable CORS for all routes
# Disable Flask's default logging
app.logger.disabled = True
log = logging.getLogger('werkzeug')
log.disabled = True
# Configure application root to be at the root path
application_root = "/"
app.config["APPLICATION_ROOT"] = application_root
MAKE_WEBHOOK_URL = os.environ.get("MAKE_WEBHOOK_URL", "https://hook.eu1.make.celonis.com/h8gjldwnp4u5cvc0io474zq4u7vwb9zw")
# Azure AD configuration
TENANT_ID = os.environ.get("AZURE_TENANT_ID", "e519c2e6-bc6d-4fdf-8d9c-923c2f002385")
CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "014547f2-21c1-4245-881f-97f49543d963")
JWKS_URL = f"https://login.microsoftonline.com/{TENANT_ID}/discovery/v2.0/keys"
def get_public_keys():
"""
Fetch public keys from Microsoft's JWKS endpoint
"""
try:
response = requests.get(JWKS_URL, timeout=10)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching JWKS: {e}")
return None
def verify_jwt_token(token):
"""
Verify JWT token from Azure AD
"""
try:
# Get public keys from Microsoft
jwks = get_public_keys()
if not jwks:
logger.error("Unable to fetch JWKS public keys")
return False, "Unable to fetch public keys"
# Decode token header to get kid
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get('kid')
algorithm = unverified_header.get('alg')
logger.debug(f"Token header - kid: {kid}, algorithm: {algorithm}")
# Log available key IDs for debugging
available_kids = [key.get('kid') for key in jwks.get('keys', [])]
logger.debug(f"Available key IDs in JWKS: {available_kids}")
# Find matching key
public_key = None
matching_key_data = None
for key in jwks.get('keys', []):
if key.get('kid') == kid:
try:
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
matching_key_data = key
logger.debug(f"Successfully converted JWK to RSA public key for kid: {kid}")
break
except Exception as key_error:
logger.error(f"Failed to convert JWK to RSA key for kid {kid}: {key_error}")
logger.debug(f"Problematic key data: {key}")
continue
if not public_key:
logger.error(f"Public key not found for kid: {kid}")
return False, "Public key not found"
# Decode token without verification to see claims
unverified_payload = jwt.decode(token, options={"verify_signature": False})
logger.debug(f"Token audience: {unverified_payload.get('aud')}")
logger.debug(f"Token issuer: {unverified_payload.get('iss')}")
logger.debug(f"Expected audience: {CLIENT_ID}")
logger.debug(f"Expected issuer: https://login.microsoftonline.com/{TENANT_ID}/v2.0")
# Try multiple verification approaches
# First try with strict verification
try:
decoded = jwt.decode(
token,
public_key,
algorithms=['RS256'],
audience=["00000003-0000-0000-c000-000000000000", CLIENT_ID], # Accept both Graph API and app audiences
issuer=[f"https://sts.windows.net/{TENANT_ID}/", f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"], # Accept both issuer formats
options={"verify_signature": True, "verify_aud": True, "verify_iss": True}
)
logger.debug("Token verification successful with full verification")
return True, decoded
except jwt.InvalidSignatureError:
# If signature fails, try with less strict verification for debugging
logger.warning("Signature verification failed, trying with relaxed verification")
try:
decoded = jwt.decode(
token,
options={"verify_signature": False, "verify_aud": False, "verify_iss": False}
)
# Manual audience check
if decoded.get('aud') in ["00000003-0000-0000-c000-000000000000", CLIENT_ID]:
logger.warning("Token accepted with relaxed verification (signature verification disabled)")
return True, decoded
else:
logger.error(f"Token audience {decoded.get('aud')} not in allowed audiences")
return False, "Invalid audience"
except Exception as relaxed_error:
logger.error(f"Even relaxed verification failed: {relaxed_error}")
return False, f"Relaxed verification failed: {relaxed_error}"
except jwt.InvalidAudienceError as e:
logger.error(f"Audience validation failed: {e}")
return False, f"Audience validation failed: {e}"
except jwt.InvalidIssuerError as e:
logger.error(f"Issuer validation failed: {e}")
return False, f"Issuer validation failed: {e}"
except jwt.InvalidSignatureError as e:
logger.error(f"Signature validation failed: {e}")
logger.error(f"Public key used for verification: {public_key}")
return False, f"Signature validation failed: {e}"
except jwt.ExpiredSignatureError as e:
logger.error(f"Token expired: {e}")
return False, f"Token expired: {e}"
except jwt.InvalidKeyError as e:
logger.error(f"Invalid key error: {e}")
return False, f"Invalid key error: {e}"
except jwt.InvalidTokenError as e:
logger.warning(f"Invalid token: {e}")
logger.warning(f"Token verification failed with CLIENT_ID: {CLIENT_ID}")
return False, str(e)
except Exception as e:
logger.error(f"Token verification error: {e}")
return False, str(e)
def require_auth(f):
"""
Decorator to require authentication for routes
"""
@wraps(f)
def decorated_function(*args, **kwargs):
# Check for Authorization header
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'error': 'Authorization header missing'}), 401
# Extract token from "Bearer <token>"
try:
token = auth_header.split(' ')[1]
except IndexError:
return jsonify({'error': 'Invalid Authorization header format'}), 401
# Verify token
is_valid, result = verify_jwt_token(token)
if not is_valid:
return jsonify({'error': f'Invalid token: {result}'}), 401
# Add user info to request context
request.current_user = result
return f(*args, **kwargs)
return decorated_function
def fetch_from_make(data_type):
"""
Fetch data from the Make.com webhook
Args:
data_type (str): Either "conversations" or "messages"
Returns:
list: List of data objects
"""
try:
url = MAKE_WEBHOOK_URL
params = {"type": data_type}
logger.debug(f"Fetching data from: {url} with params: {params}")
response = requests.get(url, params=params, timeout=30)
response.raise_for_status() # Raise an exception for HTTP errors
data = response.json()
logger.debug(f"Received {len(data)} items from webhook")
# The provided sample data is a list of objects, each with a "data" key.
# We should extract the value of "data" from each item.
result = [item.get('data', {}) for item in data]
return result
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching {data_type}: {e}")
return []
@app.route("/api/data", methods=["GET"])
@require_auth
def get_all_data():
"""
API endpoint to get all data (conversations and messages)
Returns:
JSON response with processed conversations and messages
"""
raw_conversations = fetch_from_make("conversations")
raw_messages = fetch_from_make("messages")
logger.debug(f"Fetched {len(raw_conversations)} conversations and {len(raw_messages)} messages")
# Debug sample message data logging to check for timestamp fields
if logger.level <= logging.DEBUG and raw_messages:
sample_msg = raw_messages[0].copy()
# Mask sensitive data for logging
if 'User_ID' in sample_msg:
sample_msg['User_ID'] = sample_msg['User_ID'][:5] + '...'
if 'Content' in sample_msg:
sample_msg['Content'] = sample_msg['Content'][:20] + '...' if len(sample_msg['Content']) > 20 else sample_msg['Content']
logger.debug(f"Sample raw message from webhook: {json.dumps(sample_msg, indent=2)}")
logger.debug(f"Raw message field names: {list(sample_msg.keys())}")
# Log Assistant_ID field presence in conversations
assistant_id_counts = {
"present": sum(1 for conv in raw_conversations if "Assistant_ID" in conv and conv["Assistant_ID"]),
"missing": sum(1 for conv in raw_conversations if "Assistant_ID" not in conv or not conv["Assistant_ID"])
}
# Warn if there are missing Assistant_IDs
if assistant_id_counts["missing"] > 0:
logger.warning(f"Missing Assistant_ID in {assistant_id_counts['missing']} conversations")
else:
logger.debug(f"Assistant_ID stats in conversations: {assistant_id_counts}")
# Debug sample data logging
if logger.level <= logging.DEBUG and raw_conversations:
sample_conv = raw_conversations[0].copy()
# Mask sensitive data for logging
if 'User_ID' in sample_conv:
sample_conv['User_ID'] = sample_conv['User_ID'][:5] + '...'
logger.debug(f"Sample conversation: {json.dumps(sample_conv, indent=2)}")
logger.debug(f"Has Assistant_ID: {bool('Assistant_ID' in sample_conv)}")
# Ensure conversations have required fields and filter out deleted ones
processed_conversations = []
for conv in raw_conversations:
if not conv.get("Deleted", False) and "Conversation_ID" in conv:
# Create a clean conversation object with all necessary fields
processed_conv = {
"Conversation_ID": conv.get("Conversation_ID"),
"User_ID": conv.get("User_ID"),
"StartTime": conv.get("StartTime"),
"EndTime": conv.get("EndTime"),
"Title": conv.get("Title"),
"Brand Voice Setting": conv.get("Brand Voice Setting"),
"Assistant_ID": conv.get("Assistant_ID"), # Ensure this field is always present
"Thread_ID": conv.get("Thread_ID"),
"Assistant_Key": conv.get("Assistant_Key")
}
processed_conversations.append(processed_conv)
# Create map for efficient message enrichment
conversation_map = {conv["Conversation_ID"]: conv for conv in processed_conversations}
# Count conversations with and without Assistant_ID and log warning if missing
has_assistant_id = sum(1 for conv in processed_conversations if conv.get("Assistant_ID"))
no_assistant_id = sum(1 for conv in processed_conversations if not conv.get("Assistant_ID"))
if no_assistant_id > 0:
logger.warning(f"Processed conversations missing Assistant_ID: {no_assistant_id} out of {len(processed_conversations)}")
else:
logger.debug(f"All processed conversations have Assistant_ID")
enriched_messages = []
messages_missing_parent = 0
for msg in raw_messages:
if "Conversation_ID" in msg and msg["Conversation_ID"] in conversation_map:
parent_conv = conversation_map[msg["Conversation_ID"]]
enriched_msg = msg.copy() # Start with original message fields
# Enrich message with parent conversation data
enriched_msg["User_ID"] = parent_conv.get("User_ID")
# CRITICAL: Ensure Assistant_ID is properly propagated
assistant_id = parent_conv.get("Assistant_ID")
enriched_msg["Assistant_ID"] = assistant_id
enriched_msg["Brand_Voice_Setting"] = parent_conv.get("Brand Voice Setting")
# Add timestamp to message - use message's native timestamp if available,
# otherwise fallback to parent conversation's StartTime
enriched_msg["Timestamp"] = msg.get("Timestamp") or parent_conv.get("StartTime")
enriched_messages.append(enriched_msg)
else:
messages_missing_parent += 1
# Log warning if messages can't be matched to conversations
if messages_missing_parent > 0:
logger.warning(f"Found {messages_missing_parent} messages that could not be linked to a conversation")
# Log warning if any messages ended up without a Timestamp after enrichment
msg_no_timestamp = sum(1 for m in enriched_messages if not m.get("Timestamp"))
if msg_no_timestamp:
logger.warning(f"Messages missing Timestamp after enrichment: {msg_no_timestamp} / {len(enriched_messages)}")
# Log warning if messages are missing Assistant_ID
msg_has_assistant_id = sum(1 for msg in enriched_messages if msg.get("Assistant_ID"))
msg_no_assistant_id = sum(1 for msg in enriched_messages if not msg.get("Assistant_ID"))
if msg_no_assistant_id > 0:
logger.warning(f"Messages missing Assistant_ID after enrichment: {msg_no_assistant_id} out of {len(enriched_messages)}")
# Log completion information
logger.debug(f"Returning {len(processed_conversations)} processed conversations and {len(enriched_messages)} enriched messages")
return jsonify({
"conversations": processed_conversations,
"messages": enriched_messages
})
if __name__ == "__main__":
# Configure a simple access log for requests
@app.before_request
def log_request_info():
if request.path != '/favicon.ico': # Skip favicon requests
logger.debug(f"Request: {request.method} {request.path}")
# Configure after-request handler to log errors
@app.after_request
def log_response_info(response):
if response.status_code >= 400:
logger.warning(f"Response: {response.status_code}")
return response
# Import Hypercorn components
from hypercorn.config import Config
from hypercorn.asyncio import serve
import asyncio
# Configure Hypercorn
config = Config()
port = os.environ.get("BACKEND_PORT", "5001")
config.bind = [f"0.0.0.0:{port}"] # Port for backend
config.use_reloader = False
config.root_path = application_root
# Run the application with Hypercorn
logger.warning(f"Starting Oliver Agency Reporting Backend with Hypercorn on port {port}")
asyncio.run(serve(app, config))