Issue 1 – Bar charts blank/lines only: - Silent fall-through on unsupported chart_types (donut, stacked_bar, area) now raises ValueError instead of producing axes-only output - Zero-width bars on duplicate/single dates fixed via sorted-gap calculation - Donut chart type added (ring with percentage labels) - Pie/donut routing now triggers on any() instead of all() Issue 2 – Axis controls not applying: - AxisSpec gains date_min/date_max (x-axis clamping via prompts) - y-bounds no longer silently widened when user sets min_val/max_val - Tick clamping: ticks outside user range are dropped not widened - New dual_y_axis layout with independent left/right Y-axes and y_axis_side per series - Endpoint Y-axis labels (min/max) always render even when spacing is tight Issue 3+4 – Font fallback & InDesign compatibility: - Replace CairoSVG with Playwright/headless Chromium for PNG and PDF export - Chromium honours @font-face base64 data URIs → Roboto Condensed in all exports - PDF output contains embedded TTF subsets and real text operators (selectable in InDesign/Illustrator, no path-outlining, consistent across regions) - FastAPI lifespan manages persistent Playwright browser instance Issue 5 – Stroke weight drift: - All stroke_width values now carry explicit "px" unit suffix - SVG root gets width="…px" height="…px" so 1 SVG px = 0.75 PDF pt exactly AI improvements: - Prompts document date_min/date_max, scale_kind, dual_y_axis, donut - Rule 9 softened: user-specified ranges are honoured even if they crop data - Refinement uses deep-merge so tick_interval/min_val/date_min are never accidentally reset to None when Claude modifies unrelated fields - New donut few-shot example added Library upgrades: anthropic 0.84→0.97, fastapi 0.135→0.136, pandas 3.0.1→3.0.2, pydantic 2.12→2.13, uvicorn 0.41→0.46; cairosvg removed, playwright 1.58.0 added. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
543 lines
19 KiB
Python
543 lines
19 KiB
Python
"""Main rendering engine: ChartSpec + DataFrames -> SVG string."""
|
|
|
|
from __future__ import annotations
|
|
import base64
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import drawsvg as draw
|
|
import pandas as pd
|
|
|
|
from app.config import FONTS_DIR
|
|
from app.models.chart_spec import ChartSpec, PanelSpec, SeriesSpec
|
|
from app.models.style import COLORS, LAYOUT
|
|
from app.renderer.layout import compute_layout, PanelBounds
|
|
from app.renderer.scale import LinearScale, DateScale, CategoricalScale, nice_ticks, nice_date_ticks
|
|
from app.renderer.axes import render_y_axis, render_x_axis, render_x_axis_categorical
|
|
from app.renderer.series import render_line_series, render_shaded_fill, render_bar_series
|
|
from app.renderer.legend import render_legend
|
|
from app.renderer.typography import render_title, render_subtitle
|
|
from app.renderer.annotations import render_ellipse
|
|
from app.renderer.pie import render_pie_series
|
|
|
|
|
|
def render_chart(spec: ChartSpec, data: dict[str, pd.DataFrame]) -> str:
|
|
"""Render a ChartSpec into an SVG string."""
|
|
has_pie = bool(spec.panels) and all(
|
|
s.chart_type in ("pie", "donut")
|
|
for panel in spec.panels
|
|
for s in panel.series
|
|
)
|
|
|
|
from app.renderer.legend import legend_row_count
|
|
from app.models.style import FONTS as _FONTS
|
|
_margin = LAYOUT["margins"]["top"]
|
|
_layout_key = spec.layout if spec.layout in LAYOUT else "single"
|
|
_canvas_w = LAYOUT[_layout_key]["width"]
|
|
_est_panel_w = _canvas_w - 2 * _margin - 120 - 30
|
|
if spec.layout == "dual_panel":
|
|
_est_panel_w = (_est_panel_w - LAYOUT["panel_gap"]) / 2
|
|
|
|
_max_legend_rows = max(
|
|
(legend_row_count(p.series, 0, _est_panel_w) for p in spec.panels),
|
|
default=1,
|
|
)
|
|
_has_subtitle = any(bool(p.subtitle) for p in spec.panels)
|
|
_TITLE_H = _FONTS["title"]["size"]
|
|
_SUBTITLE_H = _FONTS["subtitle"]["size"]
|
|
_LEGEND_ROW_H = _FONTS["legend"]["size"] + 12
|
|
_GAP = 10
|
|
_pad_top = (
|
|
15 + _TITLE_H + _GAP
|
|
+ (_SUBTITLE_H + _GAP if _has_subtitle else 0)
|
|
+ _max_legend_rows * _LEGEND_ROW_H
|
|
+ _GAP
|
|
)
|
|
|
|
canvas_w, canvas_h, panel_bounds = compute_layout(
|
|
spec.layout, vertical_legend=has_pie, pad_top=_pad_top
|
|
)
|
|
|
|
# Explicit pixel units so Chromium's SVG→PDF conversion uses 1px = 0.75pt exactly
|
|
d = draw.Drawing(canvas_w, canvas_h, id_prefix="c")
|
|
d.set_pixel_scale(1)
|
|
|
|
_embed_fonts(d)
|
|
d.append(draw.Rectangle(0, 0, canvas_w, canvas_h, fill=COLORS["background"]))
|
|
|
|
for i, panel_spec in enumerate(spec.panels):
|
|
if i >= len(panel_bounds):
|
|
break
|
|
bounds = panel_bounds[i]
|
|
panel_key = f"_panel_{i}"
|
|
panel_data = {panel_key: data[panel_key]} if panel_key in data else data
|
|
_render_panel(d, panel_spec, bounds, panel_data, layout=spec.layout)
|
|
|
|
svg = d.as_svg()
|
|
|
|
# Inject explicit width/height units into <svg> root so Chromium treats 1 unit = 1px
|
|
svg = svg.replace(
|
|
f'width="{canvas_w}" height="{canvas_h}"',
|
|
f'width="{canvas_w}px" height="{canvas_h}px"',
|
|
1,
|
|
)
|
|
return svg
|
|
|
|
|
|
def _embed_fonts(d: draw.Drawing):
|
|
"""Embed Roboto Condensed fonts as base64 @font-face — honored by Chromium."""
|
|
font_css = ""
|
|
font_entries = [
|
|
("Roboto Condensed", FONTS_DIR / "RobotoCondensed-Light.ttf", "300"),
|
|
("Roboto Condensed", FONTS_DIR / "RobotoCondensed-Regular.ttf", "normal"),
|
|
("Roboto Condensed", FONTS_DIR / "RobotoCondensed-Bold.ttf", "bold"),
|
|
]
|
|
for family_name, font_path, weight in font_entries:
|
|
if font_path.exists():
|
|
with open(font_path, "rb") as f:
|
|
b64 = base64.b64encode(f.read()).decode("ascii")
|
|
font_css += (
|
|
f"@font-face {{"
|
|
f"font-family:'{family_name}';"
|
|
f"src:url('data:font/truetype;base64,{b64}') format('truetype');"
|
|
f"font-weight:{weight};font-style:normal;"
|
|
f"}}"
|
|
)
|
|
if font_css:
|
|
d.append(draw.Raw(f"<defs><style type='text/css'>{font_css}</style></defs>"))
|
|
|
|
|
|
def _render_pie_panel(
|
|
d: draw.Drawing,
|
|
panel: PanelSpec,
|
|
bounds: PanelBounds,
|
|
data: dict[str, pd.DataFrame],
|
|
):
|
|
"""Render a pie/donut panel."""
|
|
df = _resolve_dataframe(data)
|
|
if df is None or df.empty:
|
|
return
|
|
|
|
palette = COLORS["palette"]
|
|
labels, values, colors = [], [], []
|
|
is_donut = any(s.chart_type == "donut" for s in panel.series)
|
|
|
|
for idx, s in enumerate(panel.series):
|
|
col = _find_column(df, s.data_column)
|
|
if col is None:
|
|
continue
|
|
numeric = pd.to_numeric(df[col], errors="coerce").dropna()
|
|
if numeric.empty:
|
|
continue
|
|
labels.append(s.label)
|
|
values.append(float(numeric.iloc[-1]))
|
|
colors.append(palette[(s.color_index if s.color_index is not None else idx) % len(palette)])
|
|
|
|
if not labels:
|
|
return
|
|
|
|
cx = (bounds.left + bounds.right) / 2
|
|
cy = (bounds.top + bounds.bottom) / 2
|
|
radius = min(bounds.right - bounds.left, bounds.bottom - bounds.top) / 2 * 0.85
|
|
|
|
render_pie_series(d, labels, values, colors, cx, cy, radius,
|
|
inner_radius_ratio=0.6 if is_donut else 0.0)
|
|
|
|
render_title(d, panel.title, bounds.left, bounds.top - 130)
|
|
if panel.subtitle:
|
|
render_subtitle(d, panel.subtitle, bounds.left, bounds.top - 80)
|
|
|
|
render_legend(d, panel.series, bounds.left, bounds.right, bounds.top, bounds.bottom, mode="vertical")
|
|
|
|
|
|
def _render_panel(
|
|
d: draw.Drawing,
|
|
panel: PanelSpec,
|
|
bounds: PanelBounds,
|
|
data: dict[str, pd.DataFrame],
|
|
layout: str = "single",
|
|
):
|
|
"""Render a single chart panel."""
|
|
# Route pie/donut to dedicated renderer
|
|
if panel.series and all(s.chart_type in ("pie", "donut") for s in panel.series):
|
|
_render_pie_panel(d, panel, bounds, data)
|
|
return
|
|
|
|
df = _resolve_dataframe(data)
|
|
if df is None or df.empty:
|
|
return
|
|
|
|
use_categorical = panel.x_axis.scale_kind == "category"
|
|
|
|
# Build x-axis scale
|
|
if use_categorical:
|
|
x_scale, x_categories, date_min, date_max = _build_categorical_scale(df, panel, bounds)
|
|
if x_scale is None:
|
|
return
|
|
else:
|
|
x_scale, date_min, date_max = _build_date_scale(df, panel, bounds)
|
|
if x_scale is None:
|
|
return
|
|
x_categories = None
|
|
|
|
# Collect Y values
|
|
all_y_values: list[float] = []
|
|
series_data: dict[str, dict] = {}
|
|
palette = COLORS["palette"]
|
|
|
|
if not use_categorical:
|
|
date_col = _find_date_column(df)
|
|
x_vals_shared = pd.to_datetime(df[date_col]).tolist() if date_col else []
|
|
else:
|
|
x_vals_shared = x_categories
|
|
|
|
for idx, s in enumerate(panel.series):
|
|
col = _find_column(df, s.data_column)
|
|
if col is None:
|
|
continue
|
|
y_vals = pd.to_numeric(df[col], errors="coerce")
|
|
x_vals = x_vals_shared
|
|
series_data[s.label] = {
|
|
"x": x_vals,
|
|
"y": y_vals.tolist(),
|
|
"spec": s,
|
|
"color": palette[(s.color_index if s.color_index is not None else idx) % len(palette)],
|
|
}
|
|
valid = y_vals.dropna()
|
|
if not valid.empty:
|
|
all_y_values.extend(valid.tolist())
|
|
|
|
if not all_y_values:
|
|
return
|
|
|
|
if layout == "dual_y_axis":
|
|
_render_dual_y_panel_body(d, panel, bounds, series_data, x_scale, date_min, date_max,
|
|
use_categorical, x_categories)
|
|
else:
|
|
_render_standard_panel_body(d, panel, bounds, series_data, all_y_values, x_scale,
|
|
date_min, date_max, use_categorical)
|
|
|
|
|
|
def _build_date_scale(df, panel, bounds):
|
|
"""Build DateScale honouring spec date_min/date_max. Returns (scale, date_min, date_max) or (None,…)."""
|
|
date_col = _find_date_column(df)
|
|
if date_col is None:
|
|
return None, None, None
|
|
|
|
dates = pd.to_datetime(df[date_col])
|
|
valid_dates = dates.dropna()
|
|
|
|
date_min = valid_dates.min().to_pydatetime()
|
|
date_max = valid_dates.max().to_pydatetime()
|
|
|
|
if panel.x_axis.date_min:
|
|
try:
|
|
date_min = pd.to_datetime(panel.x_axis.date_min).to_pydatetime()
|
|
except Exception:
|
|
pass
|
|
if panel.x_axis.date_max:
|
|
try:
|
|
date_max = pd.to_datetime(panel.x_axis.date_max).to_pydatetime()
|
|
except Exception:
|
|
pass
|
|
|
|
x_scale = DateScale(date_min, date_max, bounds.left, bounds.right)
|
|
return x_scale, date_min, date_max
|
|
|
|
|
|
def _build_categorical_scale(df, panel, bounds):
|
|
"""Build CategoricalScale from first non-date column. Returns (scale, categories, None, None)."""
|
|
date_col = _find_date_column(df)
|
|
cat_col = None
|
|
for col in df.columns:
|
|
if col != date_col:
|
|
cat_col = col
|
|
break
|
|
if cat_col is None:
|
|
cat_col = df.columns[0]
|
|
|
|
categories = [str(v) for v in df[cat_col].dropna().tolist()]
|
|
if not categories:
|
|
return None, None, None, None
|
|
|
|
x_scale = CategoricalScale(categories, bounds.left, bounds.right)
|
|
return x_scale, categories, None, None
|
|
|
|
|
|
def _compute_y_axis(panel, all_y_values):
|
|
"""Compute y_min, y_max, y_ticks respecting user-set bounds."""
|
|
user_min = panel.y_axis.min_val
|
|
user_max = panel.y_axis.max_val
|
|
|
|
y_min = user_min if user_min is not None else min(all_y_values)
|
|
y_max = user_max if user_max is not None else max(all_y_values)
|
|
|
|
if user_min is None or user_max is None:
|
|
y_range = y_max - y_min
|
|
if y_range == 0:
|
|
y_range = abs(y_min) or 1
|
|
if user_min is None:
|
|
y_min -= y_range * 0.05
|
|
if user_max is None:
|
|
y_max += y_range * 0.05
|
|
|
|
if panel.y_axis.tick_interval and panel.y_axis.tick_interval > 0:
|
|
interval = panel.y_axis.tick_interval
|
|
start = y_min
|
|
stop = y_max
|
|
y_ticks = []
|
|
val = start
|
|
while val <= stop + interval * 0.01:
|
|
y_ticks.append(round(val, 10))
|
|
val += interval
|
|
else:
|
|
y_ticks = nice_ticks(y_min, y_max)
|
|
|
|
# Only widen auto bounds, never override user-set bounds
|
|
if y_ticks:
|
|
if user_min is None:
|
|
y_min = min(y_min, y_ticks[0])
|
|
if user_max is None:
|
|
y_max = max(y_max, y_ticks[-1])
|
|
|
|
# Clamp ticks to [y_min, y_max] so no tick plots outside the user range
|
|
y_ticks = [t for t in y_ticks if y_min - 1e-9 <= t <= y_max + 1e-9]
|
|
|
|
return y_min, y_max, y_ticks
|
|
|
|
|
|
def _render_standard_panel_body(
|
|
d, panel, bounds, series_data, all_y_values, x_scale, date_min, date_max, use_categorical
|
|
):
|
|
y_min, y_max, y_ticks = _compute_y_axis(panel, all_y_values)
|
|
y_scale = LinearScale(y_min, y_max, bounds.bottom, bounds.top)
|
|
|
|
render_y_axis(
|
|
d, y_scale,
|
|
plot_left=bounds.left, plot_right=bounds.right,
|
|
ticks=y_ticks, suffix=panel.y_axis.suffix or "", label=panel.y_axis.label,
|
|
)
|
|
|
|
if use_categorical:
|
|
render_x_axis_categorical(d, x_scale, bounds.bottom)
|
|
else:
|
|
date_ticks = nice_date_ticks(date_min, date_max)
|
|
render_x_axis(d, x_scale, bounds.bottom, ticks=date_ticks, date_format=panel.x_axis.date_format)
|
|
|
|
_render_clip_and_series(d, panel, bounds, series_data, x_scale, y_scale, date_min, date_max, y_min, y_max)
|
|
_render_labels(d, panel, bounds)
|
|
|
|
|
|
def _render_dual_y_panel_body(
|
|
d, panel, bounds, series_data, x_scale, date_min, date_max, use_categorical, x_categories
|
|
):
|
|
"""Render a panel with two independent Y-axes (left primary, right secondary)."""
|
|
sec_spec = panel.secondary_y_axis or panel.y_axis
|
|
|
|
primary_values = [
|
|
v
|
|
for label, sd in series_data.items()
|
|
if sd["spec"].y_axis_side == "primary"
|
|
for v in sd["y"]
|
|
if v is not None and not (isinstance(v, float) and v != v)
|
|
]
|
|
secondary_values = [
|
|
v
|
|
for label, sd in series_data.items()
|
|
if sd["spec"].y_axis_side == "secondary"
|
|
for v in sd["y"]
|
|
if v is not None and not (isinstance(v, float) and v != v)
|
|
]
|
|
|
|
if not primary_values:
|
|
primary_values = [v for sd in series_data.values() for v in sd["y"] if v is not None]
|
|
if not secondary_values:
|
|
secondary_values = primary_values
|
|
|
|
primary_panel = panel.model_copy(update={"y_axis": panel.y_axis})
|
|
secondary_panel = panel.model_copy(update={"y_axis": sec_spec})
|
|
|
|
y_min_p, y_max_p, y_ticks_p = _compute_y_axis(primary_panel, primary_values)
|
|
y_min_s, y_max_s, y_ticks_s = _compute_y_axis(secondary_panel, secondary_values)
|
|
|
|
y_scale_primary = LinearScale(y_min_p, y_max_p, bounds.bottom, bounds.top)
|
|
y_scale_secondary = LinearScale(y_min_s, y_max_s, bounds.bottom, bounds.top)
|
|
|
|
# Primary (left) axis draws gridlines; secondary (right) does not
|
|
render_y_axis(
|
|
d, y_scale_primary,
|
|
plot_left=bounds.left, plot_right=bounds.right,
|
|
ticks=y_ticks_p, suffix=panel.y_axis.suffix or "", label=panel.y_axis.label,
|
|
side="left",
|
|
)
|
|
render_y_axis(
|
|
d, y_scale_secondary,
|
|
plot_left=bounds.left, plot_right=bounds.right,
|
|
ticks=y_ticks_s, suffix=sec_spec.suffix or "", label=sec_spec.label,
|
|
side="right",
|
|
)
|
|
|
|
if use_categorical:
|
|
render_x_axis_categorical(d, x_scale, bounds.bottom)
|
|
else:
|
|
date_ticks = nice_date_ticks(date_min, date_max)
|
|
render_x_axis(d, x_scale, bounds.bottom, ticks=date_ticks, date_format=panel.x_axis.date_format)
|
|
|
|
clip_id = f"plot-clip-{int(bounds.left)}-{int(bounds.top)}"
|
|
d.append(draw.Raw(
|
|
f'<defs><clipPath id="{clip_id}">'
|
|
f'<rect x="{bounds.left}" y="{bounds.top}" '
|
|
f'width="{bounds.width}" height="{bounds.height}"/>'
|
|
f'</clipPath></defs>'
|
|
))
|
|
clipped = draw.Group(clip_path=f"url(#{clip_id})")
|
|
|
|
for label, sd in series_data.items():
|
|
s = sd["spec"]
|
|
y_scale = y_scale_secondary if s.y_axis_side == "secondary" else y_scale_primary
|
|
_dispatch_series(clipped, s, sd, x_scale, y_scale)
|
|
|
|
d.append(clipped)
|
|
_render_labels(d, panel, bounds)
|
|
|
|
|
|
def _render_clip_and_series(d, panel, bounds, series_data, x_scale, y_scale, date_min, date_max, y_min, y_max):
|
|
clip_id = f"plot-clip-{int(bounds.left)}-{int(bounds.top)}"
|
|
d.append(draw.Raw(
|
|
f'<defs><clipPath id="{clip_id}">'
|
|
f'<rect x="{bounds.left}" y="{bounds.top}" '
|
|
f'width="{bounds.width}" height="{bounds.height}"/>'
|
|
f'</clipPath></defs>'
|
|
))
|
|
clipped = draw.Group(clip_path=f"url(#{clip_id})")
|
|
|
|
# Shaded fills first (behind lines)
|
|
for label, sd in series_data.items():
|
|
s = sd["spec"]
|
|
if s.shaded_fill:
|
|
ref_label = s.shaded_fill.reference_series
|
|
if ref_label in series_data:
|
|
ref_sd = series_data[ref_label]
|
|
min_len = min(len(sd["x"]), len(sd["y"]), len(ref_sd["y"]))
|
|
render_shaded_fill(
|
|
clipped, sd["x"][:min_len], sd["y"][:min_len], ref_sd["y"][:min_len],
|
|
x_scale, y_scale,
|
|
above_color=s.shaded_fill.above_color,
|
|
below_color=s.shaded_fill.below_color,
|
|
)
|
|
|
|
# Annotations (behind series)
|
|
for ann in panel.annotations:
|
|
if ann.type == "ellipse" and date_min is not None:
|
|
try:
|
|
x_start = pd.to_datetime(ann.x_start).to_pydatetime() if ann.x_start else date_min
|
|
x_end = pd.to_datetime(ann.x_end).to_pydatetime() if ann.x_end else date_max
|
|
y_start = ann.y_start if ann.y_start is not None else y_min
|
|
y_end = ann.y_end if ann.y_end is not None else y_max
|
|
render_ellipse(clipped, x_scale, y_scale, x_start, x_end, y_start, y_end)
|
|
except Exception:
|
|
pass
|
|
|
|
# Series
|
|
for label, sd in series_data.items():
|
|
_dispatch_series(clipped, sd["spec"], sd, x_scale, y_scale)
|
|
|
|
d.append(clipped)
|
|
|
|
|
|
def _dispatch_series(d, s: SeriesSpec, sd: dict, x_scale, y_scale):
|
|
"""Dispatch a single series to the correct renderer. Raises on unsupported types."""
|
|
if s.chart_type == "line":
|
|
render_line_series(
|
|
d, sd["x"], sd["y"], x_scale, y_scale,
|
|
color=sd["color"], line_style=s.line_style, line_weight=s.line_weight,
|
|
)
|
|
elif s.chart_type == "bar":
|
|
render_bar_series(d, sd["x"], sd["y"], x_scale, y_scale, color=sd["color"])
|
|
elif s.chart_type in ("stacked_bar", "area", "pie", "donut"):
|
|
# These are either handled upstream (pie/donut) or not yet implemented
|
|
raise ValueError(
|
|
f"chart_type='{s.chart_type}' for series '{s.label}' cannot be rendered in a standard panel. "
|
|
f"Use a dedicated panel with all series of the same type."
|
|
)
|
|
else:
|
|
raise ValueError(f"Unsupported chart_type '{s.chart_type}' for series '{s.label}'.")
|
|
|
|
|
|
def _render_labels(d, panel, bounds):
|
|
"""Render title, subtitle, and horizontal legend above the plot area."""
|
|
from app.renderer.legend import legend_row_count
|
|
from app.models.style import FONTS as _FL
|
|
_TITLE_H = _FL["title"]["size"]
|
|
_SUBTITLE_H = _FL["subtitle"]["size"]
|
|
_LEGEND_ROW_H = _FL["legend"]["size"] + 12
|
|
_GAP = 10
|
|
|
|
n_legend_rows = legend_row_count(panel.series, bounds.left, bounds.right)
|
|
_TEXT_HALF = _FL["legend"]["size"] // 2
|
|
|
|
legend_bottom_row_center_y = bounds.top - _GAP - _TEXT_HALF
|
|
legend_block_top_y = legend_bottom_row_center_y - (n_legend_rows - 1) * _LEGEND_ROW_H - _TEXT_HALF
|
|
|
|
if panel.subtitle:
|
|
subtitle_y = legend_block_top_y - _GAP - _SUBTITLE_H
|
|
title_y = subtitle_y - _GAP - _TITLE_H
|
|
else:
|
|
subtitle_y = None
|
|
title_y = legend_block_top_y - _GAP - _TITLE_H
|
|
|
|
render_title(d, panel.title, bounds.left, title_y)
|
|
if panel.subtitle:
|
|
render_subtitle(d, panel.subtitle, bounds.left, subtitle_y)
|
|
|
|
render_legend(
|
|
d, panel.series, bounds.left, bounds.right, bounds.top, bounds.bottom,
|
|
mode="horizontal", legend_bottom_y=legend_bottom_row_center_y,
|
|
)
|
|
|
|
|
|
def _resolve_dataframe(data: dict[str, pd.DataFrame]) -> pd.DataFrame | None:
|
|
if "_default" in data:
|
|
return data["_default"]
|
|
if data:
|
|
return next(iter(data.values()))
|
|
return None
|
|
|
|
|
|
def _find_date_column(df: pd.DataFrame) -> str | None:
|
|
_DATE_EXACT = {
|
|
"date", "dates", "time", "timestamp", "period", "month", "quarter",
|
|
"year", "as of", "as_of", "report_date", "reporting_date",
|
|
"fiscal_year", "observation_date", "obs_date",
|
|
}
|
|
for col in df.columns:
|
|
if str(col).lower() in _DATE_EXACT:
|
|
return col
|
|
for col in df.columns:
|
|
col_lower = str(col).lower()
|
|
if "date" in col_lower or "time" in col_lower:
|
|
return col
|
|
for col in df.columns:
|
|
if df[col].dtype == "datetime64[ns]":
|
|
return col
|
|
first_col = df.columns[0]
|
|
try:
|
|
parsed = pd.to_datetime(df[first_col].head(10), errors="coerce")
|
|
if parsed.notna().sum() >= 5:
|
|
return first_col
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _find_column(df: pd.DataFrame, name: str) -> str | None:
|
|
if name in df.columns:
|
|
return name
|
|
lower_map = {c.lower(): c for c in df.columns}
|
|
if name.lower() in lower_map:
|
|
return lower_map[name.lower()]
|
|
import difflib
|
|
matches = difflib.get_close_matches(name.lower(), [c.lower() for c in df.columns], n=1, cutoff=0.6)
|
|
if matches:
|
|
return lower_map[matches[0]]
|
|
return None
|