diff --git a/core/src/broadcast_stage/broadcast_utils.rs b/core/src/broadcast_stage/broadcast_utils.rs index a54b0fa79f..8079d396dd 100644 --- a/core/src/broadcast_stage/broadcast_utils.rs +++ b/core/src/broadcast_stage/broadcast_utils.rs @@ -1,7 +1,9 @@ use { crate::result::Result, + bincode::serialized_size, crossbeam_channel::Receiver, solana_entry::entry::Entry, + solana_ledger::shred::ShredData, solana_poh::poh_recorder::WorkingBankEntry, solana_runtime::bank::Bank, solana_sdk::clock::Slot, @@ -11,6 +13,8 @@ use { }, }; +const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50); + pub(super) struct ReceiveResults { pub entries: Vec, pub time_elapsed: Duration, @@ -26,45 +30,61 @@ pub struct UnfinishedSlotInfo { pub parent: Slot, } -/// This parameter tunes how many entries are received in one iteration of recv loop -/// This will prevent broadcast stage from consuming more entries, that could have led -/// to delays in shredding, and broadcasting shreds to peer validators -const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8; - pub(super) fn recv_slot_entries(receiver: &Receiver) -> Result { + let target_serialized_batch_byte_count: u64 = + 32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64; let timer = Duration::new(1, 0); let recv_start = Instant::now(); + let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?; let mut entries = vec![entry]; - let mut slot = bank.slot(); - let mut max_tick_height = bank.max_tick_height(); - assert!(last_tick_height <= max_tick_height); + assert!(last_tick_height <= bank.max_tick_height()); - if last_tick_height != max_tick_height { - while let Ok((try_bank, (entry, tick_height))) = receiver.try_recv() { - // If the bank changed, that implies the previous slot was interrupted and we do not have to - // broadcast its entries. - if try_bank.slot() != slot { - warn!("Broadcast for slot: {} interrupted", bank.slot()); - entries.clear(); - bank = try_bank; - slot = bank.slot(); - max_tick_height = bank.max_tick_height(); - } - last_tick_height = tick_height; - entries.push(entry); - - if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD { - break; - } - - assert!(last_tick_height <= max_tick_height); - if last_tick_height == max_tick_height { - break; - } + // Drain channel + while last_tick_height != bank.max_tick_height() { + let (try_bank, (entry, tick_height)) = match receiver.try_recv() { + Ok(working_bank_entry) => working_bank_entry, + Err(_) => break, + }; + // If the bank changed, that implies the previous slot was interrupted and we do not have to + // broadcast its entries. + if try_bank.slot() != bank.slot() { + warn!("Broadcast for slot: {} interrupted", bank.slot()); + entries.clear(); + bank = try_bank; } + last_tick_height = tick_height; + entries.push(entry); + assert!(last_tick_height <= bank.max_tick_height()); + } + + let mut serialized_batch_byte_count = serialized_size(&entries)?; + + // Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch + let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION; + while last_tick_height != bank.max_tick_height() + && serialized_batch_byte_count < target_serialized_batch_byte_count + { + let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) { + Ok(working_bank_entry) => working_bank_entry, + Err(_) => break, + }; + // If the bank changed, that implies the previous slot was interrupted and we do not have to + // broadcast its entries. + if try_bank.slot() != bank.slot() { + warn!("Broadcast for slot: {} interrupted", bank.slot()); + entries.clear(); + serialized_batch_byte_count = 8; // Vec len + bank = try_bank; + coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION; + } + last_tick_height = tick_height; + let entry_bytes = serialized_size(&entry)?; + serialized_batch_byte_count += entry_bytes; + entries.push(entry); + assert!(last_tick_height <= bank.max_tick_height()); } let time_elapsed = recv_start.elapsed(); diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 4c20924c96..aea429c5cb 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -51,11 +51,6 @@ #[cfg(test)] pub(crate) use shred_code::MAX_CODE_SHREDS_PER_SLOT; -pub(crate) use shred_data::ShredData; -pub use { - self::stats::{ProcessShredsStats, ShredFetchStats}, - crate::shredder::Shredder, -}; use { self::{shred_code::ShredCode, traits::Shred as _}, crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT}, @@ -75,6 +70,13 @@ use { std::fmt::Debug, thiserror::Error, }; +pub use { + self::{ + shred_data::ShredData, + stats::{ProcessShredsStats, ShredFetchStats}, + }, + crate::shredder::Shredder, +}; mod common; mod legacy; diff --git a/ledger/src/shred/shred_data.rs b/ledger/src/shred/shred_data.rs index 777ef6feff..d6976514e2 100644 --- a/ledger/src/shred/shred_data.rs +++ b/ledger/src/shred/shred_data.rs @@ -99,7 +99,7 @@ impl ShredData { // Maximum size of ledger data that can be embedded in a data-shred. // merkle_proof_size is the number of proof entries in the merkle tree // branch. None indicates a legacy data-shred. - pub(crate) fn capacity(merkle_proof_size: Option) -> Result { + pub fn capacity(merkle_proof_size: Option) -> Result { match merkle_proof_size { None => Ok(legacy::ShredData::CAPACITY), Some(proof_size) => merkle::ShredData::capacity(proof_size),