Skip to content

perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window)#333

Merged
smoreinis merged 6 commits intonextfrom
perf/streaming-coalesce
Apr 30, 2026
Merged

perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window)#333
smoreinis merged 6 commits intonextfrom
perf/streaming-coalesce

Conversation

@smoreinis
Copy link
Copy Markdown
Contributor

@smoreinis smoreinis commented Apr 30, 2026

Summary

Per-token Redis publishes from TemporalStreamingModel were adding ~45s (56-62%) overhead to agent response latency. Root cause: each await streaming_context.stream_update(...) inside the OpenAI stream async for blocked token consumption until the Redis publish round-trip completed (head-of-line blocking via TCP backpressure on the SSE connection).

This PR replaces per-token publishes with a coalescing buffer that runs on a background ticker driven by asyncio.Event — so the producer's event loop never awaits on Redis, even on size-threshold flushes.

Performance context (from the original report)

Metric No-Streaming Baseline With Per-Token Streaming Overhead
First Callback P50 56.2s 103.1s +47s (+84%)
Final Response P50 73.1s 118.4s +45s (+62%)
Final Response P95 121.9s 178.4s +56s (+46%)

Cost model with 1000 tokens at the chosen thresholds:

  • ~10–22 Redis publishes per response (down from ~1000)
  • ~50–110ms total Redis time, off the critical path
  • Expected to land within ~5–10% of the no-streaming P50 (~73s)

Design

StreamingMode = Literal["off", "per_token", "coalesced"]

Single source of truth in streaming.py. Every layer takes it as a parameter (model, provider, service, adk module, context).

Mode Semantics
off Feed accumulator (so persisted message body is correct), no per-delta publishes. Consumers see start → done only.
per_token Publish every delta immediately. Legacy behavior.
coalesced (default) Buffer 50ms / 128-char windowed batches, flush first delta immediately for perceived responsiveness.

CoalescingBuffer

  • 50ms time window (timer-driven)
  • 128-char size threshold (signal-driven via asyncio.Event)
  • First-delta-immediate optimization (~1 extra publish per stream, big perceived-latency win)
  • Background ticker decoupled from add() — producer never awaits on Redis
  • Consecutive-only merge: only merges adjacent same-channel deltas, so character order within every (type, index) channel is preserved exactly. Cross-channel order is preserved too.
  • close() drains remaining buffer before StreamTaskMessageDone so consumers see the full sequence.

Default flip

The default for streaming_mode is "coalesced" at every layer. All 13+ existing callers of streaming_task_message_context() (claude_agents, langgraph, litellm provider, openai sync provider, etc.) benefit automatically without code changes.

Risks / caveats

  1. Behavior change for non-OpenAI streaming consumers. claude_agents, langgraph, litellm provider, and the openai sync provider all flip from per-token to coalesced silently. Downstream UIs that animate per-delta will animate per-50ms-chunk instead. At human reading speeds for prose, this should be indistinguishable, but it's a contract change worth flagging.
  2. First-delta-immediate adds one extra publish per stream. Tiny absolute cost, big UX win (response appears "alive" within ~20ms). Always on.
  3. Granularity, not order, changes. Coalescing never reorders characters within a channel — it only emits fewer events with bigger payloads. Consumers that assume "each delta == one OpenAI token" or use inter-delta timing as a signal will see different timing characteristics; consumers that just concatenate (the standard pattern) see identical final output.
  4. Opt-out is one line. Anyone needing strict per-token streaming sets streaming_mode=\"per_token\" on the model/provider/context constructor — no other code changes required.

Test plan

  • pyright clean on all 3 modified files
  • Top-level tests/test_streaming.py (20/20 pass)
  • End-to-end smoke tests for the 6 key invariants:
    • Pure-text consecutive merge collapses to single delta with full content
    • Cross-channel reasoning deltas preserve order; per-channel content matches per-token semantics
    • First delta flushes within ~20ms (well under the 50ms window)
    • 200-char delta triggers size-flush before the timer fires
    • close() drains buffer; persisted message body is the full assembled content
    • \"off\" mode produces zero per-delta publishes but still persists complete content
  • Re-run the original benchmark on a representative workload to confirm P50 final-response landed within ~5–10% of the no-streaming baseline (~73s)
  • Manual smoke: deploy to a dev environment, exercise an OpenAI agent end-to-end, verify chat UI still renders streamingly (just chunkier)
  • Manual smoke: exercise a claude_agents and a langgraph path to verify the silent default-flip doesn't break anything

