diff --git a/examples/streamable_http_client.rb b/examples/streamable_http_client.rb index de83b231..7cca9ac1 100644 --- a/examples/streamable_http_client.rb +++ b/examples/streamable_http_client.rb @@ -1,49 +1,25 @@ # frozen_string_literal: true +$LOAD_PATH.unshift(File.expand_path("../lib", __dir__)) +require "mcp" require "net/http" require "uri" require "json" require "logger" +require "event_stream_parser" -# Logger for client operations -logger = Logger.new($stdout) -logger.formatter = proc do |severity, datetime, _progname, msg| - "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" -end - -# Server configuration -SERVER_URL = "http://localhost:9393/mcp" -PROTOCOL_VERSION = "2024-11-05" +SERVER_URL = "http://localhost:9393" -# Helper method to make JSON-RPC requests -def make_request(session_id, method, params = {}, id = nil) - uri = URI(SERVER_URL) - http = Net::HTTP.new(uri.host, uri.port) - - request = Net::HTTP::Post.new(uri) - request["Content-Type"] = "application/json" - request["Mcp-Session-Id"] = session_id if session_id - - body = { - jsonrpc: "2.0", - method: method, - params: params, - id: id || SecureRandom.uuid, - } - - request.body = body.to_json - response = http.request(request) - - { - status: response.code, - headers: response.to_hash, - body: JSON.parse(response.body), - } -rescue => e - { error: e.message } +def create_logger + logger = Logger.new($stdout) + logger.formatter = proc do |severity, datetime, _progname, msg| + "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" + end + logger end -# Connect to SSE stream +# The SDK does not yet implement the optional GET SSE stream, so this example +# uses MCP::Client for JSON-RPC requests and raw Net::HTTP for the event stream. def connect_sse(session_id, logger) uri = URI(SERVER_URL) @@ -59,17 +35,13 @@ def connect_sse(session_id, logger) if response.code == "200" logger.info("SSE stream connected successfully") + parser = EventStreamParser::Parser.new response.read_body do |chunk| - chunk.split("\n").each do |line| - if line.start_with?("data: ") - data = line[6..-1] - begin - logger.info("SSE data: #{data}") - rescue JSON::ParserError - logger.debug("Non-JSON SSE data: #{data}") - end - elsif line.start_with?(": ") - logger.debug("SSE keepalive received: #{line}") + parser.feed(chunk) do |type, data, _id| + if type.empty? + logger.info("SSE event: #{data}") + else + logger.info("SSE event (#{type}): #{data}") end end end @@ -79,129 +51,129 @@ def connect_sse(session_id, logger) end end rescue Interrupt - logger.info("SSE connection interrupted by user") + logger.info("SSE connection interrupted") rescue => e logger.error("SSE connection error: #{e.message}") end -# Main client flow -def main - logger = Logger.new($stdout) - logger.formatter = proc do |severity, datetime, _progname, msg| - "[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n" - end - - puts "=== MCP SSE Test Client ===" - - # Step 1: Initialize session - logger.info("Initializing session...") - - init_response = make_request( - nil, - "initialize", - { - protocolVersion: PROTOCOL_VERSION, - capabilities: {}, - clientInfo: { - name: "sse-test-client", - version: "1.0", - }, - }, - "init-1", - ) - - if init_response[:error] - logger.error("Failed to initialize: #{init_response[:error]}") - exit(1) - end - - session_id = init_response[:headers]["mcp-session-id"]&.first - - if session_id.nil? - logger.error("No session ID received") - exit(1) - end - - if init_response[:body].dig("result", "capabilities", "logging") - make_request(session_id, "logging/setLevel", { level: "info" }) +def print_response(response) + if response.nil? + puts "Response accepted; watch the SSE stream for the server response." + else + puts "Response: #{JSON.pretty_generate(response)}" end +end - logger.info("Session initialized: #{session_id}") - logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}") - - # Step 2: Start SSE connection in a separate thread - sse_thread = Thread.new { connect_sse(session_id, logger) } - - # Give SSE time to connect - sleep(1) - - # Step 3: Interactive menu - loop do - puts <<~MESSAGE.chomp - - === Available Actions === - 1. Send custom notification - 2. Test echo - 3. List tools - 0. Exit - - Choose an action:#{" "} +def main + logger = create_logger + + puts <<~MESSAGE + MCP Streamable HTTP Client + Make sure the server is running (ruby examples/streamable_http_server.rb) + #{"=" * 60} + MESSAGE + + http_transport = MCP::Client::HTTP.new(url: SERVER_URL) + client = MCP::Client.new(transport: http_transport) + sse_thread = nil + + begin + puts "=== Initializing session ===" + server_info = client.connect( + client_info: { name: "streamable-http-client", version: "1.0" }, + ) + + puts <<~MESSAGE + ID: #{http_transport.session_id} + Version: #{http_transport.protocol_version} + Server: #{server_info["serverInfo"]} MESSAGE - choice = gets.chomp - - case choice - when "1" - print("Enter notification message: ") - message = gets.chomp - print("Enter delay in seconds (0 for immediate): ") - delay = gets.chomp.to_f - - response = make_request( - session_id, - "tools/call", - { - name: "notification_tool", - arguments: { - message: message, - delay: delay, - }, - }, - ) - if response[:body]["accepted"] - logger.info("Notification sent successfully") + unless http_transport.session_id + logger.error("No session ID received; this example requires a stateful Streamable HTTP session.") + return + end + + puts "=== Listing tools ===" + tools = client.tools + tools.each { |tool| puts " - #{tool.name}: #{tool.description}" } + + echo_tool = tools.find { |tool| tool.name == "echo" } + notification_tool = tools.find { |tool| tool.name == "notification_tool" } + + sse_thread = Thread.new { connect_sse(http_transport.session_id, logger) } + sleep(1) + + # Once the optional SSE stream is active, POST requests may receive only a + # 202 ACK while the actual JSON-RPC response is delivered over SSE. + loop do + puts <<~MENU.chomp + + === Available Actions === + 1. Send notification (triggers SSE event) + 2. Echo message + 3. Show cached tools + 0. Exit + + Choose an action:#{" "} + MENU + + case gets.chomp + when "1" + if notification_tool + print("Enter notification message: ") + message = gets.chomp + print("Enter delay in seconds (0 for immediate): ") + delay = gets.chomp.to_f + + puts "=== Calling tool: notification_tool ===" + response = client.call_tool( + tool: notification_tool, + arguments: { message: message, delay: delay }, + ) + print_response(response) + else + puts "notification_tool not available" + end + when "2" + if echo_tool + print("Enter message to echo: ") + message = gets.chomp + + puts "=== Calling tool: echo ===" + response = client.call_tool(tool: echo_tool, arguments: { message: message }) + print_response(response) + else + puts "echo tool not available" + end + when "3" + puts "=== Cached tools ===" + tools.each { |tool| puts " - #{tool.name}: #{tool.description}" } + when "0" + logger.info("Exiting...") + break else - logger.error("Error: #{response[:body]["error"]}") + puts "Invalid choice" end - when "2" - print("Enter message to echo: ") - message = gets.chomp - make_request(session_id, "tools/call", { name: "echo", arguments: { message: message } }) - when "3" - make_request(session_id, "tools/list") - when "0" - logger.info("Exiting...") - break - else - puts "Invalid choice" + end + rescue MCP::Client::SessionExpiredError => e + logger.error("Session expired: #{e.message}") + rescue MCP::Client::RequestHandlerError => e + logger.error("Request error: #{e.message}") + rescue Interrupt + logger.info("Client interrupted") + rescue => e + logger.error("Error: #{e.message}") + logger.error(e.backtrace.first(5).join("\n")) + ensure + sse_thread.kill if sse_thread&.alive? + + if http_transport.connected? + puts "=== Closing session ===" + http_transport.close + puts "Session closed" end end - - # Clean up - sse_thread.kill if sse_thread.alive? - - # Close session - logger.info("Closing session...") - make_request(session_id, "close") - logger.info("Session closed") -rescue Interrupt - logger.info("Client interrupted by user") -rescue => e - logger.error("Client error: #{e.message}") - logger.error(e.backtrace.join("\n")) end -# Run the client -if __FILE__ == $PROGRAM_NAME - main -end +main if __FILE__ == $PROGRAM_NAME