Fix transaction chunking on QUIC batch send (#26642)
* Fix chunking of transaction at batch transmit via QUIC * clippy fixes
This commit is contained in:
parent
36cfa78fa0
commit
27866aeab4
|
@ -437,6 +437,21 @@ impl QuicClient {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn compute_chunk_length(num_buffers_to_chunk: usize, num_chunks: usize) -> usize {
|
||||
// The function is equivalent to checked div_ceil()
|
||||
// Also, if num_chunks == 0 || num_buffers_per_chunk == 0, return 1
|
||||
num_buffers_to_chunk
|
||||
.checked_div(num_chunks)
|
||||
.map_or(1, |value| {
|
||||
if num_buffers_to_chunk.checked_rem(num_chunks).unwrap_or(0) != 0 {
|
||||
value.saturating_add(1)
|
||||
} else {
|
||||
value
|
||||
}
|
||||
})
|
||||
.max(1)
|
||||
}
|
||||
|
||||
pub async fn send_batch<T>(
|
||||
&self,
|
||||
buffers: &[T],
|
||||
|
@ -468,9 +483,9 @@ impl QuicClient {
|
|||
// by just getting a reference to the NewConnection once
|
||||
let connection_ref: &NewConnection = &connection;
|
||||
|
||||
let chunks = buffers[1..buffers.len()]
|
||||
.iter()
|
||||
.chunks(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
|
||||
let chunk_len =
|
||||
Self::compute_chunk_length(buffers.len() - 1, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS);
|
||||
let chunks = buffers[1..buffers.len()].iter().chunks(chunk_len);
|
||||
|
||||
let futures: Vec<_> = chunks
|
||||
.into_iter()
|
||||
|
@ -572,3 +587,29 @@ impl TpuConnection for QuicTpuConnection {
|
|||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use crate::nonblocking::quic_client::QuicClient;
|
||||
|
||||
#[test]
|
||||
fn test_transaction_batch_chunking() {
|
||||
assert_eq!(QuicClient::compute_chunk_length(0, 0), 1);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 0), 1);
|
||||
assert_eq!(QuicClient::compute_chunk_length(0, 10), 1);
|
||||
assert_eq!(QuicClient::compute_chunk_length(usize::MAX, usize::MAX), 1);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, usize::MAX), 1);
|
||||
assert!(QuicClient::compute_chunk_length(usize::MAX, 10) == (usize::MAX / 10) + 1);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 1), 10);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 2), 5);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 3), 4);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 4), 3);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 5), 2);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 6), 2);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 7), 2);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 8), 2);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 9), 2);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 10), 1);
|
||||
assert_eq!(QuicClient::compute_chunk_length(10, 11), 1);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ pub const QUIC_PORT_OFFSET: u16 = 6;
|
|||
// that seems to maximize TPS on GCE (higher values don't seem to
|
||||
// give significant improvement or seem to impact stability)
|
||||
pub const QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS: usize = 128;
|
||||
pub const QUIC_MIN_STAKED_CONCURRENT_STREAMS: usize = 128;
|
||||
|
||||
pub const QUIC_MAX_TIMEOUT_MS: u32 = 2_000;
|
||||
pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
|
||||
|
|
|
@ -17,7 +17,10 @@ use {
|
|||
solana_sdk::{
|
||||
packet::{Packet, PACKET_DATA_SIZE},
|
||||
pubkey::Pubkey,
|
||||
quic::{QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS},
|
||||
quic::{
|
||||
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
|
||||
},
|
||||
signature::Keypair,
|
||||
timing,
|
||||
},
|
||||
|
@ -156,6 +159,34 @@ fn get_connection_stake(
|
|||
})
|
||||
}
|
||||
|
||||
fn compute_max_allowed_uni_streams(
|
||||
peer_type: ConnectionPeerType,
|
||||
peer_stake: u64,
|
||||
staked_nodes: Arc<RwLock<StakedNodes>>,
|
||||
) -> usize {
|
||||
if peer_stake == 0 {
|
||||
// Treat stake = 0 as unstaked
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
} else {
|
||||
match peer_type {
|
||||
ConnectionPeerType::Staked => {
|
||||
let staked_nodes = staked_nodes.read().unwrap();
|
||||
|
||||
// No checked math for f64 type. So let's explicitly check for 0 here
|
||||
if staked_nodes.total_stake == 0 {
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
} else {
|
||||
(((peer_stake as f64 / staked_nodes.total_stake as f64)
|
||||
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS as f64)
|
||||
as usize)
|
||||
.max(QUIC_MIN_STAKED_CONCURRENT_STREAMS)
|
||||
}
|
||||
}
|
||||
_ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn setup_connection(
|
||||
connecting: Connecting,
|
||||
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
|
||||
|
@ -233,19 +264,19 @@ async fn setup_connection(
|
|||
|
||||
if let Some((mut connection_table_l, stake)) = table_and_stake {
|
||||
let table_type = connection_table_l.peer_type;
|
||||
let max_uni_streams = match table_type {
|
||||
ConnectionPeerType::Unstaked => {
|
||||
VarInt::from_u64(QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS as u64)
|
||||
}
|
||||
ConnectionPeerType::Staked => {
|
||||
let staked_nodes = staked_nodes.read().unwrap();
|
||||
VarInt::from_u64(
|
||||
((stake as f64 / staked_nodes.total_stake as f64)
|
||||
* QUIC_TOTAL_STAKED_CONCURRENT_STREAMS)
|
||||
as u64,
|
||||
)
|
||||
}
|
||||
};
|
||||
let max_uni_streams = VarInt::from_u64(compute_max_allowed_uni_streams(
|
||||
table_type,
|
||||
stake,
|
||||
staked_nodes.clone(),
|
||||
) as u64);
|
||||
|
||||
debug!(
|
||||
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
|
||||
table_type,
|
||||
stake,
|
||||
staked_nodes.read().unwrap().total_stake,
|
||||
max_uni_streams.unwrap().into_inner()
|
||||
);
|
||||
|
||||
if let Ok(max_uni_streams) = max_uni_streams {
|
||||
connection.set_max_concurrent_uni_streams(max_uni_streams);
|
||||
|
@ -526,7 +557,7 @@ impl Drop for ConnectionEntry {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
#[derive(Copy, Clone, Debug)]
|
||||
enum ConnectionPeerType {
|
||||
Unstaked,
|
||||
Staked,
|
||||
|
@ -684,6 +715,7 @@ pub mod test {
|
|||
use {
|
||||
super::*,
|
||||
crate::{
|
||||
nonblocking::quic::compute_max_allowed_uni_streams,
|
||||
quic::{MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
|
||||
tls_certificates::new_self_signed_tls_certificate_chain,
|
||||
},
|
||||
|
@ -1371,4 +1403,62 @@ pub mod test {
|
|||
}
|
||||
assert_eq!(table.total_size, 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_max_allowed_uni_streams() {
|
||||
let staked_nodes = Arc::new(RwLock::new(StakedNodes::default()));
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes.clone()),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, staked_nodes.clone()),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()),
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
staked_nodes.write().unwrap().total_stake = 10000;
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, staked_nodes.clone()),
|
||||
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (10_f64)) as usize
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, staked_nodes.clone()),
|
||||
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS / (100_f64)) as usize
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, staked_nodes.clone()),
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1, staked_nodes.clone()),
|
||||
QUIC_MIN_STAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, staked_nodes.clone()),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(
|
||||
ConnectionPeerType::Unstaked,
|
||||
1000,
|
||||
staked_nodes.clone()
|
||||
),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, staked_nodes.clone()),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
assert_eq!(
|
||||
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, staked_nodes),
|
||||
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue