From 100eddbc21c0e7bd876a3fccc7b8196aee45e93e Mon Sep 17 00:00:00 2001 From: DJP Date: Wed, 6 May 2026 12:20:16 -0400 Subject: [PATCH] Switch LLM calls to streaming + tighten batch sizes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Anthropic SDK refuses non-streaming calls expected to take >10 minutes ("Streaming is required..."). Long-output batches (32k tokens of densely-formatted markdown) hit this on real 172-line briefs. Both LLMClient.create_message and create_message_cached now use the streaming context manager (client.messages.stream(...)) and accumulate text chunks; final usage + stop_reason come from get_final_message(). No timeout on streaming requests. Tightened the batch tier so individual streams stay well under any ceiling and progress / failure recovery is more granular: - ≤50 lines: single call - 51-120: batches of 30 (max_tokens=16k each) - 121+: batches of 25 (max_tokens=16k each) Verified with the 172-line case: 7 batches of 25, 172 drafts produced. Live streaming call confirmed end-to-end (haiku returned, usage and stop_reason populated correctly). Co-Authored-By: Claude Opus 4.7 (1M context) --- backend/app/llm/client.py | 54 ++++++++++----------- backend/app/pipeline/agents/agent_single.py | 31 +++++++----- 2 files changed, 45 insertions(+), 40 deletions(-) diff --git a/backend/app/llm/client.py b/backend/app/llm/client.py index ebd8a3c..4f91aa1 100644 --- a/backend/app/llm/client.py +++ b/backend/app/llm/client.py @@ -69,23 +69,23 @@ class LLMClient: for attempt in range(1, self.max_retries + 1): try: - response = self.client.messages.create( + # Streaming required for long requests (>10 min cap on + # non-streaming calls). All call paths use streaming for + # consistency. + response_text = "" + with self.client.messages.stream( model=self.model, max_tokens=max_tokens, system=system_prompt, messages=[{"role": "user", "content": user_message}], temperature=temperature, - ) + ) as stream: + for chunk in stream.text_stream: + response_text += chunk + final = stream.get_final_message() - # Extract text - response_text = "" - for block in response.content: - if hasattr(block, "text"): - response_text += block.text - - # Track usage - input_tokens = response.usage.input_tokens - output_tokens = response.usage.output_tokens + input_tokens = final.usage.input_tokens + output_tokens = final.usage.output_tokens total_tokens = input_tokens + output_tokens input_rate, output_rate = MODEL_COSTS.get( self.model, (COST_PER_INPUT_TOKEN, COST_PER_OUTPUT_TOKEN) @@ -95,7 +95,7 @@ class LLMClient: + output_tokens * output_rate ) - stop_reason = getattr(response, "stop_reason", None) + stop_reason = getattr(final, "stop_reason", None) if stop_reason == "max_tokens": logger.warning( "LLM hit max_tokens (%d) — response was truncated. " @@ -206,32 +206,32 @@ class LLMClient: for attempt in range(1, self.max_retries + 1): try: - response = self.client.messages.create( + # Streaming required: a single non-streaming request is + # capped by the SDK at 10 minutes. Long batches (32k+ + # output tokens) routinely exceed that. Streaming keeps + # the connection alive event-by-event and has no time cap. + response_text = "" + with self.client.messages.stream( model=self.model, max_tokens=max_tokens, system=system_blocks, messages=[{"role": "user", "content": user_content}], temperature=temperature, - ) + ) as stream: + for chunk in stream.text_stream: + response_text += chunk + final = stream.get_final_message() - response_text = "" - for block in response.content: - if hasattr(block, "text"): - response_text += block.text - - input_tokens = response.usage.input_tokens - output_tokens = response.usage.output_tokens - cache_read = getattr(response.usage, "cache_read_input_tokens", 0) or 0 - cache_creation = getattr(response.usage, "cache_creation_input_tokens", 0) or 0 + input_tokens = final.usage.input_tokens + output_tokens = final.usage.output_tokens + cache_read = getattr(final.usage, "cache_read_input_tokens", 0) or 0 + cache_creation = getattr(final.usage, "cache_creation_input_tokens", 0) or 0 total_tokens = input_tokens + output_tokens input_rate, output_rate = MODEL_COSTS.get( self.model, (COST_PER_INPUT_TOKEN, COST_PER_OUTPUT_TOKEN) ) # Cache writes cost 1.25x input rate, cache reads 0.1x input rate. - # The reported `input_tokens` excludes cached tokens, so we - # pay full rate on it plus the cache-write/read multipliers - # on the cached portions. estimated_cost = ( input_tokens * input_rate + cache_creation * input_rate * 1.25 @@ -239,7 +239,7 @@ class LLMClient: + output_tokens * output_rate ) - stop_reason = getattr(response, "stop_reason", None) + stop_reason = getattr(final, "stop_reason", None) if stop_reason == "max_tokens": logger.warning( "LLM hit max_tokens (%d) — response was truncated. " diff --git a/backend/app/pipeline/agents/agent_single.py b/backend/app/pipeline/agents/agent_single.py index 06ba489..39dde3b 100644 --- a/backend/app/pipeline/agents/agent_single.py +++ b/backend/app/pipeline/agents/agent_single.py @@ -512,24 +512,29 @@ class AgentSingle(BaseAgent): cached_user_content = "\n".join(static_parts) # ── Decide batch strategy ───────────────────────────────────── - # Tier the work by line count. Above ~70 lines we always batch: - # Sonnet 4.6's 64k output cap empirically fits ~150 rows max - # (avg ~400 output tokens per row) so we slice into ~50-line - # chunks. The first call writes the cache (~30k system + variable - # static content); subsequent calls hit it. + # Tier by line count. Smaller batches = shorter individual + # streams, faster failure recovery, and more granular progress. + # All calls stream (the SDK 10-minute non-streaming cap doesn't + # apply) but staying well under that ceiling per batch is still + # the right default — it limits blast radius if one batch fails. line_count = len(context.source_lines) - if line_count <= 70: + if line_count <= 50: batch_size = line_count # single call - elif line_count <= 150: - batch_size = 50 + elif line_count <= 120: + batch_size = 30 else: - batch_size = 40 + batch_size = 25 - max_tokens_per_batch = 32768 if batch_size <= 50 else 64000 - # Cap small jobs at a smaller max_tokens to avoid waste - if line_count <= 25: + # Each row averages ~400 output tokens. Add 30% headroom and + # round to the next 4k boundary; cap small jobs to save cost. + if line_count <= 15: max_tokens_per_batch = 8192 - elif line_count <= 60: + elif line_count <= 30: + max_tokens_per_batch = 16384 + elif line_count <= 50: + max_tokens_per_batch = 32768 + else: + # Batched: each batch fits comfortably under 16k output max_tokens_per_batch = 16384 n_batches = max(1, (line_count + batch_size - 1) // batch_size) if batch_size else 1