Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 130 additions & 158 deletions examples/streamable_http_client.rb
Original file line number Diff line number Diff line change
@@ -1,49 +1,25 @@
# frozen_string_literal: true

$LOAD_PATH.unshift(File.expand_path("../lib", __dir__))
require "mcp"
require "net/http"
require "uri"
require "json"
require "logger"
require "event_stream_parser"

# Logger for client operations
logger = Logger.new($stdout)
logger.formatter = proc do |severity, datetime, _progname, msg|
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
end

# Server configuration
SERVER_URL = "http://localhost:9393/mcp"
PROTOCOL_VERSION = "2024-11-05"
SERVER_URL = "http://localhost:9393"

# Helper method to make JSON-RPC requests
def make_request(session_id, method, params = {}, id = nil)
uri = URI(SERVER_URL)
http = Net::HTTP.new(uri.host, uri.port)

request = Net::HTTP::Post.new(uri)
request["Content-Type"] = "application/json"
request["Mcp-Session-Id"] = session_id if session_id

body = {
jsonrpc: "2.0",
method: method,
params: params,
id: id || SecureRandom.uuid,
}

request.body = body.to_json
response = http.request(request)

{
status: response.code,
headers: response.to_hash,
body: JSON.parse(response.body),
}
rescue => e
{ error: e.message }
def create_logger
logger = Logger.new($stdout)
logger.formatter = proc do |severity, datetime, _progname, msg|
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
end
logger
end

# Connect to SSE stream
# The SDK does not yet implement the optional GET SSE stream, so this example
# uses MCP::Client for JSON-RPC requests and raw Net::HTTP for the event stream.
def connect_sse(session_id, logger)
uri = URI(SERVER_URL)

Expand All @@ -59,17 +35,13 @@ def connect_sse(session_id, logger)
if response.code == "200"
logger.info("SSE stream connected successfully")

parser = EventStreamParser::Parser.new
response.read_body do |chunk|
chunk.split("\n").each do |line|
if line.start_with?("data: ")
data = line[6..-1]
begin
logger.info("SSE data: #{data}")
rescue JSON::ParserError
logger.debug("Non-JSON SSE data: #{data}")
end
elsif line.start_with?(": ")
logger.debug("SSE keepalive received: #{line}")
parser.feed(chunk) do |type, data, _id|
if type.empty?
logger.info("SSE event: #{data}")
else
logger.info("SSE event (#{type}): #{data}")
end
end
end
Expand All @@ -79,129 +51,129 @@ def connect_sse(session_id, logger)
end
end
rescue Interrupt
logger.info("SSE connection interrupted by user")
logger.info("SSE connection interrupted")
rescue => e
logger.error("SSE connection error: #{e.message}")
end

# Main client flow
def main
logger = Logger.new($stdout)
logger.formatter = proc do |severity, datetime, _progname, msg|
"[CLIENT] #{severity} #{datetime.strftime("%H:%M:%S.%L")} - #{msg}\n"
end

puts "=== MCP SSE Test Client ==="

# Step 1: Initialize session
logger.info("Initializing session...")

init_response = make_request(
nil,
"initialize",
{
protocolVersion: PROTOCOL_VERSION,
capabilities: {},
clientInfo: {
name: "sse-test-client",
version: "1.0",
},
},
"init-1",
)

if init_response[:error]
logger.error("Failed to initialize: #{init_response[:error]}")
exit(1)
end

session_id = init_response[:headers]["mcp-session-id"]&.first

if session_id.nil?
logger.error("No session ID received")
exit(1)
end

if init_response[:body].dig("result", "capabilities", "logging")
make_request(session_id, "logging/setLevel", { level: "info" })
def print_response(response)
if response.nil?
puts "Response accepted; watch the SSE stream for the server response."
else
puts "Response: #{JSON.pretty_generate(response)}"
end
end

logger.info("Session initialized: #{session_id}")
logger.info("Server info: #{init_response[:body]["result"]["serverInfo"]}")

# Step 2: Start SSE connection in a separate thread
sse_thread = Thread.new { connect_sse(session_id, logger) }

# Give SSE time to connect
sleep(1)

# Step 3: Interactive menu
loop do
puts <<~MESSAGE.chomp

