From befe8b9d983f6c386241a68327b602bed3e158fc Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 20 Feb 2024 09:30:46 -0800 Subject: [PATCH] replay: reload tower if set-identity during startup (#35173) * replay: reload tower if set-identity during startup * pr feedback: add unit tests * pr feedback: use tower.node_pubkey, more descriptive names --- core/src/consensus.rs | 22 ++++++ core/src/replay_stage.rs | 126 ++++++++++++++++++++++++------ sdk/program/src/vote/state/mod.rs | 18 +++++ 3 files changed, 141 insertions(+), 25 deletions(-) diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 4f129b182..3e24f3323 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -292,6 +292,28 @@ impl Tower { } } + #[cfg(test)] + pub fn new_random(node_pubkey: Pubkey) -> Self { + use rand::Rng; + + let mut rng = rand::thread_rng(); + let root_slot = rng.gen(); + let vote_state = VoteState::new_rand_for_tests(node_pubkey, root_slot); + let last_vote = VoteStateUpdate::from( + vote_state + .votes + .iter() + .map(|lv| (lv.slot(), lv.confirmation_count())) + .collect::>(), + ); + Self { + node_pubkey, + vote_state, + last_vote: VoteTransaction::CompactVoteStateUpdate(last_vote), + ..Tower::default() + } + } + pub fn new_from_bankforks( bank_forks: &BankForks, node_pubkey: &Pubkey, diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 27c30b9e5..1b7b4737f 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -578,6 +578,21 @@ impl ReplayStage { let _exit = Finalizer::new(exit.clone()); let mut identity_keypair = cluster_info.keypair().clone(); let mut my_pubkey = identity_keypair.pubkey(); + if my_pubkey != tower.node_pubkey { + // set-identity was called during the startup procedure, ensure the tower is consistent + // before starting the loop. further calls to set-identity will reload the tower in the loop + let my_old_pubkey = tower.node_pubkey; + tower = Self::load_tower( + tower_storage.as_ref(), + &my_pubkey, + &vote_account, + &bank_forks, + ); + warn!( + "Identity changed during startup from {} to {}", + my_old_pubkey, my_pubkey + ); + } let (mut progress, mut heaviest_subtree_fork_choice) = Self::initialize_progress_and_fork_choice_with_locked_bank_forks( &bank_forks, @@ -983,28 +998,12 @@ impl ReplayStage { my_pubkey = identity_keypair.pubkey(); // Load the new identity's tower - tower = Tower::restore(tower_storage.as_ref(), &my_pubkey) - .and_then(|restored_tower| { - let root_bank = bank_forks.read().unwrap().root_bank(); - let slot_history = root_bank.get_slot_history(); - restored_tower.adjust_lockouts_after_replay( - root_bank.slot(), - &slot_history, - ) - }) - .unwrap_or_else(|err| { - if err.is_file_missing() { - Tower::new_from_bankforks( - &bank_forks.read().unwrap(), - &my_pubkey, - &vote_account, - ) - } else { - error!("Failed to load tower for {}: {}", my_pubkey, err); - std::process::exit(1); - } - }); - + tower = Self::load_tower( + tower_storage.as_ref(), + &my_pubkey, + &vote_account, + &bank_forks, + ); // Ensure the validator can land votes with the new identity before // becoming leader has_new_vote_been_rooted = !wait_for_vote_to_start_leader; @@ -1154,6 +1153,32 @@ impl ReplayStage { }) } + fn load_tower( + tower_storage: &dyn TowerStorage, + node_pubkey: &Pubkey, + vote_account: &Pubkey, + bank_forks: &Arc>, + ) -> Tower { + Tower::restore(tower_storage, node_pubkey) + .and_then(|restored_tower| { + let root_bank = bank_forks.read().unwrap().root_bank(); + let slot_history = root_bank.get_slot_history(); + restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history) + }) + .unwrap_or_else(|err| { + if err.is_file_missing() { + Tower::new_from_bankforks( + &bank_forks.read().unwrap(), + node_pubkey, + vote_account, + ) + } else { + error!("Failed to load tower for {}: {}", node_pubkey, err); + std::process::exit(1); + } + }) + } + fn check_for_vote_only_mode( heaviest_bank_slot: Slot, forks_root: Slot, @@ -4230,9 +4255,9 @@ pub(crate) mod tests { crate::{ consensus::{ progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS}, - tower_storage::NullTowerStorage, + tower_storage::{FileTowerStorage, NullTowerStorage}, tree_diff::TreeDiff, - Tower, + Tower, VOTE_THRESHOLD_DEPTH, }, replay_stage::ReplayStage, vote_simulator::{self, VoteSimulator}, @@ -4254,7 +4279,7 @@ pub(crate) mod tests { }, solana_runtime::{ accounts_background_service::AbsRequestSender, - commitment::BlockCommitment, + commitment::{BlockCommitment, VOTE_THRESHOLD_SIZE}, genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs}, }, solana_sdk::{ @@ -4278,6 +4303,7 @@ pub(crate) mod tests { iter, sync::{atomic::AtomicU64, Arc, RwLock}, }, + tempfile::tempdir, trees::{tr, Tree}, }; @@ -8598,4 +8624,54 @@ pub(crate) mod tests { assert_eq!(reset_fork, Some(4)); assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]); } + + #[test] + fn test_tower_load_missing() { + let tower_file = tempdir().unwrap().into_path(); + let tower_storage = FileTowerStorage::new(tower_file); + let node_pubkey = Pubkey::new_unique(); + let vote_account = Pubkey::new_unique(); + let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); + let generate_votes = |pubkeys: Vec| { + pubkeys + .into_iter() + .zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2))) + .collect() + }; + let (vote_simulator, _blockstore) = + setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); + let bank_forks = vote_simulator.bank_forks; + + let tower = + ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks); + let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE); + assert_eq!(tower.vote_state, expected_tower.vote_state); + assert_eq!(tower.node_pubkey, node_pubkey); + } + + #[test] + fn test_tower_load() { + let tower_file = tempdir().unwrap().into_path(); + let tower_storage = FileTowerStorage::new(tower_file); + let node_keypair = Keypair::new(); + let node_pubkey = node_keypair.pubkey(); + let vote_account = Pubkey::new_unique(); + let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6))))); + let generate_votes = |pubkeys: Vec| { + pubkeys + .into_iter() + .zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2))) + .collect() + }; + let (vote_simulator, _blockstore) = + setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes))); + let bank_forks = vote_simulator.bank_forks; + let expected_tower = Tower::new_random(node_pubkey); + expected_tower.save(&tower_storage, &node_keypair).unwrap(); + + let tower = + ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks); + assert_eq!(tower.vote_state, expected_tower.vote_state); + assert_eq!(tower.node_pubkey, expected_tower.node_pubkey); + } } diff --git a/sdk/program/src/vote/state/mod.rs b/sdk/program/src/vote/state/mod.rs index 8cfcd0ef1..a6e765472 100644 --- a/sdk/program/src/vote/state/mod.rs +++ b/sdk/program/src/vote/state/mod.rs @@ -352,6 +352,24 @@ impl VoteState { } } + pub fn new_rand_for_tests(node_pubkey: Pubkey, root_slot: Slot) -> Self { + let votes = (1..32) + .map(|x| LandedVote { + latency: 0, + lockout: Lockout::new_with_confirmation_count( + u64::from(x).saturating_add(root_slot), + 32_u32.saturating_sub(x), + ), + }) + .collect(); + Self { + node_pubkey, + root_slot: Some(root_slot), + votes, + ..VoteState::default() + } + } + pub fn get_authorized_voter(&self, epoch: Epoch) -> Option { self.authorized_voters.get_authorized_voter(epoch) }