chains Merkle shreds in broadcast duplicates (#35058)

The commit migrates
    turbine/src/broadcast_stage/broadcast_duplicates_run.rs
to use chained Merkle shreds variant.
This commit is contained in:
behzad nouri 2024-02-07 22:14:31 +00:00 committed by GitHub
parent 3ddd2352a1
commit 1b9dfd447e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 22 additions and 5 deletions

View File

@ -37,6 +37,7 @@ pub struct BroadcastDuplicatesConfig {
pub(super) struct BroadcastDuplicatesRun {
config: BroadcastDuplicatesConfig,
current_slot: Slot,
chained_merkle_root: Hash,
next_shred_index: u32,
next_code_index: u32,
shred_version: u16,
@ -57,6 +58,7 @@ impl BroadcastDuplicatesRun {
));
Self {
config,
chained_merkle_root: Hash::default(),
next_shred_index: u32::MAX,
next_code_index: 0,
shred_version,
@ -76,7 +78,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
fn run(
&mut self,
keypair: &Keypair,
_blockstore: &Blockstore,
blockstore: &Blockstore,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
@ -87,6 +89,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
let last_tick_height = receive_results.last_tick_height;
if bank.slot() != self.current_slot {
self.chained_merkle_root = broadcast_utils::get_chained_merkle_root_from_parent(
bank.slot(),
bank.parent_slot(),
blockstore,
)
.unwrap();
self.next_shred_index = 0;
self.next_code_index = 0;
self.current_slot = bank.slot();
@ -169,18 +177,25 @@ impl BroadcastRun for BroadcastDuplicatesRun {
)
.expect("Expected to create a new shredder");
// Chained Merkle shreds are always discarded in epoch 0, due to
// feature_set::enable_chained_merkle_shreds. Below can be removed once
// the feature gated code is removed.
let should_chain_merkle_shreds = bank.epoch() > 0;
let (data_shreds, coding_shreds) = shredder.entries_to_shreds(
keypair,
&receive_results.entries,
last_tick_height == bank.max_tick_height() && last_entries.is_none(),
None, // chained_merkle_root
should_chain_merkle_shreds.then_some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
&self.reed_solomon_cache,
&mut ProcessShredsStats::default(),
);
if let Some(shred) = data_shreds.iter().max_by_key(|shred| shred.index()) {
self.chained_merkle_root = shred.merkle_root().unwrap();
}
self.next_shred_index += data_shreds.len() as u32;
if let Some(index) = coding_shreds.iter().map(Shred::index).max() {
self.next_code_index = index + 1;
@ -191,7 +206,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&[original_last_entry],
true,
None, // chained_merkle_root
should_chain_merkle_shreds.then_some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
@ -205,7 +220,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
keypair,
&duplicate_extra_last_entries,
true,
None, // chained_merkle_root
should_chain_merkle_shreds.then_some(self.chained_merkle_root),
self.next_shred_index,
self.next_code_index,
true, // merkle_variant
@ -222,6 +237,8 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sigs,
);
assert_eq!(original_last_data_shred.len(), 1);
assert_eq!(partition_last_data_shred.len(), 1);
self.next_shred_index += 1;
(original_last_data_shred, partition_last_data_shred)
});