From 3350ca968fe353e771f492ec8e36ce5d7645f0a7 Mon Sep 17 00:00:00 2001 From: Andrey Barchenkov Date: Mon, 27 Apr 2026 19:17:24 -0400 Subject: [PATCH 1/2] fix: prevent infinite reconnection loop when SSE stream drops without response When the server accepts SSE connections but closes the stream without delivering a complete JSON-RPC response, the client retried forever because _handle_reconnection reset the attempt counter to 0 on each reconnection. Now the attempt counter only resets when real data (not just priming events) was received, indicating the server made progress. When the server only sends empty priming events and drops, the counter increments and the client gives up after MAX_RECONNECTION_ATTEMPTS. Also report a JSONRPCError back to the caller when max attempts are exceeded, so call_tool returns an error instead of hanging forever. Fixes #2393 --- src/mcp/client/streamable_http.py | 24 +++++++-- tests/shared/test_streamable_http.py | 74 ++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index 9a119c633..e5ea0a2de 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -380,8 +380,17 @@ async def _handle_reconnection( ) -> None: """Reconnect with Last-Event-ID to resume stream after server disconnect.""" # Bail if max retries exceeded - if attempt >= MAX_RECONNECTION_ATTEMPTS: # pragma: no cover - logger.debug(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") + if attempt >= MAX_RECONNECTION_ATTEMPTS: + logger.warning(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") + if isinstance(ctx.session_message.message, JSONRPCRequest): + error_data = ErrorData( + code=INTERNAL_ERROR, + message=f"SSE stream disconnected after {MAX_RECONNECTION_ATTEMPTS} reconnection attempts", + ) + error_msg = SessionMessage( + JSONRPCError(jsonrpc="2.0", id=ctx.session_message.message.id, error=error_data) + ) + await ctx.read_stream_writer.send(error_msg) return # Always wait - use server value or default @@ -404,12 +413,15 @@ async def _handle_reconnection( # Track for potential further reconnection reconnect_last_event_id: str = last_event_id reconnect_retry_ms = retry_interval_ms + received_data = False async for sse in event_source.aiter_sse(): if sse.id: # pragma: no branch reconnect_last_event_id = sse.id if sse.retry is not None: reconnect_retry_ms = sse.retry + if sse.data: + received_data = True is_complete = await self._handle_sse_event( sse, @@ -421,9 +433,13 @@ async def _handle_reconnection( await event_source.response.aclose() return - # Stream ended again without response - reconnect again (reset attempt counter) + # Stream ended without response — reconnect. + # Reset attempt counter only when real data was received + # (server made progress). Otherwise increment to prevent + # infinite loops when server only sends priming events. + next_attempt = 0 if received_data else attempt + 1 logger.info("SSE stream disconnected, reconnecting...") - await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, 0) + await self._handle_reconnection(ctx, reconnect_last_event_id, reconnect_retry_ms, next_attempt) except Exception as e: # pragma: no cover logger.debug(f"Reconnection failed: {e}") # Try to reconnect again if we still have an event ID diff --git a/tests/shared/test_streamable_http.py b/tests/shared/test_streamable_http.py index 3d5770fb6..8c11c2380 100644 --- a/tests/shared/test_streamable_http.py +++ b/tests/shared/test_streamable_http.py @@ -57,6 +57,7 @@ CallToolRequestParams, CallToolResult, InitializeResult, + JSONRPCError, JSONRPCRequest, ListToolsResult, PaginatedRequestParams, @@ -2318,3 +2319,76 @@ async def test_streamable_http_client_preserves_custom_with_mcp_headers( assert "content-type" in headers_data assert headers_data["content-type"] == "application/json" + + +@pytest.mark.anyio +async def test_handle_reconnection_stops_after_max_attempts() -> None: + """_handle_reconnection must not reset attempt counter on stream drop. + + Regression test for https://github.com/modelcontextprotocol/python-sdk/issues/2393. + When the server accepts the SSE connection but closes the stream without + sending a complete JSON-RPC response, the client must give up after + MAX_RECONNECTION_ATTEMPTS total attempts and report an error — not retry + forever. + """ + from unittest.mock import AsyncMock, MagicMock + + from mcp.client.streamable_http import MAX_RECONNECTION_ATTEMPTS, RequestContext + + transport = StreamableHTTPTransport("http://test/mcp") + connect_count = 0 + + @asynccontextmanager + async def fake_aconnect_sse(*_args: object, **_kwargs: object): + nonlocal connect_count + connect_count += 1 + + response = MagicMock() + response.raise_for_status = MagicMock() + response.aclose = AsyncMock() + + event_source = MagicMock() + event_source.response = response + + async def aiter_sse(): + yield ServerSentEvent(event="message", data="", id=f"evt-{connect_count}", retry=None) + + event_source.aiter_sse = aiter_sse + yield event_source + + write_stream, read_stream = create_context_streams[SessionMessage | Exception](1) + + request = JSONRPCRequest( + jsonrpc="2.0", + id="req-1", + method="tools/call", + params={"name": "test_tool", "arguments": {}}, + ) + ctx = RequestContext( + client=MagicMock(), + session_id="test-session", + session_message=SessionMessage(request), + metadata=None, + read_stream_writer=write_stream, + ) + + import mcp.client.streamable_http as _mod + + original = _mod.aconnect_sse + _mod.aconnect_sse = fake_aconnect_sse # type: ignore[assignment] + try: + await transport._handle_reconnection(ctx, "evt-0", 0) + finally: + _mod.aconnect_sse = original + + assert connect_count == MAX_RECONNECTION_ATTEMPTS + + with anyio.fail_after(1): + msg = await read_stream.receive() + assert isinstance(msg, SessionMessage) + assert isinstance(msg.message, JSONRPCError) + assert "reconnection attempts" in msg.message.error.message.lower() + assert msg.message.id == "req-1" + + await write_stream.aclose() + await read_stream.aclose() From f4d2ea2e9dbf2ca6e78c1e8c57ea865f7261de07 Mon Sep 17 00:00:00 2001 From: Andrey Barchenkov Date: Mon, 27 Apr 2026 20:10:44 -0400 Subject: [PATCH 2/2] fix: add pragma no branch for coverage on reconnection error path The isinstance check for JSONRPCRequest always evaluates to True because _handle_reconnection is only called for request messages. Mark the branch with pragma: no branch to satisfy 100% branch coverage. --- src/mcp/client/streamable_http.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mcp/client/streamable_http.py b/src/mcp/client/streamable_http.py index e5ea0a2de..5443aca48 100644 --- a/src/mcp/client/streamable_http.py +++ b/src/mcp/client/streamable_http.py @@ -382,7 +382,7 @@ async def _handle_reconnection( # Bail if max retries exceeded if attempt >= MAX_RECONNECTION_ATTEMPTS: logger.warning(f"Max reconnection attempts ({MAX_RECONNECTION_ATTEMPTS}) exceeded") - if isinstance(ctx.session_message.message, JSONRPCRequest): + if isinstance(ctx.session_message.message, JSONRPCRequest): # pragma: no branch error_data = ErrorData( code=INTERNAL_ERROR, message=f"SSE stream disconnected after {MAX_RECONNECTION_ATTEMPTS} reconnection attempts",