Fix determination of staked QUIC connections (#34760)

* Fix determination of staked QUIC connections

* address review comments

* review comments

* treat connections with zero stake as unstaked
This commit is contained in:
Pankaj Garg 2024-01-13 18:38:31 -08:00 committed by GitHub
parent 9468996c99
commit f92275bcaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 183 additions and 240 deletions

View File

@ -133,21 +133,21 @@ impl QuicConfig {
}
fn compute_max_parallel_streams(&self) -> usize {
let (client_type, stake, total_stake) =
let (client_type, total_stake) =
self.maybe_client_pubkey
.map_or((ConnectionPeerType::Unstaked, 0, 0), |pubkey| {
.map_or((ConnectionPeerType::Unstaked, 0), |pubkey| {
self.maybe_staked_nodes.as_ref().map_or(
(ConnectionPeerType::Unstaked, 0, 0),
(ConnectionPeerType::Unstaked, 0),
|stakes| {
let rstakes = stakes.read().unwrap();
rstakes.get_node_stake(&pubkey).map_or(
(ConnectionPeerType::Unstaked, 0, rstakes.total_stake()),
|stake| (ConnectionPeerType::Staked, stake, rstakes.total_stake()),
(ConnectionPeerType::Unstaked, rstakes.total_stake()),
|stake| (ConnectionPeerType::Staked(stake), rstakes.total_stake()),
)
},
)
});
compute_max_allowed_uni_streams(client_type, stake, total_stake)
compute_max_allowed_uni_streams(client_type, total_stake)
}
pub fn update_client_certificate(

View File

@ -83,6 +83,18 @@ struct PacketAccumulator {
pub chunks: Vec<PacketChunk>,
}
#[derive(Copy, Clone, Debug)]
pub enum ConnectionPeerType {
Unstaked,
Staked(u64),
}
impl ConnectionPeerType {
fn is_staked(&self) -> bool {
matches!(self, ConnectionPeerType::Staked(_))
}
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_server(
name: &'static str,
@ -142,12 +154,11 @@ async fn run_server(
const WAIT_FOR_CONNECTION_TIMEOUT: Duration = Duration::from_secs(1);
debug!("spawn quic server");
let mut last_datapoint = Instant::now();
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> = Arc::new(Mutex::new(
ConnectionTable::new(ConnectionPeerType::Unstaked),
));
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new()));
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone()));
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
Arc::new(Mutex::new(ConnectionTable::new()));
let (sender, receiver) = async_unbounded();
tokio::spawn(packet_batch_sender(
packet_sender,
@ -227,40 +238,30 @@ fn get_connection_stake(
))
}
pub fn compute_max_allowed_uni_streams(
peer_type: ConnectionPeerType,
peer_stake: u64,
total_stake: u64,
) -> usize {
// Treat stake = 0 as unstaked
if peer_stake == 0 {
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
} else {
match peer_type {
ConnectionPeerType::Staked => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
warn!(
"Invalid stake values: peer_stake: {:?}, total_stake: {:?}",
peer_stake, total_stake,
);
pub fn compute_max_allowed_uni_streams(peer_type: ConnectionPeerType, total_stake: u64) -> usize {
match peer_type {
ConnectionPeerType::Staked(peer_stake) => {
// No checked math for f64 type. So let's explicitly check for 0 here
if total_stake == 0 || peer_stake > total_stake {
warn!(
"Invalid stake values: peer_stake: {:?}, total_stake: {:?}",
peer_stake, total_stake,
);
QUIC_MIN_STAKED_CONCURRENT_STREAMS
} else {
let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS
- QUIC_MIN_STAKED_CONCURRENT_STREAMS)
as f64;
QUIC_MIN_STAKED_CONCURRENT_STREAMS
} else {
let delta = (QUIC_TOTAL_STAKED_CONCURRENT_STREAMS
- QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
(((peer_stake as f64 / total_stake as f64) * delta) as usize
+ QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.clamp(
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
)
}
(((peer_stake as f64 / total_stake as f64) * delta) as usize
+ QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.clamp(
QUIC_MIN_STAKED_CONCURRENT_STREAMS,
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
)
}
_ => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
ConnectionPeerType::Unstaked => QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS,
}
}
@ -278,7 +279,7 @@ struct NewConnectionHandlerParams {
// we're sticking with an async channel
packet_sender: AsyncSender<PacketAccumulator>,
remote_pubkey: Option<Pubkey>,
stake: u64,
peer_type: ConnectionPeerType,
total_stake: u64,
max_connections_per_peer: usize,
stats: Arc<StreamStats>,
@ -295,7 +296,7 @@ impl NewConnectionHandlerParams {
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: None,
stake: 0,
peer_type: ConnectionPeerType::Unstaked,
total_stake: 0,
max_connections_per_peer,
stats,
@ -314,18 +315,13 @@ fn handle_and_cache_new_connection(
stream_load_ema: Arc<StakedStreamLoadEMA>,
) -> Result<(), ConnectionHandlerError> {
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
connection_table_l.peer_type,
params.stake,
params.peer_type,
params.total_stake,
) as u64)
{
connection.set_max_concurrent_uni_streams(max_uni_streams);
let receive_window = compute_recieve_window(
params.max_stake,
params.min_stake,
connection_table_l.peer_type,
params.stake,
);
let receive_window =
compute_recieve_window(params.max_stake, params.min_stake, params.peer_type);
if let Ok(receive_window) = receive_window {
connection.set_receive_window(receive_window);
@ -334,9 +330,8 @@ fn handle_and_cache_new_connection(
let remote_addr = connection.remote_address();
debug!(
"Peer type: {:?}, stake {}, total stake {}, max streams {} receive_window {:?} from peer {}",
connection_table_l.peer_type,
params.stake,
"Peer type {:?}, total stake {}, max streams {} receive_window {:?} from peer {}",
params.peer_type,
params.total_stake,
max_uni_streams.into_inner(),
receive_window,
@ -348,12 +343,11 @@ fn handle_and_cache_new_connection(
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
remote_addr.port(),
Some(connection.clone()),
params.stake,
params.peer_type,
timing::timestamp(),
params.max_connections_per_peer,
)
{
let peer_type = connection_table_l.peer_type;
drop(connection_table_l);
tokio::spawn(handle_connection(
connection,
@ -362,7 +356,6 @@ fn handle_and_cache_new_connection(
connection_table,
stream_exit,
params.clone(),
peer_type,
wait_for_chunk_timeout,
stream_load_ema,
stream_counter,
@ -448,13 +441,12 @@ fn compute_recieve_window(
max_stake: u64,
min_stake: u64,
peer_type: ConnectionPeerType,
peer_stake: u64,
) -> Result<VarInt, VarIntBoundsExceeded> {
match peer_type {
ConnectionPeerType::Unstaked => {
VarInt::from_u64(PACKET_DATA_SIZE as u64 * QUIC_UNSTAKED_RECEIVE_WINDOW_RATIO)
}
ConnectionPeerType::Staked => {
ConnectionPeerType::Staked(peer_stake) => {
let ratio =
compute_receive_window_ratio_for_staked_node(max_stake, min_stake, peer_stake);
VarInt::from_u64(PACKET_DATA_SIZE as u64 * ratio)
@ -490,10 +482,15 @@ async fn setup_connection(
stats.clone(),
),
|(pubkey, stake, total_stake, max_stake, min_stake)| {
let peer_type = if stake > 0 {
ConnectionPeerType::Staked(stake)
} else {
ConnectionPeerType::Unstaked
};
NewConnectionHandlerParams {
packet_sender,
remote_pubkey: Some(pubkey),
stake,
peer_type,
total_stake,
max_connections_per_peer,
stats: stats.clone(),
@ -503,31 +500,54 @@ async fn setup_connection(
},
);
if params.stake > 0 {
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, params.stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
}
if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
new_connection,
connection_table_l,
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout,
stream_load_ema.clone(),
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
match params.peer_type {
ConnectionPeerType::Staked(stake) => {
let mut connection_table_l = staked_connection_table.lock().unwrap();
if connection_table_l.total_size >= max_staked_connections {
let num_pruned =
connection_table_l.prune_random(PRUNE_RANDOM_SAMPLE_SIZE, stake);
stats.num_evictions.fetch_add(num_pruned, Ordering::Relaxed);
}
} else {
// If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
if connection_table_l.total_size < max_staked_connections {
if let Ok(()) = handle_and_cache_new_connection(
new_connection,
connection_table_l,
staked_connection_table.clone(),
&params,
wait_for_chunk_timeout,
stream_load_ema.clone(),
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
}
} else {
// If we couldn't prune a connection in the staked connection table, let's
// put this connection in the unstaked connection table. If needed, prune a
// connection from the unstaked connection table.
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
stream_load_ema.clone(),
) {
stats
.connection_added_from_staked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_staked_node
.fetch_add(1, Ordering::Relaxed);
}
}
}
ConnectionPeerType::Unstaked => {
if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.clone(),
@ -537,32 +557,14 @@ async fn setup_connection(
stream_load_ema.clone(),
) {
stats
.connection_added_from_staked_peer
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_on_pruning
.fetch_add(1, Ordering::Relaxed);
stats
.connection_add_failed_staked_node
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
}
}
} else if let Ok(()) = prune_unstaked_connections_and_add_new_connection(
new_connection,
unstaked_connection_table.clone(),
max_unstaked_connections,
&params,
wait_for_chunk_timeout,
stream_load_ema.clone(),
) {
stats
.connection_added_from_unstaked_peer
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.connection_add_failed_unstaked_node
.fetch_add(1, Ordering::Relaxed);
}
}
Err(e) => {
@ -696,7 +698,6 @@ async fn packet_batch_sender(
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_connection(
connection: Connection,
remote_addr: SocketAddr,
@ -704,7 +705,6 @@ async fn handle_connection(
connection_table: Arc<Mutex<ConnectionTable>>,
stream_exit: Arc<AtomicBool>,
params: NewConnectionHandlerParams,
peer_type: ConnectionPeerType,
wait_for_chunk_timeout: Duration,
stream_load_ema: Arc<StakedStreamLoadEMA>,
stream_counter: Arc<ConnectionStreamCounter>,
@ -720,22 +720,20 @@ async fn handle_connection(
stats.total_connections.fetch_add(1, Ordering::Relaxed);
let mut max_streams_per_throttling_interval =
stream_throttle::max_streams_for_connection_in_throttling_duration(
peer_type,
params.stake,
params.peer_type,
params.total_stake,
stream_load_ema.clone(),
);
let staked_stream = matches!(peer_type, ConnectionPeerType::Staked) && params.stake > 0;
while !stream_exit.load(Ordering::Relaxed) {
if let Ok(stream) =
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
{
match stream {
Ok(mut stream) => {
if staked_stream {
if let ConnectionPeerType::Staked(peer_stake) = params.peer_type {
max_streams_per_throttling_interval = stream_load_ema
.available_load_capacity_in_throttling_duration(
params.stake,
peer_stake,
params.total_stake,
);
}
@ -748,7 +746,7 @@ async fn handle_connection(
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
continue;
}
if staked_stream {
if params.peer_type.is_staked() {
stream_load_ema.increment_load();
}
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
@ -782,7 +780,7 @@ async fn handle_connection(
&remote_addr,
&packet_sender,
stats.clone(),
peer_type,
params.peer_type,
)
.await
{
@ -799,7 +797,7 @@ async fn handle_connection(
}
}
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
if staked_stream {
if params.peer_type.is_staked() {
stream_load_ema.update_ema_if_needed();
}
});
@ -884,17 +882,14 @@ async fn handle_chunk(
accum.meta.size = std::cmp::max(accum.meta.size, end_of_chunk);
}
match peer_type {
ConnectionPeerType::Staked => {
stats
.total_staked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
ConnectionPeerType::Unstaked => {
stats
.total_unstaked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
if peer_type.is_staked() {
stats
.total_staked_chunks_received
.fetch_add(1, Ordering::Relaxed);
} else {
stats
.total_unstaked_chunks_received
.fetch_add(1, Ordering::Relaxed);
}
} else {
// done receiving chunks
@ -943,7 +938,7 @@ async fn handle_chunk(
#[derive(Debug)]
struct ConnectionEntry {
exit: Arc<AtomicBool>,
stake: u64,
peer_type: ConnectionPeerType,
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
@ -953,7 +948,7 @@ struct ConnectionEntry {
impl ConnectionEntry {
fn new(
exit: Arc<AtomicBool>,
stake: u64,
peer_type: ConnectionPeerType,
last_update: Arc<AtomicU64>,
port: u16,
connection: Option<Connection>,
@ -961,7 +956,7 @@ impl ConnectionEntry {
) -> Self {
Self {
exit,
stake,
peer_type,
last_update,
port,
connection,
@ -972,6 +967,13 @@ impl ConnectionEntry {
fn last_update(&self) -> u64 {
self.last_update.load(Ordering::Relaxed)
}
fn stake(&self) -> u64 {
match self.peer_type {
ConnectionPeerType::Unstaked => 0,
ConnectionPeerType::Staked(stake) => stake,
}
}
}
impl Drop for ConnectionEntry {
@ -986,12 +988,6 @@ impl Drop for ConnectionEntry {
}
}
#[derive(Copy, Clone, Debug)]
pub enum ConnectionPeerType {
Unstaked,
Staked,
}
#[derive(Copy, Clone, Eq, Hash, PartialEq)]
enum ConnectionTableKey {
IP(IpAddr),
@ -1010,17 +1006,15 @@ impl ConnectionTableKey {
struct ConnectionTable {
table: IndexMap<ConnectionTableKey, Vec<ConnectionEntry>>,
total_size: usize,
peer_type: ConnectionPeerType,
}
// Prune the connection which has the oldest update
// Return number pruned
impl ConnectionTable {
fn new(peer_type: ConnectionPeerType) -> Self {
fn new() -> Self {
Self {
table: IndexMap::default(),
total_size: 0,
peer_type,
}
}
@ -1055,7 +1049,7 @@ impl ConnectionTable {
})
.map(|index| {
let connection = self.table[index].first();
let stake = connection.map(|connection| connection.stake);
let stake = connection.map(|connection| connection.stake());
(index, stake)
})
.take(sample_size)
@ -1073,7 +1067,7 @@ impl ConnectionTable {
key: ConnectionTableKey,
port: u16,
connection: Option<Connection>,
stake: u64,
peer_type: ConnectionPeerType,
last_update: u64,
max_connections_per_peer: usize,
) -> Option<(
@ -1090,7 +1084,7 @@ impl ConnectionTable {
if has_connection_capacity {
let exit = Arc::new(AtomicBool::new(false));
let last_update = Arc::new(AtomicU64::new(last_update));
let stream_counter = if stake > 0 {
let stream_counter = if peer_type.is_staked() {
connection_entry
.first()
.map(|entry| entry.stream_counter.clone())
@ -1102,7 +1096,7 @@ impl ConnectionTable {
};
connection_entry.push(ConnectionEntry::new(
exit.clone(),
stake,
peer_type,
last_update.clone(),
port,
connection,
@ -1738,7 +1732,7 @@ pub mod test {
fn test_prune_table_with_ip() {
use std::net::Ipv4Addr;
solana_logger::setup();
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
let mut table = ConnectionTable::new();
let mut num_entries = 5;
let max_connections_per_peer = 10;
let sockets: Vec<_> = (0..num_entries)
@ -1750,7 +1744,7 @@ pub mod test {
ConnectionTableKey::IP(socket.ip()),
socket.port(),
None,
0,
ConnectionPeerType::Unstaked,
i as u64,
max_connections_per_peer,
)
@ -1762,7 +1756,7 @@ pub mod test {
ConnectionTableKey::IP(sockets[0].ip()),
sockets[0].port(),
None,
0,
ConnectionPeerType::Unstaked,
5,
max_connections_per_peer,
)
@ -1787,7 +1781,7 @@ pub mod test {
#[test]
fn test_prune_table_with_unique_pubkeys() {
solana_logger::setup();
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
let mut table = ConnectionTable::new();
// We should be able to add more entries than max_connections_per_peer, since each entry is
// from a different peer pubkey.
@ -1801,7 +1795,7 @@ pub mod test {
ConnectionTableKey::Pubkey(*pubkey),
0,
None,
0,
ConnectionPeerType::Unstaked,
i as u64,
max_connections_per_peer,
)
@ -1822,7 +1816,7 @@ pub mod test {
#[test]
fn test_prune_table_with_non_unique_pubkeys() {
solana_logger::setup();
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
let mut table = ConnectionTable::new();
let max_connections_per_peer = 10;
let pubkey = Pubkey::new_unique();
@ -1832,7 +1826,7 @@ pub mod test {
ConnectionTableKey::Pubkey(pubkey),
0,
None,
0,
ConnectionPeerType::Unstaked,
i as u64,
max_connections_per_peer,
)
@ -1846,7 +1840,7 @@ pub mod test {
ConnectionTableKey::Pubkey(pubkey),
0,
None,
0,
ConnectionPeerType::Unstaked,
10,
max_connections_per_peer,
)
@ -1860,7 +1854,7 @@ pub mod test {
ConnectionTableKey::Pubkey(pubkey2),
0,
None,
0,
ConnectionPeerType::Unstaked,
10,
max_connections_per_peer,
)
@ -1882,7 +1876,7 @@ pub mod test {
fn test_prune_table_random() {
use std::net::Ipv4Addr;
solana_logger::setup();
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
let mut table = ConnectionTable::new();
let num_entries = 5;
let max_connections_per_peer = 10;
let sockets: Vec<_> = (0..num_entries)
@ -1894,7 +1888,7 @@ pub mod test {
ConnectionTableKey::IP(socket.ip()),
socket.port(),
None,
(i + 1) as u64,
ConnectionPeerType::Staked((i + 1) as u64),
i as u64,
max_connections_per_peer,
)
@ -1919,7 +1913,7 @@ pub mod test {
fn test_remove_connections() {
use std::net::Ipv4Addr;
solana_logger::setup();
let mut table = ConnectionTable::new(ConnectionPeerType::Staked);
let mut table = ConnectionTable::new();
let num_ips = 5;
let max_connections_per_peer = 10;
let mut sockets: Vec<_> = (0..num_ips)
@ -1931,7 +1925,7 @@ pub mod test {
ConnectionTableKey::IP(socket.ip()),
socket.port(),
None,
0,
ConnectionPeerType::Unstaked,
(i * 2) as u64,
max_connections_per_peer,
)
@ -1942,7 +1936,7 @@ pub mod test {
ConnectionTableKey::IP(socket.ip()),
socket.port(),
None,
0,
ConnectionPeerType::Unstaked,
(i * 2 + 1) as u64,
max_connections_per_peer,
)
@ -1956,7 +1950,7 @@ pub mod test {
ConnectionTableKey::IP(single_connection_addr.ip()),
single_connection_addr.port(),
None,
0,
ConnectionPeerType::Unstaked,
(num_ips * 2) as u64,
max_connections_per_peer,
)
@ -1978,46 +1972,26 @@ pub mod test {
fn test_max_allowed_uni_streams() {
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 0),
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 0),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 10, 0),
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(10), 0),
QUIC_MIN_STAKED_CONCURRENT_STREAMS
);
let delta =
(QUIC_TOTAL_STAKED_CONCURRENT_STREAMS - QUIC_MIN_STAKED_CONCURRENT_STREAMS) as f64;
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 1000, 10000),
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(1000), 10000),
QUIC_MAX_STAKED_CONCURRENT_STREAMS,
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 100, 10000),
compute_max_allowed_uni_streams(ConnectionPeerType::Staked(100), 10000),
((delta / (100_f64)) as usize + QUIC_MIN_STAKED_CONCURRENT_STREAMS)
.min(QUIC_MAX_STAKED_CONCURRENT_STREAMS)
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Staked, 0, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1000, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 1, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
assert_eq!(
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 0, 10000),
compute_max_allowed_uni_streams(ConnectionPeerType::Unstaked, 10000),
QUIC_MAX_UNSTAKED_CONCURRENT_STREAMS
);
}

View File

@ -156,24 +156,27 @@ impl StakedStreamLoadEMA {
}
pub(crate) fn max_streams_for_connection_in_throttling_duration(
connection_type: ConnectionPeerType,
stake: u64,
peer_type: ConnectionPeerType,
total_stake: u64,
ema_load: Arc<StakedStreamLoadEMA>,
) -> u64 {
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
let max_num_connections = u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| {
error!(
"Failed to convert maximum number of unstaked connections {} to u64.",
MAX_UNSTAKED_CONNECTIONS
);
500
});
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_connections)
} else {
ema_load.available_load_capacity_in_throttling_duration(stake, total_stake)
match peer_type {
ConnectionPeerType::Unstaked => {
let max_num_connections =
u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| {
error!(
"Failed to convert maximum number of unstaked connections {} to u64.",
MAX_UNSTAKED_CONNECTIONS
);
500
});
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
.apply_to(MAX_STREAMS_PER_MS * STREAM_THROTTLING_INTERVAL_MS)
.saturating_div(max_num_connections)
}
ConnectionPeerType::Staked(stake) => {
ema_load.available_load_capacity_in_throttling_duration(stake, total_stake)
}
}
}
@ -213,13 +216,10 @@ pub mod test {
use {
super::*,
crate::{
nonblocking::{
quic::ConnectionPeerType,
stream_throttle::{
max_streams_for_connection_in_throttling_duration,
MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION,
STREAM_LOAD_EMA_INTERVAL_MS,
},
nonblocking::stream_throttle::{
max_streams_for_connection_in_throttling_duration,
MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION,
STREAM_LOAD_EMA_INTERVAL_MS,
},
quic::StreamStats,
},
@ -236,30 +236,6 @@ pub mod test {
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Unstaked,
0,
10000,
load_ema.clone(),
),
10
);
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Unstaked,
10,
10000,
load_ema.clone(),
),
10
);
// If stake is 0, same limits as unstaked connections will apply.
// 25K packets per ms * 20% / 500 max unstaked connections
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
0,
10000,
load_ema.clone(),
),
@ -283,8 +259,7 @@ pub mod test {
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
15,
ConnectionPeerType::Staked(15),
10000,
load_ema.clone(),
),
@ -295,8 +270,7 @@ pub mod test {
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
1000,
ConnectionPeerType::Staked(1000),
10000,
load_ema.clone(),
),
@ -308,8 +282,7 @@ pub mod test {
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
15,
ConnectionPeerType::Staked(15),
10000,
load_ema.clone(),
),
@ -320,8 +293,7 @@ pub mod test {
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
1000,
ConnectionPeerType::Staked(1000),
10000,
load_ema.clone(),
),
@ -334,8 +306,7 @@ pub mod test {
// function = ((10K * 10K) / 25% of 10K) * stake / total_stake
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
15,
ConnectionPeerType::Staked(15),
10000,
load_ema.clone(),
),
@ -345,8 +316,7 @@ pub mod test {
// function = ((10K * 10K) / 25% of 10K) * stake / total_stake
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
1000,
ConnectionPeerType::Staked(1000),
10000,
load_ema.clone(),
),
@ -357,8 +327,7 @@ pub mod test {
// MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams.
assert_eq!(
max_streams_for_connection_in_throttling_duration(
ConnectionPeerType::Staked,
1,
ConnectionPeerType::Staked(1),
40000,
load_ema.clone(),
),