fix(net): Fix handshake timing and error handling (#4772)

* Actually wait between initial peer connections

* Add a missing span to initial handshake tasks

* Forward handshake panics to the calling task

* Clarify a handshake comment

* Wrap the entire handshake in a timeout, not just some messages

* Actually delay spawning initial connections, so we don't flood the network

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
teor 2022-08-01 07:05:52 +10:00 committed by GitHub
parent 61f363947e
commit e9c9ea91bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 35 deletions

View File

@ -6,6 +6,7 @@ use std::{
fmt, fmt,
future::Future, future::Future,
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
panic,
pin::Pin, pin::Pin,
sync::Arc, sync::Arc,
task::{Context, Poll}, task::{Context, Poll},
@ -17,7 +18,7 @@ use tokio::{
io::{AsyncRead, AsyncWrite}, io::{AsyncRead, AsyncWrite},
sync::broadcast, sync::broadcast,
task::JoinError, task::JoinError,
time::{timeout, Instant}, time::{error, timeout, Instant},
}; };
use tokio_stream::wrappers::IntervalStream; use tokio_stream::wrappers::IntervalStream;
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
@ -807,10 +808,6 @@ where
"negotiating protocol version with remote peer" "negotiating protocol version with remote peer"
); );
// CORRECTNESS
//
// As a defence-in-depth against hangs, every send() or next() on peer_conn
// should be wrapped in a timeout.
let mut peer_conn = Framed::new( let mut peer_conn = Framed::new(
data_stream, data_stream,
Codec::builder() Codec::builder()
@ -820,20 +817,17 @@ where
); );
// Wrap the entire initial connection setup in a timeout. // Wrap the entire initial connection setup in a timeout.
let (remote_version, remote_services, remote_canonical_addr) = timeout( let (remote_version, remote_services, remote_canonical_addr) = negotiate_version(
constants::HANDSHAKE_TIMEOUT, &mut peer_conn,
negotiate_version( &connected_addr,
&mut peer_conn, config,
&connected_addr, nonces,
config, user_agent,
nonces, our_services,
user_agent, relay,
our_services, minimum_peer_version,
relay,
minimum_peer_version,
),
) )
.await??; .await?;
// If we've learned potential peer addresses from an inbound // If we've learned potential peer addresses from an inbound
// connection or handshake, add those addresses to our address book. // connection or handshake, add those addresses to our address book.
@ -874,8 +868,9 @@ where
debug!("constructing client, spawning server"); debug!("constructing client, spawning server");
// These channels should not be cloned more than they are // These channels communicate between the inbound and outbound halves of the connection,
// in this block, see constants.rs for more. // and between the different connection tasks. We create separate tasks and channels
// for each new connection.
let (server_tx, server_rx) = futures::channel::mpsc::channel(0); let (server_tx, server_rx) = futures::channel::mpsc::channel(0);
let (shutdown_tx, shutdown_rx) = oneshot::channel(); let (shutdown_tx, shutdown_rx) = oneshot::channel();
let error_slot = ErrorSlot::default(); let error_slot = ErrorSlot::default();
@ -1014,9 +1009,28 @@ where
Ok(client) Ok(client)
}; };
// Spawn a new task to drive this handshake. // Correctness: As a defence-in-depth against hangs, wrap the entire handshake in a timeout.
let fut = timeout(constants::HANDSHAKE_TIMEOUT, fut);
// Spawn a new task to drive this handshake, forwarding panics to the calling task.
tokio::spawn(fut.instrument(negotiator_span)) tokio::spawn(fut.instrument(negotiator_span))
.map(|x: Result<Result<Client, HandshakeError>, JoinError>| Ok(x??)) .map(
|join_result: Result<
Result<Result<Client, HandshakeError>, error::Elapsed>,
JoinError,
>| {
match join_result {
Ok(Ok(Ok(connection_client))) => Ok(connection_client),
Ok(Ok(Err(handshake_error))) => Err(handshake_error.into()),
Ok(Err(timeout_error)) => Err(timeout_error.into()),
Err(join_error) => match join_error.try_into_panic() {
// Forward panics to the calling task
Ok(panic_reason) => panic::resume_unwind(panic_reason),
Err(join_error) => Err(join_error.into()),
},
}
},
)
.boxed() .boxed()
} }
} }

View File

@ -289,21 +289,24 @@ where
addr, addr,
connection_tracker, connection_tracker,
}; };
let outbound_connector = outbound_connector.clone();
// Construct a connector future but do not drive it yet ... // Spawn a new task to make the outbound connection.
let outbound_connector_future = outbound_connector tokio::spawn(
.clone() async move {
.oneshot(req) // Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`,
.map_err(move |e| (addr, e)); // sleeping for an interval according to its index in the list.
sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await;
// ... instead, spawn a new task to handle this connector // As soon as we create the connector future,
tokio::spawn(async move { // the handshake starts running as a spawned task.
let task = outbound_connector_future.await; outbound_connector
// Only spawn one outbound connector per `MIN_PEER_CONNECTION_INTERVAL`, .oneshot(req)
// sleeping for an interval according to its index in the list. .map_err(move |e| (addr, e))
sleep(constants::MIN_PEER_CONNECTION_INTERVAL.saturating_mul(i as u32)).await; .await
task }
}) .in_current_span(),
)
}) })
.collect(); .collect();