diff --git a/src/mcp/server/lowlevel/server.py b/src/mcp/server/lowlevel/server.py index 59de0ace4..df212cbdc 100644 --- a/src/mcp/server/lowlevel/server.py +++ b/src/mcp/server/lowlevel/server.py @@ -567,6 +567,7 @@ def streamable_http_app( stateless_http: bool = False, event_store: EventStore | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, transport_security: TransportSecuritySettings | None = None, host: str = "127.0.0.1", auth: AuthSettings | None = None, @@ -591,6 +592,7 @@ def streamable_http_app( json_response=json_response, stateless=stateless_http, security_settings=transport_security, + session_idle_timeout=session_idle_timeout, ) self._session_manager = session_manager diff --git a/src/mcp/server/mcpserver/server.py b/src/mcp/server/mcpserver/server.py index be77705da..3c21e7aed 100644 --- a/src/mcp/server/mcpserver/server.py +++ b/src/mcp/server/mcpserver/server.py @@ -1050,6 +1050,7 @@ def streamable_http_app( stateless_http: bool = False, event_store: EventStore | None = None, retry_interval: int | None = None, + session_idle_timeout: float | None = None, transport_security: TransportSecuritySettings | None = None, host: str = "127.0.0.1", ) -> Starlette: @@ -1060,6 +1061,7 @@ def streamable_http_app( stateless_http=stateless_http, event_store=event_store, retry_interval=retry_interval, + session_idle_timeout=session_idle_timeout, transport_security=transport_security, host=host, auth=self.settings.auth, diff --git a/src/mcp/server/streamable_http.py b/src/mcp/server/streamable_http.py index f14201857..52f760db0 100644 --- a/src/mcp/server/streamable_http.py +++ b/src/mcp/server/streamable_http.py @@ -173,6 +173,10 @@ def __init__( self._terminated = False # Idle timeout cancel scope; managed by the session manager. self.idle_scope: anyio.CancelScope | None = None + # Number of requests currently in flight on this session. While > 0, + # the session manager suspends the idle deadline so that a long-running + # request cannot be reaped mid-flight. + self.idle_active_requests: int = 0 @property def is_terminated(self) -> bool: diff --git a/src/mcp/server/streamable_http_manager.py b/src/mcp/server/streamable_http_manager.py index c25314eab..6e357a7f0 100644 --- a/src/mcp/server/streamable_http_manager.py +++ b/src/mcp/server/streamable_http_manager.py @@ -4,6 +4,7 @@ import contextlib import logging +import math from collections.abc import AsyncIterator from http import HTTPStatus from typing import TYPE_CHECKING, Any @@ -136,6 +137,32 @@ async def lifespan(app: Starlette) -> AsyncIterator[None]: # Clear any remaining server instances self._server_instances.clear() + @contextlib.asynccontextmanager + async def _suspend_idle_timeout(self, transport: StreamableHTTPServerTransport) -> AsyncIterator[None]: + """Suspend the idle-timeout deadline while a request is in flight. + + The idle timeout exists to reap sessions that receive no HTTP requests + for ``session_idle_timeout`` seconds; a request that is currently being + processed should not count as an idle session. While at least one + request is in flight we set the deadline to ``math.inf``; once the + last concurrent request completes we restore ``now + timeout``. + """ + active = transport.idle_scope is not None and self.session_idle_timeout is not None + if active: + transport.idle_active_requests += 1 + if transport.idle_active_requests == 1: + assert transport.idle_scope is not None + transport.idle_scope.deadline = math.inf + try: + yield + finally: + if active: + transport.idle_active_requests -= 1 + if transport.idle_active_requests == 0: + assert transport.idle_scope is not None + assert self.session_idle_timeout is not None + transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout + async def handle_request(self, scope: Scope, receive: Receive, send: Send) -> None: """Process ASGI request with proper session handling and transport setup. @@ -196,10 +223,8 @@ async def _handle_stateful_request(self, scope: Scope, receive: Receive, send: S if request_mcp_session_id is not None and request_mcp_session_id in self._server_instances: transport = self._server_instances[request_mcp_session_id] logger.debug("Session already exists, handling request directly") - # Push back idle deadline on activity - if transport.idle_scope is not None and self.session_idle_timeout is not None: - transport.idle_scope.deadline = anyio.current_time() + self.session_idle_timeout # pragma: no cover - await transport.handle_request(scope, receive, send) + async with self._suspend_idle_timeout(transport): + await transport.handle_request(scope, receive, send) return if request_mcp_session_id is None: @@ -267,7 +292,8 @@ async def run_server(*, task_status: TaskStatus[None] = anyio.TASK_STATUS_IGNORE await self._task_group.start(run_server) # Handle the HTTP request and return the response - await http_transport.handle_request(scope, receive, send) + async with self._suspend_idle_timeout(http_transport): + await http_transport.handle_request(scope, receive, send) else: # Unknown or expired session ID - return 404 per MCP spec # TODO: Align error code once spec clarifies diff --git a/tests/server/test_streamable_http_manager.py b/tests/server/test_streamable_http_manager.py index 47cfbf14a..a0e217015 100644 --- a/tests/server/test_streamable_http_manager.py +++ b/tests/server/test_streamable_http_manager.py @@ -413,3 +413,184 @@ def test_session_idle_timeout_rejects_non_positive(): def test_session_idle_timeout_rejects_stateless(): with pytest.raises(RuntimeError, match="not supported in stateless"): StreamableHTTPSessionManager(app=Server("test"), session_idle_timeout=30, stateless=True) + + +def test_session_idle_timeout_passthrough_lowlevel(): + """Server.streamable_http_app() exposes session_idle_timeout to its manager.""" + app = Server("test-passthrough") + app.streamable_http_app(session_idle_timeout=42) + assert app._session_manager is not None + assert app._session_manager.session_idle_timeout == 42 + + +def test_session_idle_timeout_passthrough_mcpserver(): + """MCPServer.streamable_http_app() forwards session_idle_timeout to the low-level wrapper.""" + from mcp.server.mcpserver.server import MCPServer + + server = MCPServer("test-passthrough-mcp") + server.streamable_http_app(session_idle_timeout=17) + assert server._lowlevel_server._session_manager is not None + assert server._lowlevel_server._session_manager.session_idle_timeout == 17 + + +@pytest.mark.anyio +async def test_suspend_idle_timeout_sets_deadline_inf_then_restores(): + """The helper must set deadline=inf for the duration of a request, then restore now+timeout.""" + import math + + app = Server("test-suspend") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=30) + transport = StreamableHTTPServerTransport(mcp_session_id="s1") + transport.idle_scope = anyio.CancelScope() + transport.idle_scope.deadline = anyio.current_time() + 30 + + async with manager._suspend_idle_timeout(transport): + assert transport.idle_active_requests == 1 + assert transport.idle_scope.deadline == math.inf + + assert transport.idle_active_requests == 0 + # Deadline restored to a finite value approximately now + timeout + assert transport.idle_scope.deadline != math.inf + assert transport.idle_scope.deadline > anyio.current_time() + + +@pytest.mark.anyio +async def test_suspend_idle_timeout_only_restores_after_last_concurrent_request(): + """With nested suspensions the deadline stays suspended until the outermost exit.""" + import math + + app = Server("test-suspend-nested") + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=30) + transport = StreamableHTTPServerTransport(mcp_session_id="s1") + transport.idle_scope = anyio.CancelScope() + transport.idle_scope.deadline = anyio.current_time() + 30 + + async with manager._suspend_idle_timeout(transport): + async with manager._suspend_idle_timeout(transport): + assert transport.idle_active_requests == 2 + assert transport.idle_scope.deadline == math.inf + # After the inner exit, one request still in flight — deadline must remain suspended. + assert transport.idle_active_requests == 1 + assert transport.idle_scope.deadline == math.inf + + assert transport.idle_active_requests == 0 + assert transport.idle_scope.deadline != math.inf + + +@pytest.mark.anyio +async def test_suspend_idle_timeout_no_op_without_timeout(): + """When session_idle_timeout is None the helper must not touch any state.""" + app = Server("test-suspend-noop") + manager = StreamableHTTPSessionManager(app=app) # no timeout + transport = StreamableHTTPServerTransport(mcp_session_id="s1") + + async with manager._suspend_idle_timeout(transport): + assert transport.idle_active_requests == 0 + assert transport.idle_scope is None + + assert transport.idle_active_requests == 0 + + +@pytest.mark.anyio +async def test_long_running_request_outlives_idle_timeout(): + """A handler running longer than session_idle_timeout must still complete successfully. + + This is the regression for the second half of #2455: previously the idle scope + fired mid-request and cancelled the in-flight tool call. + """ + from mcp.server import ServerRequestContext + from mcp.types import ( + CallToolRequestParams, + CallToolResult, + ListToolsResult, + PaginatedRequestParams, + TextContent, + Tool, + ) + + host = "testserver-long-request" + timeout = 0.1 + handler_delay = 0.4 + + async def handle_list_tools(ctx: ServerRequestContext, params: PaginatedRequestParams | None) -> ListToolsResult: + return ListToolsResult(tools=[Tool(name="slow", input_schema={"type": "object"})]) + + async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult: + await anyio.sleep(handler_delay) + return CallToolResult(content=[TextContent(type="text", text="done")]) + + app = Server("test-slow-handler", on_list_tools=handle_list_tools, on_call_tool=handle_call_tool) + mcp_app = app.streamable_http_app(host=host, session_idle_timeout=timeout) + + async with ( + mcp_app.router.lifespan_context(mcp_app), + httpx.ASGITransport(mcp_app) as transport, + httpx.AsyncClient(transport=transport) as http_client, + Client(streamable_http_client(f"http://{host}/mcp", http_client=http_client)) as client, + ): + result = await client.call_tool("slow", {}) + assert not result.is_error + assert result.content[0].text == "done" # type: ignore[union-attr] + + +@pytest.mark.anyio +async def test_idle_session_reaped_after_request_completes(): + """After a request finishes the idle deadline resumes; the session is eventually reaped.""" + app = Server("test-reap-after-request") + timeout = 0.05 + manager = StreamableHTTPSessionManager(app=app, session_idle_timeout=timeout) + + async with manager.run(): + sent_messages: list[Message] = [] + + async def mock_send(message: Message): + sent_messages.append(message) + + scope: dict[str, Any] = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [(b"content-type", b"application/json")], + } + + async def mock_receive(): # pragma: no cover + return {"type": "http.request", "body": b"", "more_body": False} + + await manager.handle_request(scope, mock_receive, mock_send) + + session_id: str | None = None + for msg in sent_messages: # pragma: no branch + if msg["type"] == "http.response.start": # pragma: no branch + for header_name, header_value in msg.get("headers", []): # pragma: no branch + if header_name.decode().lower() == MCP_SESSION_ID_HEADER.lower(): + session_id = header_value.decode() + break + if session_id: # pragma: no branch + break + assert session_id is not None + + # Once the request completed the idle deadline should have resumed; wait it out. + await anyio.sleep(timeout * 4) + + response_messages: list[Message] = [] + + async def capture_send(message: Message): + response_messages.append(message) + + scope_with_session: dict[str, Any] = { + "type": "http", + "method": "POST", + "path": "/mcp", + "headers": [ + (b"content-type", b"application/json"), + (b"mcp-session-id", session_id.encode()), + ], + } + await manager.handle_request(scope_with_session, mock_receive, capture_send) + + response_start = next( + (msg for msg in response_messages if msg["type"] == "http.response.start"), + None, + ) + assert response_start is not None + assert response_start["status"] == 404