Stop ignoring panics in inbound handshakes (#3192)
This commit is contained in:
parent
ee073c0876
commit
ba42d59f12
|
@ -483,8 +483,24 @@ where
|
||||||
{
|
{
|
||||||
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
|
let mut active_inbound_connections = ActiveConnectionCounter::new_counter();
|
||||||
|
|
||||||
|
let mut handshakes = FuturesUnordered::new();
|
||||||
|
// Keeping an unresolved future in the pool means the stream never terminates.
|
||||||
|
handshakes.push(future::pending().boxed());
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let inbound_result = listener.accept().await;
|
// Check for panics in finished tasks, before accepting new connections
|
||||||
|
let inbound_result = tokio::select! {
|
||||||
|
biased;
|
||||||
|
next_handshake_res = handshakes.next() => match next_handshake_res {
|
||||||
|
// The task has already sent the peer change to the peer set.
|
||||||
|
Some(Ok(_)) => continue,
|
||||||
|
Some(Err(task_panic)) => panic!("panic in inbound handshake task: {:?}", task_panic),
|
||||||
|
None => unreachable!("handshakes never terminates, because it contains a future that never resolves"),
|
||||||
|
},
|
||||||
|
|
||||||
|
inbound_result = listener.accept() => inbound_result,
|
||||||
|
};
|
||||||
|
|
||||||
if let Ok((tcp_stream, addr)) = inbound_result {
|
if let Ok((tcp_stream, addr)) = inbound_result {
|
||||||
if active_inbound_connections.update_count()
|
if active_inbound_connections.update_count()
|
||||||
>= config.peerset_inbound_connection_limit()
|
>= config.peerset_inbound_connection_limit()
|
||||||
|
@ -521,14 +537,21 @@ where
|
||||||
// ... instead, spawn a new task to handle this connection
|
// ... instead, spawn a new task to handle this connection
|
||||||
{
|
{
|
||||||
let mut peerset_tx = peerset_tx.clone();
|
let mut peerset_tx = peerset_tx.clone();
|
||||||
tokio::spawn(
|
|
||||||
|
let handshake_task = tokio::spawn(
|
||||||
async move {
|
async move {
|
||||||
if let Ok(client) = handshake.await {
|
let handshake_result = handshake.await;
|
||||||
|
|
||||||
|
if let Ok(client) = handshake_result {
|
||||||
let _ = peerset_tx.send(Ok((addr, client))).await;
|
let _ = peerset_tx.send(Ok((addr, client))).await;
|
||||||
|
} else {
|
||||||
|
debug!(?handshake_result, "error handshaking with inbound peer");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
.instrument(handshaker_span),
|
.instrument(handshaker_span),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
handshakes.push(Box::pin(handshake_task));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Only spawn one inbound connection handshake per `MIN_PEER_CONNECTION_INTERVAL`.
|
// Only spawn one inbound connection handshake per `MIN_PEER_CONNECTION_INTERVAL`.
|
||||||
|
|
Loading…
Reference in New Issue