From 1eaf71b5b49baa7aad9b0d8b216dfebaab972b4a Mon Sep 17 00:00:00 2001 From: Michael Vines Date: Mon, 2 Dec 2019 10:01:25 -0700 Subject: [PATCH] Improve error handling when the user mixes up gossip (8001) and RPC (8899) ports (#7158) automerge --- client/src/rpc_client_request.rs | 4 + net-utils/src/ip_echo_server.rs | 173 ++++++++++++++++++++----------- net-utils/src/lib.rs | 40 ++++++- 3 files changed, 154 insertions(+), 63 deletions(-) diff --git a/client/src/rpc_client_request.rs b/client/src/rpc_client_request.rs index 5345ce01ec..3593865e67 100644 --- a/client/src/rpc_client_request.rs +++ b/client/src/rpc_client_request.rs @@ -56,6 +56,10 @@ impl GenericRpcClientRequest for RpcClientRequest { .send() { Ok(mut response) => { + if !response.status().is_success() { + return Err(response.error_for_status().unwrap_err().into()); + } + let json: serde_json::Value = serde_json::from_str(&response.text()?)?; if json["error"].is_object() { return Err(RpcError::RpcRequestError(format!( diff --git a/net-utils/src/ip_echo_server.rs b/net-utils/src/ip_echo_server.rs index 73564c9e23..2205586634 100644 --- a/net-utils/src/ip_echo_server.rs +++ b/net-utils/src/ip_echo_server.rs @@ -42,83 +42,132 @@ pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer { .incoming() .map_err(|err| warn!("accept failed: {:?}", err)) .for_each(move |socket| { - let ip = socket.peer_addr().expect("Expect peer_addr()").ip(); - info!("connection from {:?}", ip); + let peer_addr = socket.peer_addr().expect("Expect peer_addr()"); + info!("connection from {:?}", peer_addr); let framed = BytesCodec::new().framed(socket); let (writer, reader) = framed.split(); let processor = reader - .and_then(move |bytes| { - bincode::deserialize::(&bytes).or_else(|err| { - Err(io::Error::new( + .and_then(move |data| { + if data.len() < 4 { + return Err(io::Error::new( io::ErrorKind::Other, - format!("Failed to deserialize IpEchoServerMessage: {:?}", err), - )) - }) + format!("Request too short, received {} bytes", data.len()), + )); + } + let request_header: String = data[0..4].iter().map(|b| *b as char).collect(); + if request_header != "\0\0\0\0" { + // Explicitly check for HTTP GET/POST requests to more gracefully handle + // the case where a user accidentally tried to use a gossip entrypoint in + // place of a JSON RPC URL: + if request_header == "GET " || request_header == "POST" { + return Ok(None); // None -> Send HTTP error response + } + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Bad request header: {}", request_header), + )); + } + + bincode::deserialize::(&data[4..]) + .map(Some) + .or_else(|err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!("Failed to deserialize IpEchoServerMessage: {:?}", err), + )) + }) }) - .and_then(move |msg| { - // Fire a datagram at each non-zero UDP port - if !msg.udp_ports.is_empty() { - match std::net::UdpSocket::bind("0.0.0.0:0") { - Ok(udp_socket) => { - for udp_port in &msg.udp_ports { - if *udp_port != 0 { - match udp_socket - .send_to(&[0], SocketAddr::from((ip, *udp_port))) - { - Ok(_) => debug!("Successful send_to udp/{}", udp_port), - Err(err) => { - info!("Failed to send_to udp/{}: {}", udp_port, err) + .and_then(move |maybe_msg| { + match maybe_msg { + None => None, // Send HTTP error response + Some(msg) => { + // Fire a datagram at each non-zero UDP port + if !msg.udp_ports.is_empty() { + match std::net::UdpSocket::bind("0.0.0.0:0") { + Ok(udp_socket) => { + for udp_port in &msg.udp_ports { + if *udp_port != 0 { + match udp_socket.send_to( + &[0], + SocketAddr::from((peer_addr.ip(), *udp_port)), + ) { + Ok(_) => debug!( + "Successful send_to udp/{}", + udp_port + ), + Err(err) => info!( + "Failed to send_to udp/{}: {}", + udp_port, err + ), + } } } } + Err(err) => { + warn!("Failed to bind local udp socket: {}", err); + } } } - Err(err) => { - warn!("Failed to bind local udp socket: {}", err); - } + + // Try to connect to each non-zero TCP port + let tcp_futures: Vec<_> = + msg.tcp_ports + .iter() + .filter_map(|tcp_port| { + let tcp_port = *tcp_port; + if tcp_port == 0 { + None + } else { + Some( + tokio::net::TcpStream::connect(&SocketAddr::new( + peer_addr.ip(), + tcp_port, + )) + .and_then(move |tcp_stream| { + debug!( + "Connection established to tcp/{}", + tcp_port + ); + let _ = tcp_stream + .shutdown(std::net::Shutdown::Both); + Ok(()) + }) + .timeout(Duration::from_secs(5)) + .or_else(move |err| { + Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Connection timeout to {}: {:?}", + tcp_port, err + ), + )) + }), + ) + } + }) + .collect(); + Some(future::join_all(tcp_futures)) } } - - // Try to connect to each non-zero TCP port - let tcp_futures: Vec<_> = msg - .tcp_ports - .iter() - .filter_map(|tcp_port| { - let tcp_port = *tcp_port; - if tcp_port == 0 { - None - } else { - Some( - tokio::net::TcpStream::connect(&SocketAddr::new(ip, tcp_port)) - .and_then(move |tcp_stream| { - debug!("Connection established to tcp/{}", tcp_port); - let _ = tcp_stream.shutdown(std::net::Shutdown::Both); - Ok(()) - }) - .timeout(Duration::from_secs(5)) - .or_else(move |err| { - Err(io::Error::new( - io::ErrorKind::Other, - format!( - "Connection timeout to {}: {:?}", - tcp_port, err - ), - )) - }), - ) - } - }) - .collect(); - future::join_all(tcp_futures) }) - .and_then(move |_| { - let ip = bincode::serialize(&ip).unwrap_or_else(|err| { - warn!("Failed to serialize: {:?}", err); - vec![] - }); - Ok(Bytes::from(ip)) + .and_then(move |valid_request| { + if valid_request.is_none() { + Ok(Bytes::from( + "HTTP/1.1 400 Bad Request\nContent-length: 0\n\n", + )) + } else { + // "\0\0\0\0" header is added to ensure a valid response will never + // conflict with the first four bytes of a valid HTTP response. + let mut bytes = vec![ + 0; + 4 + bincode::serialized_size(&peer_addr.ip()).unwrap() + as usize + ]; + bincode::serialize_into(&mut bytes[4..], &peer_addr.ip()).unwrap(); + Ok(Bytes::from(bytes)) + } }); let connection = writer diff --git a/net-utils/src/lib.rs b/net-utils/src/lib.rs index f789f35257..295b456967 100644 --- a/net-utils/src/lib.rs +++ b/net-utils/src/lib.rs @@ -30,7 +30,13 @@ fn ip_echo_server_request( TcpStream::connect_timeout(ip_echo_server_addr, timeout) .and_then(|mut stream| { let msg = bincode::serialize(&msg).expect("serialize IpEchoServerMessage"); + // Start with 4 null bytes to avoid looking like an HTTP GET/POST request + stream.write_all(&[0; 4])?; + stream.write_all(&msg)?; + + // Send a '\n' to make this request look HTTP-ish and tickle an error response back from an HTTP server + stream.write_all(b"\n")?; stream.shutdown(std::net::Shutdown::Write)?; stream .set_read_timeout(Some(Duration::new(10, 0))) @@ -38,7 +44,38 @@ fn ip_echo_server_request( stream.read_to_end(&mut data) }) .and_then(|_| { - bincode::deserialize(&data).map_err(|err| { + // It's common for users to accidentally confuse the validator's gossip port and JSON + // RPC port. Attempt to detect when this occurs by looking for the standard HTTP + // response header and provide the user with a helpful error message + if data.len() < 4 { + return Err(io::Error::new( + io::ErrorKind::Other, + format!("Response too short, received {} bytes", data.len()), + )); + } + + let response_header: String = data[0..4].iter().map(|b| *b as char).collect(); + if response_header != "\0\0\0\0" { + if response_header == "HTTP" { + let http_response = data.iter().map(|b| *b as char).collect::(); + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Invalid gossip entrypoint. {} looks to be an HTTP port: {}", + ip_echo_server_addr, http_response + ), + )); + } + return Err(io::Error::new( + io::ErrorKind::Other, + format!( + "Invalid gossip entrypoint. {} provided an invalid response header: '{}'", + ip_echo_server_addr, response_header + ), + )); + } + + bincode::deserialize(&data[3..]).map_err(|err| { io::Error::new( io::ErrorKind::Other, format!("Failed to deserialize: {:?}", err), @@ -435,6 +472,7 @@ mod tests { #[test] fn test_get_public_ip_addr() { + solana_logger::setup(); let (_server_port, (server_udp_socket, server_tcp_listener)) = bind_common_in_range((3200, 3250)).unwrap(); let (client_port, (client_udp_socket, client_tcp_listener)) =