From 904700cc560d0514e4c4d4a5878fc4b69e253d89 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 11 Jan 2024 10:05:38 -0800 Subject: [PATCH] Use EMA to compute QUIC streamer load for staked connections (#34586) * Use EMA to compute QUIC streamer load for staked connections * change min load to 25% of max load * reduce max PPS from 500K to 250K * update ema_function to account for missing intervals * replace f64 math with u128 * track streams across all connections for a peer * u128 -> u64 * replace ' as ' type conversion to from and try_from * add counter for u64 overflow * reset recent stream load on ema interval * do not use same counter for unstaked connections from a peer IP --- streamer/src/nonblocking/quic.rs | 521 +++++++++++++++++++++++++++---- streamer/src/quic.rs | 18 ++ 2 files changed, 479 insertions(+), 60 deletions(-) diff --git a/streamer/src/nonblocking/quic.rs b/streamer/src/nonblocking/quic.rs index 951ec6cb31..76385c4370 100644 --- a/streamer/src/nonblocking/quic.rs +++ b/streamer/src/nonblocking/quic.rs @@ -28,6 +28,7 @@ use { timing, }, std::{ + cmp, iter::repeat_with, net::{IpAddr, SocketAddr, UdpSocket}, sync::{ @@ -39,10 +40,10 @@ use { tokio::{task::JoinHandle, time::timeout}, }; -/// Limit to 500K PPS -const MAX_STREAMS_PER_100MS: u64 = 500_000 / 10; +/// Limit to 250K PPS +const MAX_STREAMS_PER_MS: u64 = 250; const MAX_UNSTAKED_STREAMS_PERCENT: u64 = 20; -const STREAM_THROTTLING_INTERVAL: Duration = Duration::from_millis(100); +const STREAM_THROTTLING_INTERVAL_MS: u64 = 100; const WAIT_FOR_STREAM_TIMEOUT: Duration = Duration::from_millis(100); pub const DEFAULT_WAIT_FOR_CHUNK_TIMEOUT: Duration = Duration::from_secs(10); @@ -60,6 +61,9 @@ const CONNECTION_CLOSE_REASON_EXCEED_MAX_STREAM_COUNT: &[u8] = b"exceed_max_stre const CONNECTION_CLOSE_CODE_TOO_MANY: u32 = 4; const CONNECTION_CLOSE_REASON_TOO_MANY: &[u8] = b"too_many"; const STREAM_STOP_CODE_THROTTLING: u32 = 15; +const STREAM_LOAD_EMA_INTERVAL_MS: u64 = 5; +const STREAM_LOAD_EMA_INTERVAL_COUNT: u64 = 10; +const MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION: u64 = 8; // A sequence of bytes that is part of a packet // along with where in the packet it is @@ -85,6 +89,138 @@ struct PacketAccumulator { pub chunks: Vec, } +struct StakedStreamLoadEMA { + current_load_ema: AtomicU64, + load_in_recent_interval: AtomicU64, + last_update: RwLock, + stats: Arc, +} + +impl StakedStreamLoadEMA { + fn new(stats: Arc) -> Self { + Self { + current_load_ema: AtomicU64::default(), + load_in_recent_interval: AtomicU64::default(), + last_update: RwLock::new(Instant::now()), + stats, + } + } + + fn ema_function(current_ema: u128, recent_load: u128) -> u128 { + // Using the EMA multiplier helps in avoiding the floating point math during EMA related calculations + const STREAM_LOAD_EMA_MULTIPLIER: u128 = 1024; + let multiplied_smoothing_factor: u128 = + 2 * STREAM_LOAD_EMA_MULTIPLIER / (u128::from(STREAM_LOAD_EMA_INTERVAL_COUNT) + 1); + + // The formula is + // updated_ema = recent_load * smoothing_factor + current_ema * (1 - smoothing_factor) + // To avoid floating point math, we are using STREAM_LOAD_EMA_MULTIPLIER + // updated_ema = (recent_load * multiplied_smoothing_factor + // + current_ema * (multiplier - multiplied_smoothing_factor)) / multiplier + (recent_load * multiplied_smoothing_factor + + current_ema * (STREAM_LOAD_EMA_MULTIPLIER - multiplied_smoothing_factor)) + / STREAM_LOAD_EMA_MULTIPLIER + } + + fn update_ema(&self, time_since_last_update_ms: u128) { + // if time_since_last_update_ms > STREAM_LOAD_EMA_INTERVAL_MS, there might be intervals where ema was not updated. + // count how many updates (1 + missed intervals) are needed. + let num_extra_updates = + time_since_last_update_ms.saturating_sub(1) / u128::from(STREAM_LOAD_EMA_INTERVAL_MS); + + let load_in_recent_interval = + u128::from(self.load_in_recent_interval.swap(0, Ordering::Relaxed)); + + let mut updated_load_ema = Self::ema_function( + u128::from(self.current_load_ema.load(Ordering::Relaxed)), + load_in_recent_interval, + ); + + for _ in 0..num_extra_updates { + updated_load_ema = Self::ema_function(updated_load_ema, load_in_recent_interval); + } + + let Ok(updated_load_ema) = u64::try_from(updated_load_ema) else { + error!( + "Failed to convert EMA {} to a u64. Not updating the load EMA", + updated_load_ema + ); + self.stats + .stream_load_ema_overflow + .fetch_add(1, Ordering::Relaxed); + return; + }; + + self.current_load_ema + .store(updated_load_ema, Ordering::Relaxed); + self.stats + .stream_load_ema + .store(updated_load_ema as usize, Ordering::Relaxed); + } + + fn update_ema_if_needed(&self) { + const EMA_DURATION: Duration = Duration::from_millis(STREAM_LOAD_EMA_INTERVAL_MS); + // Read lock enables multiple connection handlers to run in parallel if interval is not expired + if Instant::now().duration_since(*self.last_update.read().unwrap()) >= EMA_DURATION { + let mut last_update_w = self.last_update.write().unwrap(); + // Recheck as some other thread might have updated the ema since this thread tried to acquire the write lock. + let since_last_update = Instant::now().duration_since(*last_update_w); + if since_last_update >= EMA_DURATION { + *last_update_w = Instant::now(); + self.update_ema(since_last_update.as_millis()); + } + } + } + + fn increment_load(&self) { + self.load_in_recent_interval.fetch_add(1, Ordering::Relaxed); + self.update_ema_if_needed(); + } + + fn available_load_capacity_in_duration( + &self, + stake: u64, + total_stake: u64, + duration_ms: u64, + ) -> u64 { + let ema_window_ms = STREAM_LOAD_EMA_INTERVAL_MS * STREAM_LOAD_EMA_INTERVAL_COUNT; + let max_load_in_ema_window = u128::from( + (MAX_STREAMS_PER_MS + - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_MS)) + * ema_window_ms, + ); + + // If the current load is low, cap it to 25% of max_load. + let current_load = cmp::max( + u128::from(self.current_load_ema.load(Ordering::Relaxed)), + max_load_in_ema_window / 4, + ); + + // Formula is (max_load ^ 2 / current_load) * (stake / total_stake) + let capacity_in_ema_window = + (max_load_in_ema_window * max_load_in_ema_window * u128::from(stake)) + / (current_load * u128::from(total_stake)); + + let calculated_capacity = + capacity_in_ema_window * u128::from(duration_ms) / u128::from(ema_window_ms); + let calculated_capacity = u64::try_from(calculated_capacity).unwrap_or_else(|_| { + error!( + "Failed to convert stream capacity {} to u64. Using minimum load capacity", + calculated_capacity + ); + self.stats + .stream_load_capacity_overflow + .fetch_add(1, Ordering::Relaxed); + MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION + }); + + cmp::max( + calculated_capacity, + MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION, + ) + } +} + #[allow(clippy::too_many_arguments)] pub fn spawn_server( name: &'static str, @@ -147,6 +283,7 @@ async fn run_server( let unstaked_connection_table: Arc> = Arc::new(Mutex::new( ConnectionTable::new(ConnectionPeerType::Unstaked), )); + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(stats.clone())); let staked_connection_table: Arc> = Arc::new(Mutex::new(ConnectionTable::new(ConnectionPeerType::Staked))); let (sender, receiver) = async_unbounded(); @@ -178,6 +315,7 @@ async fn run_server( max_unstaked_connections, stats.clone(), wait_for_chunk_timeout, + stream_load_ema.clone(), )); } else { debug!("accept(): Timed out waiting for connection"); @@ -311,6 +449,7 @@ fn handle_and_cache_new_connection( connection_table: Arc>, params: &NewConnectionHandlerParams, wait_for_chunk_timeout: Duration, + stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { if let Ok(max_uni_streams) = VarInt::from_u64(compute_max_allowed_uni_streams( connection_table_l.peer_type, @@ -342,14 +481,16 @@ fn handle_and_cache_new_connection( remote_addr, ); - if let Some((last_update, stream_exit)) = connection_table_l.try_add_connection( - ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), - remote_addr.port(), - Some(connection.clone()), - params.stake, - timing::timestamp(), - params.max_connections_per_peer, - ) { + if let Some((last_update, stream_exit, stream_counter)) = connection_table_l + .try_add_connection( + ConnectionTableKey::new(remote_addr.ip(), params.remote_pubkey), + remote_addr.port(), + Some(connection.clone()), + params.stake, + timing::timestamp(), + params.max_connections_per_peer, + ) + { let peer_type = connection_table_l.peer_type; drop(connection_table_l); tokio::spawn(handle_connection( @@ -361,6 +502,8 @@ fn handle_and_cache_new_connection( params.clone(), peer_type, wait_for_chunk_timeout, + stream_load_ema, + stream_counter, )); Ok(()) } else { @@ -389,6 +532,7 @@ fn prune_unstaked_connections_and_add_new_connection( max_connections: usize, params: &NewConnectionHandlerParams, wait_for_chunk_timeout: Duration, + stream_load_ema: Arc, ) -> Result<(), ConnectionHandlerError> { let stats = params.stats.clone(); if max_connections > 0 { @@ -401,6 +545,7 @@ fn prune_unstaked_connections_and_add_new_connection( connection_table_clone, params, wait_for_chunk_timeout, + stream_load_ema, ) } else { connection.close( @@ -467,6 +612,7 @@ async fn setup_connection( max_unstaked_connections: usize, stats: Arc, wait_for_chunk_timeout: Duration, + stream_load_ema: Arc, ) { const PRUNE_RANDOM_SAMPLE_SIZE: usize = 2; let from = connecting.remote_address(); @@ -510,6 +656,7 @@ async fn setup_connection( staked_connection_table.clone(), ¶ms, wait_for_chunk_timeout, + stream_load_ema.clone(), ) { stats .connection_added_from_staked_peer @@ -525,6 +672,7 @@ async fn setup_connection( max_unstaked_connections, ¶ms, wait_for_chunk_timeout, + stream_load_ema.clone(), ) { stats .connection_added_from_staked_peer @@ -544,6 +692,7 @@ async fn setup_connection( max_unstaked_connections, ¶ms, wait_for_chunk_timeout, + stream_load_ema.clone(), ) { stats .connection_added_from_unstaked_peer @@ -685,31 +834,30 @@ async fn packet_batch_sender( } } -fn max_streams_for_connection_in_100ms( +fn max_streams_for_connection_in_duration( connection_type: ConnectionPeerType, stake: u64, total_stake: u64, + ema_load: Arc, + duration_ms: u64, ) -> u64 { if matches!(connection_type, ConnectionPeerType::Unstaked) || stake == 0 { + let max_num_connections = u64::try_from(MAX_UNSTAKED_CONNECTIONS).unwrap_or_else(|_| { + error!( + "Failed to convert maximum number of unstaked connections {} to u64.", + MAX_UNSTAKED_CONNECTIONS + ); + 500 + }); Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT) - .apply_to(MAX_STREAMS_PER_100MS) - .saturating_div(MAX_UNSTAKED_CONNECTIONS as u64) + .apply_to(MAX_STREAMS_PER_MS * duration_ms) + .saturating_div(max_num_connections) } else { - let max_total_staked_streams: u64 = MAX_STREAMS_PER_100MS - - Percentage::from(MAX_UNSTAKED_STREAMS_PERCENT).apply_to(MAX_STREAMS_PER_100MS); - ((max_total_staked_streams as f64 / total_stake as f64) * stake as f64) as u64 - } -} - -fn reset_throttling_params_if_needed(last_instant: &mut tokio::time::Instant) -> bool { - if tokio::time::Instant::now().duration_since(*last_instant) > STREAM_THROTTLING_INTERVAL { - *last_instant = tokio::time::Instant::now(); - true - } else { - false + ema_load.available_load_capacity_in_duration(stake, total_stake, duration_ms) } } +#[allow(clippy::too_many_arguments)] async fn handle_connection( connection: Connection, remote_addr: SocketAddr, @@ -719,6 +867,8 @@ async fn handle_connection( params: NewConnectionHandlerParams, peer_type: ConnectionPeerType, wait_for_chunk_timeout: Duration, + stream_load_ema: Arc, + stream_counter: Arc, ) { let stats = params.stats; debug!( @@ -729,30 +879,48 @@ async fn handle_connection( ); let stable_id = connection.stable_id(); stats.total_connections.fetch_add(1, Ordering::Relaxed); - let max_streams_per_100ms = - max_streams_for_connection_in_100ms(peer_type, params.stake, params.total_stake); - let mut last_throttling_instant = tokio::time::Instant::now(); - let mut streams_in_current_interval = 0; + let mut max_streams_per_throttling_interval = max_streams_for_connection_in_duration( + peer_type, + params.stake, + params.total_stake, + stream_load_ema.clone(), + STREAM_THROTTLING_INTERVAL_MS, + ); + let staked_stream = matches!(peer_type, ConnectionPeerType::Staked) && params.stake > 0; while !stream_exit.load(Ordering::Relaxed) { if let Ok(stream) = tokio::time::timeout(WAIT_FOR_STREAM_TIMEOUT, connection.accept_uni()).await { match stream { Ok(mut stream) => { - if reset_throttling_params_if_needed(&mut last_throttling_instant) { - streams_in_current_interval = 0; - } else if streams_in_current_interval >= max_streams_per_100ms { + if staked_stream { + max_streams_per_throttling_interval = stream_load_ema + .available_load_capacity_in_duration( + params.stake, + params.total_stake, + STREAM_THROTTLING_INTERVAL_MS, + ); + } + + stream_counter.reset_throttling_params_if_needed(); + if stream_counter.stream_count.load(Ordering::Relaxed) + >= max_streams_per_throttling_interval + { stats.throttled_streams.fetch_add(1, Ordering::Relaxed); let _ = stream.stop(VarInt::from_u32(STREAM_STOP_CODE_THROTTLING)); continue; } - streams_in_current_interval = streams_in_current_interval.saturating_add(1); + if staked_stream { + stream_load_ema.increment_load(); + } + stream_counter.stream_count.fetch_add(1, Ordering::Relaxed); stats.total_streams.fetch_add(1, Ordering::Relaxed); stats.total_new_streams.fetch_add(1, Ordering::Relaxed); let stream_exit = stream_exit.clone(); let stats = stats.clone(); let packet_sender = params.packet_sender.clone(); let last_update = last_update.clone(); + let stream_load_ema = stream_load_ema.clone(); tokio::spawn(async move { let mut maybe_batch = None; // The min is to guard against a value too small which can wake up unnecessarily @@ -793,6 +961,9 @@ async fn handle_connection( } } stats.total_streams.fetch_sub(1, Ordering::Relaxed); + if staked_stream { + stream_load_ema.update_ema_if_needed(); + } }); } Err(e) => { @@ -931,6 +1102,37 @@ async fn handle_chunk( false } +#[derive(Debug)] +struct ConnectionStreamCounter { + stream_count: AtomicU64, + last_throttling_instant: RwLock, +} + +impl ConnectionStreamCounter { + fn new() -> Self { + Self { + stream_count: AtomicU64::default(), + last_throttling_instant: RwLock::new(tokio::time::Instant::now()), + } + } + + fn reset_throttling_params_if_needed(&self) { + const THROTTLING_INTERVAL: Duration = Duration::from_millis(STREAM_THROTTLING_INTERVAL_MS); + if tokio::time::Instant::now().duration_since(*self.last_throttling_instant.read().unwrap()) + > THROTTLING_INTERVAL + { + let mut last_throttling_instant = self.last_throttling_instant.write().unwrap(); + // Recheck as some other thread might have done throttling since this thread tried to acquire the write lock. + if tokio::time::Instant::now().duration_since(*last_throttling_instant) + > THROTTLING_INTERVAL + { + *last_throttling_instant = tokio::time::Instant::now(); + self.stream_count.store(0, Ordering::Relaxed); + } + } + } +} + #[derive(Debug)] struct ConnectionEntry { exit: Arc, @@ -938,6 +1140,7 @@ struct ConnectionEntry { last_update: Arc, port: u16, connection: Option, + stream_counter: Arc, } impl ConnectionEntry { @@ -947,6 +1150,7 @@ impl ConnectionEntry { last_update: Arc, port: u16, connection: Option, + stream_counter: Arc, ) -> Self { Self { exit, @@ -954,6 +1158,7 @@ impl ConnectionEntry { last_update, port, connection, + stream_counter, } } @@ -1064,7 +1269,11 @@ impl ConnectionTable { stake: u64, last_update: u64, max_connections_per_peer: usize, - ) -> Option<(Arc, Arc)> { + ) -> Option<( + Arc, + Arc, + Arc, + )> { let connection_entry = self.table.entry(key).or_default(); let has_connection_capacity = connection_entry .len() @@ -1074,15 +1283,26 @@ impl ConnectionTable { if has_connection_capacity { let exit = Arc::new(AtomicBool::new(false)); let last_update = Arc::new(AtomicU64::new(last_update)); + let stream_counter = if stake > 0 { + connection_entry + .first() + .map(|entry| entry.stream_counter.clone()) + .unwrap_or(Arc::new(ConnectionStreamCounter::new())) + } else { + // Unstaked connections are tracked using peer IP address. It's possible that different clients + // use the same IP due to NAT. So counting all the streams from a given IP could be too restrictive. + Arc::new(ConnectionStreamCounter::new()) + }; connection_entry.push(ConnectionEntry::new( exit.clone(), stake, last_update.clone(), port, connection, + stream_counter.clone(), )); self.total_size += 1; - Some((last_update, exit)) + Some((last_update, exit, stream_counter)) } else { if let Some(connection) = connection { connection.close( @@ -2030,38 +2250,219 @@ pub mod test { } #[test] - fn test_max_streams_for_connection_in_100ms() { - // 50K packets per ms * 20% / 500 max unstaked connections + fn test_max_streams_for_unstaked_connection_in_100ms() { + let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 0, 10000), - 20 + max_streams_for_connection_in_duration( + ConnectionPeerType::Unstaked, + 0, + 10000, + load_ema.clone(), + 100 + ), + 10 ); - // 50K packets per ms * 20% / 500 max unstaked connections + // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_100ms(ConnectionPeerType::Unstaked, 10, 10000), - 20 + max_streams_for_connection_in_duration( + ConnectionPeerType::Unstaked, + 10, + 10000, + load_ema.clone(), + 100 + ), + 10 ); // If stake is 0, same limits as unstaked connections will apply. - // 50K packets per ms * 20% / 500 max unstaked connections + // 25K packets per ms * 20% / 500 max unstaked connections assert_eq!( - max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 0, 10000), - 20 - ); - - // max staked streams = 50K packets per ms * 80% = 40K - // function = 40K * stake / total_stake - assert_eq!( - max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 15, 10000), - 60 - ); - - // max staked streams = 50K packets per ms * 80% = 40K - // function = 40K * stake / total_stake - assert_eq!( - max_streams_for_connection_in_100ms(ConnectionPeerType::Staked, 1000, 10000), - 4000 + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 0, + 10000, + load_ema.clone(), + 100 + ), + 10 ); } + #[test] + fn test_max_streams_for_staked_connection_in_100ms() { + let load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + + // EMA load is used for staked connections to calculate max number of allowed streams. + // EMA window = 5ms interval * 10 intervals = 50ms + // max streams per window = 250K streams/sec * 80% = 200K/sec = 10K per 50ms + // max_streams in 50ms = ((10K * 10K) / ema_load) * stake / total_stake + // + // Stream throttling window is 100ms. So it'll double the amount of max streams. + // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / ema_load) * stake / total_stake + + load_ema.current_load_ema.store(10000, Ordering::Relaxed); + // ema_load = 10K, stake = 15, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 15 / 10K = 30 + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 15, + 10000, + load_ema.clone(), + 100 + ), + 30 + ); + + // ema_load = 10K, stake = 1K, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 10K) * 1K / 10K = 2K + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 1000, + 10000, + load_ema.clone(), + 100 + ), + 2000 + ); + + load_ema.current_load_ema.store(2500, Ordering::Relaxed); + // ema_load = 2.5K, stake = 15, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 15 / 10K = 120 + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 15, + 10000, + load_ema.clone(), + 100 + ), + 120 + ); + + // ema_load = 2.5K, stake = 1K, total_stake = 10K + // max_streams in 100ms (throttling window) = 2 * ((10K * 10K) / 2.5K) * 1K / 10K = 8000 + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 1000, + 10000, + load_ema.clone(), + 100 + ), + 8000 + ); + + // At 2000, the load is less than 25% of max_load (10K). + // Test that we cap it to 25%, yielding the same result as if load was 2500. + load_ema.current_load_ema.store(2000, Ordering::Relaxed); + // function = ((10K * 10K) / 25% of 10K) * stake / total_stake + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 15, + 10000, + load_ema.clone(), + 100 + ), + 120 + ); + + // function = ((10K * 10K) / 25% of 10K) * stake / total_stake + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 1000, + 10000, + load_ema.clone(), + 100 + ), + 8000 + ); + + // At 1/40000 stake weight, and minimum load, it should still allow + // MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION of streams. + assert_eq!( + max_streams_for_connection_in_duration( + ConnectionPeerType::Staked, + 1, + 40000, + load_ema.clone(), + 100 + ), + MIN_STREAMS_PER_THROTTLING_INTERVAL_FOR_STAKED_CONNECTION + ); + } + + #[test] + fn test_update_ema() { + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + stream_load_ema + .load_in_recent_interval + .store(2500, Ordering::Relaxed); + stream_load_ema + .current_load_ema + .store(2000, Ordering::Relaxed); + + stream_load_ema.update_ema(5); + + let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed); + assert_eq!(updated_ema, 2090); + + stream_load_ema + .load_in_recent_interval + .store(2500, Ordering::Relaxed); + + stream_load_ema.update_ema(5); + + let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed); + assert_eq!(updated_ema, 2164); + } + + #[test] + fn test_update_ema_missing_interval() { + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + stream_load_ema + .load_in_recent_interval + .store(2500, Ordering::Relaxed); + stream_load_ema + .current_load_ema + .store(2000, Ordering::Relaxed); + + stream_load_ema.update_ema(8); + + let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed); + assert_eq!(updated_ema, 2164); + } + + #[test] + fn test_update_ema_if_needed() { + let stream_load_ema = Arc::new(StakedStreamLoadEMA::new(Arc::new(StreamStats::default()))); + stream_load_ema + .load_in_recent_interval + .store(2500, Ordering::Relaxed); + stream_load_ema + .current_load_ema + .store(2000, Ordering::Relaxed); + + stream_load_ema.update_ema_if_needed(); + + let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed); + assert_eq!(updated_ema, 2000); + + let ema_interval = Duration::from_millis(STREAM_LOAD_EMA_INTERVAL_MS); + *stream_load_ema.last_update.write().unwrap() = + Instant::now().checked_sub(ema_interval).unwrap(); + + stream_load_ema.update_ema_if_needed(); + assert!( + Instant::now().duration_since(*stream_load_ema.last_update.read().unwrap()) + < ema_interval + ); + + let updated_ema = stream_load_ema.current_load_ema.load(Ordering::Relaxed); + assert_eq!(updated_ema, 2090); + } } diff --git a/streamer/src/quic.rs b/streamer/src/quic.rs index 5a56c74f45..617341fd5d 100644 --- a/streamer/src/quic.rs +++ b/streamer/src/quic.rs @@ -176,6 +176,9 @@ pub struct StreamStats { pub(crate) connection_removed: AtomicUsize, pub(crate) connection_remove_failed: AtomicUsize, pub(crate) throttled_streams: AtomicUsize, + pub(crate) stream_load_ema: AtomicUsize, + pub(crate) stream_load_ema_overflow: AtomicUsize, + pub(crate) stream_load_capacity_overflow: AtomicUsize, } impl StreamStats { @@ -411,6 +414,21 @@ impl StreamStats { self.throttled_streams.swap(0, Ordering::Relaxed), i64 ), + ( + "stream_load_ema", + self.stream_load_ema.load(Ordering::Relaxed), + i64 + ), + ( + "stream_load_ema_overflow", + self.stream_load_ema_overflow.load(Ordering::Relaxed), + i64 + ), + ( + "stream_load_capacity_overflow", + self.stream_load_capacity_overflow.load(Ordering::Relaxed), + i64 + ), ); } }