adds rollout path for chained Merkle shreds (#35076)
The commit adds should_chain_merkle_shreds to incrementally roll out chained Merkle shreds to clusters.
This commit is contained in:
parent
eeb0cf1ea8
commit
0cfb06f745
|
@ -23,6 +23,8 @@ pub struct ProcessShredsStats {
|
|||
num_data_shreds_hist: [usize; 5],
|
||||
// If the blockstore already has shreds for the broadcast slot.
|
||||
pub num_extant_slots: u64,
|
||||
// When looking up chained merkle root from parent slot fails.
|
||||
pub err_unknown_chained_merkle_root: u64,
|
||||
pub(crate) data_buffer_residual: usize,
|
||||
pub num_merkle_data_shreds: usize,
|
||||
pub num_merkle_coding_shreds: usize,
|
||||
|
@ -89,6 +91,11 @@ impl ProcessShredsStats {
|
|||
("sign_coding_time", self.sign_coding_elapsed, i64),
|
||||
("coding_send_time", self.coding_send_elapsed, i64),
|
||||
("num_extant_slots", self.num_extant_slots, i64),
|
||||
(
|
||||
"err_unknown_chained_merkle_root",
|
||||
self.err_unknown_chained_merkle_root,
|
||||
i64
|
||||
),
|
||||
("data_buffer_residual", self.data_buffer_residual, i64),
|
||||
("num_data_shreds_07", self.num_data_shreds_hist[0], i64),
|
||||
("num_data_shreds_15", self.num_data_shreds_hist[1], i64),
|
||||
|
@ -161,6 +168,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
|
|||
coalesce_elapsed,
|
||||
num_data_shreds_hist,
|
||||
num_extant_slots,
|
||||
err_unknown_chained_merkle_root,
|
||||
data_buffer_residual,
|
||||
num_merkle_data_shreds,
|
||||
num_merkle_coding_shreds,
|
||||
|
@ -175,6 +183,7 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
|
|||
self.get_leader_schedule_elapsed += get_leader_schedule_elapsed;
|
||||
self.coalesce_elapsed += coalesce_elapsed;
|
||||
self.num_extant_slots += num_extant_slots;
|
||||
self.err_unknown_chained_merkle_root += err_unknown_chained_merkle_root;
|
||||
self.data_buffer_residual += data_buffer_residual;
|
||||
self.num_merkle_data_shreds += num_merkle_data_shreds;
|
||||
self.num_merkle_coding_shreds += num_merkle_coding_shreds;
|
||||
|
|
|
@ -28,6 +28,7 @@ pub(super) struct ReceiveResults {
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct UnfinishedSlotInfo {
|
||||
pub(super) chained_merkle_root: Hash,
|
||||
pub next_shred_index: u32,
|
||||
pub(crate) next_code_index: u32,
|
||||
pub slot: Slot,
|
||||
|
|
|
@ -14,6 +14,8 @@ use {
|
|||
shred::{shred_code, ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
|
||||
},
|
||||
solana_sdk::{
|
||||
genesis_config::ClusterType,
|
||||
hash::Hash,
|
||||
signature::Keypair,
|
||||
timing::{duration_as_us, AtomicInterval},
|
||||
},
|
||||
|
@ -69,6 +71,7 @@ impl StandardBroadcastRun {
|
|||
&mut self,
|
||||
keypair: &Keypair,
|
||||
max_ticks_in_slot: u8,
|
||||
cluster_type: ClusterType,
|
||||
stats: &mut ProcessShredsStats,
|
||||
) -> Vec<Shred> {
|
||||
const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits();
|
||||
|
@ -85,7 +88,8 @@ impl StandardBroadcastRun {
|
|||
keypair,
|
||||
&[], // entries
|
||||
true, // is_last_in_slot,
|
||||
None, // chained_merkle_root
|
||||
should_chain_merkle_shreds(state.slot, cluster_type)
|
||||
.then_some(state.chained_merkle_root),
|
||||
state.next_shred_index,
|
||||
state.next_code_index,
|
||||
true, // merkle_variant
|
||||
|
@ -110,6 +114,7 @@ impl StandardBroadcastRun {
|
|||
blockstore: &Blockstore,
|
||||
reference_tick: u8,
|
||||
is_slot_end: bool,
|
||||
cluster_type: ClusterType,
|
||||
process_stats: &mut ProcessShredsStats,
|
||||
max_data_shreds_per_slot: u32,
|
||||
max_code_shreds_per_slot: u32,
|
||||
|
@ -121,8 +126,12 @@ impl StandardBroadcastRun {
|
|||
BroadcastError,
|
||||
> {
|
||||
let (slot, parent_slot) = self.current_slot_and_parent.unwrap();
|
||||
let (next_shred_index, next_code_index) = match &self.unfinished_slot {
|
||||
Some(state) => (state.next_shred_index, state.next_code_index),
|
||||
let (next_shred_index, next_code_index, chained_merkle_root) = match &self.unfinished_slot {
|
||||
Some(state) => (
|
||||
state.next_shred_index,
|
||||
state.next_code_index,
|
||||
state.chained_merkle_root,
|
||||
),
|
||||
None => {
|
||||
// If the blockstore has shreds for the slot, it should not
|
||||
// recreate the slot:
|
||||
|
@ -135,7 +144,17 @@ impl StandardBroadcastRun {
|
|||
return Ok((Vec::default(), Vec::default()));
|
||||
}
|
||||
}
|
||||
(0u32, 0u32)
|
||||
let chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
|
||||
slot,
|
||||
parent_slot,
|
||||
blockstore,
|
||||
)
|
||||
.unwrap_or_else(|err| {
|
||||
error!("Unknown chained Merkle root: {err}");
|
||||
process_stats.err_unknown_chained_merkle_root += 1;
|
||||
Hash::default()
|
||||
});
|
||||
(0u32, 0u32, chained_merkle_root)
|
||||
}
|
||||
};
|
||||
let shredder =
|
||||
|
@ -144,7 +163,7 @@ impl StandardBroadcastRun {
|
|||
keypair,
|
||||
entries,
|
||||
is_slot_end,
|
||||
None, // chained_merkle_root
|
||||
should_chain_merkle_shreds(slot, cluster_type).then_some(chained_merkle_root),
|
||||
next_shred_index,
|
||||
next_code_index,
|
||||
true, // merkle_variant
|
||||
|
@ -153,6 +172,10 @@ impl StandardBroadcastRun {
|
|||
);
|
||||
process_stats.num_merkle_data_shreds += data_shreds.len();
|
||||
process_stats.num_merkle_coding_shreds += coding_shreds.len();
|
||||
let chained_merkle_root = match data_shreds.iter().max_by_key(|shred| shred.index()) {
|
||||
None => chained_merkle_root,
|
||||
Some(shred) => shred.merkle_root().unwrap(),
|
||||
};
|
||||
let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
|
||||
Some(index) => index + 1,
|
||||
None => next_shred_index,
|
||||
|
@ -169,6 +192,7 @@ impl StandardBroadcastRun {
|
|||
return Err(BroadcastError::TooManyShreds);
|
||||
}
|
||||
self.unfinished_slot = Some(UnfinishedSlotInfo {
|
||||
chained_merkle_root,
|
||||
next_shred_index,
|
||||
next_code_index,
|
||||
slot,
|
||||
|
@ -232,10 +256,15 @@ impl StandardBroadcastRun {
|
|||
let mut process_stats = ProcessShredsStats::default();
|
||||
|
||||
let mut to_shreds_time = Measure::start("broadcast_to_shreds");
|
||||
let cluster_type = bank.cluster_type();
|
||||
|
||||
// 1) Check if slot was interrupted
|
||||
let prev_slot_shreds =
|
||||
self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats);
|
||||
let prev_slot_shreds = self.finish_prev_slot(
|
||||
keypair,
|
||||
bank.ticks_per_slot() as u8,
|
||||
cluster_type,
|
||||
&mut process_stats,
|
||||
);
|
||||
|
||||
// 2) Convert entries to shreds and coding shreds
|
||||
let is_last_in_slot = last_tick_height == bank.max_tick_height();
|
||||
|
@ -247,6 +276,7 @@ impl StandardBroadcastRun {
|
|||
blockstore,
|
||||
reference_tick as u8,
|
||||
is_last_in_slot,
|
||||
cluster_type,
|
||||
&mut process_stats,
|
||||
blockstore::MAX_DATA_SHREDS_PER_SLOT as u32,
|
||||
shred_code::MAX_CODE_SHREDS_PER_SLOT as u32,
|
||||
|
@ -497,10 +527,15 @@ impl BroadcastRun for StandardBroadcastRun {
|
|||
}
|
||||
}
|
||||
|
||||
fn should_chain_merkle_shreds(_slot: Slot, _cluster_type: ClusterType) -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use {
|
||||
super::*,
|
||||
rand::Rng,
|
||||
solana_entry::entry::create_ticks,
|
||||
solana_gossip::cluster_info::{ClusterInfo, Node},
|
||||
solana_ledger::{
|
||||
|
@ -510,6 +545,7 @@ mod test {
|
|||
solana_runtime::bank::Bank,
|
||||
solana_sdk::{
|
||||
genesis_config::GenesisConfig,
|
||||
hash::Hash,
|
||||
signature::{Keypair, Signer},
|
||||
},
|
||||
solana_streamer::socket::SocketAddrSpace,
|
||||
|
@ -569,6 +605,7 @@ mod test {
|
|||
let slot = 1;
|
||||
let parent = 0;
|
||||
run.unfinished_slot = Some(UnfinishedSlotInfo {
|
||||
chained_merkle_root: Hash::new_from_array(rand::thread_rng().gen()),
|
||||
next_shred_index,
|
||||
next_code_index: 17,
|
||||
slot,
|
||||
|
@ -580,7 +617,12 @@ mod test {
|
|||
run.current_slot_and_parent = Some((4, 2));
|
||||
|
||||
// Slot 2 interrupted slot 1
|
||||
let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default());
|
||||
let shreds = run.finish_prev_slot(
|
||||
&keypair,
|
||||
0, // max_ticks_in_slot
|
||||
ClusterType::Development,
|
||||
&mut ProcessShredsStats::default(),
|
||||
);
|
||||
let shred = shreds
|
||||
.first()
|
||||
.expect("Expected a shred that signals an interrupt");
|
||||
|
@ -831,6 +873,7 @@ mod test {
|
|||
&blockstore,
|
||||
0,
|
||||
false,
|
||||
ClusterType::Development,
|
||||
&mut stats,
|
||||
1000,
|
||||
1000,
|
||||
|
@ -846,6 +889,7 @@ mod test {
|
|||
&blockstore,
|
||||
0,
|
||||
false,
|
||||
ClusterType::Development,
|
||||
&mut stats,
|
||||
10,
|
||||
10,
|
||||
|
|
Loading…
Reference in New Issue