From ca9d4e34df5fa87b75cc4fcfa0738eeb1f8a876a Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 19 Sep 2019 16:29:52 -0700 Subject: [PATCH] Broadcast stage tuning (#5989) --- core/src/banking_stage.rs | 2 +- core/src/broadcast_stage.rs | 2 +- core/src/broadcast_stage/broadcast_utils.rs | 9 ++ .../broadcast_stage/standard_broadcast_run.rs | 36 +++-- core/src/replay_stage.rs | 13 +- core/src/shred.rs | 128 +++++++++--------- 6 files changed, 110 insertions(+), 80 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index aa030cf92..aa03ff346 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -52,7 +52,7 @@ pub const NUM_THREADS: u32 = 4; const TOTAL_BUFFERED_PACKETS: usize = 500_000; -const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 512; +const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; /// Stores the stage's thread handle and output receiver. pub struct BankingStage { diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 4a91bb334..f4196ead3 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -8,7 +8,7 @@ use crate::poh_recorder::WorkingBankEntry; use crate::result::{Error, Result}; use crate::service::Service; use crate::staking_utils; -use solana_metrics::{datapoint, inc_new_counter_error, inc_new_counter_info}; +use solana_metrics::{datapoint_info, inc_new_counter_error, inc_new_counter_info}; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError}; diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index 567496b72..f0fe41874 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -15,6 +15,11 @@ pub(super) struct ReceiveResults { pub last_tick: u64, } +/// Theis parameter tunes how many entries are received in one iteration of recv loop +/// This will prevent broadcast stage from consuming more entries, that could have led +/// to delays in shredding, and broadcasting shreds to peer validators +const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8; + pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result { let timer = Duration::new(1, 0); let (bank, (entry, mut last_tick)) = receiver.recv_timeout(timer)?; @@ -38,6 +43,10 @@ pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result last_tick = tick_height; entries.push(entry); + if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD { + break; + } + assert!(last_tick <= max_tick_height); if last_tick == max_tick_height { break; diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 0ffaa2734..a51e7fc47 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -23,16 +23,19 @@ impl StandardBroadcastRun { fn update_broadcast_stats( &mut self, + receive_entries_elapsed: u64, + shredding_elapsed: u64, + insert_shreds_elapsed: u64, broadcast_elapsed: u64, run_elapsed: u64, num_entries: usize, - to_blobs_elapsed: u64, + num_shreds: usize, blob_index: u64, ) { inc_new_counter_info!("broadcast_service-time_ms", broadcast_elapsed as usize); self.stats.num_entries.push(num_entries); - self.stats.to_blobs_elapsed.push(to_blobs_elapsed); + self.stats.to_blobs_elapsed.push(shredding_elapsed); self.stats.run_elapsed.push(run_elapsed); if self.stats.num_entries.len() >= 16 { info!( @@ -44,7 +47,16 @@ impl StandardBroadcastRun { self.stats.run_elapsed.clear(); } - datapoint!("broadcast-service", ("transmit-index", blob_index, i64)); + datapoint_info!( + "broadcast-service", + ("num_entries", num_entries as i64, i64), + ("num_shreds", num_shreds as i64, i64), + ("receive_time", receive_entries_elapsed as i64, i64), + ("shredding_time", shredding_elapsed as i64, i64), + ("insert_shred_time", insert_shreds_elapsed as i64, i64), + ("broadcast_time", broadcast_elapsed as i64, i64), + ("transmit-index", blob_index as i64, i64), + ); } } @@ -65,7 +77,6 @@ impl BroadcastRun for StandardBroadcastRun { inc_new_counter_info!("broadcast_service-entries_received", num_entries); // 2) Convert entries to blobs + generate coding blobs - let to_blobs_start = Instant::now(); let keypair = &cluster_info.read().unwrap().keypair.clone(); let latest_shred_index = blocktree .meta(bank.slot()) @@ -79,6 +90,7 @@ impl BroadcastRun for StandardBroadcastRun { 0 }; + let to_shreds_start = Instant::now(); let (shred_infos, latest_shred_index) = entries_to_shreds( receive_results.entries, last_tick, @@ -88,14 +100,15 @@ impl BroadcastRun for StandardBroadcastRun { latest_shred_index, parent_slot, ); + let to_shreds_elapsed = to_shreds_start.elapsed(); let all_seeds: Vec<[u8; 32]> = shred_infos.iter().map(|s| s.seed()).collect(); let num_shreds = shred_infos.len(); + let insert_shreds_start = Instant::now(); blocktree .insert_shreds(shred_infos.clone(), None) .expect("Failed to insert shreds in blocktree"); - - let to_blobs_elapsed = to_blobs_start.elapsed(); + let insert_shreds_elapsed = insert_shreds_start.elapsed(); // 3) Start broadcast step let broadcast_start = Instant::now(); @@ -111,14 +124,17 @@ impl BroadcastRun for StandardBroadcastRun { stakes.as_ref(), )?; - inc_new_counter_debug!("streamer-broadcast-sent", num_shreds); - let broadcast_elapsed = broadcast_start.elapsed(); self.update_broadcast_stats( + duration_as_ms(&receive_elapsed), + duration_as_ms(&to_shreds_elapsed), + duration_as_ms(&insert_shreds_elapsed), duration_as_ms(&broadcast_elapsed), - duration_as_ms(&(receive_elapsed + to_blobs_elapsed + broadcast_elapsed)), + duration_as_ms( + &(receive_elapsed + to_shreds_elapsed + insert_shreds_elapsed + broadcast_elapsed), + ), num_entries, - duration_as_ms(&to_blobs_elapsed), + num_shreds, latest_shred_index, ); diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index a681a4dfe..4c0194131 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -694,7 +694,12 @@ impl ReplayStage { let bank_progress = &mut progress .entry(bank.slot()) .or_insert_with(|| ForkProgress::new(bank.last_blockhash())); - let result = Self::verify_and_process_entries(&bank, &entries, &bank_progress.last_entry); + let result = Self::verify_and_process_entries( + &bank, + &entries, + &bank_progress.last_entry, + bank_progress.num_blobs, + ); bank_progress.num_blobs += num; if let Some(last_entry) = entries.last() { bank_progress.last_entry = last_entry.hash; @@ -707,14 +712,16 @@ impl ReplayStage { bank: &Bank, entries: &[Entry], last_entry: &Hash, + shred_index: usize, ) -> Result<()> { if !entries.verify(last_entry) { warn!( - "entry verification failed {} {} {} {}", + "entry verification failed {} {} {} {} {}", entries.len(), bank.tick_height(), last_entry, - bank.last_blockhash() + bank.last_blockhash(), + shred_index ); datapoint_error!( diff --git a/core/src/shred.rs b/core/src/shred.rs index 651b9a2af..825673524 100644 --- a/core/src/shred.rs +++ b/core/src/shred.rs @@ -23,8 +23,6 @@ lazy_static! { { serialized_size(&DataShredHeader::default()).unwrap() as usize }; static ref SIZE_OF_SIGNATURE: usize = { bincode::serialized_size(&Signature::default()).unwrap() as usize }; - static ref SIZE_OF_EMPTY_VEC: usize = - { bincode::serialized_size(&vec![0u8; 0]).unwrap() as usize }; static ref SIZE_OF_SHRED_TYPE: usize = { bincode::serialized_size(&0u8).unwrap() as usize }; } @@ -37,6 +35,69 @@ thread_local!(static PAR_THREAD_POOL: RefCell = RefCell::new(rayon:: pub const DATA_SHRED: u8 = 0b1010_0101; pub const CODING_SHRED: u8 = 0b0101_1010; +/// This limit comes from reed solomon library, but unfortunately they don't have +/// a public constant defined for it. +const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16; + +/// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds +pub const RECOMMENDED_FEC_RATE: f32 = 0.25; + +const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001; +const DATA_COMPLETE_SHRED: u8 = 0b0000_0010; + +/// A common header that is present at start of every shred +#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] +pub struct ShredCommonHeader { + pub signature: Signature, + pub slot: u64, + pub index: u32, +} + +/// A common header that is present at start of every data shred +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +pub struct DataShredHeader { + pub common_header: CodingShredHeader, + pub data_header: ShredCommonHeader, + pub parent_offset: u16, + pub flags: u8, +} + +/// The coding shred header has FEC information +#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] +pub struct CodingShredHeader { + pub shred_type: u8, + pub coding_header: ShredCommonHeader, + pub num_data_shreds: u16, + pub num_coding_shreds: u16, + pub position: u16, +} + +impl Default for DataShredHeader { + fn default() -> Self { + DataShredHeader { + common_header: CodingShredHeader { + shred_type: DATA_SHRED, + ..CodingShredHeader::default() + }, + data_header: ShredCommonHeader::default(), + parent_offset: 0, + flags: 0, + } + } +} + +impl Default for CodingShredHeader { + fn default() -> Self { + CodingShredHeader { + shred_type: CODING_SHRED, + coding_header: ShredCommonHeader::default(), + num_data_shreds: 0, + num_coding_shreds: 0, + position: 0, + } + } +} + #[derive(Clone, Debug, PartialEq)] pub struct Shred { pub headers: DataShredHeader, @@ -180,69 +241,6 @@ impl Shred { } } -/// This limit comes from reed solomon library, but unfortunately they don't have -/// a public constant defined for it. -const MAX_DATA_SHREDS_PER_FEC_BLOCK: u32 = 16; - -/// Based on rse benchmarks, the optimal erasure config uses 16 data shreds and 4 coding shreds -pub const RECOMMENDED_FEC_RATE: f32 = 0.25; - -const LAST_SHRED_IN_SLOT: u8 = 0b0000_0001; -const DATA_COMPLETE_SHRED: u8 = 0b0000_0010; - -/// A common header that is present at start of every shred -#[derive(Serialize, Clone, Deserialize, Default, PartialEq, Debug)] -pub struct ShredCommonHeader { - pub signature: Signature, - pub slot: u64, - pub index: u32, -} - -/// A common header that is present at start of every data shred -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub struct DataShredHeader { - pub common_header: CodingShredHeader, - pub data_header: ShredCommonHeader, - pub parent_offset: u16, - pub flags: u8, -} - -/// The coding shred header has FEC information -#[derive(Serialize, Clone, Deserialize, PartialEq, Debug)] -pub struct CodingShredHeader { - pub shred_type: u8, - pub coding_header: ShredCommonHeader, - pub num_data_shreds: u16, - pub num_coding_shreds: u16, - pub position: u16, -} - -impl Default for DataShredHeader { - fn default() -> Self { - DataShredHeader { - common_header: CodingShredHeader { - shred_type: DATA_SHRED, - ..CodingShredHeader::default() - }, - data_header: ShredCommonHeader::default(), - parent_offset: 0, - flags: 0, - } - } -} - -impl Default for CodingShredHeader { - fn default() -> Self { - CodingShredHeader { - shred_type: CODING_SHRED, - coding_header: ShredCommonHeader::default(), - num_data_shreds: 0, - num_coding_shreds: 0, - position: 0, - } - } -} - #[derive(Debug)] pub struct Shredder { slot: u64,