Greptile Summary

This PR replaces per-token Redis publishes in TemporalStreamingModel with a CoalescingBuffer that merges consecutive same-channel deltas in 50ms / 128-char windows on a background asyncio.Task, eliminating the head-of-line blocking that was adding ~45s to agent response latency. A new StreamingMode literal ("off" / "per_token" / "coalesced") threads through every layer; the default flips to "coalesced" at all 13+ call sites automatically.

The previously-flagged silent data-loss on CancelledError mid-flush has been addressed: items are now re-enqueued before re-raising so close()'s final drain recovers them. Two minor style points remain (see inline comments) but neither affects correctness.

Confidence Score: 5/5

Safe to merge; the core correctness concern from the prior review thread is resolved, and all remaining findings are P2 style suggestions.

The critical silent-drop bug (CancelledError during flush leaving items unrecovered) is fixed with the re-enqueue pattern. The two remaining comments are defensive coding suggestions — a TOCTOU on _closed that only matters for concurrent callers (not the sequential streaming model), and a _buf_chars accounting gap that is benign given _closed=True gates any threshold checks. Neither affects observable behavior. Test coverage is thorough (20 unit tests across helpers, buffer windowing, cancellation recovery, and mode dispatch).

src/agentex/lib/core/services/adk/streaming.py — the two P2 style items in CoalescingBuffer._run and add()

Important Files Changed

Filename Overview
src/agentex/lib/core/services/adk/streaming.py Core of the PR: adds StreamingMode type alias, CoalescingBuffer class, and mode-dispatch in StreamingTaskMessageContext. The CancelledError re-enqueue fix from the previous review thread is applied; one minor race on the _closed guard and a _buf_chars accounting gap after re-enqueue remain.
src/agentex/lib/adk/_modules/streaming.py Thin pass-through: adds streaming_mode parameter to StreamingModule.streaming_task_message_context() and forwards it to the service. No logic here.
src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py Adds streaming_mode parameter to TemporalStreamingModel and TemporalStreamingModelProvider, switches logger from raw logging.getLogger to make_logger, and passes streaming_mode through to context construction at both reasoning and text streaming sites.
tests/lib/core/services/adk/test_streaming.py New 497-line test file covering CoalescingBuffer time/size windows, close drain, cancel-during-flush recovery, merge helpers, and StreamingTaskMessageContext mode dispatch. Comprehensive coverage of all three modes.
src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py Loosened one assertion to not strict-match full kwargs (since streaming_mode is now passed), keeping task_id check. Minor test hygiene improvement.

Sequence Diagram

sequenceDiagram
    participant P as Producer (OpenAI stream)
    participant C as StreamingTaskMessageContext
    participant B as CoalescingBuffer
    participant R as Redis (StreamingService)

    P->>C: stream_update(delta1) [first]
    C->>B: add(delta1)
    Note over B: _first_flushed=False → set _flush_signal
    B-->>R: on_flush(delta1) [immediate via ticker]

    P->>C: stream_update(delta2)
    P->>C: stream_update(delta3)
    Note over B: buffering in _buf...

    loop Every 50ms or 128 chars
        B->>B: _drain_locked() → _merge_consecutive()
        B-->>R: on_flush(merged delta2+delta3)
    end

    P->>C: close()
    C->>B: close()
    B->>B: cancel background ticker
    B->>B: final _drain_locked()
    B-->>R: on_flush(remaining merged)
    C-->>R: stream_update(StreamTaskMessageDone)
Loading

Fix All in Cursor Fix All in Claude Code Fix All in Codex

Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
src/agentex/lib/core/services/adk/streaming.py:175-183
**`_closed` guard is outside the lock — concurrent `close()` can silently drop items**

