feat: Enhance LiteParse runner and document processing
- Updated the LiteParse runner to support two output formats: raw text and JSON, improving compatibility and flexibility. - Introduced error handling for missing file arguments and file existence checks, enhancing robustness. - Added functions to clean and extract text from LiteParse JSON outputs, handling malformed JSON gracefully. - Updated the DocumentsLoader to utilize the new text cleaning functionality, ensuring cleaner document outputs. - Implemented tests for the new text extraction and cleaning features, ensuring reliability and correctness.
This commit is contained in:
parent
4c271170b5
commit
35f784379b
7 changed files with 418 additions and 6 deletions
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/env node
|
||||
/**
|
||||
* CLI bridge for Python: one JSON line on stdout for LiteParse extraction.
|
||||
* CLI bridge for Python: by default, raw extracted text on stdout (--python-bridge plain);
|
||||
* or one JSON line (--python-bridge json) for backward compatibility.
|
||||
*
|
||||
* OCR follows LlamaIndex LiteParse guidance (built-in Tesseract by default):
|
||||
* https://developers.llamaindex.ai/liteparse/guides/ocr/
|
||||
|
|
@ -56,14 +57,31 @@ function emit(result, exitCode = 0) {
|
|||
process.exit(exitCode);
|
||||
}
|
||||
|
||||
/** "plain" = success: UTF-8 text on stdout only. "json" = one JSON line (legacy, huge payloads can break). */
|
||||
const pyBridgeArg = readArg("--python-bridge");
|
||||
const pyBridge =
|
||||
pyBridgeArg == null || pyBridgeArg === ""
|
||||
? "json"
|
||||
: String(pyBridgeArg).trim().toLowerCase() === "plain"
|
||||
? "plain"
|
||||
: "json";
|
||||
|
||||
function bridgeError(message, exitCode) {
|
||||
if (pyBridge === "plain") {
|
||||
process.stderr.write(`${message}\n`);
|
||||
process.exit(exitCode);
|
||||
}
|
||||
emit({ ok: false, error: message }, exitCode);
|
||||
}
|
||||
|
||||
const filePath = readArg("--file");
|
||||
if (!filePath) {
|
||||
emit({ ok: false, error: "Missing required --file argument" }, 2);
|
||||
bridgeError("Missing required --file argument", 2);
|
||||
}
|
||||
|
||||
const resolvedPath = path.resolve(filePath);
|
||||
if (!fs.existsSync(resolvedPath)) {
|
||||
emit({ ok: false, error: `File not found: ${resolvedPath}` }, 2);
|
||||
bridgeError(`File not found: ${resolvedPath}`, 2);
|
||||
}
|
||||
|
||||
const ocrEnabled = parseBool(readArg("--ocr-enabled"), true);
|
||||
|
|
@ -117,6 +135,10 @@ try {
|
|||
|
||||
const result = await parser.parse(resolvedPath, true);
|
||||
const text = result?.text ?? "";
|
||||
if (pyBridge === "plain") {
|
||||
process.stdout.write(text);
|
||||
process.exit(0);
|
||||
}
|
||||
emit({
|
||||
ok: true,
|
||||
filePath: resolvedPath,
|
||||
|
|
@ -133,6 +155,13 @@ try {
|
|||
} catch (error) {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
const stack = error instanceof Error ? error.stack : undefined;
|
||||
if (pyBridge === "plain") {
|
||||
if (stack) {
|
||||
process.stderr.write(`${stack}\n`);
|
||||
}
|
||||
process.stderr.write(`${message}\n`);
|
||||
process.exit(1);
|
||||
}
|
||||
if (stack) {
|
||||
process.stderr.write(`${stack}\n`);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
|
@ -30,6 +32,129 @@ except Exception:
|
|||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _unwrap_liteparse_json_line_if_stored(text: str) -> str:
|
||||
"""If the whole JSON line from the LiteParse runner was stored as the document, keep only the text field."""
|
||||
if not text:
|
||||
return text
|
||||
s = text.lstrip()
|
||||
if not s.startswith("{"):
|
||||
return text
|
||||
try:
|
||||
payload = json.loads(s)
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
return text
|
||||
if not isinstance(payload, dict):
|
||||
return text
|
||||
if (
|
||||
payload.get("ok") is True
|
||||
and "filePath" in payload
|
||||
and isinstance(payload.get("text"), str)
|
||||
):
|
||||
return payload["text"]
|
||||
return text
|
||||
|
||||
|
||||
_RE_TEXT_KEY = re.compile(r'"text"\s*:\s*"')
|
||||
|
||||
|
||||
def _json_unescape_quoted_value(s: str, content_start: int) -> str:
|
||||
"""
|
||||
Unescape a JSON string value. `content_start` is the index of the first character
|
||||
*inside* the value (immediately after the opening quote of the "text" field).
|
||||
If the closing quote is missing (truncated), returns the unescaped rest of the string.
|
||||
"""
|
||||
out: list[str] = []
|
||||
i = content_start
|
||||
n = len(s)
|
||||
while i < n:
|
||||
c = s[i]
|
||||
if c == "\\" and i + 1 < n:
|
||||
e = s[i + 1]
|
||||
if e in '"\\':
|
||||
out.append(e)
|
||||
i += 2
|
||||
elif e == "/":
|
||||
out.append("/")
|
||||
i += 2
|
||||
elif e == "b":
|
||||
out.append("\b")
|
||||
i += 2
|
||||
elif e == "f":
|
||||
out.append("\f")
|
||||
i += 2
|
||||
elif e == "n":
|
||||
out.append("\n")
|
||||
i += 2
|
||||
elif e == "r":
|
||||
out.append("\r")
|
||||
i += 2
|
||||
elif e == "t":
|
||||
out.append("\t")
|
||||
i += 2
|
||||
elif e == "u" and i + 5 < n:
|
||||
try:
|
||||
out.append(chr(int(s[i + 2 : i + 6], 16)))
|
||||
except (ValueError, OverflowError):
|
||||
out.append(s[i : i + 6])
|
||||
i += 6
|
||||
else:
|
||||
out.append(e)
|
||||
i += 2
|
||||
elif c == '"':
|
||||
return "".join(out)
|
||||
else:
|
||||
out.append(c)
|
||||
i += 1
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def _try_extract_liteparse_text_value_from_malformed_json(s: str) -> Optional[str]:
|
||||
"""
|
||||
When json.loads failed (e.g. truncated or corrupt), find the "text" field value
|
||||
in a LiteParse-shaped object and return only the unescaped string body.
|
||||
"""
|
||||
if not s.startswith("{"):
|
||||
return None
|
||||
head = s[:10000] if len(s) > 10000 else s
|
||||
if not ("ok" in head and "filePath" in head):
|
||||
return None
|
||||
m = _RE_TEXT_KEY.search(s)
|
||||
if not m:
|
||||
return None
|
||||
return _json_unescape_quoted_value(s, m.end())
|
||||
|
||||
|
||||
def _clean_extracted_one_pass(t: str) -> str:
|
||||
for _ in range(3):
|
||||
nxt = _unwrap_liteparse_json_line_if_stored(t)
|
||||
if nxt == t:
|
||||
break
|
||||
t = nxt
|
||||
s = t.lstrip()
|
||||
if s.startswith("{"):
|
||||
m = _try_extract_liteparse_text_value_from_malformed_json(s)
|
||||
if m is not None:
|
||||
return m
|
||||
return t
|
||||
|
||||
|
||||
def clean_extracted_document_text(text: str) -> str:
|
||||
"""
|
||||
Return only the document body: strip LiteParse JSON wrappers, then drop any
|
||||
leading payload before the "text" value (handles truncated/invalid JSON).
|
||||
Multiple passes in case the inner body is again JSON-shaped.
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
t = text
|
||||
for _ in range(4):
|
||||
nxt = _clean_extracted_one_pass(t)
|
||||
if nxt == t:
|
||||
return t
|
||||
t = nxt
|
||||
return t
|
||||
|
||||
|
||||
class DocumentsLoader:
|
||||
DECOMPOSE_TIMEOUT_SECONDS = 600
|
||||
|
||||
|
|
@ -107,6 +232,7 @@ class DocumentsLoader:
|
|||
else:
|
||||
document = await asyncio.to_thread(self._parse_with_liteparse, file_path)
|
||||
|
||||
document = clean_extracted_document_text(document)
|
||||
documents.append(document)
|
||||
images.append(imgs)
|
||||
|
||||
|
|
|
|||
|
|
@ -193,6 +193,11 @@ class LiteParseService:
|
|||
|
||||
return True, "ok"
|
||||
|
||||
@staticmethod
|
||||
def _use_json_runner_output() -> bool:
|
||||
"""If true, expect one JSON line on stdout (legacy). Default is plain UTF-8 text (better for large PDFs)."""
|
||||
return (os.getenv("LITEPARSE_RUNNER_OUTPUT") or "").strip().lower() == "json"
|
||||
|
||||
def parse_to_markdown(
|
||||
self,
|
||||
file_path: str,
|
||||
|
|
@ -233,6 +238,9 @@ class LiteParseService:
|
|||
if tessdata:
|
||||
command.extend(["--tessdata-path", tessdata])
|
||||
|
||||
use_json = self._use_json_runner_output()
|
||||
command.extend(["--python-bridge", "json" if use_json else "plain"])
|
||||
|
||||
LOGGER.info(
|
||||
"[LiteParse] Parsing file=%s ocr_enabled=%s ocr_language=%s",
|
||||
file_path,
|
||||
|
|
@ -254,6 +262,20 @@ class LiteParseService:
|
|||
_command_str(command),
|
||||
)
|
||||
|
||||
if not use_json:
|
||||
if process.returncode != 0:
|
||||
err = (process.stderr or "").strip() or "LiteParse failed"
|
||||
raise LiteParseError(
|
||||
f"{err}; returncode={process.returncode}; "
|
||||
f"stderr={_snippet(process.stderr)}; stdout={_snippet(process.stdout)}"
|
||||
)
|
||||
return {
|
||||
"ok": True,
|
||||
"text": (process.stdout or "").lstrip("\ufeff"),
|
||||
"filePath": file_path,
|
||||
"pageCount": 0,
|
||||
}
|
||||
|
||||
payload: Dict[str, Any]
|
||||
try:
|
||||
payload = self._decode_runner_output(process.stdout)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Any, List, Optional, Tuple
|
||||
|
|
@ -30,6 +32,129 @@ except Exception:
|
|||
LOGGER = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _unwrap_liteparse_json_line_if_stored(text: str) -> str:
|
||||
"""If the whole JSON line from the LiteParse runner was stored as the document, keep only the text field."""
|
||||
if not text:
|
||||
return text
|
||||
s = text.lstrip()
|
||||
if not s.startswith("{"):
|
||||
return text
|
||||
try:
|
||||
payload = json.loads(s)
|
||||
except (json.JSONDecodeError, TypeError, ValueError):
|
||||
return text
|
||||
if not isinstance(payload, dict):
|
||||
return text
|
||||
if (
|
||||
payload.get("ok") is True
|
||||
and "filePath" in payload
|
||||
and isinstance(payload.get("text"), str)
|
||||
):
|
||||
return payload["text"]
|
||||
return text
|
||||
|
||||
|
||||
_RE_TEXT_KEY = re.compile(r'"text"\s*:\s*"')
|
||||
|
||||
|
||||
def _json_unescape_quoted_value(s: str, content_start: int) -> str:
|
||||
"""
|
||||
Unescape a JSON string value. `content_start` is the index of the first character
|
||||
*inside* the value (immediately after the opening quote of the "text" field).
|
||||
If the closing quote is missing (truncated), returns the unescaped rest of the string.
|
||||
"""
|
||||
out: list[str] = []
|
||||
i = content_start
|
||||
n = len(s)
|
||||
while i < n:
|
||||
c = s[i]
|
||||
if c == "\\" and i + 1 < n:
|
||||
e = s[i + 1]
|
||||
if e in '"\\':
|
||||
out.append(e)
|
||||
i += 2
|
||||
elif e == "/":
|
||||
out.append("/")
|
||||
i += 2
|
||||
elif e == "b":
|
||||
out.append("\b")
|
||||
i += 2
|
||||
elif e == "f":
|
||||
out.append("\f")
|
||||
i += 2
|
||||
elif e == "n":
|
||||
out.append("\n")
|
||||
i += 2
|
||||
elif e == "r":
|
||||
out.append("\r")
|
||||
i += 2
|
||||
elif e == "t":
|
||||
out.append("\t")
|
||||
i += 2
|
||||
elif e == "u" and i + 5 < n:
|
||||
try:
|
||||
out.append(chr(int(s[i + 2 : i + 6], 16)))
|
||||
except (ValueError, OverflowError):
|
||||
out.append(s[i : i + 6])
|
||||
i += 6
|
||||
else:
|
||||
out.append(e)
|
||||
i += 2
|
||||
elif c == '"':
|
||||
return "".join(out)
|
||||
else:
|
||||
out.append(c)
|
||||
i += 1
|
||||
return "".join(out)
|
||||
|
||||
|
||||
def _try_extract_liteparse_text_value_from_malformed_json(s: str) -> Optional[str]:
|
||||
"""
|
||||
When json.loads failed (e.g. truncated or corrupt), find the "text" field value
|
||||
in a LiteParse-shaped object and return only the unescaped string body.
|
||||
"""
|
||||
if not s.startswith("{"):
|
||||
return None
|
||||
head = s[:10000] if len(s) > 10000 else s
|
||||
if not ("ok" in head and "filePath" in head):
|
||||
return None
|
||||
m = _RE_TEXT_KEY.search(s)
|
||||
if not m:
|
||||
return None
|
||||
return _json_unescape_quoted_value(s, m.end())
|
||||
|
||||
|
||||
def _clean_extracted_one_pass(t: str) -> str:
|
||||
for _ in range(3):
|
||||
nxt = _unwrap_liteparse_json_line_if_stored(t)
|
||||
if nxt == t:
|
||||
break
|
||||
t = nxt
|
||||
s = t.lstrip()
|
||||
if s.startswith("{"):
|
||||
m = _try_extract_liteparse_text_value_from_malformed_json(s)
|
||||
if m is not None:
|
||||
return m
|
||||
return t
|
||||
|
||||
|
||||
def clean_extracted_document_text(text: str) -> str:
|
||||
"""
|
||||
Return only the document body: strip LiteParse JSON wrappers, then drop any
|
||||
leading payload before the "text" value (handles truncated/invalid JSON).
|
||||
Multiple passes in case the inner body is again JSON-shaped.
|
||||
"""
|
||||
if not text:
|
||||
return text
|
||||
t = text
|
||||
for _ in range(4):
|
||||
nxt = _clean_extracted_one_pass(t)
|
||||
if nxt == t:
|
||||
return t
|
||||
t = nxt
|
||||
return t
|
||||
|
||||
|
||||
class DocumentsLoader:
|
||||
DECOMPOSE_TIMEOUT_SECONDS = 600
|
||||
|
||||
|
|
@ -107,6 +232,7 @@ class DocumentsLoader:
|
|||
else:
|
||||
document = await asyncio.to_thread(self._parse_with_liteparse, file_path)
|
||||
|
||||
document = clean_extracted_document_text(document)
|
||||
documents.append(document)
|
||||
images.append(imgs)
|
||||
|
||||
|
|
|
|||
|
|
@ -227,6 +227,11 @@ class LiteParseService:
|
|||
|
||||
return True, "ok"
|
||||
|
||||
@staticmethod
|
||||
def _use_json_runner_output() -> bool:
|
||||
"""If true, expect one JSON line on stdout (legacy). Default is plain UTF-8 text (better for large PDFs)."""
|
||||
return (os.getenv("LITEPARSE_RUNNER_OUTPUT") or "").strip().lower() == "json"
|
||||
|
||||
def parse_to_markdown(
|
||||
self,
|
||||
file_path: str,
|
||||
|
|
@ -271,6 +276,9 @@ class LiteParseService:
|
|||
if tessdata:
|
||||
command.extend(["--tessdata-path", tessdata])
|
||||
|
||||
use_json = self._use_json_runner_output()
|
||||
command.extend(["--python-bridge", "json" if use_json else "plain"])
|
||||
|
||||
LOGGER.info(
|
||||
"[LiteParse] Parsing file=%s ocr_enabled=%s ocr_language=%s dpi=%s num_workers=%s",
|
||||
file_path,
|
||||
|
|
@ -294,6 +302,20 @@ class LiteParseService:
|
|||
_command_str(command),
|
||||
)
|
||||
|
||||
if not use_json:
|
||||
if process.returncode != 0:
|
||||
err = (process.stderr or "").strip() or "LiteParse failed"
|
||||
raise LiteParseError(
|
||||
f"{err}; returncode={process.returncode}; "
|
||||
f"stderr={_snippet(process.stderr)}; stdout={_snippet(process.stdout)}"
|
||||
)
|
||||
return {
|
||||
"ok": True,
|
||||
"text": (process.stdout or "").lstrip("\ufeff"),
|
||||
"filePath": file_path,
|
||||
"pageCount": 0,
|
||||
}
|
||||
|
||||
payload: Dict[str, Any]
|
||||
try:
|
||||
payload = self._decode_runner_output(process.stdout)
|
||||
|
|
|
|||
60
servers/fastapi/tests/test_documents_loader_unwrap.py
Normal file
60
servers/fastapi/tests/test_documents_loader_unwrap.py
Normal file
|
|
@ -0,0 +1,60 @@
|
|||
import json
|
||||
|
||||
from services.documents_loader import (
|
||||
_unwrap_liteparse_json_line_if_stored,
|
||||
clean_extracted_document_text,
|
||||
)
|
||||
|
||||
|
||||
def test_unwrap_strips_liteparse_json_line():
|
||||
inner = "Title\n\nBody with \"quotes\" and\nnewlines."
|
||||
line = json.dumps(
|
||||
{"ok": True, "filePath": "/tmp/x.pdf", "text": inner},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
assert _unwrap_liteparse_json_line_if_stored(line) == inner
|
||||
assert _unwrap_liteparse_json_line_if_stored(" \n" + line) == inner
|
||||
|
||||
|
||||
def test_unwrap_leaves_plain_text():
|
||||
t = "Not JSON. {Braces} in prose."
|
||||
assert _unwrap_liteparse_json_line_if_stored(t) is t
|
||||
|
||||
|
||||
def test_unwrap_rejects_malformed_json():
|
||||
t = "{not valid json"
|
||||
assert _unwrap_liteparse_json_line_if_stored(t) is t
|
||||
|
||||
|
||||
def test_clean_extracts_text_when_json_truncated():
|
||||
"""Drops everything before the "text" value and unescapes, even if JSON is not closed."""
|
||||
blob = (
|
||||
'{"ok": true, "filePath": "/tmp/x.pdf", "text": " similarweb | HypeAuditor\\n\\n2024" '
|
||||
)
|
||||
# Missing closing " } — json.loads will fail, fallback path should still return body
|
||||
out = clean_extracted_document_text(blob)
|
||||
assert "similarweb" in out
|
||||
assert "ok" not in out
|
||||
assert "filePath" not in out
|
||||
|
||||
|
||||
def test_clean_same_as_unwrap_for_valid_line():
|
||||
inner = "Prose only."
|
||||
line = json.dumps(
|
||||
{"ok": True, "filePath": "/tmp/x.pdf", "text": inner},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
assert clean_extracted_document_text(line) == inner
|
||||
|
||||
|
||||
def test_clean_double_json_embedded_in_text_field():
|
||||
inner2 = "Final body."
|
||||
inner1 = json.dumps(
|
||||
{"ok": True, "filePath": "/a.pdf", "text": inner2},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
outer = json.dumps(
|
||||
{"ok": True, "filePath": "/b.pdf", "text": inner1},
|
||||
ensure_ascii=False,
|
||||
)
|
||||
assert clean_extracted_document_text(outer) == inner2
|
||||
|
|
@ -4,8 +4,12 @@ from unittest.mock import patch
|
|||
from services.liteparse_service import LiteParseService
|
||||
|
||||
|
||||
def _ok_process(stdout: str = '{"ok": true, "text": "ok"}'):
|
||||
return SimpleNamespace(returncode=0, stdout=stdout, stderr="")
|
||||
def _ok_process(
|
||||
stdout: str = "ok",
|
||||
returncode: int = 0,
|
||||
stderr: str = "",
|
||||
):
|
||||
return SimpleNamespace(returncode=returncode, stdout=stdout, stderr=stderr)
|
||||
|
||||
|
||||
class TestLiteParseService:
|
||||
|
|
@ -26,13 +30,16 @@ class TestLiteParseService:
|
|||
return_value=_ok_process(),
|
||||
) as mock_run:
|
||||
service = LiteParseService(timeout_seconds=30)
|
||||
service.parse("/tmp/sample.pdf", ocr_enabled=True, ocr_language="eng")
|
||||
r = service.parse("/tmp/sample.pdf", ocr_enabled=True, ocr_language="eng")
|
||||
assert r["ok"] is True
|
||||
assert r["text"] == "ok"
|
||||
|
||||
command = mock_run.call_args.args[0]
|
||||
assert "--dpi" in command
|
||||
assert command[command.index("--dpi") + 1] == "120"
|
||||
assert "--num-workers" in command
|
||||
assert command[command.index("--num-workers") + 1] == "1"
|
||||
assert command[command.index("--python-bridge") + 1] == "plain"
|
||||
|
||||
def test_parse_uses_env_overrides(self):
|
||||
with patch.dict(
|
||||
|
|
@ -79,3 +86,23 @@ class TestLiteParseService:
|
|||
command = mock_run.call_args.args[0]
|
||||
assert command[command.index("--dpi") + 1] == "72"
|
||||
assert command[command.index("--num-workers") + 1] == "1"
|
||||
|
||||
def test_parse_json_bridge_env(self):
|
||||
with patch.dict(
|
||||
"os.environ",
|
||||
{"LITEPARSE_RUNNER_OUTPUT": "json"},
|
||||
clear=False,
|
||||
), patch.object(
|
||||
LiteParseService,
|
||||
"check_runtime_ready",
|
||||
return_value=(True, "ok"),
|
||||
), patch(
|
||||
"services.liteparse_service.subprocess.run",
|
||||
return_value=_ok_process(stdout='{"ok": true, "text": "legacy"}\n'),
|
||||
) as mock_run:
|
||||
service = LiteParseService(timeout_seconds=30)
|
||||
r = service.parse("/tmp/sample.pdf", ocr_enabled=True, ocr_language="eng")
|
||||
assert r["text"] == "legacy"
|
||||
|
||||
command = mock_run.call_args.args[0]
|
||||
assert command[command.index("--python-bridge") + 1] == "json"
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue