Wen_restart: check block full using blockstore (#250)

* Switch to blockstore.is_full() check because replay thread isn't active.

* Use make_chaining_slot_entries and add first_parent to the method.
Small style fixes.

* Switch to blockstore.is_full() check because replay thread isn't active.
This commit is contained in:
Wen 2024-03-14 20:45:03 -07:00 committed by GitHub
parent 2537e3e4ad
commit 5591db7801
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 50 additions and 71 deletions

View File

@ -1326,7 +1326,7 @@ mod test {
let slots: Vec<u64> = vec![1, 3, 5, 7, 8]; let slots: Vec<u64> = vec![1, 3, 5, 7, 8];
let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1; let num_entries_per_slot = max_ticks_per_n_shreds(1, None) + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot, 0);
for (mut slot_shreds, _) in shreds.into_iter() { for (mut slot_shreds, _) in shreds.into_iter() {
slot_shreds.remove(0); slot_shreds.remove(0);
blockstore.insert_shreds(slot_shreds, None, false).unwrap(); blockstore.insert_shreds(slot_shreds, None, false).unwrap();
@ -1621,7 +1621,7 @@ mod test {
let slots: Vec<u64> = vec![2, 3, 5, 7]; let slots: Vec<u64> = vec![2, 3, 5, 7];
let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1; let num_entries_per_slot = max_ticks_per_n_shreds(3, None) + 1;
let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot); let shreds = make_chaining_slot_entries(&slots, num_entries_per_slot, 0);
for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() { for (i, (mut slot_shreds, _)) in shreds.into_iter().enumerate() {
slot_shreds.remove(i); slot_shreds.remove(i);
blockstore.insert_shreds(slot_shreds, None, false).unwrap(); blockstore.insert_shreds(slot_shreds, None, false).unwrap();

View File

@ -2377,7 +2377,7 @@ mod test {
assert_eq!(repairs[2].slot(), 5); assert_eq!(repairs[2].slot(), 5);
// Simulate repair on 6 and 5 // Simulate repair on 6 and 5
for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100) { for (shreds, _) in make_chaining_slot_entries(&[5, 6], 100, 0) {
blockstore.insert_shreds(shreds, None, true).unwrap(); blockstore.insert_shreds(shreds, None, true).unwrap();
} }

View File

@ -4658,12 +4658,13 @@ pub fn make_many_slot_shreds(
pub fn make_chaining_slot_entries( pub fn make_chaining_slot_entries(
chain: &[u64], chain: &[u64],
entries_per_slot: u64, entries_per_slot: u64,
first_parent: u64,
) -> Vec<(Vec<Shred>, Vec<Entry>)> { ) -> Vec<(Vec<Shred>, Vec<Entry>)> {
let mut slots_shreds_and_entries = vec![]; let mut slots_shreds_and_entries = vec![];
for (i, slot) in chain.iter().enumerate() { for (i, slot) in chain.iter().enumerate() {
let parent_slot = { let parent_slot = {
if *slot == 0 || i == 0 { if *slot == 0 || i == 0 {
0 first_parent
} else { } else {
chain[i - 1] chain[i - 1]
} }
@ -5609,7 +5610,7 @@ pub mod tests {
let entries_per_slot = 10; let entries_per_slot = 10;
let slots = [2, 5, 10]; let slots = [2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot); let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot, 0);
// Get the shreds for slot 10, chaining to slot 5 // Get the shreds for slot 10, chaining to slot 5
let (mut orphan_child, _) = all_shreds.remove(2); let (mut orphan_child, _) = all_shreds.remove(2);
@ -5654,7 +5655,7 @@ pub mod tests {
let entries_per_slot = 10; let entries_per_slot = 10;
let mut slots = vec![2, 5, 10]; let mut slots = vec![2, 5, 10];
let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot); let mut all_shreds = make_chaining_slot_entries(&slots[..], entries_per_slot, 0);
let disconnected_slot = 4; let disconnected_slot = 4;
let (shreds0, _) = all_shreds.remove(0); let (shreds0, _) = all_shreds.remove(0);
@ -7428,7 +7429,7 @@ pub mod tests {
let blockstore = Blockstore::open(ledger_path.path()).unwrap(); let blockstore = Blockstore::open(ledger_path.path()).unwrap();
let shreds_per_slot = 10; let shreds_per_slot = 10;
let slots = vec![2, 4, 8, 12]; let slots = vec![2, 4, 8, 12];
let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot); let all_shreds = make_chaining_slot_entries(&slots, shreds_per_slot, 0);
let slot_8_shreds = all_shreds[2].0.clone(); let slot_8_shreds = all_shreds[2].0.clone();
for (slot_shreds, _) in all_shreds { for (slot_shreds, _) in all_shreds {
blockstore.insert_shreds(slot_shreds, None, false).unwrap(); blockstore.insert_shreds(slot_shreds, None, false).unwrap();
@ -9963,7 +9964,7 @@ pub mod tests {
let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot]; let slots = vec![2, unconfirmed_slot, unconfirmed_child_slot];
// Insert into slot 9, mark it as dead // Insert into slot 9, mark it as dead
let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1) let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1, 0)
.into_iter() .into_iter()
.flat_map(|x| x.0) .flat_map(|x| x.0)
.collect(); .collect();
@ -10005,7 +10006,7 @@ pub mod tests {
let unconfirmed_slot = 8; let unconfirmed_slot = 8;
let slots = vec![confirmed_slot, unconfirmed_slot]; let slots = vec![confirmed_slot, unconfirmed_slot];
let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1) let shreds: Vec<_> = make_chaining_slot_entries(&slots, 1, 0)
.into_iter() .into_iter()
.flat_map(|x| x.0) .flat_map(|x| x.0)
.collect(); .collect();

View File

@ -104,6 +104,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
cluster_info: Arc<ClusterInfo>, cluster_info: Arc<ClusterInfo>,
last_voted_fork_slots: &Vec<Slot>, last_voted_fork_slots: &Vec<Slot>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
blockstore: Arc<Blockstore>,
wen_restart_repair_slots: Arc<RwLock<Vec<Slot>>>, wen_restart_repair_slots: Arc<RwLock<Vec<Slot>>>,
exit: Arc<AtomicBool>, exit: Arc<AtomicBool>,
progress: &mut WenRestartProgress, progress: &mut WenRestartProgress,
@ -160,20 +161,18 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots(
let active_percent = last_voted_fork_slots_aggregate.active_percent(); let active_percent = last_voted_fork_slots_aggregate.active_percent();
let mut filtered_slots: Vec<Slot>; let mut filtered_slots: Vec<Slot>;
{ {
let my_bank_forks = bank_forks.read().unwrap();
filtered_slots = last_voted_fork_slots_aggregate filtered_slots = last_voted_fork_slots_aggregate
.slots_to_repair_iter() .slots_to_repair_iter()
.filter(|slot| { .filter(|slot| {
if *slot <= &root_slot || is_full_slots.contains(*slot) { if *slot <= &root_slot || is_full_slots.contains(*slot) {
return false; return false;
} }
let is_full = my_bank_forks if blockstore.is_full(**slot) {
.get(**slot)
.map_or(false, |bank| bank.is_frozen());
if is_full {
is_full_slots.insert(**slot); is_full_slots.insert(**slot);
false
} else {
true
} }
!is_full
}) })
.cloned() .cloned()
.collect(); .collect();
@ -234,6 +233,7 @@ pub fn wait_for_wen_restart(
cluster_info.clone(), cluster_info.clone(),
last_voted_fork_slots, last_voted_fork_slots,
bank_forks.clone(), bank_forks.clone(),
blockstore.clone(),
wen_restart_repair_slots.clone().unwrap(), wen_restart_repair_slots.clone().unwrap(),
exit.clone(), exit.clone(),
&mut progress, &mut progress,
@ -382,7 +382,6 @@ mod tests {
use { use {
crate::wen_restart::*, crate::wen_restart::*,
assert_matches::assert_matches, assert_matches::assert_matches,
solana_entry::entry,
solana_gossip::{ solana_gossip::{
cluster_info::ClusterInfo, cluster_info::ClusterInfo,
contact_info::ContactInfo, contact_info::ContactInfo,
@ -391,7 +390,10 @@ mod tests {
legacy_contact_info::LegacyContactInfo, legacy_contact_info::LegacyContactInfo,
restart_crds_values::RestartLastVotedForkSlots, restart_crds_values::RestartLastVotedForkSlots,
}, },
solana_ledger::{blockstore, get_tmp_ledger_path_auto_delete}, solana_ledger::{
blockstore::{make_chaining_slot_entries, Blockstore},
get_tmp_ledger_path_auto_delete,
},
solana_program::{ solana_program::{
hash::Hash, hash::Hash,
vote::state::{Vote, VoteStateUpdate}, vote::state::{Vote, VoteStateUpdate},
@ -403,7 +405,6 @@ mod tests {
}, },
}, },
solana_sdk::{ solana_sdk::{
pubkey::Pubkey,
signature::{Keypair, Signer}, signature::{Keypair, Signer},
timing::timestamp, timing::timestamp,
}, },
@ -454,6 +455,16 @@ mod tests {
pub wen_restart_proto_path: PathBuf, pub wen_restart_proto_path: PathBuf,
} }
fn insert_slots_into_blockstore(
blockstore: Arc<Blockstore>,
first_parent: Slot,
slots_to_insert: &[Slot],
) {
for (shreds, _) in make_chaining_slot_entries(slots_to_insert, 2, first_parent) {
blockstore.insert_shreds(shreds, None, false).unwrap();
}
}
fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult { fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult {
let validator_voting_keypairs: Vec<_> = let validator_voting_keypairs: Vec<_> =
(0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect(); (0..10).map(|_| ValidatorVoteKeypairs::new_rand()).collect();
@ -468,7 +479,7 @@ mod tests {
node_keypair.clone(), node_keypair.clone(),
SocketAddrSpace::Unspecified, SocketAddrSpace::Unspecified,
)); ));
let blockstore = Arc::new(blockstore::Blockstore::open(ledger_path.path()).unwrap()); let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap());
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts(
10_000, 10_000,
&validator_voting_keypairs, &validator_voting_keypairs,
@ -479,42 +490,16 @@ mod tests {
.try_into() .try_into()
.unwrap(); .unwrap();
let mut last_voted_fork_slots = Vec::new(); let mut last_voted_fork_slots = Vec::new();
last_voted_fork_slots.extend([1, last_parent]);
for i in 0..EXPECTED_SLOTS { for i in 0..EXPECTED_SLOTS {
let entries = entry::create_ticks(1, 0, Hash::default()); last_voted_fork_slots.push(
let parent_slot = if i > 0 { (RestartLastVotedForkSlots::MAX_SLOTS
(RestartLastVotedForkSlots::MAX_SLOTS.saturating_add(i))
.try_into()
.unwrap()
} else {
last_parent
};
let slot = (RestartLastVotedForkSlots::MAX_SLOTS
.saturating_add(i) .saturating_add(i)
.saturating_add(1)) as Slot; .saturating_add(1)) as Slot,
let shreds = blockstore::entries_to_test_shreds(
&entries,
slot,
parent_slot,
false,
0,
true, // merkle_variant
); );
blockstore.insert_shreds(shreds, None, false).unwrap();
last_voted_fork_slots.push(slot);
} }
// link directly to slot 1 whose distance to last_vote > RestartLastVotedForkSlots::MAX_SLOTS so it will not be included. insert_slots_into_blockstore(blockstore.clone(), 0, &last_voted_fork_slots);
let entries = entry::create_ticks(1, 0, Hash::default()); last_voted_fork_slots.insert(0, 0);
let shreds = blockstore::entries_to_test_shreds(
&entries,
last_parent,
1,
false,
0,
true, // merkle_variant
);
last_voted_fork_slots.extend([last_parent, 1]);
blockstore.insert_shreds(shreds, None, false).unwrap();
last_voted_fork_slots.sort();
last_voted_fork_slots.reverse(); last_voted_fork_slots.reverse();
let mut wen_restart_proto_path = ledger_path.path().to_path_buf(); let mut wen_restart_proto_path = ledger_path.path().to_path_buf();
wen_restart_proto_path.push("wen_restart_status.proto"); wen_restart_proto_path.push("wen_restart_status.proto");
@ -599,23 +584,6 @@ mod tests {
let _ = remove_file(&test_state.wen_restart_proto_path); let _ = remove_file(&test_state.wen_restart_proto_path);
} }
fn insert_and_freeze_slots(
bank_forks: Arc<RwLock<BankForks>>,
expected_slots_to_repair: Vec<Slot>,
) {
let mut parent_bank = bank_forks.read().unwrap().root_bank();
for slot in expected_slots_to_repair {
let mut bank_forks_rw = bank_forks.write().unwrap();
bank_forks_rw.insert(Bank::new_from_parent(
parent_bank.clone(),
&Pubkey::default(),
slot,
));
parent_bank = bank_forks_rw.get(slot).unwrap();
parent_bank.freeze();
}
}
#[test] #[test]
fn test_wen_restart_normal_flow() { fn test_wen_restart_normal_flow() {
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();
@ -673,7 +641,11 @@ mod tests {
} }
// Simulating successful repair of missing blocks. // Simulating successful repair of missing blocks.
insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); insert_slots_into_blockstore(
test_state.blockstore.clone(),
last_vote_slot,
&expected_slots_to_repair,
);
let _ = wen_restart_thread_handle.join(); let _ = wen_restart_thread_handle.join();
let progress = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap(); let progress = read_wen_restart_records(&test_state.wen_restart_proto_path).unwrap();
@ -934,6 +906,7 @@ mod tests {
let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone(); let wen_restart_proto_path_clone = test_state.wen_restart_proto_path.clone();
let cluster_info_clone = test_state.cluster_info.clone(); let cluster_info_clone = test_state.cluster_info.clone();
let bank_forks_clone = test_state.bank_forks.clone(); let bank_forks_clone = test_state.bank_forks.clone();
let blockstore_clone = test_state.blockstore.clone();
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let exit_clone = exit.clone(); let exit_clone = exit.clone();
let mut progress_clone = progress.clone(); let mut progress_clone = progress.clone();
@ -947,6 +920,7 @@ mod tests {
cluster_info_clone, cluster_info_clone,
&last_voted_fork_slots, &last_voted_fork_slots,
bank_forks_clone, bank_forks_clone,
blockstore_clone,
Arc::new(RwLock::new(Vec::new())), Arc::new(RwLock::new(Vec::new())),
exit_clone, exit_clone,
&mut progress_clone, &mut progress_clone,
@ -995,7 +969,11 @@ mod tests {
} }
// Simulating successful repair of missing blocks. // Simulating successful repair of missing blocks.
insert_and_freeze_slots(test_state.bank_forks.clone(), expected_slots_to_repair); insert_slots_into_blockstore(
test_state.blockstore.clone(),
last_vote_slot,
&expected_slots_to_repair,
);
let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); let last_voted_fork_slots = test_state.last_voted_fork_slots.clone();
wen_restart_test_succeed_after_failure( wen_restart_test_succeed_after_failure(