Full-stack application combining LlamaIndex vector search with Neo4j knowledge graph (GraphRAG) for answering queries about Netflix marketing materials. Flask/Hypercorn backend with custom ReAct agent, React frontend. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
133 lines
No EOL
5.9 KiB
Python
133 lines
No EOL
5.9 KiB
Python
# netflix_chatbot/json_utils.py
|
|
import json
|
|
import llama_index
|
|
from llama_index.core.tools import ToolOutput
|
|
from llama_index.core.agent.react.types import (
|
|
ActionReasoningStep,
|
|
ObservationReasoningStep,
|
|
ResponseReasoningStep,
|
|
BaseReasoningStep,
|
|
)
|
|
from llama_index.core.llms import ChatMessage, LLM, ChatResponse as LlamaResponse
|
|
from llama_index.core.base.response.schema import Response
|
|
from flask.json.provider import JSONProvider
|
|
from bson import ObjectId # Import ObjectId if used in responses/data
|
|
from datetime import datetime
|
|
|
|
class CustomJSONEncoder(json.JSONEncoder):
|
|
"""
|
|
Custom JSON Encoder to handle LlamaIndex objects, BSON ObjectId, and other types.
|
|
"""
|
|
def default(self, obj):
|
|
try:
|
|
# Specific LlamaIndex Types
|
|
if isinstance(obj, ToolOutput):
|
|
return {
|
|
'content': str(obj.content) if obj.content is not None else "",
|
|
'tool_name': getattr(obj, 'tool_name', None),
|
|
'raw_output': str(getattr(obj, 'raw_output', None)), # Safely convert raw_output
|
|
'type': 'tool_output',
|
|
'metadata': getattr(obj, 'metadata', {})
|
|
}
|
|
elif isinstance(obj, (llama_index.core.llms.ChatMessage, ChatMessage)):
|
|
return {
|
|
'role': str(obj.role),
|
|
'content': str(obj.content),
|
|
'additional_kwargs': obj.additional_kwargs if hasattr(obj, 'additional_kwargs') else {}
|
|
}
|
|
elif isinstance(obj, (LlamaResponse, Response)):
|
|
return {
|
|
'content': str(getattr(obj, 'response', getattr(obj, 'message', None))),
|
|
'metadata': getattr(obj, 'metadata', {}),
|
|
'type': 'llm_response'
|
|
}
|
|
elif isinstance(obj, ActionReasoningStep):
|
|
return {
|
|
'type': 'action_step',
|
|
'action': obj.action,
|
|
'action_input': obj.action_input, # Should be serializable dict
|
|
'thought': getattr(obj, 'thought', None)
|
|
}
|
|
elif isinstance(obj, ObservationReasoningStep):
|
|
return {
|
|
'type': 'observation_step',
|
|
'observation': str(obj.observation), # Ensure observation is string
|
|
'thought': getattr(obj, 'thought', None)
|
|
}
|
|
elif isinstance(obj, ResponseReasoningStep):
|
|
return {
|
|
'type': 'response_step',
|
|
'response': str(obj.response), # Ensure response is string
|
|
'is_streaming': getattr(obj, 'is_streaming', False),
|
|
'thought': getattr(obj, 'thought', None)
|
|
}
|
|
elif isinstance(obj, BaseReasoningStep): # Catch-all for other steps
|
|
return {
|
|
'type': 'base_reasoning_step',
|
|
'thought': getattr(obj, 'thought', None),
|
|
'is_done': getattr(obj, 'is_done', False),
|
|
}
|
|
# Handle LlamaIndex Document/Node related objects if needed
|
|
elif isinstance(obj, llama_index.core.schema.Document):
|
|
return {
|
|
'doc_id': obj.id_,
|
|
'text_preview': obj.text[:100] + "..." if obj.text else "",
|
|
'metadata': obj.metadata, # Metadata should be serializable
|
|
'type': 'llama_document'
|
|
}
|
|
elif isinstance(obj, llama_index.core.schema.NodeWithScore):
|
|
return {
|
|
'node': self.default(obj.node), # Recursively serialize the node
|
|
'score': obj.score,
|
|
'type': 'node_with_score'
|
|
}
|
|
elif isinstance(obj, llama_index.core.schema.TextNode):
|
|
return {
|
|
'node_id': obj.id_,
|
|
'text_preview': obj.text[:100] + "..." if obj.text else "",
|
|
'metadata': obj.metadata,
|
|
'type': 'text_node'
|
|
}
|
|
|
|
# Common Python Types
|
|
elif isinstance(obj, datetime):
|
|
return obj.isoformat()
|
|
elif isinstance(obj, ObjectId):
|
|
return str(obj)
|
|
elif isinstance(obj, bytes):
|
|
return "<bytes>" # Or encode to base64 if needed
|
|
|
|
# General Fallback for objects with __dict__
|
|
elif hasattr(obj, '__dict__'):
|
|
# Filter out private/callable attributes, be cautious with recursion
|
|
try:
|
|
d = {k: v for k, v in obj.__dict__.items()
|
|
if not k.startswith('_') and not callable(v)}
|
|
# Basic check to prevent deep recursion errors
|
|
if len(d) > 50: # Arbitrary limit
|
|
return f"<Complex object type {type(obj).__name__} with keys: {list(d.keys())[:5]}>"
|
|
return d
|
|
except Exception:
|
|
return f"<Unserializable object type {type(obj).__name__} with __dict__>"
|
|
|
|
# Final fallback using standard JSON encoding
|
|
return super().default(obj)
|
|
|
|
except Exception as e:
|
|
# Log the error? Be careful about logging sensitive data
|
|
# print(f"DEBUG: JSON encoding error for type {type(obj).__name__}: {e}")
|
|
return f"<Unserializable object of type {type(obj).__name__}>"
|
|
|
|
|
|
class CustomJSONProvider(JSONProvider):
|
|
"""
|
|
Flask JSON Provider using the CustomJSONEncoder.
|
|
"""
|
|
def dumps(self, obj, **kwargs):
|
|
kwargs.setdefault('cls', CustomJSONEncoder)
|
|
kwargs.setdefault('ensure_ascii', False) # Often useful for non-English text
|
|
kwargs.setdefault('indent', None) # No indent for production APIs
|
|
return json.dumps(obj, **kwargs)
|
|
|
|
def loads(self, s, **kwargs):
|
|
return json.loads(s, **kwargs) |