diff --git a/core/src/ancestor_hashes_service.rs b/core/src/ancestor_hashes_service.rs index db06cec05b..f4e0b90515 100644 --- a/core/src/ancestor_hashes_service.rs +++ b/core/src/ancestor_hashes_service.rs @@ -3,6 +3,7 @@ use { cluster_slots::ClusterSlots, duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision}, outstanding_requests::OutstandingRequests, + packet_threshold::DynamicPacketToProcessThreshold, repair_response::{self}, repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup}, replay_stage::DUPLICATE_THRESHOLD, @@ -12,7 +13,6 @@ use { crossbeam_channel::{unbounded, Receiver, Sender}, dashmap::{mapref::entry::Entry::Occupied, DashMap}, solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE}, - solana_measure::measure::Measure, solana_perf::{ packet::{limited_deserialize, Packet, PacketBatch}, recycler::Recycler, @@ -208,7 +208,7 @@ impl AncestorHashesService { .spawn(move || { let mut last_stats_report = Instant::now(); let mut stats = AncestorHashesResponsesStats::default(); - let mut max_packets = 1024; + let mut packet_threshold = DynamicPacketToProcessThreshold::default(); loop { let result = Self::process_new_packets_from_channel( &ancestor_hashes_request_statuses, @@ -216,13 +216,13 @@ impl AncestorHashesService { &blockstore, &outstanding_requests, &mut stats, - &mut max_packets, + &mut packet_threshold, &duplicate_slots_reset_sender, &retryable_slots_sender, ); match result { Err(Error::RecvTimeout(_)) | Ok(_) => {} - Err(err) => info!("ancestors hashes reponses listener error: {:?}", err), + Err(err) => info!("ancestors hashes responses listener error: {:?}", err), }; if exit.load(Ordering::Relaxed) { return; @@ -243,7 +243,7 @@ impl AncestorHashesService { blockstore: &Blockstore, outstanding_requests: &RwLock, stats: &mut AncestorHashesResponsesStats, - max_packets: &mut usize, + packet_threshold: &mut DynamicPacketToProcessThreshold, duplicate_slots_reset_sender: &DuplicateSlotsResetSender, retryable_slots_sender: &RetryableSlotsSender, ) -> Result<()> { @@ -254,18 +254,17 @@ impl AncestorHashesService { let mut dropped_packets = 0; while let Ok(batch) = response_receiver.try_recv() { total_packets += batch.packets.len(); - if total_packets < *max_packets { - // Drop the rest in the channel in case of DOS - packet_batches.push(batch); - } else { + if packet_threshold.should_drop(total_packets) { dropped_packets += batch.packets.len(); + } else { + packet_batches.push(batch); } } stats.dropped_packets += dropped_packets; stats.total_packets += total_packets; - let mut time = Measure::start("ancestor_hashes::handle_packets"); + let timer = Instant::now(); for packet_batch in packet_batches { Self::process_packet_batch( ancestor_hashes_request_statuses, @@ -277,14 +276,7 @@ impl AncestorHashesService { retryable_slots_sender, ); } - time.stop(); - if total_packets >= *max_packets { - if time.as_ms() > 1000 { - *max_packets = (*max_packets * 9) / 10; - } else { - *max_packets = (*max_packets * 10) / 9; - } - } + packet_threshold.update(total_packets, timer.elapsed()); Ok(()) } diff --git a/core/src/lib.rs b/core/src/lib.rs index 24383ccd95..54453bd6c4 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -34,6 +34,7 @@ pub mod ledger_cleanup_service; pub mod optimistic_confirmation_verifier; pub mod outstanding_requests; pub mod packet_hasher; +pub mod packet_threshold; pub mod progress_map; pub mod qos_service; pub mod repair_generic_traversal; diff --git a/core/src/packet_threshold.rs b/core/src/packet_threshold.rs new file mode 100644 index 0000000000..f1d534284b --- /dev/null +++ b/core/src/packet_threshold.rs @@ -0,0 +1,85 @@ +use std::time::Duration; + +enum PacketThresholdUpdate { + Increase, + Decrease, +} + +impl PacketThresholdUpdate { + const PERCENTAGE: usize = 90; + + fn calculate(&self, current: usize) -> usize { + match *self { + PacketThresholdUpdate::Increase => { + current.saturating_mul(100).saturating_div(Self::PERCENTAGE) + } + PacketThresholdUpdate::Decrease => { + current.saturating_mul(Self::PERCENTAGE).saturating_div(100) + } + } + } +} + +#[derive(Debug)] +pub struct DynamicPacketToProcessThreshold { + max_packets: usize, +} + +impl Default for DynamicPacketToProcessThreshold { + fn default() -> Self { + Self { + max_packets: Self::DEFAULT_MAX_PACKETS, + } + } +} + +impl DynamicPacketToProcessThreshold { + const DEFAULT_MAX_PACKETS: usize = 1024; + const TIME_THRESHOLD: Duration = Duration::from_secs(1); + + pub fn update(&mut self, total_packets: usize, compute_time: Duration) { + if total_packets >= self.max_packets { + let threshold_update = if compute_time > Self::TIME_THRESHOLD { + PacketThresholdUpdate::Decrease + } else { + PacketThresholdUpdate::Increase + }; + self.max_packets = threshold_update.calculate(self.max_packets); + } + } + + pub fn should_drop(&self, total: usize) -> bool { + total >= self.max_packets + } +} + +#[cfg(test)] +mod test { + use super::DynamicPacketToProcessThreshold; + use std::time::Duration; + + #[test] + fn test_dynamic_packet_threshold() { + let mut threshold = DynamicPacketToProcessThreshold::default(); + assert_eq!( + threshold.max_packets, + DynamicPacketToProcessThreshold::DEFAULT_MAX_PACKETS + ); + + assert!(!threshold.should_drop(10)); + assert!(threshold.should_drop(2000)); + + let old = threshold.max_packets; + + // Increase + let total = 2000; + let compute_time = Duration::from_millis(500); + threshold.update(total, compute_time); + assert!(threshold.max_packets > old); + + // Decrease + let compute_time = Duration::from_millis(2000); + threshold.update(total, compute_time); + assert_eq!(threshold.max_packets, old - 1); // due to rounding error, there is a difference of 1 + } +} diff --git a/core/src/serve_repair.rs b/core/src/serve_repair.rs index f643302737..0d36bea192 100644 --- a/core/src/serve_repair.rs +++ b/core/src/serve_repair.rs @@ -2,6 +2,7 @@ use { crate::{ cluster_slots::ClusterSlots, duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE, + packet_threshold::DynamicPacketToProcessThreshold, repair_response, repair_service::{OutstandingShredRepairs, RepairStats}, request_response::RequestResponse, @@ -23,7 +24,6 @@ use { blockstore::Blockstore, shred::{Nonce, Shred, SIZE_OF_NONCE}, }, - solana_measure::measure::Measure, solana_metrics::inc_new_counter_debug, solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler}, solana_sdk::{ @@ -322,7 +322,7 @@ impl ServeRepair { requests_receiver: &PacketBatchReceiver, response_sender: &PacketBatchSender, stats: &mut ServeRepairStats, - max_packets: &mut usize, + packet_threshold: &mut DynamicPacketToProcessThreshold, ) -> Result<()> { //TODO cache connections let timeout = Duration::new(1, 0); @@ -332,29 +332,21 @@ impl ServeRepair { let mut dropped_packets = 0; while let Ok(more) = requests_receiver.try_recv() { total_packets += more.packets.len(); - if total_packets < *max_packets { - // Drop the rest in the channel in case of dos - reqs_v.push(more); - } else { + if packet_threshold.should_drop(total_packets) { dropped_packets += more.packets.len(); + } else { + reqs_v.push(more); } } stats.dropped_packets += dropped_packets; stats.total_packets += total_packets; - let mut time = Measure::start("repair::handle_packets"); + let timer = Instant::now(); for reqs in reqs_v { Self::handle_packets(obj, recycler, blockstore, reqs, response_sender, stats); } - time.stop(); - if total_packets >= *max_packets { - if time.as_ms() > 1000 { - *max_packets = (*max_packets * 9) / 10; - } else { - *max_packets = (*max_packets * 10) / 9; - } - } + packet_threshold.update(total_packets, timer.elapsed()); Ok(()) } @@ -403,7 +395,7 @@ impl ServeRepair { .spawn(move || { let mut last_print = Instant::now(); let mut stats = ServeRepairStats::default(); - let mut max_packets = 1024; + let mut packet_threshold = DynamicPacketToProcessThreshold::default(); loop { let result = Self::run_listen( &me, @@ -412,7 +404,7 @@ impl ServeRepair { &requests_receiver, &response_sender, &mut stats, - &mut max_packets, + &mut packet_threshold, ); match result { Err(Error::RecvTimeout(_)) | Ok(_) => {}