Refactor packet_threshold adjustment code into its own struct (#23216)

* refactor packet_threshold adjustment code into own struct and add unittest for it

* fix a typo in error message

* code review feedbacks

* another code review feedback

* Update core/src/ancestor_hashes_service.rs

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>

* share packet threshold with repair service (credit to carl)

Co-authored-by: Trent Nelson <trent.a.b.nelson@gmail.com>
This commit is contained in:
HaoranYi 2022-03-02 09:09:06 -06:00 committed by GitHub
parent 86e2f728c3
commit 8de88d0a55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 105 additions and 35 deletions

View File

@ -3,6 +3,7 @@ use {
cluster_slots::ClusterSlots,
duplicate_repair_status::{DeadSlotAncestorRequestStatus, DuplicateAncestorDecision},
outstanding_requests::OutstandingRequests,
packet_threshold::DynamicPacketToProcessThreshold,
repair_response::{self},
repair_service::{DuplicateSlotsResetSender, RepairInfo, RepairStatsGroup},
replay_stage::DUPLICATE_THRESHOLD,
@ -12,7 +13,6 @@ use {
crossbeam_channel::{unbounded, Receiver, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap},
solana_ledger::{blockstore::Blockstore, shred::SIZE_OF_NONCE},
solana_measure::measure::Measure,
solana_perf::{
packet::{limited_deserialize, Packet, PacketBatch},
recycler::Recycler,
@ -208,7 +208,7 @@ impl AncestorHashesService {
.spawn(move || {
let mut last_stats_report = Instant::now();
let mut stats = AncestorHashesResponsesStats::default();
let mut max_packets = 1024;
let mut packet_threshold = DynamicPacketToProcessThreshold::default();
loop {
let result = Self::process_new_packets_from_channel(
&ancestor_hashes_request_statuses,
@ -216,13 +216,13 @@ impl AncestorHashesService {
&blockstore,
&outstanding_requests,
&mut stats,
&mut max_packets,
&mut packet_threshold,
&duplicate_slots_reset_sender,
&retryable_slots_sender,
);
match result {
Err(Error::RecvTimeout(_)) | Ok(_) => {}
Err(err) => info!("ancestors hashes reponses listener error: {:?}", err),
Err(err) => info!("ancestors hashes responses listener error: {:?}", err),
};
if exit.load(Ordering::Relaxed) {
return;
@ -243,7 +243,7 @@ impl AncestorHashesService {
blockstore: &Blockstore,
outstanding_requests: &RwLock<OutstandingAncestorHashesRepairs>,
stats: &mut AncestorHashesResponsesStats,
max_packets: &mut usize,
packet_threshold: &mut DynamicPacketToProcessThreshold,
duplicate_slots_reset_sender: &DuplicateSlotsResetSender,
retryable_slots_sender: &RetryableSlotsSender,
) -> Result<()> {
@ -254,18 +254,17 @@ impl AncestorHashesService {
let mut dropped_packets = 0;
while let Ok(batch) = response_receiver.try_recv() {
total_packets += batch.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of DOS
packet_batches.push(batch);
} else {
if packet_threshold.should_drop(total_packets) {
dropped_packets += batch.packets.len();
} else {
packet_batches.push(batch);
}
}
stats.dropped_packets += dropped_packets;
stats.total_packets += total_packets;
let mut time = Measure::start("ancestor_hashes::handle_packets");
let timer = Instant::now();
for packet_batch in packet_batches {
Self::process_packet_batch(
ancestor_hashes_request_statuses,
@ -277,14 +276,7 @@ impl AncestorHashesService {
retryable_slots_sender,
);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
packet_threshold.update(total_packets, timer.elapsed());
Ok(())
}

View File

@ -34,6 +34,7 @@ pub mod ledger_cleanup_service;
pub mod optimistic_confirmation_verifier;
pub mod outstanding_requests;
pub mod packet_hasher;
pub mod packet_threshold;
pub mod progress_map;
pub mod qos_service;
pub mod repair_generic_traversal;

View File

@ -0,0 +1,85 @@
use std::time::Duration;
enum PacketThresholdUpdate {
Increase,
Decrease,
}
impl PacketThresholdUpdate {
const PERCENTAGE: usize = 90;
fn calculate(&self, current: usize) -> usize {
match *self {
PacketThresholdUpdate::Increase => {
current.saturating_mul(100).saturating_div(Self::PERCENTAGE)
}
PacketThresholdUpdate::Decrease => {
current.saturating_mul(Self::PERCENTAGE).saturating_div(100)
}
}
}
}
#[derive(Debug)]
pub struct DynamicPacketToProcessThreshold {
max_packets: usize,
}
impl Default for DynamicPacketToProcessThreshold {
fn default() -> Self {
Self {
max_packets: Self::DEFAULT_MAX_PACKETS,
}
}
}
impl DynamicPacketToProcessThreshold {
const DEFAULT_MAX_PACKETS: usize = 1024;
const TIME_THRESHOLD: Duration = Duration::from_secs(1);
pub fn update(&mut self, total_packets: usize, compute_time: Duration) {
if total_packets >= self.max_packets {
let threshold_update = if compute_time > Self::TIME_THRESHOLD {
PacketThresholdUpdate::Decrease
} else {
PacketThresholdUpdate::Increase
};
self.max_packets = threshold_update.calculate(self.max_packets);
}
}
pub fn should_drop(&self, total: usize) -> bool {
total >= self.max_packets
}
}
#[cfg(test)]
mod test {
use super::DynamicPacketToProcessThreshold;
use std::time::Duration;
#[test]
fn test_dynamic_packet_threshold() {
let mut threshold = DynamicPacketToProcessThreshold::default();
assert_eq!(
threshold.max_packets,
DynamicPacketToProcessThreshold::DEFAULT_MAX_PACKETS
);
assert!(!threshold.should_drop(10));
assert!(threshold.should_drop(2000));
let old = threshold.max_packets;
// Increase
let total = 2000;
let compute_time = Duration::from_millis(500);
threshold.update(total, compute_time);
assert!(threshold.max_packets > old);
// Decrease
let compute_time = Duration::from_millis(2000);
threshold.update(total, compute_time);
assert_eq!(threshold.max_packets, old - 1); // due to rounding error, there is a difference of 1
}
}

View File

@ -2,6 +2,7 @@ use {
crate::{
cluster_slots::ClusterSlots,
duplicate_repair_status::ANCESTOR_HASH_REPAIR_SAMPLE_SIZE,
packet_threshold::DynamicPacketToProcessThreshold,
repair_response,
repair_service::{OutstandingShredRepairs, RepairStats},
request_response::RequestResponse,
@ -23,7 +24,6 @@ use {
blockstore::Blockstore,
shred::{Nonce, Shred, SIZE_OF_NONCE},
},
solana_measure::measure::Measure,
solana_metrics::inc_new_counter_debug,
solana_perf::packet::{limited_deserialize, PacketBatch, PacketBatchRecycler},
solana_sdk::{
@ -322,7 +322,7 @@ impl ServeRepair {
requests_receiver: &PacketBatchReceiver,
response_sender: &PacketBatchSender,
stats: &mut ServeRepairStats,
max_packets: &mut usize,
packet_threshold: &mut DynamicPacketToProcessThreshold,
) -> Result<()> {
//TODO cache connections
let timeout = Duration::new(1, 0);
@ -332,29 +332,21 @@ impl ServeRepair {
let mut dropped_packets = 0;
while let Ok(more) = requests_receiver.try_recv() {
total_packets += more.packets.len();
if total_packets < *max_packets {
// Drop the rest in the channel in case of dos
reqs_v.push(more);
} else {
if packet_threshold.should_drop(total_packets) {
dropped_packets += more.packets.len();
} else {
reqs_v.push(more);
}
}
stats.dropped_packets += dropped_packets;
stats.total_packets += total_packets;
let mut time = Measure::start("repair::handle_packets");
let timer = Instant::now();
for reqs in reqs_v {
Self::handle_packets(obj, recycler, blockstore, reqs, response_sender, stats);
}
time.stop();
if total_packets >= *max_packets {
if time.as_ms() > 1000 {
*max_packets = (*max_packets * 9) / 10;
} else {
*max_packets = (*max_packets * 10) / 9;
}
}
packet_threshold.update(total_packets, timer.elapsed());
Ok(())
}
@ -403,7 +395,7 @@ impl ServeRepair {
.spawn(move || {
let mut last_print = Instant::now();
let mut stats = ServeRepairStats::default();
let mut max_packets = 1024;
let mut packet_threshold = DynamicPacketToProcessThreshold::default();
loop {
let result = Self::run_listen(
&me,
@ -412,7 +404,7 @@ impl ServeRepair {
&requests_receiver,
&response_sender,
&mut stats,
&mut max_packets,
&mut packet_threshold,
);
match result {
Err(Error::RecvTimeout(_)) | Ok(_) => {}