From 5591db78018dbe6052be84a289c0c5ce722babee Mon Sep 17 00:00:00 2001 From: Wen <113942165+wen-coding@users.noreply.github.com> Date: Thu, 14 Mar 2024 20:45:03 -0700 Subject: [PATCH] Wen_restart: check block full using blockstore (#250) * Switch to blockstore.is_full() check because replay thread isn't active. * Use make_chaining_slot_entries and add first_parent to the method. Small style fixes. * Switch to blockstore.is_full() check because replay thread isn't active. --- core/src/repair/repair_service.rs | 4 +- core/src/repair/repair_weight.rs | 2 +- ledger/src/blockstore.rs | 13 ++-- wen-restart/src/wen_restart.rs | 102 ++++++++++++------------------ 4 files changed, 50 insertions(+), 71 deletions(-) diff --git a/core/src/repair/repair_service.rs b/core/src/repair/repair_service.rs index eb516fc74..9431d7a25 100644 --- a/core/src/repair/repair_service.rs +++ b/core/src/repair/repair_service.rs @@ -1326,7 +1326,7 @@ mod test { let slots: Vec = vec![1, 3, 5, 7, 8]; let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; - let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); + let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot, 0); for (mut slot_shreds, _) in shreds.into_iter() { slot_shreds.remove(0); blockstore.insert_shreds(slot_shreds, None, false).unwrap(); @@ -1621,7 +1621,7 @@ mod test { let slots: Vec = vec![2, 3, 5, 7]; let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1; - let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); + let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot, 0); for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() { slot_shreds.remove(i); blockstore.insert_shreds(slot_shreds, None, false).unwrap(); diff --git a/core/src/repair/repair_weight.rs b/core/src/repair/repair_weight.rs index 7e65cfaa2..cb15c2de2 100644 --- a/core/src/repair/repair_weight.rs +++ b/core/src/repair/repair_weight.rs @@ -2377,7 +2377,7 @@ mod test { assert_eq!(repairs[2].slot(), 5); // Simulate repair on 6 and 5 - for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100) { + for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100, 0) { blockstore.insert_shreds(shreds, None, true).unwrap(); } diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index f15976abd..9e10a4548 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -4658,12 +4658,13 @@ pub fn make_many_slot_shreds( pub fn make_chaining_slot_entries( chain: &[u64], entries_per_slot: u64, + first_parent: u64, ) -> Vec<(Vec, Vec)> { let mut slots_shreds_and_entries = vec![]; for (i, slot) in chain.iter().enumerate() { let parent_slot = { if *slot == 0 || i == 0 { - 0 + first_parent } else { chain[i - 1] } @@ -5609,7 +5610,7 @@ pub mod tests { let entries_per_slot = 10; let slots = [2, 5, 10]; - let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot); + let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot, 0); // Get the shreds for slot 10, chaining to slot 5 let (mut orphan_child, _) = all_shreds.remove(2); @@ -5654,7 +5655,7 @@ pub mod tests { let entries_per_slot = 10; let mut slots = vec![2, 5, 10]; - let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot); + let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot, 0); let disconnected_slot = 4; let (shreds0, _) = all_shreds.remove(0); @@ -7428,7 +7429,7 @@ pub mod tests { let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let shreds_per_slot = 10; let slots = vec![2, 4, 8, 12]; - let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot); + let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot, 0); let slot_8_shreds = all_shreds[2].0.clone(); for (slot_shreds, _) in all_shreds { blockstore.insert_shreds(slot_shreds, None, false).unwrap(); @@ -9963,7 +9964,7 @@ pub mod tests { let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot]; // Insert into slot 9, mark it as dead - let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1) + let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1, 0) .into_iter() .flat_map(|x| x.0) .collect(); @@ -10005,7 +10006,7 @@ pub mod tests { let unconfirmed_slot = 8; let slots = vec![confirmed_slot, unconfirmed_slot]; - let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1) + let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1, 0) .into_iter() .flat_map(|x| x.0) .collect(); diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 5f7d66a78..fe28270a4 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -104,6 +104,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( cluster_info: Arc, last_voted_fork_slots: &Vec, bank_forks: Arc>, + blockstore: Arc, wen_restart_repair_slots: Arc>>, exit: Arc, progress: &mut WenRestartProgress, @@ -160,20 +161,18 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( let active_percent = last_voted_fork_slots_aggregate.active_percent(); let mut filtered_slots: Vec; { - let my_bank_forks = bank_forks.read().unwrap(); filtered_slots = last_voted_fork_slots_aggregate .slots_to_repair_iter() .filter(|slot| { if *slot <= &root_slot || is_full_slots.contains(*slot) { return false; } - let is_full = my_bank_forks - .get(**slot) - .map_or(false, |bank| bank.is_frozen()); - if is_full { + if blockstore.is_full(**slot) { is_full_slots.insert(**slot); + false + } else { + true } - !is_full }) .cloned() .collect(); @@ -234,6 +233,7 @@ pub fn wait_for_wen_restart( cluster_info.clone(), last_voted_fork_slots, bank_forks.clone(), + blockstore.clone(), wen_restart_repair_slots.clone().unwrap(), exit.clone(), &mut progress, @@ -382,7 +382,6 @@ mod tests { use { crate::wen_restart::*, assert_matches::assert_matches, - solana_entry::entry, solana_gossip::{ cluster_info::ClusterInfo, contact_info::ContactInfo, @@ -391,7 +390,10 @@ mod tests { legacy_contact_info::LegacyContactInfo, restart_crds_values::RestartLastVotedForkSlots, }, - solana_ledger::{blockstore, get_tmp_ledger_path_auto_delete}, + solana_ledger::{ + blockstore::{make_chaining_slot_entries, Blockstore}, + get_tmp_ledger_path_auto_delete, + }, solana_program::{ hash::Hash, vote::state::{Vote, VoteStateUpdate}, @@ -403,7 +405,6 @@ mod tests { }, }, solana_sdk::{ - pubkey::Pubkey, signature::{Keypair, Signer}, timing::timestamp, }, @@ -454,6 +455,16 @@ mod tests { pub wen_restart_proto_path: PathBuf, } + fn insert_slots_into_blockstore( + blockstore: Arc, + first_parent: Slot, + slots_to_insert: &[Slot], + ) { + for (shreds, _) in make_chaining_slot_entries(slots_to_insert, 2, first_parent) { + blockstore.insert_shreds(shreds, None, false).unwrap(); + } + } + fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult { let validator_voting_keypairs: Vec<_> = (0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect(); @@ -468,7 +479,7 @@ mod tests { node_keypair.clone(), SocketAddrSpace::Unspecified, )); - let blockstore = Arc::new(blockstore::Blockstore::open(ledger_path.path()).unwrap()); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( 10_000, &validator_voting_keypairs, @@ -479,42 +490,16 @@ mod tests { .try_into() .unwrap(); let mut last_voted_fork_slots = Vec::new(); + last_voted_fork_slots.extend([1, last_parent]); for i in 0..EXPECTED_SLOTS { - let entries = entry::create_ticks(1, 0, Hash::default()); - let parent_slot = if i > 0 { - (RestartLastVotedForkSlots::MAX_SLOTS.saturating_add(i)) - .try_into() - .unwrap() - } else { - last_parent - }; - let slot = (RestartLastVotedForkSlots::MAX_SLOTS - .saturating_add(i) - .saturating_add(1)) as Slot; - let shreds = blockstore::entries_to_test_shreds( - &entries, - slot, - parent_slot, - false, - 0, - true, // merkle_variant + last_voted_fork_slots.push( + (RestartLastVotedForkSlots::MAX_SLOTS + .saturating_add(i) + .saturating_add(1)) as Slot, ); - blockstore.insert_shreds(shreds, None, false).unwrap(); - last_voted_fork_slots.push(slot); } - // link directly to slot 1 whose distance to last_vote > RestartLastVotedForkSlots::MAX_SLOTS so it will not be included. - let entries = entry::create_ticks(1, 0, Hash::default()); - let shreds = blockstore::entries_to_test_shreds( - &entries, - last_parent, - 1, - false, - 0, - true, // merkle_variant - ); - last_voted_fork_slots.extend([last_parent, 1]); - blockstore.insert_shreds(shreds, None, false).unwrap(); - last_voted_fork_slots.sort(); + insert_slots_into_blockstore(blockstore.clone(), 0, &last_voted_fork_slots); + last_voted_fork_slots.insert(0, 0); last_voted_fork_slots.reverse(); let mut wen_restart_proto_path = ledger_path.path().to_path_buf(); wen_restart_proto_path.push("wen_restart_status.proto"); @@ -599,23 +584,6 @@ mod tests { let _ = remove_file(&test_state.wen_restart_proto_path); } - fn insert_and_freeze_slots( - bank_forks: Arc>, - expected_slots_to_repair: Vec, - ) { - let mut parent_bank = bank_forks.read().unwrap().root_bank(); - for slot in expected_slots_to_repair { - let mut bank_forks_rw = bank_forks.write().unwrap(); - bank_forks_rw.insert(Bank::new_from_parent( - parent_bank.clone(), - &Pubkey::default(), - slot, - )); - parent_bank = bank_forks_rw.get(slot).unwrap(); - parent_bank.freeze(); - } - } - #[test] fn test_wen_restart_normal_flow() { let ledger_path = get_tmp_ledger_path_auto_delete!(); @@ -673,7 +641,11 @@ mod tests { } // Simulating successful repair of missing blocks. - insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + insert_slots_into_blockstore( + test_state.blockstore.clone(), + last_vote_slot, + &expected_slots_to_repair, + ); let _ = wen_restart_thread_handle.join(); let progress = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap(); @@ -934,6 +906,7 @@ mod tests { let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); let cluster_info_clone = test_state.cluster_info.clone(); let bank_forks_clone = test_state.bank_forks.clone(); + let blockstore_clone = test_state.blockstore.clone(); let exit = Arc::new(AtomicBool::new(false)); let exit_clone = exit.clone(); let mut progress_clone = progress.clone(); @@ -947,6 +920,7 @@ mod tests { cluster_info_clone, &last_voted_fork_slots, bank_forks_clone, + blockstore_clone, Arc::new(RwLock::new(Vec::new())), exit_clone, &mut progress_clone, @@ -995,7 +969,11 @@ mod tests { } // Simulating successful repair of missing blocks. - insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); + insert_slots_into_blockstore( + test_state.blockstore.clone(), + last_vote_slot, + &expected_slots_to_repair, + ); let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); wen_restart_test_succeed_after_failure(