`_closed` is read before acquiring the lock, so a concurrent coroutine calling `close()` can interleave as follows:

1. `add()` reads `self._closed``False`, passes the guard
2. `close()` sets `self._closed = True`, drains and flushes the buffer, exits
3. `add()` acquires the lock and appends to `self._buf` — item is never drained

In asyncio this requires two coroutines concurrently calling `add()` and `close()`, which the current streaming model doesn't do (tokens are produced sequentially before `close()` is called). Still, moving the check inside the lock makes the invariant obvious and prevents silent loss for any future concurrent caller:

```python
async def add(self, update: StreamTaskMessageDelta) -> None:
    async with self._lock:
        if self._closed:
            return
        self._buf.append(update)
        self._buf_chars += _delta_char_len(update.delta)
        if not self._first_flushed or self._buf_chars >= self.MAX_BUFFERED_CHARS:
            self._first_flushed = True
            self._flush_signal.set()
```

### Issue 2 of 2
src/agentex/lib/core/services/adk/streaming.py:198-204
**`_buf_chars` not restored after re-enqueue on `CancelledError`**

`_drain_locked()` resets `self._buf_chars = 0`. When the cancel handler re-enqueues `drained[idx:]` back into `self._buf`, the char counter stays at 0 even though `self._buf` is no longer empty. This is benign today — `self._closed = True` prevents any new `add()` calls that rely on the threshold check, and the final drain in `close()` reads `self._buf` directly — but it leaves the object in an internally inconsistent state that could confuse future readers or be misused if the buffer were ever reused.

Consider updating the counter alongside the re-enqueue:
```python
async with self._lock:
    self._buf = drained[idx:] + self._buf
    self._buf_chars = sum(_delta_char_len(u.delta) for u in self._buf)
```

Reviews (5): Last reviewed commit: "chore(streaming): route TemporalStreamin..." | Re-trigger Greptile

…ar window)

Per-token Redis publishes from TemporalStreamingModel were adding ~45s
(56-62%) overhead to agent response latency, mostly from head-of-line
blocking on the model's event loop: each `await streaming_context.stream_update(...)`
inside the OpenAI stream `async for` paused token consumption until the
publish round-trip completed.

This change introduces a `CoalescingBuffer` driven by an `asyncio.Event`,
so the producer never awaits on Redis. Deltas are merged consecutive-only
(preserving character order in every (type, index) channel) and flushed
on a 50ms timer, on a 128-char size threshold, or immediately for the
first delta to keep perceived responsiveness high. The buffer's `close()`
drains remaining deltas before the DONE event, so consumers see the full
sequence in order.

A new `StreamingMode = Literal["off", "per_token", "coalesced"]` lives
in `streaming.py` as the single source of truth and is plumbed through
the adk streaming module, `StreamingService.streaming_task_message_context`,
and `StreamingTaskMessageContext`. Default is `"coalesced"` everywhere,
so all 13+ existing context callers (claude_agents, langgraph, litellm
provider, openai sync provider, etc.) benefit automatically.
Comment thread src/agentex/lib/core/services/adk/streaming.py Outdated
- _run: when CancelledError is raised mid-flush in the for-loop, re-enqueue
  the in-flight item plus any remaining items in the local `drained` list
  back into self._buf so close()'s final drain can recover them. Previously
  the local `drained` list was unreachable after CancelledError exited the
  for-loop, causing the last coalesced batch to be silently dropped on
  close-during-flush races. Trade-off: the in-flight item may be duplicated
  on the consumer side (Redis pub may have completed before cancel was
  delivered), which is preferable to silent loss for streaming UX.

- _merge_pair: replace `return b` fallback with AssertionError. All six
  current TaskMessageDelta variants have explicit isinstance branches, so
  the fallback is unreachable today. But _can_merge returns True for any
  same-type pair, so adding a 7th delta variant without updating
  _merge_pair would silently drop `a`'s accumulated content. Asserting
  turns a future silent data-loss into an immediate, diagnosable crash.
@smoreinis
Copy link
Copy Markdown
Contributor Author

