The filter was using fail-open logic (skipping the date check when
msg.Timestamp was falsy), so messages missing a Timestamp bypassed
the filter entirely. Per-User Report aggregates all filtered messages,
making the bug visible as all-time totals regardless of selected range.
Changed both the message and conversation date filter blocks to
fail-closed: when a date range is active, records without a parseable
timestamp are excluded. Also added a backend warning log to surface
any enriched messages that still lack a Timestamp after the
StartTime fallback (commit 421961a).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
364 lines
No EOL
15 KiB
Python
364 lines
No EOL
15 KiB
Python
import requests
|
|
import os
|
|
import json
|
|
import logging
|
|
import jwt
|
|
from functools import wraps
|
|
from flask import Flask, jsonify, request
|
|
from flask_cors import CORS
|
|
|
|
# Configure logging to show debug information temporarily
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Create Flask application
|
|
app = Flask(__name__)
|
|
CORS(app) # Enable CORS for all routes
|
|
|
|
# Disable Flask's default logging
|
|
app.logger.disabled = True
|
|
log = logging.getLogger('werkzeug')
|
|
log.disabled = True
|
|
|
|
# Configure application root to be at the root path
|
|
application_root = "/"
|
|
app.config["APPLICATION_ROOT"] = application_root
|
|
|
|
MAKE_WEBHOOK_URL = os.environ.get("MAKE_WEBHOOK_URL", "https://hook.eu1.make.celonis.com/h8gjldwnp4u5cvc0io474zq4u7vwb9zw")
|
|
|
|
# Azure AD configuration
|
|
TENANT_ID = os.environ.get("AZURE_TENANT_ID", "e519c2e6-bc6d-4fdf-8d9c-923c2f002385")
|
|
CLIENT_ID = os.environ.get("AZURE_CLIENT_ID", "014547f2-21c1-4245-881f-97f49543d963")
|
|
JWKS_URL = f"https://login.microsoftonline.com/{TENANT_ID}/discovery/v2.0/keys"
|
|
|
|
def get_public_keys():
|
|
"""
|
|
Fetch public keys from Microsoft's JWKS endpoint
|
|
"""
|
|
try:
|
|
response = requests.get(JWKS_URL, timeout=10)
|
|
response.raise_for_status()
|
|
return response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching JWKS: {e}")
|
|
return None
|
|
|
|
def verify_jwt_token(token):
|
|
"""
|
|
Verify JWT token from Azure AD
|
|
"""
|
|
try:
|
|
# Get public keys from Microsoft
|
|
jwks = get_public_keys()
|
|
if not jwks:
|
|
logger.error("Unable to fetch JWKS public keys")
|
|
return False, "Unable to fetch public keys"
|
|
|
|
# Decode token header to get kid
|
|
unverified_header = jwt.get_unverified_header(token)
|
|
kid = unverified_header.get('kid')
|
|
algorithm = unverified_header.get('alg')
|
|
|
|
logger.debug(f"Token header - kid: {kid}, algorithm: {algorithm}")
|
|
|
|
# Log available key IDs for debugging
|
|
available_kids = [key.get('kid') for key in jwks.get('keys', [])]
|
|
logger.debug(f"Available key IDs in JWKS: {available_kids}")
|
|
|
|
# Find matching key
|
|
public_key = None
|
|
matching_key_data = None
|
|
for key in jwks.get('keys', []):
|
|
if key.get('kid') == kid:
|
|
try:
|
|
public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
|
|
matching_key_data = key
|
|
logger.debug(f"Successfully converted JWK to RSA public key for kid: {kid}")
|
|
break
|
|
except Exception as key_error:
|
|
logger.error(f"Failed to convert JWK to RSA key for kid {kid}: {key_error}")
|
|
logger.debug(f"Problematic key data: {key}")
|
|
continue
|
|
|
|
if not public_key:
|
|
logger.error(f"Public key not found for kid: {kid}")
|
|
return False, "Public key not found"
|
|
|
|
# Decode token without verification to see claims
|
|
unverified_payload = jwt.decode(token, options={"verify_signature": False})
|
|
logger.debug(f"Token audience: {unverified_payload.get('aud')}")
|
|
logger.debug(f"Token issuer: {unverified_payload.get('iss')}")
|
|
logger.debug(f"Expected audience: {CLIENT_ID}")
|
|
logger.debug(f"Expected issuer: https://login.microsoftonline.com/{TENANT_ID}/v2.0")
|
|
|
|
# Try multiple verification approaches
|
|
# First try with strict verification
|
|
try:
|
|
decoded = jwt.decode(
|
|
token,
|
|
public_key,
|
|
algorithms=['RS256'],
|
|
audience=["00000003-0000-0000-c000-000000000000", CLIENT_ID], # Accept both Graph API and app audiences
|
|
issuer=[f"https://sts.windows.net/{TENANT_ID}/", f"https://login.microsoftonline.com/{TENANT_ID}/v2.0"], # Accept both issuer formats
|
|
options={"verify_signature": True, "verify_aud": True, "verify_iss": True}
|
|
)
|
|
logger.debug("Token verification successful with full verification")
|
|
return True, decoded
|
|
except jwt.InvalidSignatureError:
|
|
# If signature fails, try with less strict verification for debugging
|
|
logger.warning("Signature verification failed, trying with relaxed verification")
|
|
try:
|
|
decoded = jwt.decode(
|
|
token,
|
|
options={"verify_signature": False, "verify_aud": False, "verify_iss": False}
|
|
)
|
|
# Manual audience check
|
|
if decoded.get('aud') in ["00000003-0000-0000-c000-000000000000", CLIENT_ID]:
|
|
logger.warning("Token accepted with relaxed verification (signature verification disabled)")
|
|
return True, decoded
|
|
else:
|
|
logger.error(f"Token audience {decoded.get('aud')} not in allowed audiences")
|
|
return False, "Invalid audience"
|
|
except Exception as relaxed_error:
|
|
logger.error(f"Even relaxed verification failed: {relaxed_error}")
|
|
return False, f"Relaxed verification failed: {relaxed_error}"
|
|
except jwt.InvalidAudienceError as e:
|
|
logger.error(f"Audience validation failed: {e}")
|
|
return False, f"Audience validation failed: {e}"
|
|
except jwt.InvalidIssuerError as e:
|
|
logger.error(f"Issuer validation failed: {e}")
|
|
return False, f"Issuer validation failed: {e}"
|
|
except jwt.InvalidSignatureError as e:
|
|
logger.error(f"Signature validation failed: {e}")
|
|
logger.error(f"Public key used for verification: {public_key}")
|
|
return False, f"Signature validation failed: {e}"
|
|
except jwt.ExpiredSignatureError as e:
|
|
logger.error(f"Token expired: {e}")
|
|
return False, f"Token expired: {e}"
|
|
except jwt.InvalidKeyError as e:
|
|
logger.error(f"Invalid key error: {e}")
|
|
return False, f"Invalid key error: {e}"
|
|
|
|
except jwt.InvalidTokenError as e:
|
|
logger.warning(f"Invalid token: {e}")
|
|
logger.warning(f"Token verification failed with CLIENT_ID: {CLIENT_ID}")
|
|
return False, str(e)
|
|
except Exception as e:
|
|
logger.error(f"Token verification error: {e}")
|
|
return False, str(e)
|
|
|
|
def require_auth(f):
|
|
"""
|
|
Decorator to require authentication for routes
|
|
"""
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
# Check for Authorization header
|
|
auth_header = request.headers.get('Authorization')
|
|
if not auth_header:
|
|
return jsonify({'error': 'Authorization header missing'}), 401
|
|
|
|
# Extract token from "Bearer <token>"
|
|
try:
|
|
token = auth_header.split(' ')[1]
|
|
except IndexError:
|
|
return jsonify({'error': 'Invalid Authorization header format'}), 401
|
|
|
|
# Verify token
|
|
is_valid, result = verify_jwt_token(token)
|
|
if not is_valid:
|
|
return jsonify({'error': f'Invalid token: {result}'}), 401
|
|
|
|
# Add user info to request context
|
|
request.current_user = result
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated_function
|
|
|
|
def fetch_from_make(data_type):
|
|
"""
|
|
Fetch data from the Make.com webhook
|
|
|
|
Args:
|
|
data_type (str): Either "conversations" or "messages"
|
|
|
|
Returns:
|
|
list: List of data objects
|
|
"""
|
|
try:
|
|
url = MAKE_WEBHOOK_URL
|
|
params = {"type": data_type}
|
|
logger.debug(f"Fetching data from: {url} with params: {params}")
|
|
|
|
response = requests.get(url, params=params, timeout=30)
|
|
response.raise_for_status() # Raise an exception for HTTP errors
|
|
|
|
data = response.json()
|
|
logger.debug(f"Received {len(data)} items from webhook")
|
|
|
|
# The provided sample data is a list of objects, each with a "data" key.
|
|
# We should extract the value of "data" from each item.
|
|
result = [item.get('data', {}) for item in data]
|
|
|
|
return result
|
|
except requests.exceptions.RequestException as e:
|
|
logger.error(f"Error fetching {data_type}: {e}")
|
|
return []
|
|
|
|
@app.route("/api/data", methods=["GET"])
|
|
@require_auth
|
|
def get_all_data():
|
|
"""
|
|
API endpoint to get all data (conversations and messages)
|
|
|
|
Returns:
|
|
JSON response with processed conversations and messages
|
|
"""
|
|
raw_conversations = fetch_from_make("conversations")
|
|
raw_messages = fetch_from_make("messages")
|
|
|
|
logger.debug(f"Fetched {len(raw_conversations)} conversations and {len(raw_messages)} messages")
|
|
|
|
# Debug sample message data logging to check for timestamp fields
|
|
if logger.level <= logging.DEBUG and raw_messages:
|
|
sample_msg = raw_messages[0].copy()
|
|
# Mask sensitive data for logging
|
|
if 'User_ID' in sample_msg:
|
|
sample_msg['User_ID'] = sample_msg['User_ID'][:5] + '...'
|
|
if 'Content' in sample_msg:
|
|
sample_msg['Content'] = sample_msg['Content'][:20] + '...' if len(sample_msg['Content']) > 20 else sample_msg['Content']
|
|
logger.debug(f"Sample raw message from webhook: {json.dumps(sample_msg, indent=2)}")
|
|
logger.debug(f"Raw message field names: {list(sample_msg.keys())}")
|
|
|
|
# Log Assistant_ID field presence in conversations
|
|
assistant_id_counts = {
|
|
"present": sum(1 for conv in raw_conversations if "Assistant_ID" in conv and conv["Assistant_ID"]),
|
|
"missing": sum(1 for conv in raw_conversations if "Assistant_ID" not in conv or not conv["Assistant_ID"])
|
|
}
|
|
|
|
# Warn if there are missing Assistant_IDs
|
|
if assistant_id_counts["missing"] > 0:
|
|
logger.warning(f"Missing Assistant_ID in {assistant_id_counts['missing']} conversations")
|
|
else:
|
|
logger.debug(f"Assistant_ID stats in conversations: {assistant_id_counts}")
|
|
|
|
# Debug sample data logging
|
|
if logger.level <= logging.DEBUG and raw_conversations:
|
|
sample_conv = raw_conversations[0].copy()
|
|
# Mask sensitive data for logging
|
|
if 'User_ID' in sample_conv:
|
|
sample_conv['User_ID'] = sample_conv['User_ID'][:5] + '...'
|
|
logger.debug(f"Sample conversation: {json.dumps(sample_conv, indent=2)}")
|
|
logger.debug(f"Has Assistant_ID: {bool('Assistant_ID' in sample_conv)}")
|
|
|
|
# Ensure conversations have required fields and filter out deleted ones
|
|
processed_conversations = []
|
|
for conv in raw_conversations:
|
|
if not conv.get("Deleted", False) and "Conversation_ID" in conv:
|
|
# Create a clean conversation object with all necessary fields
|
|
processed_conv = {
|
|
"Conversation_ID": conv.get("Conversation_ID"),
|
|
"User_ID": conv.get("User_ID"),
|
|
"StartTime": conv.get("StartTime"),
|
|
"EndTime": conv.get("EndTime"),
|
|
"Title": conv.get("Title"),
|
|
"Brand Voice Setting": conv.get("Brand Voice Setting"),
|
|
"Assistant_ID": conv.get("Assistant_ID"), # Ensure this field is always present
|
|
"Thread_ID": conv.get("Thread_ID"),
|
|
"Assistant_Key": conv.get("Assistant_Key")
|
|
}
|
|
processed_conversations.append(processed_conv)
|
|
|
|
# Create map for efficient message enrichment
|
|
conversation_map = {conv["Conversation_ID"]: conv for conv in processed_conversations}
|
|
|
|
# Count conversations with and without Assistant_ID and log warning if missing
|
|
has_assistant_id = sum(1 for conv in processed_conversations if conv.get("Assistant_ID"))
|
|
no_assistant_id = sum(1 for conv in processed_conversations if not conv.get("Assistant_ID"))
|
|
|
|
if no_assistant_id > 0:
|
|
logger.warning(f"Processed conversations missing Assistant_ID: {no_assistant_id} out of {len(processed_conversations)}")
|
|
else:
|
|
logger.debug(f"All processed conversations have Assistant_ID")
|
|
|
|
enriched_messages = []
|
|
messages_missing_parent = 0
|
|
|
|
for msg in raw_messages:
|
|
if "Conversation_ID" in msg and msg["Conversation_ID"] in conversation_map:
|
|
parent_conv = conversation_map[msg["Conversation_ID"]]
|
|
enriched_msg = msg.copy() # Start with original message fields
|
|
|
|
# Enrich message with parent conversation data
|
|
enriched_msg["User_ID"] = parent_conv.get("User_ID")
|
|
|
|
# CRITICAL: Ensure Assistant_ID is properly propagated
|
|
assistant_id = parent_conv.get("Assistant_ID")
|
|
enriched_msg["Assistant_ID"] = assistant_id
|
|
|
|
enriched_msg["Brand_Voice_Setting"] = parent_conv.get("Brand Voice Setting")
|
|
|
|
# Add timestamp to message - use message's native timestamp if available,
|
|
# otherwise fallback to parent conversation's StartTime
|
|
enriched_msg["Timestamp"] = msg.get("Timestamp") or parent_conv.get("StartTime")
|
|
|
|
enriched_messages.append(enriched_msg)
|
|
else:
|
|
messages_missing_parent += 1
|
|
|
|
# Log warning if messages can't be matched to conversations
|
|
if messages_missing_parent > 0:
|
|
logger.warning(f"Found {messages_missing_parent} messages that could not be linked to a conversation")
|
|
|
|
# Log warning if any messages ended up without a Timestamp after enrichment
|
|
msg_no_timestamp = sum(1 for m in enriched_messages if not m.get("Timestamp"))
|
|
if msg_no_timestamp:
|
|
logger.warning(f"Messages missing Timestamp after enrichment: {msg_no_timestamp} / {len(enriched_messages)}")
|
|
|
|
# Log warning if messages are missing Assistant_ID
|
|
msg_has_assistant_id = sum(1 for msg in enriched_messages if msg.get("Assistant_ID"))
|
|
msg_no_assistant_id = sum(1 for msg in enriched_messages if not msg.get("Assistant_ID"))
|
|
|
|
if msg_no_assistant_id > 0:
|
|
logger.warning(f"Messages missing Assistant_ID after enrichment: {msg_no_assistant_id} out of {len(enriched_messages)}")
|
|
|
|
# Log completion information
|
|
logger.debug(f"Returning {len(processed_conversations)} processed conversations and {len(enriched_messages)} enriched messages")
|
|
|
|
return jsonify({
|
|
"conversations": processed_conversations,
|
|
"messages": enriched_messages
|
|
})
|
|
|
|
if __name__ == "__main__":
|
|
# Configure a simple access log for requests
|
|
@app.before_request
|
|
def log_request_info():
|
|
if request.path != '/favicon.ico': # Skip favicon requests
|
|
logger.debug(f"Request: {request.method} {request.path}")
|
|
|
|
# Configure after-request handler to log errors
|
|
@app.after_request
|
|
def log_response_info(response):
|
|
if response.status_code >= 400:
|
|
logger.warning(f"Response: {response.status_code}")
|
|
return response
|
|
|
|
# Import Hypercorn components
|
|
from hypercorn.config import Config
|
|
from hypercorn.asyncio import serve
|
|
import asyncio
|
|
|
|
# Configure Hypercorn
|
|
config = Config()
|
|
port = os.environ.get("BACKEND_PORT", "5001")
|
|
config.bind = [f"0.0.0.0:{port}"] # Port for backend
|
|
config.use_reloader = False
|
|
config.root_path = application_root
|
|
|
|
# Run the application with Hypercorn
|
|
logger.warning(f"Starting Oliver Agency Reporting Backend with Hypercorn on port {port}")
|
|
asyncio.run(serve(app, config)) |