Refactor QUIC new connection handler function (#26855)
* Refactor QUIC new connection handler function * cleanup setup_connection * more cleanup
This commit is contained in:
parent
5bc81a6c35
commit
3c15d26840
|
@ -142,7 +142,7 @@ fn prune_unstaked_connection_table(
|
|||
fn get_connection_stake(
|
||||
connection: &Connection,
|
||||
staked_nodes: Arc<RwLock<StakedNodes>>,
|
||||
) -> Option<(Pubkey, u64)> {
|
||||
) -> Option<(Pubkey, u64, u64)> {
|
||||
connection
|
||||
.peer_identity()
|
||||
.and_then(|der_cert_any| der_cert_any.downcast::<Vec<rustls::Certificate>>().ok())
|
||||
|
@ -151,10 +151,11 @@ fn get_connection_stake(
|
|||
debug!("Peer public key is {:?}", pubkey);
|
||||
|
||||
let staked_nodes = staked_nodes.read().unwrap();
|
||||
let total_stake = staked_nodes.total_stake;
|
||||
staked_nodes
|
||||
.pubkey_stake_map
|
||||
.get(&pubkey)
|
||||
.map(|stake| (pubkey, *stake))
|
||||
.map(|stake| (pubkey, *stake, total_stake))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
@ -185,6 +186,125 @@ pub fn compute_max_allowed_uni_streams(
|
|||
}
|
||||
}
|
||||
|
||||
enum ConnectionHandlerError {
|
||||
ConnectionAddError,
|
||||
MaxStreamError,
|
||||
}
|
||||
|
||||
struct NewConnectionHandlerParams {
|
||||
packet_sender: Sender<PacketBatch>,
|
||||
remote_pubkey: Option<Pubkey>,
|
||||
stake: u64,
|
||||
total_stake: u64,
|
||||
max_connections_per_peer: usize,
|
||||
stats: Arc<StreamStats>,
|
||||
}
|
||||
|
||||
impl NewConnectionHandlerParams {
|
||||
fn new_unstaked(
|
||||
packet_sender: Sender<PacketBatch>,
|
||||
max_connections_per_peer: usize,
|
||||
stats: Arc<StreamStats>,
|
||||
) -> NewConnectionHandlerParams {
|
||||
NewConnectionHandlerParams {
|
||||
packet_sender,
|
||||
remote_pubkey: None,
|
||||
stake: 0,
|
||||
total_stake: 0,
|
||||
max_connections_per_peer,
|
||||
stats,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_and_cache_new_connection(
|
||||
new_connection: NewConnection,
|
||||
mut connection_table_l: MutexGuard<ConnectionTable>,
|
||||
connection_table: Arc<Mutex<ConnectionTable>>,
|
||||
params: &NewConnectionHandlerParams,
|
||||
) -> Result<(), ConnectionHandlerError> {
|
||||
let NewConnection {
|
||||
connection,
|
||||
uni_streams,
|
||||
..
|
||||
} = new_connection;
|
||||
|
||||
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
|
||||
connection_table_l.peer_type,
|
||||
params.stake,
|
||||
params.total_stake,
|
||||
) as u64)
|
||||
{
|
||||
connection.set_max_concurrent_uni_streams(max_uni_streams);
|
||||
debug!(
|
||||
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
|
||||
connection_table_l.peer_type,
|
||||
params.stake,
|
||||
params.total_stake,
|
||||
max_uni_streams.into_inner()
|
||||
);
|
||||
|
||||
let remote_addr = connection.remote_address();
|
||||
|
||||
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
|
||||
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
|
||||
remote_addr.port(),
|
||||
Some(connection),
|
||||
params.stake,
|
||||
timing::timestamp(),
|
||||
params.max_connections_per_peer,
|
||||
) {
|
||||
drop(connection_table_l);
|
||||
tokio::spawn(handle_connection(
|
||||
uni_streams,
|
||||
params.packet_sender.clone(),
|
||||
remote_addr,
|
||||
params.remote_pubkey,
|
||||
last_update,
|
||||
connection_table,
|
||||
stream_exit,
|
||||
params.stats.clone(),
|
||||
params.stake,
|
||||
));
|
||||
Ok(())
|
||||
} else {
|
||||
params
|
||||
.stats
|
||||
.connection_add_failed
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
Err(ConnectionHandlerError::ConnectionAddError)
|
||||
}
|
||||
} else {
|
||||
params
|
||||
.stats
|
||||
.connection_add_failed_invalid_stream_count
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
Err(ConnectionHandlerError::MaxStreamError)
|
||||
}
|
||||
}
|
||||
|
||||
fn prune_unstaked_connections_and_add_new_connection(
|
||||
new_connection: NewConnection,
|
||||
mut connection_table_l: MutexGuard<ConnectionTable>,
|
||||
connection_table: Arc<Mutex<ConnectionTable>>,
|
||||
max_connections: usize,
|
||||
params: &NewConnectionHandlerParams,
|
||||
) -> Result<(), ConnectionHandlerError> {
|
||||
let stats = params.stats.clone();
|
||||
if max_connections > 0 {
|
||||
prune_unstaked_connection_table(&mut connection_table_l, max_connections, stats);
|
||||
handle_and_cache_new_connection(
|
||||
new_connection,
|
||||
connection_table_l,
|
||||
connection_table,
|
||||
params,
|
||||
)
|
||||
} else {
|
||||
new_connection.connection.close(0u32.into(), &[0u8]);
|
||||
Err(ConnectionHandlerError::ConnectionAddError)
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup_connection(
|
||||
connecting: Connecting,
|
||||
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
|
||||
|
@ -205,126 +325,76 @@ async fn setup_connection(
|
|||
if let Ok(new_connection) = connecting_result {
|
||||
stats.total_connections.fetch_add(1, Ordering::Relaxed);
|
||||
stats.total_new_connections.fetch_add(1, Ordering::Relaxed);
|
||||
let NewConnection {
|
||||
connection,
|
||||
uni_streams,
|
||||
..
|
||||
} = new_connection;
|
||||
|
||||
let remote_addr = connection.remote_address();
|
||||
let mut remote_pubkey = None;
|
||||
|
||||
let table_and_stake = {
|
||||
let (some_pubkey, stake) = get_connection_stake(&connection, staked_nodes.clone())
|
||||
.map_or((None, 0), |(pubkey, stake)| (Some(pubkey), stake));
|
||||
if stake > 0 {
|
||||
remote_pubkey = some_pubkey;
|
||||
let mut connection_table_l = staked_connection_table.lock().unwrap();
|
||||
if connection_table_l.total_size >= max_staked_connections {
|
||||
let num_pruned = connection_table_l.prune_random(stake);
|
||||
if num_pruned == 0 {
|
||||
if max_unstaked_connections > 0 {
|
||||
// If we couldn't prune a connection in the staked connection table, let's
|
||||
// put this connection in the unstaked connection table. If needed, prune a
|
||||
// connection from the unstaked connection table.
|
||||
connection_table_l = unstaked_connection_table.lock().unwrap();
|
||||
prune_unstaked_connection_table(
|
||||
&mut connection_table_l,
|
||||
max_unstaked_connections,
|
||||
stats.clone(),
|
||||
);
|
||||
Some((connection_table_l, stake))
|
||||
} else {
|
||||
stats
|
||||
.connection_add_failed_on_pruning
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
None
|
||||
}
|
||||
} else {
|
||||
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
|
||||
Some((connection_table_l, stake))
|
||||
}
|
||||
} else {
|
||||
Some((connection_table_l, stake))
|
||||
}
|
||||
} else if max_unstaked_connections > 0 {
|
||||
let mut connection_table_l = unstaked_connection_table.lock().unwrap();
|
||||
prune_unstaked_connection_table(
|
||||
&mut connection_table_l,
|
||||
max_unstaked_connections,
|
||||
let params = get_connection_stake(&new_connection.connection, staked_nodes.clone())
|
||||
.map_or(
|
||||
NewConnectionHandlerParams::new_unstaked(
|
||||
packet_sender.clone(),
|
||||
max_connections_per_peer,
|
||||
stats.clone(),
|
||||
);
|
||||
Some((connection_table_l, 0))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
};
|
||||
|
||||
if let Some((mut connection_table_l, stake)) = table_and_stake {
|
||||
let table_type = connection_table_l.peer_type;
|
||||
let total_stake = staked_nodes.read().map_or(0, |stakes| stakes.total_stake);
|
||||
drop(staked_nodes);
|
||||
|
||||
let max_uni_streams =
|
||||
VarInt::from_u64(
|
||||
compute_max_allowed_uni_streams(table_type, stake, total_stake) as u64,
|
||||
);
|
||||
|
||||
debug!(
|
||||
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
|
||||
table_type,
|
||||
stake,
|
||||
total_stake,
|
||||
max_uni_streams.unwrap().into_inner()
|
||||
),
|
||||
|(pubkey, stake, total_stake)| NewConnectionHandlerParams {
|
||||
packet_sender,
|
||||
remote_pubkey: Some(pubkey),
|
||||
stake,
|
||||
total_stake,
|
||||
max_connections_per_peer,
|
||||
stats: stats.clone(),
|
||||
},
|
||||
);
|
||||
|
||||
if let Ok(max_uni_streams) = max_uni_streams {
|
||||
connection.set_max_concurrent_uni_streams(max_uni_streams);
|
||||
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
|
||||
ConnectionTableKey::new(remote_addr.ip(), remote_pubkey),
|
||||
remote_addr.port(),
|
||||
Some(connection),
|
||||
stake,
|
||||
timing::timestamp(),
|
||||
max_connections_per_peer,
|
||||
if params.stake > 0 {
|
||||
let mut connection_table_l = staked_connection_table.lock().unwrap();
|
||||
if connection_table_l.total_size >= max_staked_connections {
|
||||
let num_pruned = connection_table_l.prune_random(params.stake);
|
||||
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
if connection_table_l.total_size < max_staked_connections {
|
||||
if let Ok(()) = handle_and_cache_new_connection(
|
||||
new_connection,
|
||||
connection_table_l,
|
||||
staked_connection_table.clone(),
|
||||
¶ms,
|
||||
) {
|
||||
drop(connection_table_l);
|
||||
let stats = stats.clone();
|
||||
let connection_table = match table_type {
|
||||
ConnectionPeerType::Unstaked => {
|
||||
stats
|
||||
.connection_added_from_unstaked_peer
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
unstaked_connection_table.clone()
|
||||
}
|
||||
ConnectionPeerType::Staked => {
|
||||
stats
|
||||
.connection_added_from_staked_peer
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
staked_connection_table.clone()
|
||||
}
|
||||
};
|
||||
tokio::spawn(handle_connection(
|
||||
uni_streams,
|
||||
packet_sender,
|
||||
remote_addr,
|
||||
remote_pubkey,
|
||||
last_update,
|
||||
connection_table,
|
||||
stream_exit,
|
||||
stats,
|
||||
stake,
|
||||
));
|
||||
} else {
|
||||
stats.connection_add_failed.fetch_add(1, Ordering::Relaxed);
|
||||
stats
|
||||
.connection_added_from_staked_peer
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
} else {
|
||||
stats
|
||||
.connection_add_failed_invalid_stream_count
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
// If we couldn't prune a connection in the staked connection table, let's
|
||||
// put this connection in the unstaked connection table. If needed, prune a
|
||||
// connection from the unstaked connection table.
|
||||
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
|
||||
new_connection,
|
||||
unstaked_connection_table.lock().unwrap(),
|
||||
unstaked_connection_table.clone(),
|
||||
max_unstaked_connections,
|
||||
¶ms,
|
||||
) {
|
||||
stats
|
||||
.connection_added_from_staked_peer
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
stats
|
||||
.connection_add_failed_on_pruning
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
stats
|
||||
.connection_add_failed_staked_node
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
|
||||
new_connection,
|
||||
unstaked_connection_table.lock().unwrap(),
|
||||
unstaked_connection_table.clone(),
|
||||
max_unstaked_connections,
|
||||
¶ms,
|
||||
) {
|
||||
stats
|
||||
.connection_added_from_unstaked_peer
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
} else {
|
||||
connection.close(0u32.into(), &[0u8]);
|
||||
stats
|
||||
.connection_add_failed_unstaked_node
|
||||
.fetch_add(1, Ordering::Relaxed);
|
||||
|
|
|
@ -133,6 +133,7 @@ pub struct StreamStats {
|
|||
pub(crate) connection_added_from_unstaked_peer: AtomicUsize,
|
||||
pub(crate) connection_add_failed: AtomicUsize,
|
||||
pub(crate) connection_add_failed_invalid_stream_count: AtomicUsize,
|
||||
pub(crate) connection_add_failed_staked_node: AtomicUsize,
|
||||
pub(crate) connection_add_failed_unstaked_node: AtomicUsize,
|
||||
pub(crate) connection_add_failed_on_pruning: AtomicUsize,
|
||||
pub(crate) connection_setup_timeout: AtomicUsize,
|
||||
|
@ -193,6 +194,12 @@ impl StreamStats {
|
|||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_add_failed_staked_node",
|
||||
self.connection_add_failed_staked_node
|
||||
.swap(0, Ordering::Relaxed),
|
||||
i64
|
||||
),
|
||||
(
|
||||
"connection_add_failed_unstaked_node",
|
||||
self.connection_add_failed_unstaked_node
|
||||
|
|
Loading…
Reference in New Issue