rolls out merkle shreds to ~5% of testnet (#28199)

This commit is contained in:
behzad nouri 2022-10-04 19:36:16 +00:00 committed by GitHub
parent 7fef7d569a
commit 9e7a0e7420
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 5 deletions

View File

@ -11,6 +11,7 @@ use {
solana_entry::entry::Entry, solana_entry::entry::Entry,
solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder}, solana_ledger::shred::{ProcessShredsStats, ReedSolomonCache, Shred, ShredFlags, Shredder},
solana_sdk::{ solana_sdk::{
genesis_config::ClusterType,
signature::Keypair, signature::Keypair,
timing::{duration_as_us, AtomicInterval}, timing::{duration_as_us, AtomicInterval},
}, },
@ -60,6 +61,7 @@ impl StandardBroadcastRun {
&mut self, &mut self,
keypair: &Keypair, keypair: &Keypair,
max_ticks_in_slot: u8, max_ticks_in_slot: u8,
cluster_type: ClusterType,
stats: &mut ProcessShredsStats, stats: &mut ProcessShredsStats,
) -> Vec<Shred> { ) -> Vec<Shred> {
const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits(); const SHRED_TICK_REFERENCE_MASK: u8 = ShredFlags::SHRED_TICK_REFERENCE_MASK.bits();
@ -72,16 +74,22 @@ impl StandardBroadcastRun {
let shredder = let shredder =
Shredder::new(state.slot, state.parent, reference_tick, self.shred_version) Shredder::new(state.slot, state.parent, reference_tick, self.shred_version)
.unwrap(); .unwrap();
let merkle_variant =
should_use_merkle_variant(state.slot, cluster_type, self.shred_version);
let (mut shreds, coding_shreds) = shredder.entries_to_shreds( let (mut shreds, coding_shreds) = shredder.entries_to_shreds(
keypair, keypair,
&[], // entries &[], // entries
true, // is_last_in_slot, true, // is_last_in_slot,
state.next_shred_index, state.next_shred_index,
state.next_code_index, state.next_code_index,
false, // merkle_variant merkle_variant,
&self.reed_solomon_cache, &self.reed_solomon_cache,
stats, stats,
); );
if merkle_variant {
stats.num_merkle_data_shreds += shreds.len();
stats.num_merkle_coding_shreds += coding_shreds.len();
}
self.report_and_reset_stats(true); self.report_and_reset_stats(true);
self.unfinished_slot = None; self.unfinished_slot = None;
shreds.extend(coding_shreds); shreds.extend(coding_shreds);
@ -97,6 +105,7 @@ impl StandardBroadcastRun {
blockstore: &Blockstore, blockstore: &Blockstore,
reference_tick: u8, reference_tick: u8,
is_slot_end: bool, is_slot_end: bool,
cluster_type: ClusterType,
process_stats: &mut ProcessShredsStats, process_stats: &mut ProcessShredsStats,
) -> ( ) -> (
Vec<Shred>, // data shreds Vec<Shred>, // data shreds
@ -122,16 +131,21 @@ impl StandardBroadcastRun {
}; };
let shredder = let shredder =
Shredder::new(slot, parent_slot, reference_tick, self.shred_version).unwrap(); Shredder::new(slot, parent_slot, reference_tick, self.shred_version).unwrap();
let merkle_variant = should_use_merkle_variant(slot, cluster_type, self.shred_version);
let (data_shreds, coding_shreds) = shredder.entries_to_shreds( let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair, keypair,
entries, entries,
is_slot_end, is_slot_end,
next_shred_index, next_shred_index,
next_code_index, next_code_index,
false, // merkle_variant merkle_variant,
&self.reed_solomon_cache, &self.reed_solomon_cache,
process_stats, process_stats,
); );
if merkle_variant {
process_stats.num_merkle_data_shreds += data_shreds.len();
process_stats.num_merkle_coding_shreds += coding_shreds.len();
}
let next_shred_index = match data_shreds.iter().map(Shred::index).max() { let next_shred_index = match data_shreds.iter().map(Shred::index).max() {
Some(index) => index + 1, Some(index) => index + 1,
None => next_shred_index, None => next_shred_index,
@ -206,10 +220,15 @@ impl StandardBroadcastRun {
let mut process_stats = ProcessShredsStats::default(); let mut process_stats = ProcessShredsStats::default();
let mut to_shreds_time = Measure::start("broadcast_to_shreds"); let mut to_shreds_time = Measure::start("broadcast_to_shreds");
let cluster_type = bank.cluster_type();
// 1) Check if slot was interrupted // 1) Check if slot was interrupted
let prev_slot_shreds = let prev_slot_shreds = self.finish_prev_slot(
self.finish_prev_slot(keypair, bank.ticks_per_slot() as u8, &mut process_stats); keypair,
bank.ticks_per_slot() as u8,
cluster_type,
&mut process_stats,
);
// 2) Convert entries to shreds and coding shreds // 2) Convert entries to shreds and coding shreds
let is_last_in_slot = last_tick_height == bank.max_tick_height(); let is_last_in_slot = last_tick_height == bank.max_tick_height();
@ -220,6 +239,7 @@ impl StandardBroadcastRun {
blockstore, blockstore,
reference_tick as u8, reference_tick as u8,
is_last_in_slot, is_last_in_slot,
cluster_type,
&mut process_stats, &mut process_stats,
); );
// Insert the first data shred synchronously so that blockstore stores // Insert the first data shred synchronously so that blockstore stores
@ -453,6 +473,10 @@ impl BroadcastRun for StandardBroadcastRun {
} }
} }
fn should_use_merkle_variant(slot: Slot, cluster_type: ClusterType, shred_version: u16) -> bool {
cluster_type == ClusterType::Testnet && shred_version == 24371 && (slot % 19 == 1)
}
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use { use {
@ -536,7 +560,12 @@ mod test {
run.current_slot_and_parent = Some((4, 2)); run.current_slot_and_parent = Some((4, 2));
// Slot 2 interrupted slot 1 // Slot 2 interrupted slot 1
let shreds = run.finish_prev_slot(&keypair, 0, &mut ProcessShredsStats::default()); let shreds = run.finish_prev_slot(
&keypair,
0,
ClusterType::Devnet,
&mut ProcessShredsStats::default(),
);
let shred = shreds let shred = shreds
.get(0) .get(0)
.expect("Expected a shred that signals an interrupt"); .expect("Expected a shred that signals an interrupt");

View File

@ -24,6 +24,8 @@ pub struct ProcessShredsStats {
// If the blockstore already has shreds for the broadcast slot. // If the blockstore already has shreds for the broadcast slot.
pub num_extant_slots: u64, pub num_extant_slots: u64,
pub(crate) data_buffer_residual: usize, pub(crate) data_buffer_residual: usize,
pub num_merkle_data_shreds: usize,
pub num_merkle_coding_shreds: usize,
} }
#[derive(Default, Debug, Eq, PartialEq)] #[derive(Default, Debug, Eq, PartialEq)]
@ -66,6 +68,12 @@ impl ProcessShredsStats {
("receive_time", self.receive_elapsed, i64), ("receive_time", self.receive_elapsed, i64),
("num_data_shreds", num_data_shreds, i64), ("num_data_shreds", num_data_shreds, i64),
("num_coding_shreds", num_coding_shreds, i64), ("num_coding_shreds", num_coding_shreds, i64),
("num_merkle_data_shreds", self.num_merkle_data_shreds, i64),
(
"num_merkle_coding_shreds",
self.num_merkle_coding_shreds,
i64
),
("slot_broadcast_time", slot_broadcast_time, i64), ("slot_broadcast_time", slot_broadcast_time, i64),
( (
"get_leader_schedule_time", "get_leader_schedule_time",
@ -140,6 +148,8 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
num_data_shreds_hist, num_data_shreds_hist,
num_extant_slots, num_extant_slots,
data_buffer_residual, data_buffer_residual,
num_merkle_data_shreds,
num_merkle_coding_shreds,
} = rhs; } = rhs;
self.shredding_elapsed += shredding_elapsed; self.shredding_elapsed += shredding_elapsed;
self.receive_elapsed += receive_elapsed; self.receive_elapsed += receive_elapsed;
@ -152,6 +162,8 @@ impl AddAssign<ProcessShredsStats> for ProcessShredsStats {
self.coalesce_elapsed += coalesce_elapsed; self.coalesce_elapsed += coalesce_elapsed;
self.num_extant_slots += num_extant_slots; self.num_extant_slots += num_extant_slots;
self.data_buffer_residual += data_buffer_residual; self.data_buffer_residual += data_buffer_residual;
self.num_merkle_data_shreds += num_merkle_data_shreds;
self.num_merkle_coding_shreds += num_merkle_coding_shreds;
for (i, bucket) in self.num_data_shreds_hist.iter_mut().enumerate() { for (i, bucket) in self.num_data_shreds_hist.iter_mut().enumerate() {
*bucket += num_data_shreds_hist[i]; *bucket += num_data_shreds_hist[i];
} }