Use EMA to compute QUIC streamer load for staked connections (#34586)
* Use EMA to compute QUIC streamer load for staked connections * change min load to 25% of max load * reduce max PPS from 500K to 250K * update ema_function to account for missing intervals * replace f64 math with u128 * track streams across all connections for a peer * u128 -> u64 * replace ' as ' type conversion to from and try_from * add counter for u64 overflow * reset recent stream load on ema interval * do not use same counter for unstaked connections from a peer IP
This commit is contained in:
parent
e31a45ad0c
commit
904700cc56
|
@ -28,6 +28,7 @@ use {
|
||||||
timing,
|
timing,
|
||||||
},
|
},
|
||||||
std::{
|
std::{
|
||||||
|
cmp,
|
||||||
iter::repeat_with,
|
iter::repeat_with,
|
||||||
net::{IpAddr, SocketAddr, UdpSocket},
|
net::{IpAddr, SocketAddr, UdpSocket},
|
||||||
sync::{
|
sync::{
|
||||||
|
@ -39,10 +40,10 @@ use {
|
||||||
tokio::{task::JoinHandle, time::timeout},
|
tokio::{task::JoinHandle, time::timeout},
|
||||||
};
|
};
|
||||||
|
|
||||||
/// Limit to 500K PPS
|
/// Limit to 250K PPS
|
||||||
const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10;
|
const MAX_STREAMS_PER_MS: u64 = 250;
|
||||||
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
|
const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20;
|
||||||
const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100);
|
const STREAM_THROTTLING_INTERVAL_MS: u64 = 100;
|
||||||
const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100);
|
const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100);
|
||||||
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);
|
pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
|
|
||||||
|
@ -60,6 +61,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre
|
||||||
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
|
const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4;
|
||||||
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
|
const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many";
|
||||||
const STREAM_STOP_CODE_THROTTLING: u32 = 15;
|
const STREAM_STOP_CODE_THROTTLING: u32 = 15;
|
||||||
|
const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5;
|
||||||
|
const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10;
|
||||||
|
const MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION: u64 = 8;
|
||||||
|
|
||||||
// A sequence of bytes that is part of a packet
|
// A sequence of bytes that is part of a packet
|
||||||
// along with where in the packet it is
|
// along with where in the packet it is
|
||||||
|
@ -85,6 +89,138 @@ struct PacketAccumulator {
|
||||||
pub chunks: Vec<PacketChunk>,
|
pub chunks: Vec<PacketChunk>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct StakedStreamLoadEMA {
|
||||||
|
current_load_ema: AtomicU64,
|
||||||
|
load_in_recent_interval: AtomicU64,
|
||||||
|
last_update: RwLock<Instant>,
|
||||||
|
stats: Arc<StreamStats>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StakedStreamLoadEMA {
|
||||||
|
fn new(stats: Arc<StreamStats>) -> Self {
|
||||||
|
Self {
|
||||||
|
current_load_ema: AtomicU64::default(),
|
||||||
|
load_in_recent_interval: AtomicU64::default(),
|
||||||
|
last_update: RwLock::new(Instant::now()),
|
||||||
|
stats,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ema_function(current_ema: u128, recent_load: u128) -> u128 {
|
||||||
|
// Using the EMA multiplier helps in avoiding the floating point math during EMA related calculations
|
||||||
|
const STREAM_LOAD_EMA_MULTIPLIER: u128 = 1024;
|
||||||
|
let multiplied_smoothing_factor: u128 =
|
||||||
|
2 * STREAM_LOAD_EMA_MULTIPLIER / (u128::from(STREAM_LOAD_EMA_INTERVAL_COUNT) + 1);
|
||||||
|
|
||||||
|
// The formula is
|
||||||
|
// updated_ema = recent_load * smoothing_factor + current_ema * (1 - smoothing_factor)
|
||||||
|
// To avoid floating point math, we are using STREAM_LOAD_EMA_MULTIPLIER
|
||||||
|
// updated_ema = (recent_load * multiplied_smoothing_factor
|
||||||
|
// + current_ema * (multiplier - multiplied_smoothing_factor)) / multiplier
|
||||||
|
(recent_load * multiplied_smoothing_factor
|
||||||
|
+ current_ema * (STREAM_LOAD_EMA_MULTIPLIER - multiplied_smoothing_factor))
|
||||||
|
/ STREAM_LOAD_EMA_MULTIPLIER
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_ema(&self, time_since_last_update_ms: u128) {
|
||||||
|
// if time_since_last_update_ms > STREAM_LOAD_EMA_INTERVAL_MS, there might be intervals where ema was not updated.
|
||||||
|
// count how many updates (1 + missed intervals) are needed.
|
||||||
|
let num_extra_updates =
|
||||||
|
time_since_last_update_ms.saturating_sub(1) / u128::from(STREAM_LOAD_EMA_INTERVAL_MS);
|
||||||
|
|
||||||
|
let load_in_recent_interval =
|
||||||
|
u128::from(self.load_in_recent_interval.swap(0, Ordering::Relaxed));
|
||||||
|
|
||||||
|
let mut updated_load_ema = Self::ema_function(
|
||||||
|
u128::from(self.current_load_ema.load(Ordering::Relaxed)),
|
||||||
|
load_in_recent_interval,
|
||||||
|
);
|
||||||
|
|
||||||
|
for _ in 0..num_extra_updates {
|
||||||
|
updated_load_ema = Self::ema_function(updated_load_ema, load_in_recent_interval);
|
||||||
|
}
|
||||||
|
|
||||||
|
let Ok(updated_load_ema) = u64::try_from(updated_load_ema) else {
|
||||||
|
error!(
|
||||||
|
"Failed to convert EMA {} to a u64. Not updating the load EMA",
|
||||||
|
updated_load_ema
|
||||||
|
);
|
||||||
|
self.stats
|
||||||
|
.stream_load_ema_overflow
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
self.current_load_ema
|
||||||
|
.store(updated_load_ema, Ordering::Relaxed);
|
||||||
|
self.stats
|
||||||
|
.stream_load_ema
|
||||||
|
.store(updated_load_ema as usize, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_ema_if_needed(&self) {
|
||||||
|
const EMA_DURATION: Duration = Duration::from_millis(STREAM_LOAD_EMA_INTERVAL_MS);
|
||||||
|
// Read lock enables multiple connection handlers to run in parallel if interval is not expired
|
||||||
|
if Instant::now().duration_since(*self.last_update.read().unwrap()) >= EMA_DURATION {
|
||||||
|
let mut last_update_w = self.last_update.write().unwrap();
|
||||||
|
// Recheck as some other thread might have updated the ema since this thread tried to acquire the write lock.
|
||||||
|
let since_last_update = Instant::now().duration_since(*last_update_w);
|
||||||
|
if since_last_update >= EMA_DURATION {
|
||||||
|
*last_update_w = Instant::now();
|
||||||
|
self.update_ema(since_last_update.as_millis());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn increment_load(&self) {
|
||||||
|
self.load_in_recent_interval.fetch_add(1, Ordering::Relaxed);
|
||||||
|
self.update_ema_if_needed();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn available_load_capacity_in_duration(
|
||||||
|
&self,
|
||||||
|
stake: u64,
|
||||||
|
total_stake: u64,
|
||||||
|
duration_ms: u64,
|
||||||
|
) -> u64 {
|
||||||
|
let ema_window_ms = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT;
|
||||||
|
let max_load_in_ema_window = u128::from(
|
||||||
|
(MAX_STREAMS_PER_MS
|
||||||
|
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS))
|
||||||
|
* ema_window_ms,
|
||||||
|
);
|
||||||
|
|
||||||
|
// If the current load is low, cap it to 25% of max_load.
|
||||||
|
let current_load = cmp::max(
|
||||||
|
u128::from(self.current_load_ema.load(Ordering::Relaxed)),
|
||||||
|
max_load_in_ema_window / 4,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Formula is (max_load ^ 2 / current_load) * (stake / total_stake)
|
||||||
|
let capacity_in_ema_window =
|
||||||
|
(max_load_in_ema_window * max_load_in_ema_window * u128::from(stake))
|
||||||
|
/ (current_load * u128::from(total_stake));
|
||||||
|
|
||||||
|
let calculated_capacity =
|
||||||
|
capacity_in_ema_window * u128::from(duration_ms) / u128::from(ema_window_ms);
|
||||||
|
let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| {
|
||||||
|
error!(
|
||||||
|
"Failed to convert stream capacity {} to u64. Using minimum load capacity",
|
||||||
|
calculated_capacity
|
||||||
|
);
|
||||||
|
self.stats
|
||||||
|
.stream_load_capacity_overflow
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION
|
||||||
|
});
|
||||||
|
|
||||||
|
cmp::max(
|
||||||
|
calculated_capacity,
|
||||||
|
MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[allow(clippy::too_many_arguments)]
|
#[allow(clippy::too_many_arguments)]
|
||||||
pub fn spawn_server(
|
pub fn spawn_server(
|
||||||
name: &'static str,
|
name: &'static str,
|
||||||
|
@ -147,6 +283,7 @@ async fn run_server(
|
||||||
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> = Arc::new(Mutex::new(
|
let unstaked_connection_table: Arc<Mutex<ConnectionTable>> = Arc::new(Mutex::new(
|
||||||
ConnectionTable::new(ConnectionPeerType::Unstaked),
|
ConnectionTable::new(ConnectionPeerType::Unstaked),
|
||||||
));
|
));
|
||||||
|
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone()));
|
||||||
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
|
let staked_connection_table: Arc<Mutex<ConnectionTable>> =
|
||||||
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
|
Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked)));
|
||||||
let (sender, receiver) = async_unbounded();
|
let (sender, receiver) = async_unbounded();
|
||||||
|
@ -178,6 +315,7 @@ async fn run_server(
|
||||||
max_unstaked_connections,
|
max_unstaked_connections,
|
||||||
stats.clone(),
|
stats.clone(),
|
||||||
wait_for_chunk_timeout,
|
wait_for_chunk_timeout,
|
||||||
|
stream_load_ema.clone(),
|
||||||
));
|
));
|
||||||
} else {
|
} else {
|
||||||
debug!("accept(): Timed out waiting for connection");
|
debug!("accept(): Timed out waiting for connection");
|
||||||
|
@ -311,6 +449,7 @@ fn handle_and_cache_new_connection(
|
||||||
connection_table: Arc<Mutex<ConnectionTable>>,
|
connection_table: Arc<Mutex<ConnectionTable>>,
|
||||||
params: &NewConnectionHandlerParams,
|
params: &NewConnectionHandlerParams,
|
||||||
wait_for_chunk_timeout: Duration,
|
wait_for_chunk_timeout: Duration,
|
||||||
|
stream_load_ema: Arc<StakedStreamLoadEMA>,
|
||||||
) -> Result<(), ConnectionHandlerError> {
|
) -> Result<(), ConnectionHandlerError> {
|
||||||
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
|
if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams(
|
||||||
connection_table_l.peer_type,
|
connection_table_l.peer_type,
|
||||||
|
@ -342,14 +481,16 @@ fn handle_and_cache_new_connection(
|
||||||
remote_addr,
|
remote_addr,
|
||||||
);
|
);
|
||||||
|
|
||||||
if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection(
|
if let Some((last_update, stream_exit, stream_counter)) = connection_table_l
|
||||||
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
|
.try_add_connection(
|
||||||
remote_addr.port(),
|
ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey),
|
||||||
Some(connection.clone()),
|
remote_addr.port(),
|
||||||
params.stake,
|
Some(connection.clone()),
|
||||||
timing::timestamp(),
|
params.stake,
|
||||||
params.max_connections_per_peer,
|
timing::timestamp(),
|
||||||
) {
|
params.max_connections_per_peer,
|
||||||
|
)
|
||||||
|
{
|
||||||
let peer_type = connection_table_l.peer_type;
|
let peer_type = connection_table_l.peer_type;
|
||||||
drop(connection_table_l);
|
drop(connection_table_l);
|
||||||
tokio::spawn(handle_connection(
|
tokio::spawn(handle_connection(
|
||||||
|
@ -361,6 +502,8 @@ fn handle_and_cache_new_connection(
|
||||||
params.clone(),
|
params.clone(),
|
||||||
peer_type,
|
peer_type,
|
||||||
wait_for_chunk_timeout,
|
wait_for_chunk_timeout,
|
||||||
|
stream_load_ema,
|
||||||
|
stream_counter,
|
||||||
));
|
));
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
|
@ -389,6 +532,7 @@ fn prune_unstaked_connections_and_add_new_connection(
|
||||||
max_connections: usize,
|
max_connections: usize,
|
||||||
params: &NewConnectionHandlerParams,
|
params: &NewConnectionHandlerParams,
|
||||||
wait_for_chunk_timeout: Duration,
|
wait_for_chunk_timeout: Duration,
|
||||||
|
stream_load_ema: Arc<StakedStreamLoadEMA>,
|
||||||
) -> Result<(), ConnectionHandlerError> {
|
) -> Result<(), ConnectionHandlerError> {
|
||||||
let stats = params.stats.clone();
|
let stats = params.stats.clone();
|
||||||
if max_connections > 0 {
|
if max_connections > 0 {
|
||||||
|
@ -401,6 +545,7 @@ fn prune_unstaked_connections_and_add_new_connection(
|
||||||
connection_table_clone,
|
connection_table_clone,
|
||||||
params,
|
params,
|
||||||
wait_for_chunk_timeout,
|
wait_for_chunk_timeout,
|
||||||
|
stream_load_ema,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
connection.close(
|
connection.close(
|
||||||
|
@ -467,6 +612,7 @@ async fn setup_connection(
|
||||||
max_unstaked_connections: usize,
|
max_unstaked_connections: usize,
|
||||||
stats: Arc<StreamStats>,
|
stats: Arc<StreamStats>,
|
||||||
wait_for_chunk_timeout: Duration,
|
wait_for_chunk_timeout: Duration,
|
||||||
|
stream_load_ema: Arc<StakedStreamLoadEMA>,
|
||||||
) {
|
) {
|
||||||
const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
|
const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2;
|
||||||
let from = connecting.remote_address();
|
let from = connecting.remote_address();
|
||||||
|
@ -510,6 +656,7 @@ async fn setup_connection(
|
||||||
staked_connection_table.clone(),
|
staked_connection_table.clone(),
|
||||||
¶ms,
|
¶ms,
|
||||||
wait_for_chunk_timeout,
|
wait_for_chunk_timeout,
|
||||||
|
stream_load_ema.clone(),
|
||||||
) {
|
) {
|
||||||
stats
|
stats
|
||||||
.connection_added_from_staked_peer
|
.connection_added_from_staked_peer
|
||||||
|
@ -525,6 +672,7 @@ async fn setup_connection(
|
||||||
max_unstaked_connections,
|
max_unstaked_connections,
|
||||||
¶ms,
|
¶ms,
|
||||||
wait_for_chunk_timeout,
|
wait_for_chunk_timeout,
|
||||||
|
stream_load_ema.clone(),
|
||||||
) {
|
) {
|
||||||
stats
|
stats
|
||||||
.connection_added_from_staked_peer
|
.connection_added_from_staked_peer
|
||||||
|
@ -544,6 +692,7 @@ async fn setup_connection(
|
||||||
max_unstaked_connections,
|
max_unstaked_connections,
|
||||||
¶ms,
|
¶ms,
|
||||||
wait_for_chunk_timeout,
|
wait_for_chunk_timeout,
|
||||||
|
stream_load_ema.clone(),
|
||||||
) {
|
) {
|
||||||
stats
|
stats
|
||||||
.connection_added_from_unstaked_peer
|
.connection_added_from_unstaked_peer
|
||||||
|
@ -685,31 +834,30 @@ async fn packet_batch_sender(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn max_streams_for_connection_in_100ms(
|
fn max_streams_for_connection_in_duration(
|
||||||
connection_type: ConnectionPeerType,
|
connection_type: ConnectionPeerType,
|
||||||
stake: u64,
|
stake: u64,
|
||||||
total_stake: u64,
|
total_stake: u64,
|
||||||
|
ema_load: Arc<StakedStreamLoadEMA>,
|
||||||
|
duration_ms: u64,
|
||||||
) -> u64 {
|
) -> u64 {
|
||||||
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
|
if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 {
|
||||||
|
let max_num_connections = u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| {
|
||||||
|
error!(
|
||||||
|
"Failed to convert maximum number of unstaked connections {} to u64.",
|
||||||
|
MAX_UNSTAKED_CONNECTIONS
|
||||||
|
);
|
||||||
|
500
|
||||||
|
});
|
||||||
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
|
Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT)
|
||||||
.apply_to(MAX_STREAMS_PER_100MS)
|
.apply_to(MAX_STREAMS_PER_MS * duration_ms)
|
||||||
.saturating_div(MAX_UNSTAKED_CONNECTIONS as u64)
|
.saturating_div(max_num_connections)
|
||||||
} else {
|
} else {
|
||||||
let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS
|
ema_load.available_load_capacity_in_duration(stake, total_stake, duration_ms)
|
||||||
- Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS);
|
|
||||||
((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool {
|
|
||||||
if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL {
|
|
||||||
*last_instant = tokio::time::Instant::now();
|
|
||||||
true
|
|
||||||
} else {
|
|
||||||
false
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[allow(clippy::too_many_arguments)]
|
||||||
async fn handle_connection(
|
async fn handle_connection(
|
||||||
connection: Connection,
|
connection: Connection,
|
||||||
remote_addr: SocketAddr,
|
remote_addr: SocketAddr,
|
||||||
|
@ -719,6 +867,8 @@ async fn handle_connection(
|
||||||
params: NewConnectionHandlerParams,
|
params: NewConnectionHandlerParams,
|
||||||
peer_type: ConnectionPeerType,
|
peer_type: ConnectionPeerType,
|
||||||
wait_for_chunk_timeout: Duration,
|
wait_for_chunk_timeout: Duration,
|
||||||
|
stream_load_ema: Arc<StakedStreamLoadEMA>,
|
||||||
|
stream_counter: Arc<ConnectionStreamCounter>,
|
||||||
) {
|
) {
|
||||||
let stats = params.stats;
|
let stats = params.stats;
|
||||||
debug!(
|
debug!(
|
||||||
|
@ -729,30 +879,48 @@ async fn handle_connection(
|
||||||
);
|
);
|
||||||
let stable_id = connection.stable_id();
|
let stable_id = connection.stable_id();
|
||||||
stats.total_connections.fetch_add(1, Ordering::Relaxed);
|
stats.total_connections.fetch_add(1, Ordering::Relaxed);
|
||||||
let max_streams_per_100ms =
|
let mut max_streams_per_throttling_interval = max_streams_for_connection_in_duration(
|
||||||
max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake);
|
peer_type,
|
||||||
let mut last_throttling_instant = tokio::time::Instant::now();
|
params.stake,
|
||||||
let mut streams_in_current_interval = 0;
|
params.total_stake,
|
||||||
|
stream_load_ema.clone(),
|
||||||
|
STREAM_THROTTLING_INTERVAL_MS,
|
||||||
|
);
|
||||||
|
let staked_stream = matches!(peer_type, ConnectionPeerType::Staked) && params.stake > 0;
|
||||||
while !stream_exit.load(Ordering::Relaxed) {
|
while !stream_exit.load(Ordering::Relaxed) {
|
||||||
if let Ok(stream) =
|
if let Ok(stream) =
|
||||||
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
|
tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await
|
||||||
{
|
{
|
||||||
match stream {
|
match stream {
|
||||||
Ok(mut stream) => {
|
Ok(mut stream) => {
|
||||||
if reset_throttling_params_if_needed(&mut last_throttling_instant) {
|
if staked_stream {
|
||||||
streams_in_current_interval = 0;
|
max_streams_per_throttling_interval = stream_load_ema
|
||||||
} else if streams_in_current_interval >= max_streams_per_100ms {
|
.available_load_capacity_in_duration(
|
||||||
|
params.stake,
|
||||||
|
params.total_stake,
|
||||||
|
STREAM_THROTTLING_INTERVAL_MS,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
stream_counter.reset_throttling_params_if_needed();
|
||||||
|
if stream_counter.stream_count.load(Ordering::Relaxed)
|
||||||
|
>= max_streams_per_throttling_interval
|
||||||
|
{
|
||||||
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
|
stats.throttled_streams.fetch_add(1, Ordering::Relaxed);
|
||||||
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
|
let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
streams_in_current_interval = streams_in_current_interval.saturating_add(1);
|
if staked_stream {
|
||||||
|
stream_load_ema.increment_load();
|
||||||
|
}
|
||||||
|
stream_counter.stream_count.fetch_add(1, Ordering::Relaxed);
|
||||||
stats.total_streams.fetch_add(1, Ordering::Relaxed);
|
stats.total_streams.fetch_add(1, Ordering::Relaxed);
|
||||||
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
|
stats.total_new_streams.fetch_add(1, Ordering::Relaxed);
|
||||||
let stream_exit = stream_exit.clone();
|
let stream_exit = stream_exit.clone();
|
||||||
let stats = stats.clone();
|
let stats = stats.clone();
|
||||||
let packet_sender = params.packet_sender.clone();
|
let packet_sender = params.packet_sender.clone();
|
||||||
let last_update = last_update.clone();
|
let last_update = last_update.clone();
|
||||||
|
let stream_load_ema = stream_load_ema.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut maybe_batch = None;
|
let mut maybe_batch = None;
|
||||||
// The min is to guard against a value too small which can wake up unnecessarily
|
// The min is to guard against a value too small which can wake up unnecessarily
|
||||||
|
@ -793,6 +961,9 @@ async fn handle_connection(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
|
stats.total_streams.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
if staked_stream {
|
||||||
|
stream_load_ema.update_ema_if_needed();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -931,6 +1102,37 @@ async fn handle_chunk(
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
struct ConnectionStreamCounter {
|
||||||
|
stream_count: AtomicU64,
|
||||||
|
last_throttling_instant: RwLock<tokio::time::Instant>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnectionStreamCounter {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
stream_count: AtomicU64::default(),
|
||||||
|
last_throttling_instant: RwLock::new(tokio::time::Instant::now()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset_throttling_params_if_needed(&self) {
|
||||||
|
const THROTTLING_INTERVAL: Duration = Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS);
|
||||||
|
if tokio::time::Instant::now().duration_since(*self.last_throttling_instant.read().unwrap())
|
||||||
|
> THROTTLING_INTERVAL
|
||||||
|
{
|
||||||
|
let mut last_throttling_instant = self.last_throttling_instant.write().unwrap();
|
||||||
|
// Recheck as some other thread might have done throttling since this thread tried to acquire the write lock.
|
||||||
|
if tokio::time::Instant::now().duration_since(*last_throttling_instant)
|
||||||
|
> THROTTLING_INTERVAL
|
||||||
|
{
|
||||||
|
*last_throttling_instant = tokio::time::Instant::now();
|
||||||
|
self.stream_count.store(0, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct ConnectionEntry {
|
struct ConnectionEntry {
|
||||||
exit: Arc<AtomicBool>,
|
exit: Arc<AtomicBool>,
|
||||||
|
@ -938,6 +1140,7 @@ struct ConnectionEntry {
|
||||||
last_update: Arc<AtomicU64>,
|
last_update: Arc<AtomicU64>,
|
||||||
port: u16,
|
port: u16,
|
||||||
connection: Option<Connection>,
|
connection: Option<Connection>,
|
||||||
|
stream_counter: Arc<ConnectionStreamCounter>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ConnectionEntry {
|
impl ConnectionEntry {
|
||||||
|
@ -947,6 +1150,7 @@ impl ConnectionEntry {
|
||||||
last_update: Arc<AtomicU64>,
|
last_update: Arc<AtomicU64>,
|
||||||
port: u16,
|
port: u16,
|
||||||
connection: Option<Connection>,
|
connection: Option<Connection>,
|
||||||
|
stream_counter: Arc<ConnectionStreamCounter>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
exit,
|
exit,
|
||||||
|
@ -954,6 +1158,7 @@ impl ConnectionEntry {
|
||||||
last_update,
|
last_update,
|
||||||
port,
|
port,
|
||||||
connection,
|
connection,
|
||||||
|
stream_counter,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1064,7 +1269,11 @@ impl ConnectionTable {
|
||||||
stake: u64,
|
stake: u64,
|
||||||
last_update: u64,
|
last_update: u64,
|
||||||
max_connections_per_peer: usize,
|
max_connections_per_peer: usize,
|
||||||
) -> Option<(Arc<AtomicU64>, Arc<AtomicBool>)> {
|
) -> Option<(
|
||||||
|
Arc<AtomicU64>,
|
||||||
|
Arc<AtomicBool>,
|
||||||
|
Arc<ConnectionStreamCounter>,
|
||||||
|
)> {
|
||||||
let connection_entry = self.table.entry(key).or_default();
|
let connection_entry = self.table.entry(key).or_default();
|
||||||
let has_connection_capacity = connection_entry
|
let has_connection_capacity = connection_entry
|
||||||
.len()
|
.len()
|
||||||
|
@ -1074,15 +1283,26 @@ impl ConnectionTable {
|
||||||
if has_connection_capacity {
|
if has_connection_capacity {
|
||||||
let exit = Arc::new(AtomicBool::new(false));
|
let exit = Arc::new(AtomicBool::new(false));
|
||||||
let last_update = Arc::new(AtomicU64::new(last_update));
|
let last_update = Arc::new(AtomicU64::new(last_update));
|
||||||
|
let stream_counter = if stake > 0 {
|
||||||
|
connection_entry
|
||||||
|
.first()
|
||||||
|
.map(|entry| entry.stream_counter.clone())
|
||||||
|
.unwrap_or(Arc::new(ConnectionStreamCounter::new()))
|
||||||
|
} else {
|
||||||
|
// Unstaked connections are tracked using peer IP address. It's possible that different clients
|
||||||
|
// use the same IP due to NAT. So counting all the streams from a given IP could be too restrictive.
|
||||||
|
Arc::new(ConnectionStreamCounter::new())
|
||||||
|
};
|
||||||
connection_entry.push(ConnectionEntry::new(
|
connection_entry.push(ConnectionEntry::new(
|
||||||
exit.clone(),
|
exit.clone(),
|
||||||
stake,
|
stake,
|
||||||
last_update.clone(),
|
last_update.clone(),
|
||||||
port,
|
port,
|
||||||
connection,
|
connection,
|
||||||
|
stream_counter.clone(),
|
||||||
));
|
));
|
||||||
self.total_size += 1;
|
self.total_size += 1;
|
||||||
Some((last_update, exit))
|
Some((last_update, exit, stream_counter))
|
||||||
} else {
|
} else {
|
||||||
if let Some(connection) = connection {
|
if let Some(connection) = connection {
|
||||||
connection.close(
|
connection.close(
|
||||||
|
@ -2030,38 +2250,219 @@ pub mod test {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_max_streams_for_connection_in_100ms() {
|
fn test_max_streams_for_unstaked_connection_in_100ms() {
|
||||||
// 50K packets per ms * 20% / 500 max unstaked connections
|
let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default())));
|
||||||
|
// 25K packets per ms * 20% / 500 max unstaked connections
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 0, 10000),
|
max_streams_for_connection_in_duration(
|
||||||
20
|
ConnectionPeerType::Unstaked,
|
||||||
|
0,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
10
|
||||||
);
|
);
|
||||||
|
|
||||||
// 50K packets per ms * 20% / 500 max unstaked connections
|
// 25K packets per ms * 20% / 500 max unstaked connections
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 10, 10000),
|
max_streams_for_connection_in_duration(
|
||||||
20
|
ConnectionPeerType::Unstaked,
|
||||||
|
10,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
10
|
||||||
);
|
);
|
||||||
|
|
||||||
// If stake is 0, same limits as unstaked connections will apply.
|
// If stake is 0, same limits as unstaked connections will apply.
|
||||||
// 50K packets per ms * 20% / 500 max unstaked connections
|
// 25K packets per ms * 20% / 500 max unstaked connections
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 0, 10000),
|
max_streams_for_connection_in_duration(
|
||||||
20
|
ConnectionPeerType::Staked,
|
||||||
);
|
0,
|
||||||
|
10000,
|
||||||
// max staked streams = 50K packets per ms * 80% = 40K
|
load_ema.clone(),
|
||||||
// function = 40K * stake / total_stake
|
100
|
||||||
assert_eq!(
|
),
|
||||||
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 15, 10000),
|
10
|
||||||
60
|
|
||||||
);
|
|
||||||
|
|
||||||
// max staked streams = 50K packets per ms * 80% = 40K
|
|
||||||
// function = 40K * stake / total_stake
|
|
||||||
assert_eq!(
|
|
||||||
max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1000, 10000),
|
|
||||||
4000
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
#[test]
|
||||||
|
fn test_max_streams_for_staked_connection_in_100ms() {
|
||||||
|
let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default())));
|
||||||
|
|
||||||
|
// EMA load is used for staked connections to calculate max number of allowed streams.
|
||||||
|
// EMA window = 5ms interval * 10 intervals = 50ms
|
||||||
|
// max streams per window = 250K streams/sec * 80% = 200K/sec = 10K per 50ms
|
||||||
|
// max_streams in 50ms = ((10K * 10K) / ema_load) * stake / total_stake
|
||||||
|
//
|
||||||
|
// Stream throttling window is 100ms. So it'll double the amount of max streams.
|
||||||
|
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / ema_load) * stake / total_stake
|
||||||
|
|
||||||
|
load_ema.current_load_ema.store(10000, Ordering::Relaxed);
|
||||||
|
// ema_load = 10K, stake = 15, total_stake = 10K
|
||||||
|
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
15,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
30
|
||||||
|
);
|
||||||
|
|
||||||
|
// ema_load = 10K, stake = 1K, total_stake = 10K
|
||||||
|
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
1000,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
2000
|
||||||
|
);
|
||||||
|
|
||||||
|
load_ema.current_load_ema.store(2500, Ordering::Relaxed);
|
||||||
|
// ema_load = 2.5K, stake = 15, total_stake = 10K
|
||||||
|
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
15,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
120
|
||||||
|
);
|
||||||
|
|
||||||
|
// ema_load = 2.5K, stake = 1K, total_stake = 10K
|
||||||
|
// max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
1000,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
8000
|
||||||
|
);
|
||||||
|
|
||||||
|
// At 2000, the load is less than 25% of max_load (10K).
|
||||||
|
// Test that we cap it to 25%, yielding the same result as if load was 2500.
|
||||||
|
load_ema.current_load_ema.store(2000, Ordering::Relaxed);
|
||||||
|
// function = ((10K * 10K) / 25% of 10K) * stake / total_stake
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
15,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
120
|
||||||
|
);
|
||||||
|
|
||||||
|
// function = ((10K * 10K) / 25% of 10K) * stake / total_stake
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
1000,
|
||||||
|
10000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
8000
|
||||||
|
);
|
||||||
|
|
||||||
|
// At 1/40000 stake weight, and minimum load, it should still allow
|
||||||
|
// MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams.
|
||||||
|
assert_eq!(
|
||||||
|
max_streams_for_connection_in_duration(
|
||||||
|
ConnectionPeerType::Staked,
|
||||||
|
1,
|
||||||
|
40000,
|
||||||
|
load_ema.clone(),
|
||||||
|
100
|
||||||
|
),
|
||||||
|
MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_update_ema() {
|
||||||
|
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default())));
|
||||||
|
stream_load_ema
|
||||||
|
.load_in_recent_interval
|
||||||
|
.store(2500, Ordering::Relaxed);
|
||||||
|
stream_load_ema
|
||||||
|
.current_load_ema
|
||||||
|
.store(2000, Ordering::Relaxed);
|
||||||
|
|
||||||
|
stream_load_ema.update_ema(5);
|
||||||
|
|
||||||
|
let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed);
|
||||||
|
assert_eq!(updated_ema, 2090);
|
||||||
|
|
||||||
|
stream_load_ema
|
||||||
|
.load_in_recent_interval
|
||||||
|
.store(2500, Ordering::Relaxed);
|
||||||
|
|
||||||
|
stream_load_ema.update_ema(5);
|
||||||
|
|
||||||
|
let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed);
|
||||||
|
assert_eq!(updated_ema, 2164);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_update_ema_missing_interval() {
|
||||||
|
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default())));
|
||||||
|
stream_load_ema
|
||||||
|
.load_in_recent_interval
|
||||||
|
.store(2500, Ordering::Relaxed);
|
||||||
|
stream_load_ema
|
||||||
|
.current_load_ema
|
||||||
|
.store(2000, Ordering::Relaxed);
|
||||||
|
|
||||||
|
stream_load_ema.update_ema(8);
|
||||||
|
|
||||||
|
let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed);
|
||||||
|
assert_eq!(updated_ema, 2164);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_update_ema_if_needed() {
|
||||||
|
let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default())));
|
||||||
|
stream_load_ema
|
||||||
|
.load_in_recent_interval
|
||||||
|
.store(2500, Ordering::Relaxed);
|
||||||
|
stream_load_ema
|
||||||
|
.current_load_ema
|
||||||
|
.store(2000, Ordering::Relaxed);
|
||||||
|
|
||||||
|
stream_load_ema.update_ema_if_needed();
|
||||||
|
|
||||||
|
let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed);
|
||||||
|
assert_eq!(updated_ema, 2000);
|
||||||
|
|
||||||
|
let ema_interval = Duration::from_millis(STREAM_LOAD_EMA_INTERVAL_MS);
|
||||||
|
*stream_load_ema.last_update.write().unwrap() =
|
||||||
|
Instant::now().checked_sub(ema_interval).unwrap();
|
||||||
|
|
||||||
|
stream_load_ema.update_ema_if_needed();
|
||||||
|
assert!(
|
||||||
|
Instant::now().duration_since(*stream_load_ema.last_update.read().unwrap())
|
||||||
|
< ema_interval
|
||||||
|
);
|
||||||
|
|
||||||
|
let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed);
|
||||||
|
assert_eq!(updated_ema, 2090);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,6 +176,9 @@ pub struct StreamStats {
|
||||||
pub(crate) connection_removed: AtomicUsize,
|
pub(crate) connection_removed: AtomicUsize,
|
||||||
pub(crate) connection_remove_failed: AtomicUsize,
|
pub(crate) connection_remove_failed: AtomicUsize,
|
||||||
pub(crate) throttled_streams: AtomicUsize,
|
pub(crate) throttled_streams: AtomicUsize,
|
||||||
|
pub(crate) stream_load_ema: AtomicUsize,
|
||||||
|
pub(crate) stream_load_ema_overflow: AtomicUsize,
|
||||||
|
pub(crate) stream_load_capacity_overflow: AtomicUsize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl StreamStats {
|
impl StreamStats {
|
||||||
|
@ -411,6 +414,21 @@ impl StreamStats {
|
||||||
self.throttled_streams.swap(0, Ordering::Relaxed),
|
self.throttled_streams.swap(0, Ordering::Relaxed),
|
||||||
i64
|
i64
|
||||||
),
|
),
|
||||||
|
(
|
||||||
|
"stream_load_ema",
|
||||||
|
self.stream_load_ema.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"stream_load_ema_overflow",
|
||||||
|
self.stream_load_ema_overflow.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
|
(
|
||||||
|
"stream_load_capacity_overflow",
|
||||||
|
self.stream_load_capacity_overflow.load(Ordering::Relaxed),
|
||||||
|
i64
|
||||||
|
),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue