Set receive_window per quic connection (#26936)

This change sets the receive_window for non-staked node to 1 * PACKET_DATA_SIZE, and maps the staked nodes's connection's receive_window between 1.2 * PACKET_DATA_SIZE to 10 * PACKET_DATA_SIZE based on the stakes.

The changes is based on Quinn library change to support per connection receive_window tweak at the server side. quinn-rs/quinn#1393
This commit is contained in:
Lijun Wang 2022-08-09 10:02:47 -07:00 committed by GitHub
parent f7c6901191
commit a69470fd45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 263 additions and 32 deletions

63
Cargo.lock generated
View File

@ -3562,8 +3562,26 @@ dependencies = [
"futures-channel",
"futures-util",
"fxhash",
"quinn-proto",
"quinn-udp",
"quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"quinn-udp 0.1.0",
"rustls 0.20.6",
"thiserror",
"tokio",
"tracing",
"webpki 0.22.0",
]
[[package]]
name = "quinn"
version = "0.8.3"
source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"fxhash",
"quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"quinn-udp 0.1.3",
"rustls 0.20.6",
"thiserror",
"tokio",
@ -3591,6 +3609,25 @@ dependencies = [
"webpki 0.22.0",
]
[[package]]
name = "quinn-proto"
version = "0.8.3"
source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd"
dependencies = [
"bytes",
"fxhash",
"rand 0.8.5",
"ring",
"rustls 0.20.6",
"rustls-native-certs",
"rustls-pemfile 0.2.1",
"slab",
"thiserror",
"tinyvec",
"tracing",
"webpki 0.22.0",
]
[[package]]
name = "quinn-udp"
version = "0.1.0"
@ -3600,7 +3637,20 @@ dependencies = [
"futures-util",
"libc",
"mio",
"quinn-proto",
"quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"socket2",
"tokio",
"tracing",
]
[[package]]
name = "quinn-udp"
version = "0.1.3"
source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd"
dependencies = [
"futures-util",
"libc",
"quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"socket2",
"tokio",
"tracing",
@ -4939,8 +4989,8 @@ dependencies = [
"jsonrpc-http-server",
"lazy_static",
"log",
"quinn",
"quinn-proto",
"quinn 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
@ -6291,7 +6341,8 @@ dependencies = [
"pem",
"percentage",
"pkcs8",
"quinn",
"quinn 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"rand 0.7.3",
"rcgen",
"rustls 0.20.6",

View File

@ -36,11 +36,15 @@ impl StakedNodesUpdaterService {
let mut new_ip_to_stake = HashMap::new();
let mut new_id_to_stake = HashMap::new();
let mut total_stake = 0;
let mut max_stake: u64 = 0;
let mut min_stake: u64 = u64::MAX;
if Self::try_refresh_stake_maps(
&mut last_stakes,
&mut new_ip_to_stake,
&mut new_id_to_stake,
&mut total_stake,
&mut max_stake,
&mut min_stake,
&bank_forks,
&cluster_info,
) {
@ -61,16 +65,21 @@ impl StakedNodesUpdaterService {
ip_to_stake: &mut HashMap<IpAddr, u64>,
id_to_stake: &mut HashMap<Pubkey, u64>,
total_stake: &mut u64,
max_stake: &mut u64,
min_stake: &mut u64,
bank_forks: &RwLock<BankForks>,
cluster_info: &ClusterInfo,
) -> bool {
if last_stakes.elapsed() > IP_TO_STAKE_REFRESH_DURATION {
let root_bank = bank_forks.read().unwrap().root_bank();
let staked_nodes = root_bank.staked_nodes();
*total_stake = staked_nodes
.iter()
.map(|(_pubkey, stake)| stake)
.sum::<u64>();
for stake in staked_nodes.values() {
*total_stake += stake;
*max_stake = *stake.max(max_stake);
*min_stake = *stake.min(min_stake);
}
*id_to_stake = cluster_info
.tvu_peers()
.into_iter()

View File

@ -3273,8 +3273,26 @@ dependencies = [
"futures-channel",
"futures-util",
"fxhash",
"quinn-proto",
"quinn-udp",
"quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"quinn-udp 0.1.1",
"rustls 0.20.6",
"thiserror",
"tokio",
"tracing",
"webpki 0.22.0",
]
[[package]]
name = "quinn"
version = "0.8.3"
source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd"
dependencies = [
"bytes",
"futures-channel",
"futures-util",
"fxhash",
"quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"quinn-udp 0.1.3",
"rustls 0.20.6",
"thiserror",
"tokio",
@ -3302,6 +3320,25 @@ dependencies = [
"webpki 0.22.0",
]
[[package]]
name = "quinn-proto"
version = "0.8.3"
source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd"
dependencies = [
"bytes",
"fxhash",
"rand 0.8.5",
"ring",
"rustls 0.20.6",
"rustls-native-certs",
"rustls-pemfile 0.2.1",
"slab",
"thiserror",
"tinyvec",
"tracing",
"webpki 0.22.0",
]
[[package]]
name = "quinn-udp"
version = "0.1.1"
@ -3311,7 +3348,20 @@ dependencies = [
"futures-util",
"libc",
"mio",
"quinn-proto",
"quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"socket2",
"tokio",
"tracing",
]
[[package]]
name = "quinn-udp"
version = "0.1.3"
source = "git+https://github.com/quinn-rs/quinn.git?branch=0.8.x#37c19743cc881cf71369946d572849d5d2ffc3fd"
dependencies = [
"futures-util",
"libc",
"quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"socket2",
"tokio",
"tracing",
@ -4650,8 +4700,8 @@ dependencies = [
"jsonrpc-core",
"lazy_static",
"log",
"quinn",
"quinn-proto",
"quinn 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"quinn-proto 0.8.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3",
"rand_chacha 0.2.2",
"rayon",
@ -5630,7 +5680,8 @@ dependencies = [
"pem",
"percentage",
"pkcs8",
"quinn",
"quinn 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"quinn-proto 0.8.3 (git+https://github.com/quinn-rs/quinn.git?branch=0.8.x)",
"rand 0.7.3",
"rcgen",
"rustls 0.20.6",

View File

@ -12,3 +12,15 @@ pub const QUIC_KEEP_ALIVE_MS: u64 = 1_000;
// applications. Different applications vary, but most seem to
// be in the 30-60 second range
pub const QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS: u64 = 60_000;
/// The receive window for QUIC connection from unstaked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO: u64 = 1;
/// The receive window for QUIC connection from minimum staked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO: u64 = 2;
/// The receive window for QUIC connection from maximum staked nodes is
/// set to this ratio times [`solana_sdk::packet::PACKET_DATA_SIZE`]
pub const QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO: u64 = 10;

View File

@ -21,7 +21,9 @@ nix = "0.24.2"
pem = "1.0.2"
percentage = "0.1.0"
pkcs8 = { version = "0.8.0", features = ["alloc"] }
quinn = "0.8.3"
quinn = {git = "https://github.com/quinn-rs/quinn.git", branch = "0.8.x", commit = "37c19743cc881cf71369946d572849d5d2ffc3fd"}
quinn-proto = {git = "https://github.com/quinn-rs/quinn.git", branch = "0.8.x", commit = "37c19743cc881cf71369946d572849d5d2ffc3fd"}
rand = "0.7.0"
rcgen = "0.9.2"
rustls = { version = "0.20.6", features = ["dangerous_configuration"] }

View File

@ -12,14 +12,16 @@ use {
Connecting, Connection, Endpoint, EndpointConfig, Incoming, IncomingUniStreams,
NewConnection, VarInt,
},
quinn_proto::VarIntBoundsExceeded,
rand::{thread_rng, Rng},
solana_perf::packet::PacketBatch,
solana_sdk::{
packet::{Packet, PACKET_DATA_SIZE},
pubkey::Pubkey,
quic::{
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_CONNECTION_HANDSHAKE_TIMEOUT_MS, QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO,
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS, QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO, QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO,
},
signature::Keypair,
timing,
@ -142,7 +144,7 @@ fn prune_unstaked_connection_table(
fn get_connection_stake(
connection: &Connection,
staked_nodes: Arc<RwLock<StakedNodes>>,
) -> Option<(Pubkey, u64, u64)> {
) -> Option<(Pubkey, u64, u64, u64, u64)> {
connection
.peer_identity()
.and_then(|der_cert_any| der_cert_any.downcast::<Vec<rustls::Certificate>>().ok())
@ -152,10 +154,12 @@ fn get_connection_stake(
let staked_nodes = staked_nodes.read().unwrap();
let total_stake = staked_nodes.total_stake;
let max_stake = staked_nodes.max_stake;
let min_stake = staked_nodes.min_stake;
staked_nodes
.pubkey_stake_map
.get(&pubkey)
.map(|stake| (pubkey, *stake, total_stake))
.map(|stake| (pubkey, *stake, total_stake, max_stake, min_stake))
})
})
}
@ -198,6 +202,8 @@ struct NewConnectionHandlerParams {
total_stake: u64,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
max_stake: u64,
min_stake: u64,
}
impl NewConnectionHandlerParams {
@ -213,6 +219,8 @@ impl NewConnectionHandlerParams {
total_stake: 0,
max_connections_per_peer,
stats,
max_stake: 0,
min_stake: 0,
}
}
}
@ -236,16 +244,29 @@ fn handle_and_cache_new_connection(
) as u64)
{
connection.set_max_concurrent_uni_streams(max_uni_streams);
let receive_window = compute_recieve_window(
params.max_stake,
params.min_stake,
connection_table_l.peer_type,
params.stake,
);
if let Ok(receive_window) = receive_window {
connection.set_receive_window(receive_window);
}
let remote_addr = connection.remote_address();
debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {}",
"Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}",
connection_table_l.peer_type,
params.stake,
params.total_stake,
max_uni_streams.into_inner()
max_uni_streams.into_inner(),
receive_window,
remote_addr,
);
let remote_addr = connection.remote_address();
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
@ -305,6 +326,50 @@ fn prune_unstaked_connections_and_add_new_connection(
}
}
/// Calculate the ratio for per connection receive window from a staked peer
fn compute_receive_window_ratio_for_staked_node(max_stake: u64, min_stake: u64, stake: u64) -> u64 {
// Testing shows the maximum througput from a connection is achieved at receive_window =
// PACKET_DATA_SIZE * 10. Beyond that, there is not much gain. We linearly map the
// stake to the ratio range from QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO to
// QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO. Where the linear algebra of finding the ratio 'r'
// for stake 's' is,
// r(s) = a * s + b. Given the max_stake, min_stake, max_ratio, min_ratio, we can find
// a and b.
if stake > max_stake {
return QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
}
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
let min_ratio = QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO;
if max_stake > min_stake {
let a = (max_ratio - min_ratio) as f64 / (max_stake - min_stake) as f64;
let b = max_ratio as f64 - ((max_stake as f64) * a);
let ratio = (a * stake as f64) + b;
ratio.round() as u64
} else {
QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO
}
}
fn compute_recieve_window(
max_stake: u64,
min_stake: u64,
peer_type: ConnectionPeerType,
peer_stake: u64,
) -> Result<VarInt, VarIntBoundsExceeded> {
match peer_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64((PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO) as u64)
}
ConnectionPeerType::Staked => {
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
VarInt::from_u64((PACKET_DATA_SIZE as u64 * ratio) as u64)
}
}
}
async fn setup_connection(
connecting: Connecting,
unstaked_connection_table: Arc<Mutex<ConnectionTable>>,
@ -333,13 +398,17 @@ async fn setup_connection(
max_connections_per_peer,
stats.clone(),
),
|(pubkey, stake, total_stake)| NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
max_stake,
min_stake,
}
},
);
@ -1475,6 +1544,7 @@ pub mod test {
}
#[test]
fn test_max_allowed_uni_streams() {
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0),
@ -1525,4 +1595,38 @@ pub mod test {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}
#[test]
fn test_cacluate_receive_window_ratio_for_staked_node() {
let mut max_stake = 10000;
let mut min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, min_stake);
assert_eq!(ratio, QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO);
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
let max_ratio = QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO;
assert_eq!(ratio, max_ratio);
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake / 2);
let average_ratio =
(QUIC_MAX_STAKED_RECEIVE_WINDOW_RATIO + QUIC_MIN_STAKED_RECEIVE_WINDOW_RATIO) / 2;
assert_eq!(ratio, average_ratio);
max_stake = 10000;
min_stake = 10000;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);
max_stake = 0;
min_stake = 0;
let ratio = compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake);
assert_eq!(ratio, max_ratio);
max_stake = 1000;
min_stake = 10;
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, max_stake + 10);
assert_eq!(ratio, max_ratio);
}
}

View File

@ -28,6 +28,8 @@ use {
#[derive(Default)]
pub struct StakedNodes {
pub total_stake: u64,
pub max_stake: u64,
pub min_stake: u64,
pub ip_stake_map: HashMap<IpAddr, u64>,
pub pubkey_stake_map: HashMap<Pubkey, u64>,
}