diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 6b0718e68..b3a7e74db 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -3,8 +3,10 @@ use super::*; use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo; use solana_ledger::entry::Entry; use solana_ledger::shred::{Shred, Shredder, RECOMMENDED_FEC_RATE}; +use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; use solana_sdk::timing::duration_as_us; +use std::collections::HashMap; use std::time::Duration; #[derive(Default)] @@ -14,7 +16,7 @@ struct BroadcastStats { insert_shreds_elapsed: u64, broadcast_elapsed: u64, receive_elapsed: u64, - clone_and_seed_elapsed: u64, + seed_elapsed: u64, } impl BroadcastStats { @@ -23,7 +25,7 @@ impl BroadcastStats { self.shredding_elapsed = 0; self.broadcast_elapsed = 0; self.receive_elapsed = 0; - self.clone_and_seed_elapsed = 0; + self.seed_elapsed = 0; } } @@ -75,27 +77,6 @@ impl StandardBroadcastRun { last_unfinished_slot_shred } - fn coalesce_shreds( - data_shreds: Vec, - coding_shreds: Vec, - last_unfinished_slot_shred: Option, - ) -> Vec { - if let Some(shred) = last_unfinished_slot_shred { - data_shreds - .iter() - .chain(coding_shreds.iter()) - .cloned() - .chain(std::iter::once(shred)) - .collect::>() - } else { - data_shreds - .iter() - .chain(coding_shreds.iter()) - .cloned() - .collect::>() - } - } - fn entries_to_shreds( &mut self, blocktree: &Blocktree, @@ -170,79 +151,88 @@ impl StandardBroadcastRun { let last_unfinished_slot_shred = self.check_for_interrupted_slot(); // 2) Convert entries to shreds and coding shreds - let (data_shreds, coding_shreds) = self.entries_to_shreds( + let (mut data_shreds, coding_shreds) = self.entries_to_shreds( blocktree, &receive_results.entries, last_tick_height == bank.max_tick_height(), ); + if let Some(last_shred) = last_unfinished_slot_shred { + data_shreds.push(last_shred); + } let to_shreds_elapsed = to_shreds_start.elapsed(); - let clone_and_seed_start = Instant::now(); - let all_shreds = - Self::coalesce_shreds(data_shreds, coding_shreds, last_unfinished_slot_shred); - let all_shreds_ = all_shreds.clone(); - let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); - let clone_and_seed_elapsed = clone_and_seed_start.elapsed(); - - // 3) Insert shreds into blocktree - let insert_shreds_start = Instant::now(); - blocktree - .insert_shreds(all_shreds_, None) - .expect("Failed to insert shreds in blocktree"); - let insert_shreds_elapsed = insert_shreds_start.elapsed(); - - // 4) Broadcast the shreds - let broadcast_start = Instant::now(); let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let all_shred_bufs: Vec> = all_shreds.into_iter().map(|s| s.payload).collect(); - trace!("Broadcasting {:?} shreds", all_shred_bufs.len()); - - cluster_info.read().unwrap().broadcast_shreds( - sock, - all_shred_bufs, - &all_seeds, + self.insert_and_broadcast(data_shreds, blocktree, cluster_info, stakes.as_ref(), sock)?; + self.insert_and_broadcast( + coding_shreds, + blocktree, + cluster_info, stakes.as_ref(), + sock, )?; - let broadcast_elapsed = broadcast_start.elapsed(); - - self.update_broadcast_stats( - duration_as_us(&receive_elapsed), - duration_as_us(&to_shreds_elapsed), - duration_as_us(&insert_shreds_elapsed), - duration_as_us(&broadcast_elapsed), - duration_as_us(&clone_and_seed_elapsed), - last_tick_height == bank.max_tick_height(), - ); + self.update_broadcast_stats(BroadcastStats { + shredding_elapsed: duration_as_us(&to_shreds_elapsed), + receive_elapsed: duration_as_us(&receive_elapsed), + ..BroadcastStats::default() + }); if last_tick_height == bank.max_tick_height() { + self.report_and_reset_stats(); self.unfinished_slot = None; } Ok(()) } - #[allow(clippy::too_many_arguments)] - fn update_broadcast_stats( + fn insert_and_broadcast( &mut self, - receive_entries_elapsed: u64, - shredding_elapsed: u64, - insert_shreds_elapsed: u64, - broadcast_elapsed: u64, - clone_and_seed_elapsed: u64, - slot_ended: bool, - ) { - self.stats.receive_elapsed += receive_entries_elapsed; - self.stats.shredding_elapsed += shredding_elapsed; - self.stats.insert_shreds_elapsed += insert_shreds_elapsed; - self.stats.broadcast_elapsed += broadcast_elapsed; - self.stats.clone_and_seed_elapsed += clone_and_seed_elapsed; + shreds: Vec, + blocktree: &Arc, + cluster_info: &Arc>, + stakes: Option<&HashMap>, + sock: &UdpSocket, + ) -> Result<()> { + let seed_start = Instant::now(); + let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); + let seed_elapsed = seed_start.elapsed(); - if slot_ended { - self.report_and_reset_stats() - } + // Insert shreds into blocktree + let insert_shreds_start = Instant::now(); + blocktree + .insert_shreds(shreds.clone(), None) + .expect("Failed to insert shreds in blocktree"); + let insert_shreds_elapsed = insert_shreds_start.elapsed(); + + // Broadcast the shreds + let broadcast_start = Instant::now(); + let shred_bufs: Vec> = shreds.into_iter().map(|s| s.payload).collect(); + trace!("Broadcasting {:?} shreds", shred_bufs.len()); + + cluster_info + .read() + .unwrap() + .broadcast_shreds(sock, shred_bufs, &seeds, stakes)?; + + let broadcast_elapsed = broadcast_start.elapsed(); + + self.update_broadcast_stats(BroadcastStats { + insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed), + broadcast_elapsed: duration_as_us(&broadcast_elapsed), + seed_elapsed: duration_as_us(&seed_elapsed), + ..BroadcastStats::default() + }); + Ok(()) + } + + fn update_broadcast_stats(&mut self, stats: BroadcastStats) { + self.stats.receive_elapsed += stats.receive_elapsed; + self.stats.shredding_elapsed += stats.shredding_elapsed; + self.stats.insert_shreds_elapsed += stats.insert_shreds_elapsed; + self.stats.broadcast_elapsed += stats.broadcast_elapsed; + self.stats.seed_elapsed += stats.seed_elapsed; } fn report_and_reset_stats(&mut self) { @@ -258,11 +248,7 @@ impl StandardBroadcastRun { ), ("broadcast_time", self.stats.broadcast_elapsed as i64, i64), ("receive_time", self.stats.receive_elapsed as i64, i64), - ( - "clone_and_seed", - self.stats.clone_and_seed_elapsed as i64, - i64 - ), + ("seed", self.stats.seed_elapsed as i64, i64), ( "num_shreds", i64::from(self.unfinished_slot.unwrap().next_shred_index),