Addressed both Greptile findings in 0258aa5:

  • P1 (inline thread, line 198 — buffered deltas dropped on cancel-during-flush): _run now catches CancelledError inside the flush loop and re-enqueues drained[idx:] (in-flight item + remaining) back into self._buf before re-raising. close()'s final drain recovers them. Trade-off documented in code: the in-flight item may be duplicated on the consumer side (Redis pub may have completed before the cancel was delivered), which is preferable to silent loss for streaming UX. Verified with a focused smoke test that drives a slow _on_flush, calls close() mid-flush, and asserts all queued chunks land in the published deltas.

  • P2 (summary — _merge_pair return b fallback): Replaced with AssertionError so a future TaskMessageDelta variant added without a corresponding _merge_pair branch fails loudly instead of silently dropping a's content. No behavior change today (all six variants have explicit branches).

After merging the test-suite repair from main (#334) into this branch, one
model test (test_responses_api_streaming) regressed because its
assert_called_with strict-matched all kwargs of streaming_task_message_context
and didn't tolerate the new `streaming_mode='coalesced'` kwarg this PR
adds. Switched to assert_called() + targeted kwarg checks so the test
verifies what it cares about (task_id threading) without locking in
implementation details.

Replaced the ad-hoc smoke scripts that lived in conversation with a real
pytest module at tests/lib/core/services/adk/test_streaming.py covering:

- _delta_char_len, _can_merge, _merge_pair: per-channel correctness +
  None-handling
- _merge_consecutive: pure-text collapse, cross-channel order preservation,
  per-channel reconstruction matches per-token semantics
- CoalescingBuffer: first-delta-immediate flush within ~20ms,
  size-threshold flush before timer fires, multi-delta coalescing within
  one window, idle close, add-after-close no-op
- CoalescingBuffer cancel-during-flush regression test for the P1 fix:
  five queued chunks must all surface across publishes when close()
  cancels mid-flush (asserts substring presence rather than exact
  ordering, since the documented trade-off allows duplicates of the
  in-flight item)
- StreamingTaskMessageContext mode dispatch: "off" suppresses publishes
  but persists full content, "per_token" publishes each delta synchronously,
  "coalesced" batches and persists full content
…gger

The model file used raw ``logging.getLogger("agentex.temporal.streaming")``,
which returns a logger with no handler attached and no level configured —
so the existing ``[TemporalStreamingModel] Initialized ... streaming_mode=...``
INFO log was silently dropped, making it impossible to verify at runtime
that a coalesced (or any) streaming mode was actually wired.

Switch to the SDK's ``make_logger`` helper (level=INFO, RichHandler in
local mode, StreamHandler otherwise) used everywhere else in the SDK.
The explicit logger name ``agentex.temporal.streaming`` is preserved so
any external logging configuration targeting that name keeps working.
@declan-scale declan-scale changed the base branch from main to next April 30, 2026 19:43
@smoreinis smoreinis merged commit e6f11c4 into next Apr 30, 2026
32 checks passed
@smoreinis smoreinis deleted the perf/streaming-coalesce branch April 30, 2026 20:06
@stainless-app stainless-app Bot mentioned this pull request Apr 30, 2026
declan-scale added a commit that referenced this pull request Apr 30, 2026
* feat(api): api update

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* chore(internal): more robust bootstrap script

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* fix: use correct field name format for multipart file arrays

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* feat: support setting headers via env

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* codegen metadata

