"""Service for parsing non-document attachments: Excel, CSV, images, URLs.""" import csv import io import mimetypes import os from typing import Any, List, Optional from pydantic import BaseModel class TableData(BaseModel): title: Optional[str] = None headers: List[str] rows: List[List[Any]] sheet_name: Optional[str] = None class ImageInfo(BaseModel): file_path: str filename: str mime_type: str width: Optional[int] = None height: Optional[int] = None def parse_excel(file_path: str) -> List[TableData]: """Parse an Excel (.xlsx/.xls) file and return one TableData per sheet.""" from openpyxl import load_workbook wb = load_workbook(file_path, read_only=True, data_only=True) results: List[TableData] = [] for sheet_name in wb.sheetnames: ws = wb[sheet_name] rows_raw = list(ws.iter_rows(values_only=True)) if not rows_raw: continue # First non-empty row is treated as headers headers = [str(c) if c is not None else "" for c in rows_raw[0]] data_rows = [] for row in rows_raw[1:]: # Skip completely empty rows if all(c is None for c in row): continue data_rows.append([_serialize_cell(c) for c in row]) if not data_rows and not any(h for h in headers): continue results.append( TableData( title=sheet_name if len(wb.sheetnames) > 1 else None, headers=headers, rows=data_rows, sheet_name=sheet_name, ) ) wb.close() return results def parse_csv(file_path: str) -> TableData: """Parse a CSV file and return a single TableData.""" with open(file_path, "r", encoding="utf-8-sig") as f: # Sniff delimiter sample = f.read(4096) f.seek(0) try: dialect = csv.Sniffer().sniff(sample, delimiters=",;\t|") except csv.Error: dialect = csv.excel reader = csv.reader(f, dialect) all_rows = list(reader) if not all_rows: return TableData(headers=[], rows=[]) headers = all_rows[0] data_rows = [[_serialize_cell(c) for c in row] for row in all_rows[1:] if any(c.strip() for c in row)] return TableData( title=os.path.splitext(os.path.basename(file_path))[0], headers=headers, rows=data_rows, ) def extract_images_metadata(file_path: str) -> ImageInfo: """Extract metadata from an image file (dimensions, MIME type).""" filename = os.path.basename(file_path) mime_type = mimetypes.guess_type(file_path)[0] or "application/octet-stream" width, height = None, None try: # Use python-pptx's image reader or basic header parsing # to avoid adding PIL as a dependency width, height = _read_image_dimensions(file_path) except Exception: pass return ImageInfo( file_path=file_path, filename=filename, mime_type=mime_type, width=width, height=height, ) async def parse_url(url: str) -> str: """Fetch a URL and extract its article content as markdown.""" import trafilatura downloaded = trafilatura.fetch_url(url) if not downloaded: return "" text = trafilatura.extract( downloaded, output_format="txt", include_tables=True, include_links=False, include_images=False, ) return text or "" # --- Helpers --- def _serialize_cell(value: Any) -> Any: """Convert cell value to JSON-safe type.""" if value is None: return None if isinstance(value, (int, float, bool)): return value return str(value) def _read_image_dimensions(file_path: str) -> tuple: """Read image dimensions from file header (PNG/JPEG/GIF/WEBP).""" with open(file_path, "rb") as f: header = f.read(32) # PNG if header[:8] == b"\x89PNG\r\n\x1a\n": import struct w, h = struct.unpack(">II", header[16:24]) return w, h # JPEG if header[:2] == b"\xff\xd8": with open(file_path, "rb") as f: f.seek(2) while True: marker = f.read(2) if len(marker) < 2: break if marker[0] != 0xFF: break if marker[1] in (0xC0, 0xC1, 0xC2): f.read(3) # length + precision import struct h, w = struct.unpack(">HH", f.read(4)) return w, h else: length = int.from_bytes(f.read(2), "big") f.seek(length - 2, 1) return None, None # GIF if header[:6] in (b"GIF87a", b"GIF89a"): import struct w, h = struct.unpack("