coalesce entries in recv_slot_entries to target byte count (#27321)
This commit is contained in:
parent
1a836ab4af
commit
d1522fc790
|
@ -1,7 +1,9 @@
|
|||
use {
|
||||
crate::result::Result,
|
||||
bincode::serialized_size,
|
||||
crossbeam_channel::Receiver,
|
||||
solana_entry::entry::Entry,
|
||||
solana_ledger::shred::ShredData,
|
||||
solana_poh::poh_recorder::WorkingBankEntry,
|
||||
solana_runtime::bank::Bank,
|
||||
solana_sdk::clock::Slot,
|
||||
|
@ -11,6 +13,8 @@ use {
|
|||
},
|
||||
};
|
||||
|
||||
const ENTRY_COALESCE_DURATION: Duration = Duration::from_millis(50);
|
||||
|
||||
pub(super) struct ReceiveResults {
|
||||
pub entries: Vec<Entry>,
|
||||
pub time_elapsed: Duration,
|
||||
|
@ -26,45 +30,61 @@ pub struct UnfinishedSlotInfo {
|
|||
pub parent: Slot,
|
||||
}
|
||||
|
||||
/// This parameter tunes how many entries are received in one iteration of recv loop
|
||||
/// This will prevent broadcast stage from consuming more entries, that could have led
|
||||
/// to delays in shredding, and broadcasting shreds to peer validators
|
||||
const RECEIVE_ENTRY_COUNT_THRESHOLD: usize = 8;
|
||||
|
||||
pub(super) fn recv_slot_entries(receiver: &Receiver<WorkingBankEntry>) -> Result<ReceiveResults> {
|
||||
let target_serialized_batch_byte_count: u64 =
|
||||
32 * ShredData::capacity(/*merkle_proof_size*/ None).unwrap() as u64;
|
||||
let timer = Duration::new(1, 0);
|
||||
let recv_start = Instant::now();
|
||||
|
||||
let (mut bank, (entry, mut last_tick_height)) = receiver.recv_timeout(timer)?;
|
||||
|
||||
let mut entries = vec![entry];
|
||||
let mut slot = bank.slot();
|
||||
let mut max_tick_height = bank.max_tick_height();
|
||||
|
||||
assert!(last_tick_height <= max_tick_height);
|
||||
assert!(last_tick_height <= bank.max_tick_height());
|
||||
|
||||
if last_tick_height != max_tick_height {
|
||||
while let Ok((try_bank, (entry, tick_height))) = receiver.try_recv() {
|
||||
// Drain channel
|
||||
while last_tick_height != bank.max_tick_height() {
|
||||
let (try_bank, (entry, tick_height)) = match receiver.try_recv() {
|
||||
Ok(working_bank_entry) => working_bank_entry,
|
||||
Err(_) => break,
|
||||
};
|
||||
// If the bank changed, that implies the previous slot was interrupted and we do not have to
|
||||
// broadcast its entries.
|
||||
if try_bank.slot() != slot {
|
||||
if try_bank.slot() != bank.slot() {
|
||||
warn!("Broadcast for slot: {} interrupted", bank.slot());
|
||||
entries.clear();
|
||||
bank = try_bank;
|
||||
slot = bank.slot();
|
||||
max_tick_height = bank.max_tick_height();
|
||||
}
|
||||
last_tick_height = tick_height;
|
||||
entries.push(entry);
|
||||
|
||||
if entries.len() >= RECEIVE_ENTRY_COUNT_THRESHOLD {
|
||||
break;
|
||||
assert!(last_tick_height <= bank.max_tick_height());
|
||||
}
|
||||
|
||||
assert!(last_tick_height <= max_tick_height);
|
||||
if last_tick_height == max_tick_height {
|
||||
break;
|
||||
}
|
||||
let mut serialized_batch_byte_count = serialized_size(&entries)?;
|
||||
|
||||
// Wait up to `ENTRY_COALESCE_DURATION` to try to coalesce entries into a 32 shred batch
|
||||
let mut coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
|
||||
while last_tick_height != bank.max_tick_height()
|
||||
&& serialized_batch_byte_count < target_serialized_batch_byte_count
|
||||
{
|
||||
let (try_bank, (entry, tick_height)) = match receiver.recv_deadline(coalesce_deadline) {
|
||||
Ok(working_bank_entry) => working_bank_entry,
|
||||
Err(_) => break,
|
||||
};
|
||||
// If the bank changed, that implies the previous slot was interrupted and we do not have to
|
||||
// broadcast its entries.
|
||||
if try_bank.slot() != bank.slot() {
|
||||
warn!("Broadcast for slot: {} interrupted", bank.slot());
|
||||
entries.clear();
|
||||
serialized_batch_byte_count = 8; // Vec len
|
||||
bank = try_bank;
|
||||
coalesce_deadline = Instant::now() + ENTRY_COALESCE_DURATION;
|
||||
}
|
||||
last_tick_height = tick_height;
|
||||
let entry_bytes = serialized_size(&entry)?;
|
||||
serialized_batch_byte_count += entry_bytes;
|
||||
entries.push(entry);
|
||||
assert!(last_tick_height <= bank.max_tick_height());
|
||||
}
|
||||
|
||||
let time_elapsed = recv_start.elapsed();
|
||||
|
|
|
@ -51,11 +51,6 @@
|
|||
|
||||
#[cfg(test)]
|
||||
pub(crate) use shred_code::MAX_CODE_SHREDS_PER_SLOT;
|
||||
pub(crate) use shred_data::ShredData;
|
||||
pub use {
|
||||
self::stats::{ProcessShredsStats, ShredFetchStats},
|
||||
crate::shredder::Shredder,
|
||||
};
|
||||
use {
|
||||
self::{shred_code::ShredCode, traits::Shred as _},
|
||||
crate::blockstore::{self, MAX_DATA_SHREDS_PER_SLOT},
|
||||
|
@ -75,6 +70,13 @@ use {
|
|||
std::fmt::Debug,
|
||||
thiserror::Error,
|
||||
};
|
||||
pub use {
|
||||
self::{
|
||||
shred_data::ShredData,
|
||||
stats::{ProcessShredsStats, ShredFetchStats},
|
||||
},
|
||||
crate::shredder::Shredder,
|
||||
};
|
||||
|
||||
mod common;
|
||||
mod legacy;
|
||||
|
|
|
@ -99,7 +99,7 @@ impl ShredData {
|
|||
// Maximum size of ledger data that can be embedded in a data-shred.
|
||||
// merkle_proof_size is the number of proof entries in the merkle tree
|
||||
// branch. None indicates a legacy data-shred.
|
||||
pub(crate) fn capacity(merkle_proof_size: Option<u8>) -> Result<usize, Error> {
|
||||
pub fn capacity(merkle_proof_size: Option<u8>) -> Result<usize, Error> {
|
||||
match merkle_proof_size {
|
||||
None => Ok(legacy::ShredData::CAPACITY),
|
||||
Some(proof_size) => merkle::ShredData::capacity(proof_size),
|
||||
|
|
Loading…
Reference in New Issue