diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 5f866d30e0..b0aefbef8d 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -142,7 +142,7 @@ fn prune_unstaked_connection_table( fn get_connection_stake( connection: &Connection, staked_nodes: Arc>, -) -> Option<(Pubkey, u64)> { +) -> Option<(Pubkey, u64, u64)> { connection .peer_identity() .and_then(|der_cert_any| der_cert_any.downcast::>().ok()) @@ -151,10 +151,11 @@ fn get_connection_stake( debug!("Peer public key is {:?}", pubkey); let staked_nodes = staked_nodes.read().unwrap(); + let total_stake = staked_nodes.total_stake; staked_nodes .pubkey_stake_map .get(&pubkey) - .map(|stake| (pubkey, *stake)) + .map(|stake| (pubkey, *stake, total_stake)) }) }) } @@ -185,6 +186,125 @@ pub fn compute_max_allowed_uni_streams( } } +enum ConnectionHandlerError { + ConnectionAddError, + MaxStreamError, +} + +struct NewConnectionHandlerParams { + packet_sender: Sender, + remote_pubkey: Option, + stake: u64, + total_stake: u64, + max_connections_per_peer: usize, + stats: Arc, +} + +impl NewConnectionHandlerParams { + fn new_unstaked( + packet_sender: Sender, + max_connections_per_peer: usize, + stats: Arc, + ) -> NewConnectionHandlerParams { + NewConnectionHandlerParams { + packet_sender, + remote_pubkey: None, + stake: 0, + total_stake: 0, + max_connections_per_peer, + stats, + } + } +} + +fn handle_and_cache_new_connection( + new_connection: NewConnection, + mut connection_table_l: MutexGuard, + connection_table: Arc>, + params: &NewConnectionHandlerParams, +) -> Result<(), ConnectionHandlerError> { + let NewConnection { + connection, + uni_streams, + .. + } = new_connection; + + if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( + connection_table_l.peer_type, + params.stake, + params.total_stake, + ) as u64) + { + connection.set_max_concurrent_uni_streams(max_uni_streams); + debug!( + "Peer type: {:?}, stake {}, total stake {}, max streams {}", + connection_table_l.peer_type, + params.stake, + params.total_stake, + max_uni_streams.into_inner() + ); + + let remote_addr = connection.remote_address(); + + if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), + remote_addr.port(), + Some(connection), + params.stake, + timing::timestamp(), + params.max_connections_per_peer, + ) { + drop(connection_table_l); + tokio::spawn(handle_connection( + uni_streams, + params.packet_sender.clone(), + remote_addr, + params.remote_pubkey, + last_update, + connection_table, + stream_exit, + params.stats.clone(), + params.stake, + )); + Ok(()) + } else { + params + .stats + .connection_add_failed + .fetch_add(1, Ordering::Relaxed); + Err(ConnectionHandlerError::ConnectionAddError) + } + } else { + params + .stats + .connection_add_failed_invalid_stream_count + .fetch_add(1, Ordering::Relaxed); + Err(ConnectionHandlerError::MaxStreamError) + } +} + +fn prune_unstaked_connections_and_add_new_connection( + new_connection: NewConnection, + mut connection_table_l: MutexGuard, + connection_table: Arc>, + max_connections: usize, + params: &NewConnectionHandlerParams, +) -> Result<(), ConnectionHandlerError> { + let stats = params.stats.clone(); + if max_connections > 0 { + prune_unstaked_connection_table(&mut connection_table_l, max_connections, stats); + handle_and_cache_new_connection( + new_connection, + connection_table_l, + connection_table, + params, + ) + } else { + new_connection.connection.close(0u32.into(), &[0u8]); + Err(ConnectionHandlerError::ConnectionAddError) + } +} + async fn setup_connection( connecting: Connecting, unstaked_connection_table: Arc>, @@ -205,126 +325,76 @@ async fn setup_connection( if let Ok(new_connection) = connecting_result { stats.total_connections.fetch_add(1, Ordering::Relaxed); stats.total_new_connections.fetch_add(1, Ordering::Relaxed); - let NewConnection { - connection, - uni_streams, - .. - } = new_connection; - let remote_addr = connection.remote_address(); - let mut remote_pubkey = None; - - let table_and_stake = { - let (some_pubkey, stake) = get_connection_stake(&connection, staked_nodes.clone()) - .map_or((None, 0), |(pubkey, stake)| (Some(pubkey), stake)); - if stake > 0 { - remote_pubkey = some_pubkey; - let mut connection_table_l = staked_connection_table.lock().unwrap(); - if connection_table_l.total_size >= max_staked_connections { - let num_pruned = connection_table_l.prune_random(stake); - if num_pruned == 0 { - if max_unstaked_connections > 0 { - // If we couldn't prune a connection in the staked connection table, let's - // put this connection in the unstaked connection table. If needed, prune a - // connection from the unstaked connection table. - connection_table_l = unstaked_connection_table.lock().unwrap(); - prune_unstaked_connection_table( - &mut connection_table_l, - max_unstaked_connections, - stats.clone(), - ); - Some((connection_table_l, stake)) - } else { - stats - .connection_add_failed_on_pruning - .fetch_add(1, Ordering::Relaxed); - None - } - } else { - stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); - Some((connection_table_l, stake)) - } - } else { - Some((connection_table_l, stake)) - } - } else if max_unstaked_connections > 0 { - let mut connection_table_l = unstaked_connection_table.lock().unwrap(); - prune_unstaked_connection_table( - &mut connection_table_l, - max_unstaked_connections, + let params = get_connection_stake(&new_connection.connection, staked_nodes.clone()) + .map_or( + NewConnectionHandlerParams::new_unstaked( + packet_sender.clone(), + max_connections_per_peer, stats.clone(), - ); - Some((connection_table_l, 0)) - } else { - None - } - }; - - if let Some((mut connection_table_l, stake)) = table_and_stake { - let table_type = connection_table_l.peer_type; - let total_stake = staked_nodes.read().map_or(0, |stakes| stakes.total_stake); - drop(staked_nodes); - - let max_uni_streams = - VarInt::from_u64( - compute_max_allowed_uni_streams(table_type, stake, total_stake) as u64, - ); - - debug!( - "Peer type: {:?}, stake {}, total stake {}, max streams {}", - table_type, - stake, - total_stake, - max_uni_streams.unwrap().into_inner() + ), + |(pubkey, stake, total_stake)| NewConnectionHandlerParams { + packet_sender, + remote_pubkey: Some(pubkey), + stake, + total_stake, + max_connections_per_peer, + stats: stats.clone(), + }, ); - if let Ok(max_uni_streams) = max_uni_streams { - connection.set_max_concurrent_uni_streams(max_uni_streams); - if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), remote_pubkey), - remote_addr.port(), - Some(connection), - stake, - timing::timestamp(), - max_connections_per_peer, + if params.stake > 0 { + let mut connection_table_l = staked_connection_table.lock().unwrap(); + if connection_table_l.total_size >= max_staked_connections { + let num_pruned = connection_table_l.prune_random(params.stake); + stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed); + } + + if connection_table_l.total_size < max_staked_connections { + if let Ok(()) = handle_and_cache_new_connection( + new_connection, + connection_table_l, + staked_connection_table.clone(), + ¶ms, ) { - drop(connection_table_l); - let stats = stats.clone(); - let connection_table = match table_type { - ConnectionPeerType::Unstaked => { - stats - .connection_added_from_unstaked_peer - .fetch_add(1, Ordering::Relaxed); - unstaked_connection_table.clone() - } - ConnectionPeerType::Staked => { - stats - .connection_added_from_staked_peer - .fetch_add(1, Ordering::Relaxed); - staked_connection_table.clone() - } - }; - tokio::spawn(handle_connection( - uni_streams, - packet_sender, - remote_addr, - remote_pubkey, - last_update, - connection_table, - stream_exit, - stats, - stake, - )); - } else { - stats.connection_add_failed.fetch_add(1, Ordering::Relaxed); + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); } } else { - stats - .connection_add_failed_invalid_stream_count - .fetch_add(1, Ordering::Relaxed); + // If we couldn't prune a connection in the staked connection table, let's + // put this connection in the unstaked connection table. If needed, prune a + // connection from the unstaked connection table. + if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + new_connection, + unstaked_connection_table.lock().unwrap(), + unstaked_connection_table.clone(), + max_unstaked_connections, + ¶ms, + ) { + stats + .connection_added_from_staked_peer + .fetch_add(1, Ordering::Relaxed); + } else { + stats + .connection_add_failed_on_pruning + .fetch_add(1, Ordering::Relaxed); + stats + .connection_add_failed_staked_node + .fetch_add(1, Ordering::Relaxed); + } } + } else if let Ok(()) = prune_unstaked_connections_and_add_new_connection( + new_connection, + unstaked_connection_table.lock().unwrap(), + unstaked_connection_table.clone(), + max_unstaked_connections, + ¶ms, + ) { + stats + .connection_added_from_unstaked_peer + .fetch_add(1, Ordering::Relaxed); } else { - connection.close(0u32.into(), &[0u8]); stats .connection_add_failed_unstaked_node .fetch_add(1, Ordering::Relaxed); diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index bfc445444a..c350fc2430 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -133,6 +133,7 @@ pub struct StreamStats { pub(crate) connection_added_from_unstaked_peer: AtomicUsize, pub(crate) connection_add_failed: AtomicUsize, pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize, + pub(crate) connection_add_failed_staked_node: AtomicUsize, pub(crate) connection_add_failed_unstaked_node: AtomicUsize, pub(crate) connection_add_failed_on_pruning: AtomicUsize, pub(crate) connection_setup_timeout: AtomicUsize, @@ -193,6 +194,12 @@ impl StreamStats { .swap(0, Ordering::Relaxed), i64 ), + ( + "connection_add_failed_staked_node", + self.connection_add_failed_staked_node + .swap(0, Ordering::Relaxed), + i64 + ), ( "connection_add_failed_unstaked_node", self.connection_add_failed_unstaked_node