=== Available Actions ===
1. Send custom notification
2. Test echo
3. List tools
0. Exit

Choose an action:#{" "}
def main
logger = create_logger

puts <<~MESSAGE
MCP Streamable HTTP Client
Make sure the server is running (ruby examples/streamable_http_server.rb)
#{"=" * 60}
MESSAGE

http_transport = MCP::Client::HTTP.new(url: SERVER_URL)
client = MCP::Client.new(transport: http_transport)
sse_thread = nil

begin
puts "=== Initializing session ==="
server_info = client.connect(
client_info: { name: "streamable-http-client", version: "1.0" },
)

puts <<~MESSAGE
ID: #{http_transport.session_id}
Version: #{http_transport.protocol_version}
Server: #{server_info["serverInfo"]}
MESSAGE

choice = gets.chomp

case choice
when "1"
print("Enter notification message: ")
message = gets.chomp
print("Enter delay in seconds (0 for immediate): ")
delay = gets.chomp.to_f

response = make_request(
session_id,
"tools/call",
{
name: "notification_tool",
arguments: {
message: message,
delay: delay,
},
},
)
if response[:body]["accepted"]
logger.info("Notification sent successfully")
unless http_transport.session_id
logger.error("No session ID received; this example requires a stateful Streamable HTTP session.")
return
end

puts "=== Listing tools ==="
tools = client.tools
tools.each { |tool| puts " - #{tool.name}: #{tool.description}" }

echo_tool = tools.find { |tool| tool.name == "echo" }
notification_tool = tools.find { |tool| tool.name == "notification_tool" }

sse_thread = Thread.new { connect_sse(http_transport.session_id, logger) }
sleep(1)

# Once the optional SSE stream is active, POST requests may receive only a
# 202 ACK while the actual JSON-RPC response is delivered over SSE.
loop do
puts <<~MENU.chomp

=== Available Actions ===
1. Send notification (triggers SSE event)
2. Echo message
3. Show cached tools
0. Exit

Choose an action:#{" "}
MENU

case gets.chomp
when "1"
if notification_tool
print("Enter notification message: ")
message = gets.chomp
print("Enter delay in seconds (0 for immediate): ")
delay = gets.chomp.to_f

puts "=== Calling tool: notification_tool ==="
response = client.call_tool(
tool: notification_tool,
arguments: { message: message, delay: delay },
)
print_response(response)
else
puts "notification_tool not available"
end
when "2"
if echo_tool
print("Enter message to echo: ")
message = gets.chomp

puts "=== Calling tool: echo ==="
response = client.call_tool(tool: echo_tool, arguments: { message: message })
print_response(response)
else
puts "echo tool not available"
end
when "3"
puts "=== Cached tools ==="
tools.each { |tool| puts " - #{tool.name}: #{tool.description}" }
when "0"
logger.info("Exiting...")
break
else
logger.error("Error: #{response[:body]["error"]}")
puts "Invalid choice"
end
when "2"
print("Enter message to echo: ")
message = gets.chomp
make_request(session_id, "tools/call", { name: "echo", arguments: { message: message } })
when "3"
make_request(session_id, "tools/list")
when "0"
logger.info("Exiting...")
break
else
puts "Invalid choice"
end
rescue MCP::Client::SessionExpiredError => e
logger.error("Session expired: #{e.message}")
rescue MCP::Client::RequestHandlerError => e
logger.error("Request error: #{e.message}")
rescue Interrupt
logger.info("Client interrupted")
rescue => e
logger.error("Error: #{e.message}")
logger.error(e.backtrace.first(5).join("\n"))
ensure
sse_thread.kill if sse_thread&.alive?

if http_transport.connected?
puts "=== Closing session ==="
http_transport.close
puts "Session closed"
end
end

# Clean up
sse_thread.kill if sse_thread.alive?

# Close session
logger.info("Closing session...")
make_request(session_id, "close")
logger.info("Session closed")
rescue Interrupt
logger.info("Client interrupted by user")
rescue => e
logger.error("Client error: #{e.message}")
logger.error(e.backtrace.join("\n"))
end

# Run the client
if __FILE__ == $PROGRAM_NAME
main
end
main if __FILE__ == $PROGRAM_NAME