Parameterize max streams per ms (#707)

Make PPS a parameter instead of the hard coded
This commit is contained in:
Lijun Wang 2024-04-15 15:58:10 -07:00 committed by GitHub
parent f16b5cf1e6
commit f2aa4f0741
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 55 additions and 16 deletions

View File

@ -227,7 +227,8 @@ mod tests {
crossbeam_channel::unbounded,
solana_sdk::{net::DEFAULT_TPU_COALESCE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
},
std::{
@ -270,6 +271,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)

View File

@ -33,7 +33,7 @@ use {
solana_runtime::{bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache},
solana_sdk::{clock::Slot, pubkey::Pubkey, quic::NotifyKeyUpdate, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::{spawn_server, SpawnServerResult, MAX_STAKED_CONNECTIONS, MAX_UNSTAKED_CONNECTIONS},
streamer::StakedNodes,
},
@ -163,6 +163,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)
@ -183,6 +184,7 @@ impl Tpu {
staked_nodes.clone(),
MAX_STAKED_CONNECTIONS.saturating_add(MAX_UNSTAKED_CONNECTIONS),
0, // Prevent unstaked nodes from forwarding transactions
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
tpu_coalesce,
)

View File

@ -10,8 +10,10 @@ mod tests {
},
solana_sdk::{net::DEFAULT_TPU_COALESCE, packet::PACKET_DATA_SIZE, signature::Keypair},
solana_streamer::{
nonblocking::quic::DEFAULT_WAIT_FOR_CHUNK_TIMEOUT, quic::SpawnServerResult,
streamer::StakedNodes, tls_certificates::new_dummy_x509_certificate,
nonblocking::quic::{DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
quic::SpawnServerResult,
streamer::StakedNodes,
tls_certificates::new_dummy_x509_certificate,
},
std::{
net::{SocketAddr, UdpSocket},
@ -82,6 +84,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
@ -161,6 +164,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(1), // wait_for_chunk_timeout
DEFAULT_TPU_COALESCE,
)
@ -223,6 +227,7 @@ mod tests {
staked_nodes.clone(),
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
@ -251,6 +256,7 @@ mod tests {
staked_nodes,
10,
10,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)

View File

@ -1,8 +1,8 @@
use {
crate::{
nonblocking::stream_throttle::{
ConnectionStreamCounter, StakedStreamLoadEMA, MAX_STREAMS_PER_MS,
STREAM_STOP_CODE_THROTTLING, STREAM_THROTTLING_INTERVAL_MS,
ConnectionStreamCounter, StakedStreamLoadEMA, STREAM_STOP_CODE_THROTTLING,
STREAM_THROTTLING_INTERVAL_MS,
},
quic::{configure_server, QuicServerError, StreamStats},
streamer::StakedNodes,
@ -76,6 +76,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
/// Limit to 250K PPS
pub const DEFAULT_MAX_STREAMS_PER_MS: u64 = 250;
// A sequence of bytes that is part of a packet
// along with where in the packet it is
struct PacketChunk {
@ -124,6 +127,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<(Endpoint, Arc<StreamStats>, JoinHandle<()>), QuicServerError> {
@ -147,6 +151,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
coalesce,
@ -164,6 +169,7 @@ async fn run_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
@ -176,6 +182,7 @@ async fn run_server(
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
stats.clone(),
max_unstaked_connections,
max_streams_per_ms,
));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
@ -206,6 +213,7 @@ async fn run_server(
staked_nodes.clone(),
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
stats.clone(),
wait_for_chunk_timeout,
stream_load_ema.clone(),
@ -484,6 +492,7 @@ async fn setup_connection(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
stats: Arc<StreamStats>,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
@ -505,7 +514,7 @@ async fn setup_connection(
// The heuristic is that the stake should be large engouh to have 1 stream pass throuh within one throttle
// interval during which we allow max (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) streams.
let min_stake_ratio =
1_f64 / (MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS) as f64;
1_f64 / (max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS) as f64;
let stake_ratio = stake as f64 / total_stake as f64;
let peer_type = if stake_ratio < min_stake_ratio {
// If it is a staked connection with ultra low stake ratio, treat it as unstaked.
@ -1327,6 +1336,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
Duration::from_secs(2),
DEFAULT_TPU_COALESCE,
)
@ -1762,6 +1772,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
@ -1791,6 +1802,7 @@ pub mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)

View File

@ -11,8 +11,6 @@ use {
},
};
/// Limit to 250K PPS
pub const MAX_STREAMS_PER_MS: u64 = 250;
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
pub const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
pub const STREAM_STOP_CODE_THROTTLING: u32 = 15;
@ -35,14 +33,18 @@ pub(crate) struct StakedStreamLoadEMA {
}
impl StakedStreamLoadEMA {
pub(crate) fn new(stats: Arc<StreamStats>, max_unstaked_connections: usize) -> Self {
pub(crate) fn new(
stats: Arc<StreamStats>,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
) -> Self {
let allow_unstaked_streams = max_unstaked_connections > 0;
let max_staked_load_in_ema_window = if allow_unstaked_streams {
(MAX_STREAMS_PER_MS
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS))
(max_streams_per_ms
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(max_streams_per_ms))
* EMA_WINDOW_MS
} else {
MAX_STREAMS_PER_MS * EMA_WINDOW_MS
max_streams_per_ms * EMA_WINDOW_MS
};
let max_num_unstaked_connections =
@ -56,7 +58,7 @@ impl StakedStreamLoadEMA {
let max_unstaked_load_in_throttling_window = if allow_unstaked_streams {
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.apply_to(max_streams_per_ms * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_unstaked_connections)
} else {
0
@ -228,7 +230,9 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
nonblocking::{
quic::DEFAULT_MAX_STREAMS_PER_MS, stream_throttle::STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::{StreamStats, MAX_UNSTAKED_CONNECTIONS},
},
std::{
@ -242,6 +246,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
@ -258,6 +263,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
// EMA load is used for staked connections to calculate max number of allowed streams.
@ -349,6 +355,7 @@ pub mod test {
let load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
0,
DEFAULT_MAX_STREAMS_PER_MS,
));
// EMA load is used for staked connections to calculate max number of allowed streams.
@ -436,6 +443,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
@ -464,6 +472,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval
@ -483,6 +492,7 @@ pub mod test {
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(
Arc::new(StreamStats::default()),
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
));
stream_load_ema
.load_in_recent_interval

View File

@ -508,6 +508,7 @@ pub fn spawn_server(
staked_nodes: Arc<RwLock<StakedNodes>>,
max_staked_connections: usize,
max_unstaked_connections: usize,
max_streams_per_ms: u64,
wait_for_chunk_timeout: Duration,
coalesce: Duration,
) -> Result<SpawnServerResult, QuicServerError> {
@ -524,6 +525,7 @@ pub fn spawn_server(
staked_nodes,
max_staked_connections,
max_unstaked_connections,
max_streams_per_ms,
wait_for_chunk_timeout,
coalesce,
)
@ -550,7 +552,9 @@ pub fn spawn_server(
mod test {
use {
super::*,
crate::nonblocking::quic::{test::*, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT},
crate::nonblocking::quic::{
test::*, DEFAULT_MAX_STREAMS_PER_MS, DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
},
crossbeam_channel::unbounded,
solana_sdk::net::DEFAULT_TPU_COALESCE,
std::net::SocketAddr,
@ -583,6 +587,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
@ -642,6 +647,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
MAX_UNSTAKED_CONNECTIONS,
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)
@ -688,6 +694,7 @@ mod test {
staked_nodes,
MAX_STAKED_CONNECTIONS,
0, // Do not allow any connection from unstaked clients/nodes
DEFAULT_MAX_STREAMS_PER_MS,
DEFAULT_WAIT_FOR_CHUNK_TIMEOUT,
DEFAULT_TPU_COALESCE,
)