* fix: allow litellm security patch (#336)

* fix(adk): Always inject headers on execute activity (#337)

* perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window) (#333)

* perf(streaming): coalesce per-token publishes to Redis (50ms / 128-char window)

Per-token Redis publishes from TemporalStreamingModel were adding ~45s
(56-62%) overhead to agent response latency, mostly from head-of-line
blocking on the model's event loop: each `await streaming_context.stream_update(...)`
inside the OpenAI stream `async for` paused token consumption until the
publish round-trip completed.

This change introduces a `CoalescingBuffer` driven by an `asyncio.Event`,
so the producer never awaits on Redis. Deltas are merged consecutive-only
(preserving character order in every (type, index) channel) and flushed
on a 50ms timer, on a 128-char size threshold, or immediately for the
first delta to keep perceived responsiveness high. The buffer's `close()`
drains remaining deltas before the DONE event, so consumers see the full
sequence in order.

A new `StreamingMode = Literal["off", "per_token", "coalesced"]` lives
in `streaming.py` as the single source of truth and is plumbed through
the adk streaming module, `StreamingService.streaming_task_message_context`,
and `StreamingTaskMessageContext`. Default is `"coalesced"` everywhere,
so all 13+ existing context callers (claude_agents, langgraph, litellm
provider, openai sync provider, etc.) benefit automatically.

* chore(streaming): fix import ordering (ruff I001)

* fix(streaming): address greptile review findings

- _run: when CancelledError is raised mid-flush in the for-loop, re-enqueue
  the in-flight item plus any remaining items in the local `drained` list
  back into self._buf so close()'s final drain can recover them. Previously
  the local `drained` list was unreachable after CancelledError exited the
  for-loop, causing the last coalesced batch to be silently dropped on
  close-during-flush races. Trade-off: the in-flight item may be duplicated
  on the consumer side (Redis pub may have completed before cancel was
  delivered), which is preferable to silent loss for streaming UX.

- _merge_pair: replace `return b` fallback with AssertionError. All six
  current TaskMessageDelta variants have explicit isinstance branches, so
  the fallback is unreachable today. But _can_merge returns True for any
  same-type pair, so adding a 7th delta variant without updating
  _merge_pair would silently drop `a`'s accumulated content. Asserting
  turns a future silent data-loss into an immediate, diagnosable crash.

* test(streaming): add coalescing-layer tests; loosen one model assertion

After merging the test-suite repair from main (#334) into this branch, one
model test (test_responses_api_streaming) regressed because its
assert_called_with strict-matched all kwargs of streaming_task_message_context
and didn't tolerate the new `streaming_mode='coalesced'` kwarg this PR
adds. Switched to assert_called() + targeted kwarg checks so the test
verifies what it cares about (task_id threading) without locking in
implementation details.

Replaced the ad-hoc smoke scripts that lived in conversation with a real
pytest module at tests/lib/core/services/adk/test_streaming.py covering:

- _delta_char_len, _can_merge, _merge_pair: per-channel correctness +
  None-handling
- _merge_consecutive: pure-text collapse, cross-channel order preservation,
  per-channel reconstruction matches per-token semantics
- CoalescingBuffer: first-delta-immediate flush within ~20ms,
  size-threshold flush before timer fires, multi-delta coalescing within
  one window, idle close, add-after-close no-op
- CoalescingBuffer cancel-during-flush regression test for the P1 fix:
  five queued chunks must all surface across publishes when close()
  cancels mid-flush (asserts substring presence rather than exact
  ordering, since the documented trade-off allows duplicates of the
  in-flight item)
- StreamingTaskMessageContext mode dispatch: "off" suppresses publishes
  but persists full content, "per_token" publishes each delta synchronously,
  "coalesced" batches and persists full content

* chore(streaming): route TemporalStreamingModel logger through make_logger

The model file used raw ``logging.getLogger("agentex.temporal.streaming")``,
which returns a logger with no handler attached and no level configured —
so the existing ``[TemporalStreamingModel] Initialized ... streaming_mode=...``
INFO log was silently dropped, making it impossible to verify at runtime
that a coalesced (or any) streaming mode was actually wired.

Switch to the SDK's ``make_logger`` helper (level=INFO, RichHandler in
local mode, StreamHandler otherwise) used everywhere else in the SDK.
The explicit logger name ``agentex.temporal.streaming`` is preserved so
any external logging configuration targeting that name keeps working.

* codegen metadata

* feat(api): api update

* release: 0.10.3

---------

Co-authored-by: stainless-app[bot] <142633134+stainless-app[bot]@users.noreply.github.com>
Co-authored-by: Brandon Allen <brandon.allen@scale.com>
Co-authored-by: Declan Brady <declan.brady@scale.com>
Co-authored-by: Stas Moreinis <stas.moreinis@scale.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants