netflix/json_utils.py
michael 236d1ddbd8 Initial commit: Netflix GraphRAG marketing chatbot
Full-stack application combining LlamaIndex vector search with Neo4j
knowledge graph (GraphRAG) for answering queries about Netflix marketing
materials. Flask/Hypercorn backend with custom ReAct agent, React frontend.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-23 10:28:33 -06:00

133 lines
No EOL
5.9 KiB
Python

# netflix_chatbot/json_utils.py
import json
import llama_index
from llama_index.core.tools import ToolOutput
from llama_index.core.agent.react.types import (
ActionReasoningStep,
ObservationReasoningStep,
ResponseReasoningStep,
BaseReasoningStep,
)
from llama_index.core.llms import ChatMessage, LLM, ChatResponse as LlamaResponse
from llama_index.core.base.response.schema import Response
from flask.json.provider import JSONProvider
from bson import ObjectId # Import ObjectId if used in responses/data
from datetime import datetime
class CustomJSONEncoder(json.JSONEncoder):
"""
Custom JSON Encoder to handle LlamaIndex objects, BSON ObjectId, and other types.
"""
def default(self, obj):
try:
# Specific LlamaIndex Types
if isinstance(obj, ToolOutput):
return {
'content': str(obj.content) if obj.content is not None else "",
'tool_name': getattr(obj, 'tool_name', None),
'raw_output': str(getattr(obj, 'raw_output', None)), # Safely convert raw_output
'type': 'tool_output',
'metadata': getattr(obj, 'metadata', {})
}
elif isinstance(obj, (llama_index.core.llms.ChatMessage, ChatMessage)):
return {
'role': str(obj.role),
'content': str(obj.content),
'additional_kwargs': obj.additional_kwargs if hasattr(obj, 'additional_kwargs') else {}
}
elif isinstance(obj, (LlamaResponse, Response)):
return {
'content': str(getattr(obj, 'response', getattr(obj, 'message', None))),
'metadata': getattr(obj, 'metadata', {}),
'type': 'llm_response'
}
elif isinstance(obj, ActionReasoningStep):
return {
'type': 'action_step',
'action': obj.action,
'action_input': obj.action_input, # Should be serializable dict
'thought': getattr(obj, 'thought', None)
}
elif isinstance(obj, ObservationReasoningStep):
return {
'type': 'observation_step',
'observation': str(obj.observation), # Ensure observation is string
'thought': getattr(obj, 'thought', None)
}
elif isinstance(obj, ResponseReasoningStep):
return {
'type': 'response_step',
'response': str(obj.response), # Ensure response is string
'is_streaming': getattr(obj, 'is_streaming', False),
'thought': getattr(obj, 'thought', None)
}
elif isinstance(obj, BaseReasoningStep): # Catch-all for other steps
return {
'type': 'base_reasoning_step',
'thought': getattr(obj, 'thought', None),
'is_done': getattr(obj, 'is_done', False),
}
# Handle LlamaIndex Document/Node related objects if needed
elif isinstance(obj, llama_index.core.schema.Document):
return {
'doc_id': obj.id_,
'text_preview': obj.text[:100] + "..." if obj.text else "",
'metadata': obj.metadata, # Metadata should be serializable
'type': 'llama_document'
}
elif isinstance(obj, llama_index.core.schema.NodeWithScore):
return {
'node': self.default(obj.node), # Recursively serialize the node
'score': obj.score,
'type': 'node_with_score'
}
elif isinstance(obj, llama_index.core.schema.TextNode):
return {
'node_id': obj.id_,
'text_preview': obj.text[:100] + "..." if obj.text else "",
'metadata': obj.metadata,
'type': 'text_node'
}
# Common Python Types
elif isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, bytes):
return "<bytes>" # Or encode to base64 if needed
# General Fallback for objects with __dict__
elif hasattr(obj, '__dict__'):
# Filter out private/callable attributes, be cautious with recursion
try:
d = {k: v for k, v in obj.__dict__.items()
if not k.startswith('_') and not callable(v)}
# Basic check to prevent deep recursion errors
if len(d) > 50: # Arbitrary limit
return f"<Complex object type {type(obj).__name__} with keys: {list(d.keys())[:5]}>"
return d
except Exception:
return f"<Unserializable object type {type(obj).__name__} with __dict__>"
# Final fallback using standard JSON encoding
return super().default(obj)
except Exception as e:
# Log the error? Be careful about logging sensitive data
# print(f"DEBUG: JSON encoding error for type {type(obj).__name__}: {e}")
return f"<Unserializable object of type {type(obj).__name__}>"
class CustomJSONProvider(JSONProvider):
"""
Flask JSON Provider using the CustomJSONEncoder.
"""
def dumps(self, obj, **kwargs):
kwargs.setdefault('cls', CustomJSONEncoder)
kwargs.setdefault('ensure_ascii', False) # Often useful for non-English text
kwargs.setdefault('indent', None) # No indent for production APIs
return json.dumps(obj, **kwargs)
def loads(self, s, **kwargs):
return json.loads(s, **kwargs)