Full-stack application combining LlamaIndex vector search with Neo4j knowledge graph (GraphRAG) for answering queries about Netflix marketing materials. Flask/Hypercorn backend with custom ReAct agent, React frontend. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
339 lines
No EOL
14 KiB
Python
339 lines
No EOL
14 KiB
Python
# netflix_chatbot/document_generator.py
|
|
|
|
import io
|
|
import re
|
|
import markdown2
|
|
from bs4 import BeautifulSoup
|
|
from docx import Document
|
|
from docx.shared import Inches, Pt, RGBColor
|
|
from docx.enum.text import WD_ALIGN_PARAGRAPH, WD_BREAK
|
|
from docx.oxml.shared import OxmlElement, qn
|
|
from docx.oxml import parse_xml
|
|
|
|
from utils import log_structured
|
|
|
|
# --- Helper for Horizontal Line ---
|
|
def add_horizontal_line(paragraph):
|
|
"""Adds a horizontal line after the specified paragraph."""
|
|
p = paragraph._p # the lxml element beneath the paragraph
|
|
pPr = p.get_or_add_pPr() # Get or add paragraph properties element
|
|
pBdr = OxmlElement('w:pBdr') # Create paragraph border element
|
|
# Add a bottom border
|
|
bottom_bdr = OxmlElement('w:bottom')
|
|
bottom_bdr.set(qn('w:val'), 'single') # Border style
|
|
bottom_bdr.set(qn('w:sz'), '6') # Border size (in 1/8 points)
|
|
bottom_bdr.set(qn('w:space'), '1') # Space between text and border
|
|
bottom_bdr.set(qn('w:color'), 'auto') # Border color
|
|
pBdr.append(bottom_bdr)
|
|
pPr.append(pBdr)
|
|
|
|
# --- Inline Markdown to DOCX Run Formatting ---
|
|
def process_inline_formatting(paragraph, text):
|
|
"""
|
|
Processes simple inline markdown (bold, italic, code) within text
|
|
and adds formatted runs to the paragraph.
|
|
Handles nested formatting cautiously.
|
|
"""
|
|
# Regex to find **bold**, *italic*, _italic_, `code` segments
|
|
# It captures the marker and the content separately.
|
|
pattern = r'(\*\*|`|\*|_)(.*?)(\1)'
|
|
last_end = 0
|
|
|
|
for match in re.finditer(pattern, text):
|
|
start, end = match.span()
|
|
marker = match.group(1)
|
|
content = match.group(2)
|
|
|
|
# Add preceding text if any
|
|
if start > last_end:
|
|
paragraph.add_run(text[last_end:start])
|
|
|
|
# Add formatted run
|
|
run = paragraph.add_run(content)
|
|
if marker == '**':
|
|
run.bold = True
|
|
elif marker == '*' or marker == '_':
|
|
run.italic = True
|
|
elif marker == '`':
|
|
run.font.name = 'Courier New'
|
|
# run.font.size = Pt(10) # Optional: Set size for code
|
|
|
|
last_end = end
|
|
|
|
# Add any remaining text after the last match
|
|
if last_end < len(text):
|
|
paragraph.add_run(text[last_end:])
|
|
|
|
|
|
# --- HTML to DOCX Conversion ---
|
|
def convert_html_to_docx(doc: Document, html_content: str):
|
|
"""
|
|
Converts basic HTML content (from markdown conversion) to Word elements.
|
|
Handles common tags like paragraphs, headings, lists, bold, italic, code.
|
|
"""
|
|
# Pre-process HTML slightly for cleaner parsing
|
|
html_content = re.sub(r'\s*\n\s*', '\n', html_content).strip() # Normalize whitespace
|
|
html_content = f"<body>{html_content}</body>" # Wrap in body for better parsing
|
|
soup = BeautifulSoup(html_content, 'html.parser')
|
|
|
|
# Recursive function to handle elements
|
|
def process_element(element, current_paragraph=None, current_style=None, in_list=False):
|
|
# Skip NavigableString if it's just whitespace or newline outside pre
|
|
if isinstance(element, str):
|
|
text = str(element).strip('\n') # Keep internal spaces, strip leading/trailing newlines
|
|
if text: # Only add if there's actual content
|
|
if current_paragraph:
|
|
run = current_paragraph.add_run(text)
|
|
if current_style:
|
|
if 'bold' in current_style: run.bold = True
|
|
if 'italic' in current_style: run.italic = True
|
|
if 'code' in current_style: run.font.name = 'Courier New'
|
|
else:
|
|
# Text outside paragraph usually means an error or whitespace
|
|
# log_structured('debug', f"Orphan text node found: '{text[:50]}...'")
|
|
pass # Or create a default paragraph: doc.add_paragraph(text)
|
|
return
|
|
|
|
# --- Block Level Elements ---
|
|
if element.name in ['p', 'div']:
|
|
# Avoid creating paragraphs for empty containers unless they contain <br>
|
|
text_content = element.get_text(strip=True)
|
|
has_br = element.find('br')
|
|
if text_content or has_br:
|
|
para = doc.add_paragraph()
|
|
# Apply list indentation if necessary (though lists handle their own paras)
|
|
# if in_list: para.paragraph_format.left_indent = Inches(0.5)
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in element.children:
|
|
process_element(child, para, new_style, in_list)
|
|
# else: skip empty p/div
|
|
|
|
elif element.name in ['h1', 'h2', 'h3', 'h4', 'h5', 'h6']:
|
|
try:
|
|
level = int(element.name[1])
|
|
heading = doc.add_heading(level=level)
|
|
# Process children for inline formatting within heading
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in element.children:
|
|
process_element(child, heading, new_style, in_list)
|
|
# If no children processed (just text), add it directly
|
|
if not heading.runs:
|
|
heading.add_run(element.get_text(strip=True))
|
|
except ValueError: pass # Should not happen with h1-h6
|
|
|
|
elif element.name == 'ul':
|
|
for li in element.find_all('li', recursive=False):
|
|
# Each li gets its own paragraph with bullet style
|
|
para = doc.add_paragraph(style='List Bullet')
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in li.children:
|
|
process_element(child, para, new_style, in_list=True)
|
|
# If li was empty or only contained whitespace
|
|
if not para.text.strip():
|
|
para.text = "" # Ensure empty bullet point exists
|
|
|
|
elif element.name == 'ol':
|
|
# Numbering is handled by the 'List Number' style
|
|
for li in element.find_all('li', recursive=False):
|
|
para = doc.add_paragraph(style='List Number')
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in li.children:
|
|
process_element(child, para, new_style, in_list=True)
|
|
if not para.text.strip():
|
|
para.text = "" # Ensure empty numbered item exists
|
|
|
|
# Note: 'li' is handled within 'ul'/'ol' processing.
|
|
|
|
elif element.name == 'pre':
|
|
# Often contains a 'code' element, handle that
|
|
code_tag = element.find('code')
|
|
content = code_tag.get_text() if code_tag else element.get_text()
|
|
if content.strip():
|
|
para = doc.add_paragraph(style='CodeStyle') # Requires 'CodeStyle' to be defined
|
|
# Preserve whitespace more carefully for <pre>
|
|
run = para.add_run(content.strip('\n')) # Strip outer newlines only
|
|
run.font.name = 'Courier New'
|
|
# run.font.size = Pt(10)
|
|
|
|
elif element.name == 'blockquote':
|
|
para = doc.add_paragraph(style='Quote') # Requires 'Quote' style
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in element.children:
|
|
process_element(child, para, new_style, in_list)
|
|
|
|
elif element.name == 'hr':
|
|
para = doc.add_paragraph()
|
|
add_horizontal_line(para)
|
|
|
|
elif element.name == 'br':
|
|
if current_paragraph:
|
|
current_paragraph.add_run().add_break() # Add line break within paragraph
|
|
|
|
|
|
# --- Inline Elements ---
|
|
elif element.name in ['strong', 'b']:
|
|
new_style = current_style.copy() if current_style else set()
|
|
new_style.add('bold')
|
|
for child in element.children:
|
|
process_element(child, current_paragraph, new_style, in_list)
|
|
|
|
elif element.name in ['em', 'i']:
|
|
new_style = current_style.copy() if current_style else set()
|
|
new_style.add('italic')
|
|
for child in element.children:
|
|
process_element(child, current_paragraph, new_style, in_list)
|
|
|
|
elif element.name == 'code':
|
|
# Handle inline code - assumes it's within a paragraph already
|
|
if current_paragraph:
|
|
text = element.get_text()
|
|
if text:
|
|
run = current_paragraph.add_run(text)
|
|
run.font.name = 'Courier New'
|
|
# Add specific inline code style if desired
|
|
else:
|
|
# Code tag not within a paragraph? Create one.
|
|
para = doc.add_paragraph(style='CodeStyle')
|
|
run = para.add_run(element.get_text())
|
|
run.font.name = 'Courier New'
|
|
|
|
|
|
elif element.name == 'a':
|
|
# Add hyperlink if possible, otherwise just text
|
|
text = element.get_text(strip=True)
|
|
href = element.get('href')
|
|
if current_paragraph and text:
|
|
# python-docx doesn't have direct hyperlink support easily added here.
|
|
# Simplest: add text with underline and blue color.
|
|
run = current_paragraph.add_run(text)
|
|
run.underline = True
|
|
run.font.color.rgb = RGBColor(0x05, 0x63, 0xC1) # Standard link blue
|
|
# For actual hyperlinks, more complex XML manipulation is needed.
|
|
|
|
|
|
# --- Body/Other Tags: Process children ---
|
|
elif element.name in ['body', 'span', 'div']: # Treat span/div mostly as containers
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in element.children:
|
|
process_element(child, current_paragraph, new_style, in_list)
|
|
|
|
# --- Ignored Tags ---
|
|
elif element.name in ['script', 'style', 'head', 'meta', 'title']:
|
|
pass # Ignore these tags and their content
|
|
|
|
else:
|
|
# Unknown tag: try to process its children if it's a container,
|
|
# or add its text content if it's inline-like.
|
|
log_structured('warning', f"Unhandled HTML tag encountered: <{element.name}>", {'content_preview': element.get_text(strip=True)[:50]})
|
|
# Default behavior: process children recursively
|
|
new_style = current_style.copy() if current_style else set()
|
|
for child in element.children:
|
|
process_element(child, current_paragraph, new_style, in_list)
|
|
|
|
|
|
# Start processing from the top-level elements within the parsed body
|
|
body = soup.find('body')
|
|
if body:
|
|
for element in body.children:
|
|
process_element(element)
|
|
|
|
|
|
# --- Main Markdown to DOCX Function ---
|
|
def create_brief_docx(brief_content_markdown: str) -> io.BytesIO:
|
|
"""
|
|
Creates a Word document (.docx) in memory from markdown content.
|
|
|
|
Args:
|
|
brief_content_markdown: The markdown string content.
|
|
|
|
Returns:
|
|
An io.BytesIO buffer containing the Word document.
|
|
"""
|
|
doc = Document()
|
|
|
|
# --- Define Styles (Optional but recommended) ---
|
|
styles = doc.styles
|
|
# Normal style
|
|
style = styles['Normal']
|
|
font = style.font
|
|
font.name = 'Calibri' # Or Netflix specific font like 'Graphik' if installed
|
|
font.size = Pt(11)
|
|
|
|
# Code style (example)
|
|
try:
|
|
code_style = styles.add_style('CodeStyle', 1) # 1 for paragraph style
|
|
code_style.font.name = 'Courier New'
|
|
code_style.font.size = Pt(10)
|
|
# Prevent spell check for code blocks
|
|
code_style.element.rPr.rFonts.set(qn('w:ascii'), 'Courier New')
|
|
code_style.element.rPr.rFonts.set(qn('w:hAnsi'), 'Courier New')
|
|
# code_style.element.xpath('./w:rPr/w:lang')[0].set(qn('w:noProof'), '1') # Requires lxml maybe
|
|
p_fmt = code_style.paragraph_format
|
|
p_fmt.space_before = Pt(6)
|
|
p_fmt.space_after = Pt(6)
|
|
except ValueError:
|
|
log_structured('warning', "'CodeStyle' already exists. Using existing.")
|
|
code_style = styles['CodeStyle'] # Use existing if it fails to add
|
|
|
|
# Quote style (example)
|
|
try:
|
|
quote_style = styles.add_style('QuoteStyle', 1)
|
|
quote_style.font.italic = True
|
|
quote_style.paragraph_format.left_indent = Inches(0.5)
|
|
quote_style.paragraph_format.space_before = Pt(6)
|
|
quote_style.paragraph_format.space_after = Pt(6)
|
|
except ValueError:
|
|
log_structured('warning', "'QuoteStyle' already exists. Using existing.")
|
|
quote_style = styles['QuoteStyle']
|
|
|
|
|
|
# --- Document Header ---
|
|
title = doc.add_heading('Marketing Brief', 0)
|
|
title.alignment = WD_ALIGN_PARAGRAPH.CENTER
|
|
|
|
date_para = doc.add_paragraph()
|
|
date_para.alignment = WD_ALIGN_PARAGRAPH.RIGHT
|
|
date_run = date_para.add_run(datetime.now().strftime("%B %d, %Y"))
|
|
date_run.italic = True
|
|
# Add some space after date
|
|
date_para.paragraph_format.space_after = Pt(12)
|
|
|
|
# Add a horizontal line separator
|
|
hr_para = doc.add_paragraph()
|
|
add_horizontal_line(hr_para)
|
|
hr_para.paragraph_format.space_after = Pt(18) # Space after the line
|
|
|
|
|
|
# --- Convert Markdown to HTML ---
|
|
# Using markdown2 with recommended extras for broad compatibility
|
|
extras = [
|
|
"tables", "fenced-code-blocks", "header-ids", "footnotes",
|
|
"task_list", "code-friendly", "cuddled-lists", "markdown-in-html",
|
|
"strike", "spoiler", "target-blank-links", "smarty-pants" # Added smarty-pants
|
|
]
|
|
html_content = markdown2.markdown(brief_content_markdown, extras=extras)
|
|
|
|
log_structured('debug', 'Converted markdown to HTML for DOCX generation', {
|
|
'md_preview': brief_content_markdown[:200],
|
|
'html_preview': html_content[:300]
|
|
})
|
|
|
|
# --- Convert HTML to Word Document Elements ---
|
|
try:
|
|
convert_html_to_docx(doc, html_content)
|
|
except Exception as conversion_err:
|
|
log_structured('error', "Error during HTML to DOCX conversion", {
|
|
'error': str(conversion_err),
|
|
'traceback': traceback.format_exc()
|
|
})
|
|
# Add error message to the document itself
|
|
doc.add_paragraph("Error: Could not fully convert content from HTML to DOCX.", style='Emphasis')
|
|
doc.add_paragraph(str(conversion_err))
|
|
|
|
|
|
# --- Save to Buffer ---
|
|
doc_buffer = io.BytesIO()
|
|
doc.save(doc_buffer)
|
|
doc_buffer.seek(0)
|
|
|
|
return doc_buffer |