Switch LLM calls to streaming + tighten batch sizes

The Anthropic SDK refuses non-streaming calls expected to take >10
minutes ("Streaming is required..."). Long-output batches (32k tokens
of densely-formatted markdown) hit this on real 172-line briefs.

Both LLMClient.create_message and create_message_cached now use the
streaming context manager (client.messages.stream(...)) and accumulate
text chunks; final usage + stop_reason come from get_final_message().
No timeout on streaming requests.

Tightened the batch tier so individual streams stay well under any
ceiling and progress / failure recovery is more granular:

- ≤50 lines: single call
- 51-120: batches of 30 (max_tokens=16k each)
- 121+:   batches of 25 (max_tokens=16k each)

Verified with the 172-line case: 7 batches of 25, 172 drafts produced.
Live streaming call confirmed end-to-end (haiku returned, usage and
stop_reason populated correctly).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
DJP 2026-05-06 12:20:16 -04:00
parent bd10bc3441
commit 100eddbc21
2 changed files with 45 additions and 40 deletions

View file

@ -69,23 +69,23 @@ class LLMClient:
for attempt in range(1, self.max_retries + 1):
try:
response = self.client.messages.create(
# Streaming required for long requests (>10 min cap on
# non-streaming calls). All call paths use streaming for
# consistency.
response_text = ""
with self.client.messages.stream(
model=self.model,
max_tokens=max_tokens,
system=system_prompt,
messages=[{"role": "user", "content": user_message}],
temperature=temperature,
)
) as stream:
for chunk in stream.text_stream:
response_text += chunk
final = stream.get_final_message()
# Extract text
response_text = ""
for block in response.content:
if hasattr(block, "text"):
response_text += block.text
# Track usage
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
input_tokens = final.usage.input_tokens
output_tokens = final.usage.output_tokens
total_tokens = input_tokens + output_tokens
input_rate, output_rate = MODEL_COSTS.get(
self.model, (COST_PER_INPUT_TOKEN, COST_PER_OUTPUT_TOKEN)
@ -95,7 +95,7 @@ class LLMClient:
+ output_tokens * output_rate
)
stop_reason = getattr(response, "stop_reason", None)
stop_reason = getattr(final, "stop_reason", None)
if stop_reason == "max_tokens":
logger.warning(
"LLM hit max_tokens (%d) — response was truncated. "
@ -206,32 +206,32 @@ class LLMClient:
for attempt in range(1, self.max_retries + 1):
try:
response = self.client.messages.create(
# Streaming required: a single non-streaming request is
# capped by the SDK at 10 minutes. Long batches (32k+
# output tokens) routinely exceed that. Streaming keeps
# the connection alive event-by-event and has no time cap.
response_text = ""
with self.client.messages.stream(
model=self.model,
max_tokens=max_tokens,
system=system_blocks,
messages=[{"role": "user", "content": user_content}],
temperature=temperature,
)
) as stream:
for chunk in stream.text_stream:
response_text += chunk
final = stream.get_final_message()
response_text = ""
for block in response.content:
if hasattr(block, "text"):
response_text += block.text
input_tokens = response.usage.input_tokens
output_tokens = response.usage.output_tokens
cache_read = getattr(response.usage, "cache_read_input_tokens", 0) or 0
cache_creation = getattr(response.usage, "cache_creation_input_tokens", 0) or 0
input_tokens = final.usage.input_tokens
output_tokens = final.usage.output_tokens
cache_read = getattr(final.usage, "cache_read_input_tokens", 0) or 0
cache_creation = getattr(final.usage, "cache_creation_input_tokens", 0) or 0
total_tokens = input_tokens + output_tokens
input_rate, output_rate = MODEL_COSTS.get(
self.model, (COST_PER_INPUT_TOKEN, COST_PER_OUTPUT_TOKEN)
)
# Cache writes cost 1.25x input rate, cache reads 0.1x input rate.
# The reported `input_tokens` excludes cached tokens, so we
# pay full rate on it plus the cache-write/read multipliers
# on the cached portions.
estimated_cost = (
input_tokens * input_rate
+ cache_creation * input_rate * 1.25
@ -239,7 +239,7 @@ class LLMClient:
+ output_tokens * output_rate
)
stop_reason = getattr(response, "stop_reason", None)
stop_reason = getattr(final, "stop_reason", None)
if stop_reason == "max_tokens":
logger.warning(
"LLM hit max_tokens (%d) — response was truncated. "

View file

@ -512,24 +512,29 @@ class AgentSingle(BaseAgent):
cached_user_content = "\n".join(static_parts)
# ── Decide batch strategy ─────────────────────────────────────
# Tier the work by line count. Above ~70 lines we always batch:
# Sonnet 4.6's 64k output cap empirically fits ~150 rows max
# (avg ~400 output tokens per row) so we slice into ~50-line
# chunks. The first call writes the cache (~30k system + variable
# static content); subsequent calls hit it.
# Tier by line count. Smaller batches = shorter individual
# streams, faster failure recovery, and more granular progress.
# All calls stream (the SDK 10-minute non-streaming cap doesn't
# apply) but staying well under that ceiling per batch is still
# the right default — it limits blast radius if one batch fails.
line_count = len(context.source_lines)
if line_count <= 70:
if line_count <= 50:
batch_size = line_count # single call
elif line_count <= 150:
batch_size = 50
elif line_count <= 120:
batch_size = 30
else:
batch_size = 40
batch_size = 25
max_tokens_per_batch = 32768 if batch_size <= 50 else 64000
# Cap small jobs at a smaller max_tokens to avoid waste
if line_count <= 25:
# Each row averages ~400 output tokens. Add 30% headroom and
# round to the next 4k boundary; cap small jobs to save cost.
if line_count <= 15:
max_tokens_per_batch = 8192
elif line_count <= 60:
elif line_count <= 30:
max_tokens_per_batch = 16384
elif line_count <= 50:
max_tokens_per_batch = 32768
else:
# Batched: each batch fits comfortably under 16k output
max_tokens_per_batch = 16384
n_batches = max(1, (line_count + batch_size - 1) // batch_size) if batch_size else 1