diff --git a/Cargo.lock b/Cargo.lock index 7e0d16747..0bfe2a924 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7651,6 +7651,7 @@ dependencies = [ "prost-types", "protobuf-src", "rand 0.8.5", + "rayon", "rustc_version 0.4.0", "serial_test", "solana-accounts-db", @@ -7659,6 +7660,7 @@ dependencies = [ "solana-ledger", "solana-logger", "solana-program", + "solana-program-runtime", "solana-runtime", "solana-sdk", "solana-streamer", diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 10b08510f..6fea64b03 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -1794,7 +1794,7 @@ fn supermajority_root_from_vote_accounts( // Processes and replays the contents of a single slot, returns Error // if failed to play the slot #[allow(clippy::too_many_arguments)] -fn process_single_slot( +pub fn process_single_slot( blockstore: &Blockstore, bank: &BankWithScheduler, replay_tx_thread_pool: &ThreadPool, diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index 417e8a388..ada0f1b2e 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -6610,11 +6610,14 @@ dependencies = [ "prost-build", "prost-types", "protobuf-src", + "rayon", "rustc_version", + "solana-entry", "solana-gossip", "solana-ledger", "solana-logger", "solana-program", + "solana-program-runtime", "solana-runtime", "solana-sdk", "solana-vote-program", diff --git a/wen-restart/Cargo.toml b/wen-restart/Cargo.toml index add4340cd..57f345126 100644 --- a/wen-restart/Cargo.toml +++ b/wen-restart/Cargo.toml @@ -15,10 +15,13 @@ anyhow = { workspace = true } log = { workspace = true } prost = { workspace = true } prost-types = { workspace = true } +rayon = { workspace = true } +solana-entry = { workspace = true } solana-gossip = { workspace = true } solana-ledger = { workspace = true } solana-logger = { workspace = true } solana-program = { workspace = true } +solana-program-runtime = { workspace = true } solana-runtime = { workspace = true } solana-sdk = { workspace = true } solana-vote-program = { workspace = true } diff --git a/wen-restart/src/wen_restart.rs b/wen-restart/src/wen_restart.rs index 17a909df7..bd283eb7d 100644 --- a/wen-restart/src/wen_restart.rs +++ b/wen-restart/src/wen_restart.rs @@ -14,13 +14,20 @@ use { anyhow::Result, log::*, prost::Message, + solana_entry::entry::VerifyRecyclers, solana_gossip::{ cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS}, restart_crds_values::RestartLastVotedForkSlots, }, - solana_ledger::{ancestor_iterator::AncestorIterator, blockstore::Blockstore}, + solana_ledger::{ + ancestor_iterator::AncestorIterator, + blockstore::Blockstore, + blockstore_processor::{process_single_slot, ConfirmationProgress, ProcessOptions}, + leader_schedule_cache::LeaderScheduleCache, + }, solana_program::{clock::Slot, hash::Hash}, - solana_runtime::bank_forks::BankForks, + solana_program_runtime::timings::ExecuteTimings, + solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_sdk::timing::timestamp, solana_vote_program::vote_state::VoteTransaction, std::{ @@ -42,11 +49,16 @@ use { const REPAIR_THRESHOLD: f64 = 0.42; // When counting Heaviest Fork, only count those with no less than // 67% - 5% - (100% - active_stake) = active_stake - 38% stake. +// 67% is the supermajority threshold (2/3), 5% is the assumption we +// made regarding how much non-conforming/offline validators the +// algorithm can tolerate. const HEAVIEST_FORK_THRESHOLD_DELTA: f64 = 0.38; #[derive(Debug, PartialEq)] pub enum WenRestartError { BlockNotFound(Slot), + BlockNotFull(Slot), + BlockNotFrozenAfterReplay(Slot, Option), BlockNotLinkedToExpectedParent(Slot, Option, Slot), ChildStakeLargerThanParent(Slot, u64, Slot, u64), Exiting, @@ -62,6 +74,12 @@ impl std::fmt::Display for WenRestartError { WenRestartError::BlockNotFound(slot) => { write!(f, "Block not found: {}", slot) } + WenRestartError::BlockNotFull(slot) => { + write!(f, "Block not full: {}", slot) + } + WenRestartError::BlockNotFrozenAfterReplay(slot, err) => { + write!(f, "Block not frozen after replay: {} {:?}", slot, err) + } WenRestartError::BlockNotLinkedToExpectedParent(slot, parent, expected_parent) => { write!( f, @@ -145,10 +163,7 @@ pub(crate) fn aggregate_restart_last_voted_fork_slots( exit: Arc, progress: &mut WenRestartProgress, ) -> Result { - let root_bank; - { - root_bank = bank_forks.read().unwrap().root_bank().clone(); - } + let root_bank = bank_forks.read().unwrap().root_bank(); let root_slot = root_bank.slot(); let mut last_voted_fork_slots_aggregate = LastVotedForkSlotsAggregate::new( root_slot, @@ -247,9 +262,7 @@ pub(crate) fn find_heaviest_fork( blockstore: Arc, exit: Arc, ) -> Result<(Slot, Hash)> { - // Because everything else is stopped, it's okay to grab a big lock on bank_forks. - let my_bank_forks = bank_forks.read().unwrap(); - let root_bank = my_bank_forks.root_bank().clone(); + let root_bank = bank_forks.read().unwrap().root_bank(); let root_slot = root_bank.slot(); // TODO: Should use better epoch_stakes later. let epoch_stake = root_bank.epoch_stakes(root_bank.epoch()).unwrap(); @@ -264,12 +277,16 @@ pub(crate) fn find_heaviest_fork( .map(|(slot, _)| *slot) .collect::>(); slots.sort(); + + // The heaviest slot we selected will always be the last of the slots list, or root if the list is empty. + let heaviest_fork_slot = slots.last().map_or(root_slot, |x| *x); + let mut expected_parent = root_slot; - for slot in slots { + for slot in &slots { if exit.load(Ordering::Relaxed) { return Err(WenRestartError::Exiting.into()); } - if let Ok(Some(block_meta)) = blockstore.meta(slot) { + if let Ok(Some(block_meta)) = blockstore.meta(*slot) { if block_meta.parent_slot != Some(expected_parent) { if expected_parent == root_slot { error!("First block {} in repair list not linked to local root {}, this could mean our root is too old", @@ -281,18 +298,111 @@ pub(crate) fn find_heaviest_fork( ); } return Err(WenRestartError::BlockNotLinkedToExpectedParent( - slot, + *slot, block_meta.parent_slot, expected_parent, ) .into()); } - expected_parent = slot; + if !block_meta.is_full() { + return Err(WenRestartError::BlockNotFull(*slot).into()); + } + expected_parent = *slot; } else { - return Err(WenRestartError::BlockNotFound(slot).into()); + return Err(WenRestartError::BlockNotFound(*slot).into()); } } - Ok((expected_parent, Hash::default())) + let heaviest_fork_bankhash = find_bankhash_of_heaviest_fork( + heaviest_fork_slot, + slots, + blockstore.clone(), + bank_forks.clone(), + root_bank, + &exit, + )?; + info!( + "Heaviest fork found: slot: {}, bankhash: {:?}", + heaviest_fork_slot, heaviest_fork_bankhash + ); + Ok((heaviest_fork_slot, heaviest_fork_bankhash)) +} + +// Find the hash of the heaviest fork, if block hasn't been replayed, replay to get the hash. +fn find_bankhash_of_heaviest_fork( + heaviest_fork_slot: Slot, + slots: Vec, + blockstore: Arc, + bank_forks: Arc>, + root_bank: Arc, + exit: &Arc, +) -> Result { + let heaviest_fork_bankhash = bank_forks + .read() + .unwrap() + .get(heaviest_fork_slot) + .map(|bank| bank.hash()); + if let Some(hash) = heaviest_fork_bankhash { + return Ok(hash); + } + + let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&root_bank); + let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new() + .thread_name(|i| format!("solReplayTx{i:02}")) + .build() + .expect("new rayon threadpool"); + let recyclers = VerifyRecyclers::default(); + let mut timing = ExecuteTimings::default(); + let opts = ProcessOptions::default(); + // Grab one write lock until end of function because we are the only one touching bankforks now. + let mut my_bankforks = bank_forks.write().unwrap(); + // Now replay all the missing blocks. + let mut parent_bank = root_bank; + for slot in slots { + if exit.load(Ordering::Relaxed) { + return Err(WenRestartError::Exiting.into()); + } + let bank = match my_bankforks.get(slot) { + Some(cur_bank) => { + if !cur_bank.is_frozen() { + return Err(WenRestartError::BlockNotFrozenAfterReplay(slot, None).into()); + } + cur_bank + } + None => { + let new_bank = Bank::new_from_parent( + parent_bank.clone(), + &leader_schedule_cache + .slot_leader_at(slot, Some(&parent_bank)) + .unwrap(), + slot, + ); + let bank_with_scheduler = my_bankforks.insert_from_ledger(new_bank); + let mut progress = ConfirmationProgress::new(parent_bank.last_blockhash()); + if let Err(e) = process_single_slot( + &blockstore, + &bank_with_scheduler, + &replay_tx_thread_pool, + &opts, + &recyclers, + &mut progress, + None, + None, + None, + None, + &mut timing, + ) { + return Err(WenRestartError::BlockNotFrozenAfterReplay( + slot, + Some(e.to_string()), + ) + .into()); + } + my_bankforks.get(slot).unwrap() + } + }; + parent_bank = bank; + } + Ok(parent_bank.hash()) } pub fn wait_for_wen_restart( @@ -577,6 +687,8 @@ mod tests { use { crate::wen_restart::{tests::wen_restart_proto::LastVotedForkSlotsAggregateFinal, *}, assert_matches::assert_matches, + solana_accounts_db::hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + solana_entry::entry::create_ticks, solana_gossip::{ cluster_info::ClusterInfo, contact_info::ContactInfo, @@ -586,18 +698,17 @@ mod tests { restart_crds_values::RestartLastVotedForkSlots, }, solana_ledger::{ - blockstore::{make_chaining_slot_entries, Blockstore}, + blockstore::{create_new_ledger, entries_to_test_shreds, Blockstore}, + blockstore_options::LedgerColumnOptions, + blockstore_processor::{fill_blockstore_slot_with_ticks, test_process_blockstore}, get_tmp_ledger_path_auto_delete, }, solana_program::{ hash::Hash, vote::state::{Vote, VoteStateUpdate}, }, - solana_runtime::{ - bank::Bank, - genesis_utils::{ - create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, - }, + solana_runtime::genesis_utils::{ + create_genesis_config_with_vote_accounts, GenesisConfigInfo, ValidatorVoteKeypairs, }, solana_sdk::{ signature::{Keypair, Signer}, @@ -609,7 +720,8 @@ mod tests { }; const SHRED_VERSION: u16 = 2; - const EXPECTED_SLOTS: usize = 400; + const EXPECTED_SLOTS: Slot = 90; + const TICKS_PER_SLOT: u64 = 2; fn push_restart_last_voted_fork_slots( cluster_info: Arc, @@ -648,16 +760,29 @@ mod tests { pub bank_forks: Arc>, pub last_voted_fork_slots: Vec, pub wen_restart_proto_path: PathBuf, + pub last_blockhash: Hash, } fn insert_slots_into_blockstore( blockstore: Arc, first_parent: Slot, slots_to_insert: &[Slot], - ) { - for (shreds, _) in make_chaining_slot_entries(slots_to_insert, 2, first_parent) { - blockstore.insert_shreds(shreds, None, false).unwrap(); + entries_per_slot: u64, + start_blockhash: Hash, + ) -> Hash { + let mut last_hash = start_blockhash; + let mut last_parent = first_parent; + for i in slots_to_insert { + last_hash = fill_blockstore_slot_with_ticks( + &blockstore, + entries_per_slot, + *i, + last_parent, + last_hash, + ); + last_parent = *i; } + last_hash } fn wen_restart_test_init(ledger_path: &TempDir) -> WenRestartTestInitResult { @@ -674,26 +799,45 @@ mod tests { node_keypair.clone(), SocketAddrSpace::Unspecified, )); - let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config_with_vote_accounts( + let GenesisConfigInfo { + mut genesis_config, .. + } = create_genesis_config_with_vote_accounts( 10_000, &validator_voting_keypairs, vec![100; validator_voting_keypairs.len()], ); - let (_, bank_forks) = Bank::new_with_bank_forks_for_tests(&genesis_config); - let last_parent = (RestartLastVotedForkSlots::MAX_SLOTS >> 1) - .try_into() - .unwrap(); - let mut last_voted_fork_slots = Vec::new(); - last_voted_fork_slots.extend([1, last_parent]); - for i in 0..EXPECTED_SLOTS { - last_voted_fork_slots.push( - (RestartLastVotedForkSlots::MAX_SLOTS - .saturating_add(i) - .saturating_add(1)) as Slot, - ); - } - insert_slots_into_blockstore(blockstore.clone(), 0, &last_voted_fork_slots); + genesis_config.ticks_per_slot = TICKS_PER_SLOT; + let start_blockhash = create_new_ledger( + ledger_path.path(), + &genesis_config, + MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, + LedgerColumnOptions::default(), + ) + .unwrap(); + let blockstore = Arc::new(Blockstore::open(ledger_path.path()).unwrap()); + let (bank_forks, ..) = test_process_blockstore( + &genesis_config, + &blockstore, + &ProcessOptions { + run_verification: true, + accounts_db_test_hash_calculation: true, + ..ProcessOptions::default() + }, + Arc::default(), + ); + let mut last_blockhash = start_blockhash; + // Skip block 1, 2 links directly to 0. + let last_parent: Slot = 2; + let mut last_voted_fork_slots: Vec = Vec::new(); + last_voted_fork_slots + .extend(last_parent..last_parent.saturating_add(EXPECTED_SLOTS).saturating_add(1)); + last_blockhash = insert_slots_into_blockstore( + blockstore.clone(), + 0, + &last_voted_fork_slots, + genesis_config.ticks_per_slot, + last_blockhash, + ); last_voted_fork_slots.insert(0, 0); last_voted_fork_slots.reverse(); let mut wen_restart_proto_path = ledger_path.path().to_path_buf(); @@ -706,6 +850,7 @@ mod tests { bank_forks, last_voted_fork_slots, wen_restart_proto_path, + last_blockhash, } } @@ -847,10 +992,12 @@ mod tests { } // Simulating successful repair of missing blocks. - insert_slots_into_blockstore( + let _ = insert_slots_into_blockstore( test_state.blockstore.clone(), last_vote_slot, &expected_slots_to_repair, + TICKS_PER_SLOT, + test_state.last_blockhash, ); let _ = wen_restart_thread_handle.join(); @@ -867,7 +1014,13 @@ mod tests { .collect(); expected_slots_stake_map.extend(expected_slots_to_repair.iter().map(|slot| (*slot, 800))); let expected_heaviest_fork_slot = last_vote_slot + 2; - let expected_heaviest_fork_bankhash = Hash::default(); + let expected_heaviest_fork_bankhash = test_state + .bank_forks + .read() + .unwrap() + .get(expected_heaviest_fork_slot) + .unwrap() + .hash(); assert_eq!( progress, WenRestartProgress { @@ -1197,10 +1350,12 @@ mod tests { } // Simulating successful repair of missing blocks. - insert_slots_into_blockstore( + let _ = insert_slots_into_blockstore( test_state.blockstore.clone(), last_vote_slot, &expected_slots_to_repair, + TICKS_PER_SLOT, + test_state.last_blockhash, ); let last_voted_fork_slots = test_state.last_voted_fork_slots.clone(); @@ -1407,7 +1562,7 @@ mod tests { assert_eq!( find_heaviest_fork( LastVotedForkSlotsFinalResult { - slots_stake_map: vec![(1, 900), (last_vote_slot, 900)].into_iter().collect(), + slots_stake_map: vec![(2, 900), (last_vote_slot, 900)].into_iter().collect(), total_active_stake: 900, }, test_state.bank_forks.clone(), @@ -1420,7 +1575,90 @@ mod tests { WenRestartError::BlockNotLinkedToExpectedParent( last_vote_slot, Some(last_vote_slot - 1), - 1 + 2 + ), + ); + // The following fails because the new slot is not full. + let not_full_slot = last_vote_slot + 5; + let parent_slot = last_vote_slot; + let num_slots = (not_full_slot - parent_slot).max(1); + let mut entries = create_ticks(num_slots * TICKS_PER_SLOT, 0, test_state.last_blockhash); + assert!(entries.len() > 1); + entries.pop(); + let shreds = entries_to_test_shreds( + &entries, + not_full_slot, + parent_slot, + false, + 0, + true, // merkle_variant + ); + test_state + .blockstore + .insert_shreds(shreds, None, false) + .unwrap(); + let mut slots_stake_map: HashMap = test_state + .last_voted_fork_slots + .iter() + .map(|slot| (*slot, 900)) + .collect(); + slots_stake_map.insert(not_full_slot, 800); + assert_eq!( + find_heaviest_fork( + LastVotedForkSlotsFinalResult { + slots_stake_map, + total_active_stake: 900, + }, + test_state.bank_forks.clone(), + test_state.blockstore.clone(), + exit.clone(), + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::BlockNotFull(not_full_slot) + ); + // The following fails because we added two blocks at the end of the chain, they are full in blockstore + // but the parent of the first one is missing. + let missing_parent = last_vote_slot.saturating_add(1); + let new_slot = last_vote_slot.saturating_add(2); + let new_hash = insert_slots_into_blockstore( + test_state.blockstore.clone(), + last_vote_slot, + &[missing_parent], + 1, + test_state.last_blockhash, + ); + let _ = insert_slots_into_blockstore( + test_state.blockstore.clone(), + missing_parent, + &[new_slot], + TICKS_PER_SLOT, + new_hash, + ); + let mut slots_stake_map: HashMap = test_state + .last_voted_fork_slots + .iter() + .map(|slot| (*slot, 900)) + .collect(); + slots_stake_map.insert(missing_parent, 800); + slots_stake_map.insert(new_slot, 800); + assert_eq!( + find_heaviest_fork( + LastVotedForkSlotsFinalResult { + slots_stake_map, + total_active_stake: 900, + }, + test_state.bank_forks.clone(), + test_state.blockstore.clone(), + exit.clone(), + ) + .unwrap_err() + .downcast::() + .unwrap(), + WenRestartError::BlockNotFrozenAfterReplay( + missing_parent, + Some("invalid block error: incomplete block".to_string()) ), ); }