track time to coalesce entries in recv_slot_entries (#27525)

This commit is contained in:
Jeff Biseda 2022-09-06 16:07:17 -07:00 committed by GitHub
parent c62aef6e02
commit 269eb519dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 21 additions and 9 deletions

View File

@ -18,6 +18,7 @@ const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50);
pub(super) struct ReceiveResults {
pub entries: Vec<Entry>,
pub time_elapsed: Duration,
pub time_coalesced: Duration,
pub bank: Arc<Bank>,
pub last_tick_height: u64,
}
@ -35,11 +36,8 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64;
let timer = Duration::new(1, 0);
let recv_start = Instant::now();
let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?;
let mut entries = vec![entry];
assert!(last_tick_height <= bank.max_tick_height());
// Drain channel
@ -63,14 +61,15 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
let mut serialized_batch_byte_count = serialized_size(&entries)?;
// Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch
let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
let mut coalesce_start = Instant::now();
while last_tick_height != bank.max_tick_height()
&& serialized_batch_byte_count < target_serialized_batch_byte_count
{
let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
let (try_bank, (entry, tick_height)) =
match receiver.recv_deadline(coalesce_start + ENTRY_COALESCE_DURATION) {
Ok(working_bank_entry) => working_bank_entry,
Err(_) => break,
};
// If the bank changed, that implies the previous slot was interrupted and we do not have to
// broadcast its entries.
if try_bank.slot() != bank.slot() {
@ -78,7 +77,7 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
entries.clear();
serialized_batch_byte_count = 8; // Vec len
bank = try_bank;
coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
coalesce_start = Instant::now();
}
last_tick_height = tick_height;
let entry_bytes = serialized_size(&entry)?;
@ -86,11 +85,13 @@ pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result
entries.push(entry);
assert!(last_tick_height <= bank.max_tick_height());
}
let time_coalesced = coalesce_start.elapsed();
let time_elapsed = recv_start.elapsed();
Ok(ReceiveResults {
entries,
time_elapsed,
time_coalesced,
bank,
last_tick_height,
})

View File

@ -177,6 +177,7 @@ impl StandardBroadcastRun {
receive_results: ReceiveResults,
) -> Result<()> {
let mut receive_elapsed = receive_results.time_elapsed;
let mut coalesce_elapsed = receive_results.time_coalesced;
let num_entries = receive_results.entries.len();
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
@ -193,6 +194,7 @@ impl StandardBroadcastRun {
self.current_slot_and_parent = Some((slot, parent_slot));
receive_elapsed = Duration::new(0, 0);
coalesce_elapsed = Duration::new(0, 0);
}
let mut process_stats = ProcessShredsStats::default();
@ -291,6 +293,7 @@ impl StandardBroadcastRun {
process_stats.shredding_elapsed = to_shreds_time.as_us();
process_stats.get_leader_schedule_elapsed = get_leader_schedule_time.as_us();
process_stats.receive_elapsed = duration_as_us(&receive_elapsed);
process_stats.coalesce_elapsed = duration_as_us(&coalesce_elapsed);
process_stats.coding_send_elapsed = coding_send_time.as_us();
self.process_shreds_stats += process_stats;
@ -552,6 +555,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks0.clone(),
time_elapsed: Duration::new(3, 0),
time_coalesced: Duration::new(2, 0),
bank: bank0.clone(),
last_tick_height: (ticks0.len() - 1) as u64,
};
@ -620,6 +624,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks1.clone(),
time_elapsed: Duration::new(2, 0),
time_coalesced: Duration::new(1, 0),
bank: bank2,
last_tick_height: (ticks1.len() - 1) as u64,
};
@ -684,6 +689,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks,
time_elapsed: Duration::new(1, 0),
time_coalesced: Duration::new(0, 0),
bank: bank.clone(),
last_tick_height,
};
@ -727,6 +733,7 @@ mod test {
let receive_results = ReceiveResults {
entries: ticks.clone(),
time_elapsed: Duration::new(3, 0),
time_coalesced: Duration::new(2, 0),
bank: bank0,
last_tick_height: ticks.len() as u64,
};

View File

@ -17,6 +17,7 @@ pub struct ProcessShredsStats {
pub sign_coding_elapsed: u64,
pub coding_send_elapsed: u64,
pub get_leader_schedule_elapsed: u64,
pub coalesce_elapsed: u64,
// Histogram count of num_data_shreds obtained from serializing entries
// counted in 5 buckets.
num_data_shreds_hist: [usize; 5],
@ -83,6 +84,7 @@ impl ProcessShredsStats {
("num_data_shreds_31", self.num_data_shreds_hist[2], i64),
("num_data_shreds_63", self.num_data_shreds_hist[3], i64),
("num_data_shreds_64", self.num_data_shreds_hist[4], i64),
("coalesce_elapsed", self.coalesce_elapsed, i64),
);
*self = Self::default();
}
@ -134,6 +136,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
sign_coding_elapsed,
coding_send_elapsed,
get_leader_schedule_elapsed,
coalesce_elapsed,
num_data_shreds_hist,
num_extant_slots,
data_buffer_residual,
@ -146,6 +149,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
self.sign_coding_elapsed += sign_coding_elapsed;
self.coding_send_elapsed += coding_send_elapsed;
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
self.coalesce_elapsed += coalesce_elapsed;
self.num_extant_slots += num_extant_slots;
self.data_buffer_residual += data_buffer_residual;
for (i, bucket) in self.num_data_shreds_hist.iter_mut().enumerate() {