From dbbd96a423d68d69b1c999f7d15fd6c97e07f79f Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 30 Apr 2026 17:12:14 -0400 Subject: [PATCH 1/6] feat(openai_agents): capture real Usage and allow opt-in prompt_cache_key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two related changes to the streaming Responses API path in `TemporalStreamingModel.get_response`. Both are observability/cache improvements; neither changes how the API is called for callers who don't opt in. 1. Capture real `Usage` from `ResponseCompletedEvent.response.usage`. Previously a zero-filled `Usage` was constructed and `event.response.usage` from the streaming protocol was discarded. The reasoning-tokens estimator (`len(''.join(reasoning_contents)) // 4`) is also dropped — the real value arrives in the API response. Falls back to zeros only when the stream ends without a `ResponseCompletedEvent` (error path). 2. Surface usage in the span's `output` dict. The `streaming_model_get_response` span now carries `{input_tokens, output_tokens, total_tokens, cached_input_tokens, reasoning_tokens}` so traces show cache-hit rate without external log scraping. 3. Plumb `prompt_cache_key` to `responses.create` as an opt-in. Callers set it via `model_settings.extra_args["prompt_cache_key"]`. We do not auto-inject a default — `prompt_cache_key` is not standard across OpenAI-compatible endpoints, and a non-OpenAI server that strictly validates request bodies could reject the field. When unset, the parameter resolves to `NOT_GIVEN` and is omitted from the request body entirely. Behavior on alternative providers is identical to today's unless a caller explicitly opts in. --- .../models/temporal_streaming_model.py | 61 +++++++++++++++---- 1 file changed, 48 insertions(+), 13 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index b21694e88..e7f697ad0 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -559,6 +559,11 @@ async def get_response( if model_settings.top_logprobs is not None: extra_args["top_logprobs"] = model_settings.top_logprobs + # Opt-in prompt_cache_key: forwarded only when the caller supplies it via + # model_settings.extra_args["prompt_cache_key"]. Not all OpenAI-compatible + # endpoints recognize this parameter, so we don't auto-inject a default. + prompt_cache_key = extra_args.pop("prompt_cache_key", NOT_GIVEN) + # Create the response stream using Responses API logger.debug(f"[TemporalStreamingModel] Creating response stream with Responses API") stream = await self.client.responses.create( # type: ignore[call-overload] @@ -589,12 +594,14 @@ async def get_response( extra_headers=model_settings.extra_headers, extra_query=model_settings.extra_query, extra_body=model_settings.extra_body, + prompt_cache_key=prompt_cache_key, # Any additional parameters from extra_args **extra_args, ) # Process the stream of events from Responses API output_items = [] + captured_usage = None current_text = "" streaming_context = None reasoning_context = None @@ -802,10 +809,12 @@ async def get_response( # Response completed logger.debug(f"[TemporalStreamingModel] Response completed") response = getattr(event, 'response', None) - if response and hasattr(response, 'output'): - # Use the final output from the response - output_items = response.output - logger.debug(f"[TemporalStreamingModel] Found {len(output_items)} output items in final response") + if response is not None: + if hasattr(response, 'output'): + # Use the final output from the response + output_items = response.output + logger.debug(f"[TemporalStreamingModel] Found {len(output_items)} output items in final response") + captured_usage = getattr(response, 'usage', None) # End of event processing loop - close any open contexts if reasoning_context: @@ -844,14 +853,33 @@ async def get_response( ) response_output.append(message) - # Create usage object - usage = Usage( - input_tokens=0, - output_tokens=0, - total_tokens=0, - input_tokens_details=InputTokensDetails(cached_tokens=0), - output_tokens_details=OutputTokensDetails(reasoning_tokens=len(''.join(reasoning_contents)) // 4), # Approximate - ) + # Use the real usage from the streaming Response if available; + # fall back to zeros only when the stream ended without a + # ResponseCompletedEvent (error paths). + if captured_usage is not None: + usage = Usage( + input_tokens=captured_usage.input_tokens, + output_tokens=captured_usage.output_tokens, + total_tokens=captured_usage.total_tokens, + input_tokens_details=InputTokensDetails( + cached_tokens=getattr( + captured_usage.input_tokens_details, "cached_tokens", 0 + ), + ), + output_tokens_details=OutputTokensDetails( + reasoning_tokens=getattr( + captured_usage.output_tokens_details, "reasoning_tokens", 0 + ), + ), + ) + else: + usage = Usage( + input_tokens=0, + output_tokens=0, + total_tokens=0, + input_tokens_details=InputTokensDetails(cached_tokens=0), + output_tokens_details=OutputTokensDetails(reasoning_tokens=0), + ) # Serialize response output items for span tracing new_items = [] @@ -900,6 +928,13 @@ async def get_response( output_data = { "new_items": new_items, "final_output": final_output, + "usage": { + "input_tokens": usage.input_tokens, + "output_tokens": usage.output_tokens, + "total_tokens": usage.total_tokens, + "cached_input_tokens": usage.input_tokens_details.cached_tokens, + "reasoning_tokens": usage.output_tokens_details.reasoning_tokens, + }, } # Include tool calls if any were in the input if tool_calls: @@ -907,7 +942,7 @@ async def get_response( # Include tool outputs if any were processed if tool_outputs: output_data["tool_outputs"] = tool_outputs - + span.output = output_data # Return the response From 125810a3e195a502490a66106beeb07c14b9ed48 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 30 Apr 2026 17:12:49 -0400 Subject: [PATCH 2/6] fix(openai_agents): return real response_id from streaming model MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `TemporalStreamingModel.get_response` was synthesizing a client-side UUID for `ModelResponse.response_id`: response_id=f"resp_{uuid.uuid4().hex[:8]}" Replace with the real `response.id` captured off `ResponseCompletedEvent.response.id` (alongside the `Usage` capture already happening in the same branch). On the error path, where the stream ends without a `ResponseCompletedEvent`, we return `None` — matching the documented `str | None` contract on `ModelResponse.response_id`. ## Why this matters The OpenAI Agents SDK reads `ModelResponse.response_id` in three places: - `agents/run.py:145` — gates whether the SDK chains via `previous_response_id` on the next call (the conditional is None-tolerant: a None value just leaves the chain pointer alone). - `agents/result.py:108` — exposes the value to user code as `RunResult.last_response_id`. - `agents/tracing/span_data.py:164` — written into trace records. A client-side UUID was never issued by any server. Any caller that picks it up and tries to chain via `previous_response_id` (the documented use case for `RunResult.last_response_id`) gets a 400 "response not found" from the API, surfacing far from the actual cause. Comparable SDK providers do this correctly: - `agents/models/openai_responses.py:149`: `response_id=response.id` - `agents/models/openai_chatcompletions.py:135`: `response_id=None` - `agents/extensions/models/litellm_model.py:182`: `response_id=None` `None` is the documented sentinel for "this provider doesn't support response_id," and the SDK is built to handle it. The bug has been latent since this file was added (commit 2f2a6ed7, Oct 10) because nothing in the codebase's call paths chains `previous_response_id` yet. The first caller that does (e.g. a multi-turn stateful Responses API workflow) triggers it. ## Compatibility This change is invisible to callers that don't read `response_id` — and nothing in `scale-agentex-python` reads it. A repo-wide grep finds zero consumers; only the (now-fixed) write site exists. The field is serialized into Temporal event history and trace records but consumed only by the OpenAI Agents SDK, which already handles `None`. --- .../openai_agents/models/temporal_streaming_model.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index e7f697ad0..0117d27c9 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -602,6 +602,7 @@ async def get_response( # Process the stream of events from Responses API output_items = [] captured_usage = None + captured_response_id = None current_text = "" streaming_context = None reasoning_context = None @@ -815,6 +816,7 @@ async def get_response( output_items = response.output logger.debug(f"[TemporalStreamingModel] Found {len(output_items)} output items in final response") captured_usage = getattr(response, 'usage', None) + captured_response_id = getattr(response, 'id', None) # End of event processing loop - close any open contexts if reasoning_context: @@ -945,11 +947,17 @@ async def get_response( span.output = output_data - # Return the response + # Return the response. response_id is the server-issued id from + # ResponseCompletedEvent.response.id, or None when the stream ended + # without a completed event (error path) — matching the documented + # `str | None` contract on `ModelResponse.response_id`. Returning + # None lets callers use it safely as `previous_response_id` for + # multi-turn chaining; a fabricated UUID would 400 against any real + # server. return ModelResponse( output=response_output, usage=usage, - response_id=f"resp_{uuid.uuid4().hex[:8]}", + response_id=captured_response_id, ) except Exception as e: From eb8ff68c002b1fbaee60d11d4e563380f8ca4090 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 30 Apr 2026 17:15:28 -0400 Subject: [PATCH 3/6] test(openai_agents): cover real Usage, real response_id, opt-in cache key MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Seven new tests in `TestStreamingModelUsageResponseIdAndCacheKey`: - Usage captured from `ResponseCompletedEvent.response.usage` - Usage falls back to zeros when stream ends without a completed event - Usage emitted in span output_data["usage"] - response_id captured from `ResponseCompletedEvent.response.id` - response_id is None (NOT a fabricated UUID) when stream ends without a completed event — guards against the previous footgun where a client-side UUID would be returned and silently break downstream `previous_response_id` chaining - prompt_cache_key resolves to NOT_GIVEN by default (omitted from request body, safe for non-OpenAI endpoints) - prompt_cache_key forwarded when caller opts in via `model_settings.extra_args["prompt_cache_key"]`, and popped from extra_args so it isn't passed twice Pre-existing tests in `TestStreamingModelBasics` (test_responses_api_streaming, test_task_id_threading, test_redis_context_creation) updated to set `response.id=None` on their `MagicMock(spec=ResponseCompletedEvent)` mocks. Without this, the auto-generated MagicMock attribute for `response.id` flows into `ModelResponse.response_id` and trips pydantic's `str | None` validation. --- .../tests/test_streaming_model.py | 250 +++++++++++++++++- 1 file changed, 246 insertions(+), 4 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py index 817e5e5b7..14c53680c 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py @@ -757,7 +757,7 @@ async def test_responses_api_streaming(self, streaming_model, mock_adk_streaming text_delta_2 = MagicMock(spec=ResponseTextDeltaEvent) text_delta_2.delta = "world!" completed = MagicMock(spec=ResponseCompletedEvent) - completed.response = MagicMock(output=[], usage=MagicMock()) + completed.response = MagicMock(output=[], usage=MagicMock(), id=None) mock_stream = AsyncMock() mock_stream.__aiter__.return_value = iter([item_added, text_delta_1, text_delta_2, completed]) streaming_model.client.responses.create.return_value = mock_stream @@ -796,7 +796,7 @@ async def test_task_id_threading(self, streaming_model, mock_adk_streaming, _str item_added.item = MagicMock(type="message") item_added.output_index = 0 completed = MagicMock(spec=ResponseCompletedEvent) - completed.response = MagicMock(output=[], usage=MagicMock()) + completed.response = MagicMock(output=[], usage=MagicMock(), id=None) mock_stream = AsyncMock() mock_stream.__aiter__.return_value = iter([item_added, completed]) streaming_model.client.responses.create.return_value = mock_stream @@ -832,7 +832,7 @@ async def test_redis_context_creation(self, streaming_model, mock_adk_streaming, reasoning_delta.delta = "Thinking..." reasoning_delta.summary_index = 0 completed = MagicMock(spec=ResponseCompletedEvent) - completed.response = MagicMock(output=[], usage=MagicMock()) + completed.response = MagicMock(output=[], usage=MagicMock(), id=None) mock_stream = AsyncMock() mock_stream.__aiter__.return_value = iter([item_added, reasoning_delta, completed]) streaming_model.client.responses.create.return_value = mock_stream @@ -871,4 +871,246 @@ async def test_missing_task_id_error(self, streaming_model): output_schema=None, handoffs=[], tracing=None, - ) \ No newline at end of file + ) + + +class TestStreamingModelUsageResponseIdAndCacheKey: + """Cover real-Usage capture, real response_id, span emission, and opt-in prompt_cache_key.""" + + @staticmethod + def _async_iter(events): + async def _gen(): + for event in events: + yield event + return _gen() + + @staticmethod + def _make_response_completed_event( + *, + input_tokens: int = 0, + output_tokens: int = 0, + total_tokens: int = 0, + cached_tokens: int = 0, + reasoning_tokens: int = 0, + with_usage: bool = True, + response_id: str | None = "resp_real_server_id", + ): + usage = MagicMock() + usage.input_tokens = input_tokens + usage.output_tokens = output_tokens + usage.total_tokens = total_tokens + usage.input_tokens_details = MagicMock(cached_tokens=cached_tokens) + usage.output_tokens_details = MagicMock(reasoning_tokens=reasoning_tokens) + + response = MagicMock() + response.output = [] + response.usage = usage if with_usage else None + response.id = response_id + + event = MagicMock(spec=ResponseCompletedEvent) + event.response = response + return event + + @pytest.fixture + def mock_span(self): + return MagicMock() + + @pytest.fixture + def streaming_model_with_mock_tracer(self, streaming_model, mock_span): + """A streaming_model whose tracer.trace().span(...) yields a captured mock span.""" + async_cm = MagicMock() + async_cm.__aenter__ = AsyncMock(return_value=mock_span) + async_cm.__aexit__ = AsyncMock(return_value=False) + trace_obj = MagicMock() + trace_obj.span = MagicMock(return_value=async_cm) + streaming_model.tracer = MagicMock() + streaming_model.tracer.trace = MagicMock(return_value=trace_obj) + return streaming_model + + @pytest.mark.asyncio + async def test_usage_captured_from_completed_event( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event( + input_tokens=1234, output_tokens=56, total_tokens=1290, + cached_tokens=987, reasoning_tokens=42, + ) + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + response = await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + assert response.usage.input_tokens == 1234 + assert response.usage.output_tokens == 56 + assert response.usage.total_tokens == 1290 + assert response.usage.input_tokens_details.cached_tokens == 987 + assert response.usage.output_tokens_details.reasoning_tokens == 42 + + @pytest.mark.asyncio + async def test_usage_falls_back_when_no_completed_event( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """Stream ending without a ResponseCompletedEvent (error path) → zero Usage.""" + model = streaming_model_with_mock_tracer + model.client.responses.create = AsyncMock(return_value=self._async_iter([])) + + response = await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + assert response.usage.input_tokens == 0 + assert response.usage.output_tokens == 0 + assert response.usage.total_tokens == 0 + assert response.usage.input_tokens_details.cached_tokens == 0 + assert response.usage.output_tokens_details.reasoning_tokens == 0 + + @pytest.mark.asyncio + async def test_usage_emitted_in_span_output( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + mock_span, + ): + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event( + input_tokens=100, output_tokens=10, total_tokens=110, + cached_tokens=80, reasoning_tokens=5, + ) + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + assert isinstance(mock_span.output, dict) + usage_block = mock_span.output["usage"] + assert usage_block == { + "input_tokens": 100, + "output_tokens": 10, + "total_tokens": 110, + "cached_input_tokens": 80, + "reasoning_tokens": 5, + } + + @pytest.mark.asyncio + async def test_response_id_captured_from_completed_event( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """Real server-issued id flows back on ModelResponse.response_id.""" + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event(response_id="resp_abcdef123456") + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + response = await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + assert response.response_id == "resp_abcdef123456" + + @pytest.mark.asyncio + async def test_response_id_is_none_when_no_completed_event( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """Stream ending without ResponseCompletedEvent → response_id is None. + + Critical: must NOT fabricate a UUID. Returning a fake id would cause + downstream `previous_response_id` chaining to 400 against the server. + """ + model = streaming_model_with_mock_tracer + model.client.responses.create = AsyncMock(return_value=self._async_iter([])) + + response = await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + assert response.response_id is None + + @pytest.mark.asyncio + async def test_prompt_cache_key_not_sent_by_default( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """Without an opt-in, prompt_cache_key resolves to NOT_GIVEN (omitted from request).""" + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + kwargs = model.client.responses.create.call_args.kwargs + assert kwargs["prompt_cache_key"] is NOT_GIVEN + + @pytest.mark.asyncio + async def test_prompt_cache_key_forwarded_when_opted_in( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """Caller opt-in via model_settings.extra_args is forwarded to responses.create.""" + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(extra_args={"prompt_cache_key": "my-key"}), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + kwargs = model.client.responses.create.call_args.kwargs + assert kwargs["prompt_cache_key"] == "my-key" + # Must be popped from extra_args so the SDK doesn't see it twice. + assert list(kwargs).count("prompt_cache_key") == 1 \ No newline at end of file From 3d8f266e5a4967da2c1dfe1bd7873f39bdc2ce57 Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 30 Apr 2026 17:38:26 -0400 Subject: [PATCH 4/6] feat(openai_agents): forward previous_response_id from SDK kwarg MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OpenAI Agents SDK's `Model.get_response` abstract has three keyword-only parameters: `previous_response_id`, `conversation_id`, `prompt`. The SDK threads them down through `_ServerConversationTracker` when callers use `Runner.run(..., previous_response_id=X)` or set `RunConfig` with `auto_previous_response_id=True`. `TemporalStreamingModel.get_response` was declared with `**kwargs # noqa: ARG002`, which silently swallowed all three. Callers who used the SDK's official chaining API saw their `previous_response_id` disappear and got no stateful behavior — without an error. This commit: - Replaces `**kwargs` with explicit `previous_response_id`, `conversation_id`, `prompt` params, matching the abstract. - Forwards `previous_response_id` to `responses.create` via `_non_null_or_not_given` (so `None` resolves to `NOT_GIVEN` and the field is omitted from the request body — identical behavior to today for callers that don't opt in). - Accepts `conversation_id` and `prompt` for SDK contract compliance but does not forward them yet (marked `# noqa: ARG002`); they can be wired through later if a use case appears. ## Compatibility with non-OpenAI backends Same opt-in pattern as `prompt_cache_key`. `TemporalStreamingModel` calls `responses.create`, but the underlying client can be pointed at any OpenAI-compatible server (LiteLLM proxy, Foundry, vLLM, etc.). Some of those backends don't recognize `previous_response_id`. Because we forward it only when explicitly set, callers who don't opt in see no change in the wire request — the field is filtered out by `NOT_GIVEN`. Callers who opt in are responsible for knowing whether their backend supports it. ## Test housekeeping The 27 existing tests that passed `task_id=sample_task_id` to `get_response` were relying on `**kwargs` to silently swallow it. Production reads `task_id` from a ContextVar (set by `ContextInterceptor` in real Temporal flows, set by the `_streaming_context_vars` fixture in tests), not from a function argument. The kwarg was vestigial cruft. Removed. --- .../models/temporal_streaming_model.py | 17 ++- .../tests/test_streaming_model.py | 114 +++++++++++++----- 2 files changed, 103 insertions(+), 28 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 0117d27c9..5a2cd9b3b 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -51,6 +51,7 @@ ResponseReasoningSummaryTextDeltaEvent, ResponseFunctionCallArgumentsDeltaEvent, ) +from openai.types.responses.response_prompt_param import ResponsePromptParam # AgentEx SDK imports from agentex.lib import adk @@ -465,12 +466,25 @@ async def get_response( output_schema: Optional[AgentOutputSchemaBase], handoffs: list[Handoff], tracing: ModelTracing, # noqa: ARG002 - **kwargs, # noqa: ARG002 + *, + previous_response_id: Optional[str] = None, + conversation_id: Optional[str] = None, # noqa: ARG002 + prompt: Optional[ResponsePromptParam] = None, # noqa: ARG002 ) -> ModelResponse: """Get a non-streaming response from the model with streaming to Redis. This method is used by Temporal activities and needs to return a complete response, but we stream the response to Redis while generating it. + + ``previous_response_id`` enables stateful multi-turn chaining on the + Responses API: when set, the server retains the prior response's + chain-of-thought and only the new input items need to be sent. Forwarded + only when explicitly set — not all OpenAI-compatible backends support + this parameter, so the default is omitted from the request body via + ``NOT_GIVEN``. + + ``conversation_id`` and ``prompt`` are accepted to satisfy the + ``Model.get_response`` abstract contract but not currently forwarded. """ task_id = streaming_task_id.get() @@ -595,6 +609,7 @@ async def get_response( extra_query=model_settings.extra_query, extra_body=model_settings.extra_body, prompt_cache_key=prompt_cache_key, + previous_response_id=self._non_null_or_not_given(previous_response_id), # Any additional parameters from extra_args **extra_args, ) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py index 14c53680c..fa2c5ede4 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py @@ -43,7 +43,6 @@ async def test_temperature_setting(self, streaming_model, _streaming_context_var output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) # Verify temperature was passed correctly @@ -73,7 +72,6 @@ async def test_top_p_setting(self, streaming_model, _streaming_context_vars, sam output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -101,7 +99,6 @@ async def test_max_tokens_setting(self, streaming_model, _streaming_context_vars output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -131,7 +128,6 @@ async def test_reasoning_effort_settings(self, streaming_model, _streaming_conte output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -161,7 +157,6 @@ async def test_reasoning_summary_settings(self, streaming_model, _streaming_cont output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -199,7 +194,6 @@ async def test_tool_choice_variations(self, streaming_model, _streaming_context_ output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -227,7 +221,6 @@ async def test_parallel_tool_calls(self, streaming_model, _streaming_context_var output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -255,7 +248,6 @@ async def test_truncation_strategy(self, streaming_model, _streaming_context_var output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -284,7 +276,6 @@ async def test_response_include(self, streaming_model, _streaming_context_vars, output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -314,7 +305,6 @@ async def test_verbosity(self, streaming_model, _streaming_context_vars, sample_ output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -347,7 +337,6 @@ async def test_metadata_and_store(self, streaming_model, _streaming_context_vars output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -383,7 +372,6 @@ async def test_extra_headers_and_body(self, streaming_model, _streaming_context_ output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -412,7 +400,6 @@ async def test_top_logprobs(self, streaming_model, _streaming_context_vars, samp output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -445,7 +432,6 @@ async def test_function_tool(self, streaming_model, _streaming_context_vars, sam output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -475,7 +461,6 @@ async def test_web_search_tool(self, streaming_model, _streaming_context_vars, s output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -502,7 +487,6 @@ async def test_file_search_tool(self, streaming_model, _streaming_context_vars, output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -531,7 +515,6 @@ async def test_computer_tool(self, streaming_model, _streaming_context_vars, sam output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -563,7 +546,6 @@ async def test_multiple_computer_tools_error(self, streaming_model, _streaming_c output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) @pytest.mark.asyncio @@ -585,7 +567,6 @@ async def test_hosted_mcp_tool(self, streaming_model, _streaming_context_vars, s output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -613,7 +594,6 @@ async def test_image_generation_tool(self, streaming_model, _streaming_context_v output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -640,7 +620,6 @@ async def test_code_interpreter_tool(self, streaming_model, _streaming_context_v output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -667,7 +646,6 @@ async def test_local_shell_tool(self, streaming_model, _streaming_context_vars, output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -695,7 +673,6 @@ async def test_handoffs(self, streaming_model, _streaming_context_vars, sample_t output_schema=None, handoffs=[sample_handoff], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -725,7 +702,6 @@ async def test_mixed_tools(self, streaming_model, _streaming_context_vars, sampl output_schema=None, handoffs=[sample_handoff], tracing=None, - task_id=sample_task_id ) create_call = streaming_model.client.responses.create.call_args @@ -770,7 +746,6 @@ async def test_responses_api_streaming(self, streaming_model, mock_adk_streaming output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) # Verify streaming context was created @@ -845,7 +820,6 @@ async def test_redis_context_creation(self, streaming_model, mock_adk_streaming, output_schema=None, handoffs=[], tracing=None, - task_id=sample_task_id ) # Should create at least one context for reasoning @@ -1113,4 +1087,90 @@ async def test_prompt_cache_key_forwarded_when_opted_in( kwargs = model.client.responses.create.call_args.kwargs assert kwargs["prompt_cache_key"] == "my-key" # Must be popped from extra_args so the SDK doesn't see it twice. - assert list(kwargs).count("prompt_cache_key") == 1 \ No newline at end of file + assert list(kwargs).count("prompt_cache_key") == 1 + + @pytest.mark.asyncio + async def test_previous_response_id_not_sent_by_default( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """Without an opt-in, previous_response_id resolves to NOT_GIVEN. + + Critical for non-Responses-API-native backends (e.g. Claude-via-LiteLLM) + where unknown fields on the request body could be rejected. NOT_GIVEN + is filtered before serialization, so the field is omitted entirely. + """ + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + kwargs = model.client.responses.create.call_args.kwargs + assert kwargs["previous_response_id"] is NOT_GIVEN + + @pytest.mark.asyncio + async def test_previous_response_id_forwarded_via_sdk_kwarg( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """The SDK threads previous_response_id as a keyword arg per Model.get_response + abstract contract. Verify it reaches responses.create instead of being silently + swallowed (which was the prior behavior under **kwargs).""" + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + previous_response_id="resp_prior_turn", + ) + + kwargs = model.client.responses.create.call_args.kwargs + assert kwargs["previous_response_id"] == "resp_prior_turn" + + @pytest.mark.asyncio + async def test_conversation_id_and_prompt_accepted_but_not_forwarded( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """conversation_id and prompt are accepted to satisfy the SDK abstract + contract but not currently forwarded to responses.create.""" + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + # Should not raise — both kwargs are accepted by the signature. + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + conversation_id="conv_test", + prompt=None, + ) + + kwargs = model.client.responses.create.call_args.kwargs + # Neither should appear in the outgoing request kwargs. + assert "conversation_id" not in kwargs + assert "prompt" not in kwargs \ No newline at end of file From a9e9d5c51f9afa922694125ddcd210426a22776e Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 30 Apr 2026 18:08:36 -0400 Subject: [PATCH 5/6] feat(openai_agents): forward conversation_id and prompt to responses.create MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SDK's ``Model.get_response`` abstract has three Responses API server-state parameters: ``previous_response_id``, ``conversation_id``, ``prompt``. The prior commit wired up ``previous_response_id`` and accepted the other two for SDK contract compliance but discarded them with ``# noqa: ARG002``. Accept-and-discard is a code smell: callers using the SDK's ``Runner.run(conversation_id=..., prompt=...)`` API would see their arguments silently dropped. Since both map directly to ``responses.create`` kwargs and we're already on that endpoint, the cost of forwarding is two lines and removes the smell entirely. - ``conversation_id`` (SDK abstract name) → ``conversation`` (responses.create endpoint kwarg). The ``Conversation`` type accepts ``str`` directly, so no translation is needed. - ``prompt`` is the same name on both sides. Both follow the same opt-in pattern as ``previous_response_id`` and ``prompt_cache_key``: ``None`` resolves to ``NOT_GIVEN`` and is omitted from the request body, so behavior on alternative OpenAI-compatible backends is unchanged unless a caller explicitly opts in. --- .../models/temporal_streaming_model.py | 24 ++++--- .../tests/test_streaming_model.py | 72 ++++++++++++++++--- 2 files changed, 76 insertions(+), 20 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py index 5a2cd9b3b..4b2bc96bf 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/models/temporal_streaming_model.py @@ -468,23 +468,21 @@ async def get_response( tracing: ModelTracing, # noqa: ARG002 *, previous_response_id: Optional[str] = None, - conversation_id: Optional[str] = None, # noqa: ARG002 - prompt: Optional[ResponsePromptParam] = None, # noqa: ARG002 + conversation_id: Optional[str] = None, + prompt: Optional[ResponsePromptParam] = None, ) -> ModelResponse: """Get a non-streaming response from the model with streaming to Redis. This method is used by Temporal activities and needs to return a complete response, but we stream the response to Redis while generating it. - ``previous_response_id`` enables stateful multi-turn chaining on the - Responses API: when set, the server retains the prior response's - chain-of-thought and only the new input items need to be sent. Forwarded - only when explicitly set — not all OpenAI-compatible backends support - this parameter, so the default is omitted from the request body via - ``NOT_GIVEN``. - - ``conversation_id`` and ``prompt`` are accepted to satisfy the - ``Model.get_response`` abstract contract but not currently forwarded. + ``previous_response_id``, ``conversation_id``, and ``prompt`` are all + Responses API server-state parameters threaded through by the OpenAI + Agents SDK. Each is forwarded to ``responses.create`` only when + explicitly set — defaults resolve to ``NOT_GIVEN`` and are omitted from + the request body. Not all OpenAI-compatible backends recognize these + fields, so callers on alternative providers see no wire-level change + unless they opt in. """ task_id = streaming_task_id.get() @@ -610,6 +608,10 @@ async def get_response( extra_body=model_settings.extra_body, prompt_cache_key=prompt_cache_key, previous_response_id=self._non_null_or_not_given(previous_response_id), + # SDK abstract names this conversation_id; the Responses API + # endpoint kwarg is `conversation` (accepts a str id directly). + conversation=self._non_null_or_not_given(conversation_id), + prompt=self._non_null_or_not_given(prompt), # Any additional parameters from extra_args **extra_args, ) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py index fa2c5ede4..a301ac590 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py @@ -1146,18 +1146,75 @@ async def test_previous_response_id_forwarded_via_sdk_kwarg( assert kwargs["previous_response_id"] == "resp_prior_turn" @pytest.mark.asyncio - async def test_conversation_id_and_prompt_accepted_but_not_forwarded( + async def test_conversation_and_prompt_not_sent_by_default( self, streaming_model_with_mock_tracer, _streaming_context_vars, # noqa: ARG002 ): - """conversation_id and prompt are accepted to satisfy the SDK abstract - contract but not currently forwarded to responses.create.""" + """Without an opt-in, conversation/prompt resolve to NOT_GIVEN. + + Same opt-in pattern as previous_response_id and prompt_cache_key — the + wire request is unchanged for callers (and non-OpenAI backends) that + don't supply these. + """ + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + ) + + kwargs = model.client.responses.create.call_args.kwargs + assert kwargs["conversation"] is NOT_GIVEN + assert kwargs["prompt"] is NOT_GIVEN + + @pytest.mark.asyncio + async def test_conversation_id_forwarded_via_sdk_kwarg( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """The SDK abstract names this `conversation_id`; the Responses API + endpoint kwarg is `conversation`. Caller passes a string id; we forward + it as-is (the Conversation type accepts str).""" + model = streaming_model_with_mock_tracer + completed = self._make_response_completed_event() + model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) + + await model.get_response( + system_instructions=None, + input="hi", + model_settings=ModelSettings(), + tools=[], + output_schema=None, + handoffs=[], + tracing=None, + conversation_id="conv_abc123", + ) + + kwargs = model.client.responses.create.call_args.kwargs + assert kwargs["conversation"] == "conv_abc123" + + @pytest.mark.asyncio + async def test_prompt_forwarded_via_sdk_kwarg( + self, + streaming_model_with_mock_tracer, + _streaming_context_vars, # noqa: ARG002 + ): + """ResponsePromptParam (a TypedDict for pre-built prompts) is forwarded + as-is to responses.create.""" model = streaming_model_with_mock_tracer completed = self._make_response_completed_event() model.client.responses.create = AsyncMock(return_value=self._async_iter([completed])) - # Should not raise — both kwargs are accepted by the signature. + prompt_param = {"id": "prompt_test_id", "version": "1"} await model.get_response( system_instructions=None, input="hi", @@ -1166,11 +1223,8 @@ async def test_conversation_id_and_prompt_accepted_but_not_forwarded( output_schema=None, handoffs=[], tracing=None, - conversation_id="conv_test", - prompt=None, + prompt=prompt_param, # type: ignore[arg-type] ) kwargs = model.client.responses.create.call_args.kwargs - # Neither should appear in the outgoing request kwargs. - assert "conversation_id" not in kwargs - assert "prompt" not in kwargs \ No newline at end of file + assert kwargs["prompt"] == prompt_param \ No newline at end of file From 55ee5a73da1e7127c46d4a420a6d50d0247c699e Mon Sep 17 00:00:00 2001 From: Devon Peticolas Date: Thu, 30 Apr 2026 18:16:55 -0400 Subject: [PATCH 6/6] test(openai_agents): drop unused sample_task_id fixtures, fix PEP 604 union Two ruff fixes for the test file: - ARG002 on 25 test method signatures: the prior commit (forward previous_response_id from SDK kwarg) stripped the vestigial ``task_id=sample_task_id`` kwargs from get_response calls, but left ``sample_task_id`` in the test method parameter lists. The contextvars fixture (``_streaming_context_vars``) already pulls ``sample_task_id`` transitively, so the explicit param is redundant. Removed from the 25 flagged signatures; preserved on ``test_responses_api_streaming`` where it's still used inside the body to assert against the streaming context. - FA102 on _make_response_completed_event: the new test helper used a PEP 604 union (``str | None``) without ``from __future__ import annotations``. Switched to ``Optional[str]`` to keep the change local to the helper rather than retrofitting future annotations across the file. --- .../tests/test_streaming_model.py | 53 ++++++++++--------- 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py index a301ac590..830f70da7 100644 --- a/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py +++ b/src/agentex/lib/core/temporal/plugins/openai_agents/tests/test_streaming_model.py @@ -2,6 +2,7 @@ Comprehensive tests for StreamingModel with all configurations and tool types. """ +from typing import Optional from unittest.mock import AsyncMock, MagicMock import pytest @@ -20,7 +21,7 @@ class TestStreamingModelSettings: """Test that all ModelSettings parameters work with Responses API""" @pytest.mark.asyncio - async def test_temperature_setting(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_temperature_setting(self, streaming_model, _streaming_context_vars): """Test that temperature parameter is properly passed to Responses API""" streaming_model.client.responses.create = AsyncMock() @@ -50,7 +51,7 @@ async def test_temperature_setting(self, streaming_model, _streaming_context_var assert create_call.kwargs['temperature'] == temp @pytest.mark.asyncio - async def test_top_p_setting(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_top_p_setting(self, streaming_model, _streaming_context_vars): """Test that top_p parameter is properly passed to Responses API""" streaming_model.client.responses.create = AsyncMock() @@ -79,7 +80,7 @@ async def test_top_p_setting(self, streaming_model, _streaming_context_vars, sam assert create_call.kwargs['top_p'] == expected @pytest.mark.asyncio - async def test_max_tokens_setting(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_max_tokens_setting(self, streaming_model, _streaming_context_vars): """Test that max_tokens is properly mapped to max_output_tokens""" streaming_model.client.responses.create = AsyncMock() @@ -105,7 +106,7 @@ async def test_max_tokens_setting(self, streaming_model, _streaming_context_vars assert create_call.kwargs['max_output_tokens'] == 2000 @pytest.mark.asyncio - async def test_reasoning_effort_settings(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_reasoning_effort_settings(self, streaming_model, _streaming_context_vars): """Test reasoning effort levels (low/medium/high)""" streaming_model.client.responses.create = AsyncMock() @@ -134,7 +135,7 @@ async def test_reasoning_effort_settings(self, streaming_model, _streaming_conte assert create_call.kwargs['reasoning'] == {"effort": effort} @pytest.mark.asyncio - async def test_reasoning_summary_settings(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_reasoning_summary_settings(self, streaming_model, _streaming_context_vars): """Test reasoning summary settings (auto/none)""" streaming_model.client.responses.create = AsyncMock() @@ -163,7 +164,7 @@ async def test_reasoning_summary_settings(self, streaming_model, _streaming_cont assert create_call.kwargs['reasoning'] == {"effort": "medium", "summary": summary} @pytest.mark.asyncio - async def test_tool_choice_variations(self, streaming_model, _streaming_context_vars, sample_task_id, sample_function_tool): + async def test_tool_choice_variations(self, streaming_model, _streaming_context_vars, sample_function_tool): """Test various tool_choice settings""" streaming_model.client.responses.create = AsyncMock() @@ -200,7 +201,7 @@ async def test_tool_choice_variations(self, streaming_model, _streaming_context_ assert create_call.kwargs['tool_choice'] == expected @pytest.mark.asyncio - async def test_parallel_tool_calls(self, streaming_model, _streaming_context_vars, sample_task_id, sample_function_tool): + async def test_parallel_tool_calls(self, streaming_model, _streaming_context_vars, sample_function_tool): """Test parallel tool calls setting""" streaming_model.client.responses.create = AsyncMock() @@ -227,7 +228,7 @@ async def test_parallel_tool_calls(self, streaming_model, _streaming_context_var assert create_call.kwargs['parallel_tool_calls'] == parallel @pytest.mark.asyncio - async def test_truncation_strategy(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_truncation_strategy(self, streaming_model, _streaming_context_vars): """Test truncation parameter""" streaming_model.client.responses.create = AsyncMock() @@ -254,7 +255,7 @@ async def test_truncation_strategy(self, streaming_model, _streaming_context_var assert create_call.kwargs['truncation'] == "auto" @pytest.mark.asyncio - async def test_response_include(self, streaming_model, _streaming_context_vars, sample_task_id, sample_file_search_tool): + async def test_response_include(self, streaming_model, _streaming_context_vars, sample_file_search_tool): """Test response include parameter""" streaming_model.client.responses.create = AsyncMock() @@ -285,7 +286,7 @@ async def test_response_include(self, streaming_model, _streaming_context_vars, assert "file_search_call.results" in include_list # Added by file search tool @pytest.mark.asyncio - async def test_verbosity(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_verbosity(self, streaming_model, _streaming_context_vars): """Test verbosity settings""" streaming_model.client.responses.create = AsyncMock() @@ -311,7 +312,7 @@ async def test_verbosity(self, streaming_model, _streaming_context_vars, sample_ assert create_call.kwargs['text'] == {"verbosity": "high"} @pytest.mark.asyncio - async def test_metadata_and_store(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_metadata_and_store(self, streaming_model, _streaming_context_vars): """Test metadata and store parameters""" streaming_model.client.responses.create = AsyncMock() @@ -344,7 +345,7 @@ async def test_metadata_and_store(self, streaming_model, _streaming_context_vars assert create_call.kwargs['store'] == store @pytest.mark.asyncio - async def test_extra_headers_and_body(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_extra_headers_and_body(self, streaming_model, _streaming_context_vars): """Test extra customization parameters""" streaming_model.client.responses.create = AsyncMock() @@ -380,7 +381,7 @@ async def test_extra_headers_and_body(self, streaming_model, _streaming_context_ assert create_call.kwargs['extra_query'] == extra_query @pytest.mark.asyncio - async def test_top_logprobs(self, streaming_model, _streaming_context_vars, sample_task_id): + async def test_top_logprobs(self, streaming_model, _streaming_context_vars): """Test top_logprobs parameter""" streaming_model.client.responses.create = AsyncMock() @@ -414,7 +415,7 @@ class TestStreamingModelTools: """Test that all tool types work with streaming""" @pytest.mark.asyncio - async def test_function_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_function_tool): + async def test_function_tool(self, streaming_model, _streaming_context_vars, sample_function_tool): """Test FunctionTool conversion and streaming""" streaming_model.client.responses.create = AsyncMock() @@ -443,7 +444,7 @@ async def test_function_tool(self, streaming_model, _streaming_context_vars, sam assert 'parameters' in tools[0] @pytest.mark.asyncio - async def test_web_search_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_web_search_tool): + async def test_web_search_tool(self, streaming_model, _streaming_context_vars, sample_web_search_tool): """Test WebSearchTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -469,7 +470,7 @@ async def test_web_search_tool(self, streaming_model, _streaming_context_vars, s assert tools[0]['type'] == 'web_search' @pytest.mark.asyncio - async def test_file_search_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_file_search_tool): + async def test_file_search_tool(self, streaming_model, _streaming_context_vars, sample_file_search_tool): """Test FileSearchTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -497,7 +498,7 @@ async def test_file_search_tool(self, streaming_model, _streaming_context_vars, assert tools[0]['max_num_results'] == 10 @pytest.mark.asyncio - async def test_computer_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_computer_tool): + async def test_computer_tool(self, streaming_model, _streaming_context_vars, sample_computer_tool): """Test ComputerTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -526,7 +527,7 @@ async def test_computer_tool(self, streaming_model, _streaming_context_vars, sam assert tools[0]['display_height'] == 1080 @pytest.mark.asyncio - async def test_multiple_computer_tools_error(self, streaming_model, _streaming_context_vars, sample_task_id, sample_computer_tool): + async def test_multiple_computer_tools_error(self, streaming_model, _streaming_context_vars, sample_computer_tool): """Test that multiple computer tools raise an error""" streaming_model.client.responses.create = AsyncMock() @@ -549,7 +550,7 @@ async def test_multiple_computer_tools_error(self, streaming_model, _streaming_c ) @pytest.mark.asyncio - async def test_hosted_mcp_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_hosted_mcp_tool): + async def test_hosted_mcp_tool(self, streaming_model, _streaming_context_vars, sample_hosted_mcp_tool): """Test HostedMCPTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -576,7 +577,7 @@ async def test_hosted_mcp_tool(self, streaming_model, _streaming_context_vars, s assert tools[0]['server_label'] == 'test_server' @pytest.mark.asyncio - async def test_image_generation_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_image_generation_tool): + async def test_image_generation_tool(self, streaming_model, _streaming_context_vars, sample_image_generation_tool): """Test ImageGenerationTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -602,7 +603,7 @@ async def test_image_generation_tool(self, streaming_model, _streaming_context_v assert tools[0]['type'] == 'image_generation' @pytest.mark.asyncio - async def test_code_interpreter_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_code_interpreter_tool): + async def test_code_interpreter_tool(self, streaming_model, _streaming_context_vars, sample_code_interpreter_tool): """Test CodeInterpreterTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -628,7 +629,7 @@ async def test_code_interpreter_tool(self, streaming_model, _streaming_context_v assert tools[0]['type'] == 'code_interpreter' @pytest.mark.asyncio - async def test_local_shell_tool(self, streaming_model, _streaming_context_vars, sample_task_id, sample_local_shell_tool): + async def test_local_shell_tool(self, streaming_model, _streaming_context_vars, sample_local_shell_tool): """Test LocalShellTool conversion""" streaming_model.client.responses.create = AsyncMock() @@ -655,7 +656,7 @@ async def test_local_shell_tool(self, streaming_model, _streaming_context_vars, # working_directory no longer in API - LocalShellTool uses executor internally @pytest.mark.asyncio - async def test_handoffs(self, streaming_model, _streaming_context_vars, sample_task_id, sample_handoff): + async def test_handoffs(self, streaming_model, _streaming_context_vars, sample_handoff): """Test Handoff conversion to function tools""" streaming_model.client.responses.create = AsyncMock() @@ -683,7 +684,7 @@ async def test_handoffs(self, streaming_model, _streaming_context_vars, sample_t assert tools[0]['description'] == 'Transfer to support agent' @pytest.mark.asyncio - async def test_mixed_tools(self, streaming_model, _streaming_context_vars, sample_task_id, + async def test_mixed_tools(self, streaming_model, _streaming_context_vars, sample_function_tool, sample_web_search_tool, sample_handoff): """Test multiple tools together""" streaming_model.client.responses.create = AsyncMock() @@ -794,7 +795,7 @@ async def test_task_id_threading(self, streaming_model, mock_adk_streaming, _str assert call_args.kwargs['task_id'] == expected_task_id @pytest.mark.asyncio - async def test_redis_context_creation(self, streaming_model, mock_adk_streaming, _streaming_context_vars, sample_task_id): + async def test_redis_context_creation(self, streaming_model, mock_adk_streaming, _streaming_context_vars): """Test that Redis streaming contexts are created properly""" streaming_model.client.responses.create = AsyncMock() @@ -867,7 +868,7 @@ def _make_response_completed_event( cached_tokens: int = 0, reasoning_tokens: int = 0, with_usage: bool = True, - response_id: str | None = "resp_real_server_id", + response_id: Optional[str] = "resp_real_server_id", ): usage = MagicMock() usage.input_tokens = input_tokens