Improve error handling when the user mixes up gossip (8001) and RPC (8899) ports (#7158)
automerge
This commit is contained in:
parent
0f872af502
commit
1eaf71b5b4
|
@ -56,6 +56,10 @@ impl GenericRpcClientRequest for RpcClientRequest {
|
||||||
.send()
|
.send()
|
||||||
{
|
{
|
||||||
Ok(mut response) => {
|
Ok(mut response) => {
|
||||||
|
if !response.status().is_success() {
|
||||||
|
return Err(response.error_for_status().unwrap_err().into());
|
||||||
|
}
|
||||||
|
|
||||||
let json: serde_json::Value = serde_json::from_str(&response.text()?)?;
|
let json: serde_json::Value = serde_json::from_str(&response.text()?)?;
|
||||||
if json["error"].is_object() {
|
if json["error"].is_object() {
|
||||||
return Err(RpcError::RpcRequestError(format!(
|
return Err(RpcError::RpcRequestError(format!(
|
||||||
|
|
|
@ -42,35 +42,65 @@ pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
|
||||||
.incoming()
|
.incoming()
|
||||||
.map_err(|err| warn!("accept failed: {:?}", err))
|
.map_err(|err| warn!("accept failed: {:?}", err))
|
||||||
.for_each(move |socket| {
|
.for_each(move |socket| {
|
||||||
let ip = socket.peer_addr().expect("Expect peer_addr()").ip();
|
let peer_addr = socket.peer_addr().expect("Expect peer_addr()");
|
||||||
info!("connection from {:?}", ip);
|
info!("connection from {:?}", peer_addr);
|
||||||
|
|
||||||
let framed = BytesCodec::new().framed(socket);
|
let framed = BytesCodec::new().framed(socket);
|
||||||
let (writer, reader) = framed.split();
|
let (writer, reader) = framed.split();
|
||||||
|
|
||||||
let processor = reader
|
let processor = reader
|
||||||
.and_then(move |bytes| {
|
.and_then(move |data| {
|
||||||
bincode::deserialize::<IpEchoServerMessage>(&bytes).or_else(|err| {
|
if data.len() < 4 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("Request too short, received {} bytes", data.len()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
let request_header: String = data[0..4].iter().map(|b| *b as char).collect();
|
||||||
|
if request_header != "\0\0\0\0" {
|
||||||
|
// Explicitly check for HTTP GET/POST requests to more gracefully handle
|
||||||
|
// the case where a user accidentally tried to use a gossip entrypoint in
|
||||||
|
// place of a JSON RPC URL:
|
||||||
|
if request_header == "GET " || request_header == "POST" {
|
||||||
|
return Ok(None); // None -> Send HTTP error response
|
||||||
|
}
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("Bad request header: {}", request_header),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
bincode::deserialize::<IpEchoServerMessage>(&data[4..])
|
||||||
|
.map(Some)
|
||||||
|
.or_else(|err| {
|
||||||
Err(io::Error::new(
|
Err(io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
format!("Failed to deserialize IpEchoServerMessage: {:?}", err),
|
format!("Failed to deserialize IpEchoServerMessage: {:?}", err),
|
||||||
))
|
))
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
.and_then(move |msg| {
|
.and_then(move |maybe_msg| {
|
||||||
|
match maybe_msg {
|
||||||
|
None => None, // Send HTTP error response
|
||||||
|
Some(msg) => {
|
||||||
// Fire a datagram at each non-zero UDP port
|
// Fire a datagram at each non-zero UDP port
|
||||||
if !msg.udp_ports.is_empty() {
|
if !msg.udp_ports.is_empty() {
|
||||||
match std::net::UdpSocket::bind("0.0.0.0:0") {
|
match std::net::UdpSocket::bind("0.0.0.0:0") {
|
||||||
Ok(udp_socket) => {
|
Ok(udp_socket) => {
|
||||||
for udp_port in &msg.udp_ports {
|
for udp_port in &msg.udp_ports {
|
||||||
if *udp_port != 0 {
|
if *udp_port != 0 {
|
||||||
match udp_socket
|
match udp_socket.send_to(
|
||||||
.send_to(&[0], SocketAddr::from((ip, *udp_port)))
|
&[0],
|
||||||
{
|
SocketAddr::from((peer_addr.ip(), *udp_port)),
|
||||||
Ok(_) => debug!("Successful send_to udp/{}", udp_port),
|
) {
|
||||||
Err(err) => {
|
Ok(_) => debug!(
|
||||||
info!("Failed to send_to udp/{}: {}", udp_port, err)
|
"Successful send_to udp/{}",
|
||||||
}
|
udp_port
|
||||||
|
),
|
||||||
|
Err(err) => info!(
|
||||||
|
"Failed to send_to udp/{}: {}",
|
||||||
|
udp_port, err
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -82,8 +112,8 @@ pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to connect to each non-zero TCP port
|
// Try to connect to each non-zero TCP port
|
||||||
let tcp_futures: Vec<_> = msg
|
let tcp_futures: Vec<_> =
|
||||||
.tcp_ports
|
msg.tcp_ports
|
||||||
.iter()
|
.iter()
|
||||||
.filter_map(|tcp_port| {
|
.filter_map(|tcp_port| {
|
||||||
let tcp_port = *tcp_port;
|
let tcp_port = *tcp_port;
|
||||||
|
@ -91,10 +121,17 @@ pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
|
||||||
None
|
None
|
||||||
} else {
|
} else {
|
||||||
Some(
|
Some(
|
||||||
tokio::net::TcpStream::connect(&SocketAddr::new(ip, tcp_port))
|
tokio::net::TcpStream::connect(&SocketAddr::new(
|
||||||
|
peer_addr.ip(),
|
||||||
|
tcp_port,
|
||||||
|
))
|
||||||
.and_then(move |tcp_stream| {
|
.and_then(move |tcp_stream| {
|
||||||
debug!("Connection established to tcp/{}", tcp_port);
|
debug!(
|
||||||
let _ = tcp_stream.shutdown(std::net::Shutdown::Both);
|
"Connection established to tcp/{}",
|
||||||
|
tcp_port
|
||||||
|
);
|
||||||
|
let _ = tcp_stream
|
||||||
|
.shutdown(std::net::Shutdown::Both);
|
||||||
Ok(())
|
Ok(())
|
||||||
})
|
})
|
||||||
.timeout(Duration::from_secs(5))
|
.timeout(Duration::from_secs(5))
|
||||||
|
@ -111,14 +148,26 @@ pub fn ip_echo_server(tcp: std::net::TcpListener) -> IpEchoServer {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
future::join_all(tcp_futures)
|
Some(future::join_all(tcp_futures))
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
.and_then(move |_| {
|
.and_then(move |valid_request| {
|
||||||
let ip = bincode::serialize(&ip).unwrap_or_else(|err| {
|
if valid_request.is_none() {
|
||||||
warn!("Failed to serialize: {:?}", err);
|
Ok(Bytes::from(
|
||||||
vec![]
|
"HTTP/1.1 400 Bad Request\nContent-length: 0\n\n",
|
||||||
});
|
))
|
||||||
Ok(Bytes::from(ip))
|
} else {
|
||||||
|
// "\0\0\0\0" header is added to ensure a valid response will never
|
||||||
|
// conflict with the first four bytes of a valid HTTP response.
|
||||||
|
let mut bytes = vec![
|
||||||
|
0;
|
||||||
|
4 + bincode::serialized_size(&peer_addr.ip()).unwrap()
|
||||||
|
as usize
|
||||||
|
];
|
||||||
|
bincode::serialize_into(&mut bytes[4..], &peer_addr.ip()).unwrap();
|
||||||
|
Ok(Bytes::from(bytes))
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let connection = writer
|
let connection = writer
|
||||||
|
|
|
@ -30,7 +30,13 @@ fn ip_echo_server_request(
|
||||||
TcpStream::connect_timeout(ip_echo_server_addr, timeout)
|
TcpStream::connect_timeout(ip_echo_server_addr, timeout)
|
||||||
.and_then(|mut stream| {
|
.and_then(|mut stream| {
|
||||||
let msg = bincode::serialize(&msg).expect("serialize IpEchoServerMessage");
|
let msg = bincode::serialize(&msg).expect("serialize IpEchoServerMessage");
|
||||||
|
// Start with 4 null bytes to avoid looking like an HTTP GET/POST request
|
||||||
|
stream.write_all(&[0; 4])?;
|
||||||
|
|
||||||
stream.write_all(&msg)?;
|
stream.write_all(&msg)?;
|
||||||
|
|
||||||
|
// Send a '\n' to make this request look HTTP-ish and tickle an error response back from an HTTP server
|
||||||
|
stream.write_all(b"\n")?;
|
||||||
stream.shutdown(std::net::Shutdown::Write)?;
|
stream.shutdown(std::net::Shutdown::Write)?;
|
||||||
stream
|
stream
|
||||||
.set_read_timeout(Some(Duration::new(10, 0)))
|
.set_read_timeout(Some(Duration::new(10, 0)))
|
||||||
|
@ -38,7 +44,38 @@ fn ip_echo_server_request(
|
||||||
stream.read_to_end(&mut data)
|
stream.read_to_end(&mut data)
|
||||||
})
|
})
|
||||||
.and_then(|_| {
|
.and_then(|_| {
|
||||||
bincode::deserialize(&data).map_err(|err| {
|
// It's common for users to accidentally confuse the validator's gossip port and JSON
|
||||||
|
// RPC port. Attempt to detect when this occurs by looking for the standard HTTP
|
||||||
|
// response header and provide the user with a helpful error message
|
||||||
|
if data.len() < 4 {
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!("Response too short, received {} bytes", data.len()),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let response_header: String = data[0..4].iter().map(|b| *b as char).collect();
|
||||||
|
if response_header != "\0\0\0\0" {
|
||||||
|
if response_header == "HTTP" {
|
||||||
|
let http_response = data.iter().map(|b| *b as char).collect::<String>();
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!(
|
||||||
|
"Invalid gossip entrypoint. {} looks to be an HTTP port: {}",
|
||||||
|
ip_echo_server_addr, http_response
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::Other,
|
||||||
|
format!(
|
||||||
|
"Invalid gossip entrypoint. {} provided an invalid response header: '{}'",
|
||||||
|
ip_echo_server_addr, response_header
|
||||||
|
),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
bincode::deserialize(&data[3..]).map_err(|err| {
|
||||||
io::Error::new(
|
io::Error::new(
|
||||||
io::ErrorKind::Other,
|
io::ErrorKind::Other,
|
||||||
format!("Failed to deserialize: {:?}", err),
|
format!("Failed to deserialize: {:?}", err),
|
||||||
|
@ -435,6 +472,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_get_public_ip_addr() {
|
fn test_get_public_ip_addr() {
|
||||||
|
solana_logger::setup();
|
||||||
let (_server_port, (server_udp_socket, server_tcp_listener)) =
|
let (_server_port, (server_udp_socket, server_tcp_listener)) =
|
||||||
bind_common_in_range((3200, 3250)).unwrap();
|
bind_common_in_range((3200, 3250)).unwrap();
|
||||||
let (client_port, (client_udp_socket, client_tcp_listener)) =
|
let (client_port, (client_udp_socket, client_tcp_listener)) =
|
||||||
|
|
Loading…
Reference in New Issue