replay: reload tower if set-identity during startup (#35173)
* replay: reload tower if set-identity during startup * pr feedback: add unit tests * pr feedback: use tower.node_pubkey, more descriptive names
This commit is contained in:
parent
d48f277091
commit
befe8b9d98
|
@ -292,6 +292,28 @@ impl Tower {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
pub fn new_random(node_pubkey: Pubkey) -> Self {
|
||||||
|
use rand::Rng;
|
||||||
|
|
||||||
|
let mut rng = rand::thread_rng();
|
||||||
|
let root_slot = rng.gen();
|
||||||
|
let vote_state = VoteState::new_rand_for_tests(node_pubkey, root_slot);
|
||||||
|
let last_vote = VoteStateUpdate::from(
|
||||||
|
vote_state
|
||||||
|
.votes
|
||||||
|
.iter()
|
||||||
|
.map(|lv| (lv.slot(), lv.confirmation_count()))
|
||||||
|
.collect::<Vec<_>>(),
|
||||||
|
);
|
||||||
|
Self {
|
||||||
|
node_pubkey,
|
||||||
|
vote_state,
|
||||||
|
last_vote: VoteTransaction::CompactVoteStateUpdate(last_vote),
|
||||||
|
..Tower::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn new_from_bankforks(
|
pub fn new_from_bankforks(
|
||||||
bank_forks: &BankForks,
|
bank_forks: &BankForks,
|
||||||
node_pubkey: &Pubkey,
|
node_pubkey: &Pubkey,
|
||||||
|
|
|
@ -578,6 +578,21 @@ impl ReplayStage {
|
||||||
let _exit = Finalizer::new(exit.clone());
|
let _exit = Finalizer::new(exit.clone());
|
||||||
let mut identity_keypair = cluster_info.keypair().clone();
|
let mut identity_keypair = cluster_info.keypair().clone();
|
||||||
let mut my_pubkey = identity_keypair.pubkey();
|
let mut my_pubkey = identity_keypair.pubkey();
|
||||||
|
if my_pubkey != tower.node_pubkey {
|
||||||
|
// set-identity was called during the startup procedure, ensure the tower is consistent
|
||||||
|
// before starting the loop. further calls to set-identity will reload the tower in the loop
|
||||||
|
let my_old_pubkey = tower.node_pubkey;
|
||||||
|
tower = Self::load_tower(
|
||||||
|
tower_storage.as_ref(),
|
||||||
|
&my_pubkey,
|
||||||
|
&vote_account,
|
||||||
|
&bank_forks,
|
||||||
|
);
|
||||||
|
warn!(
|
||||||
|
"Identity changed during startup from {} to {}",
|
||||||
|
my_old_pubkey, my_pubkey
|
||||||
|
);
|
||||||
|
}
|
||||||
let (mut progress, mut heaviest_subtree_fork_choice) =
|
let (mut progress, mut heaviest_subtree_fork_choice) =
|
||||||
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
|
Self::initialize_progress_and_fork_choice_with_locked_bank_forks(
|
||||||
&bank_forks,
|
&bank_forks,
|
||||||
|
@ -983,28 +998,12 @@ impl ReplayStage {
|
||||||
my_pubkey = identity_keypair.pubkey();
|
my_pubkey = identity_keypair.pubkey();
|
||||||
|
|
||||||
// Load the new identity's tower
|
// Load the new identity's tower
|
||||||
tower = Tower::restore(tower_storage.as_ref(), &my_pubkey)
|
tower = Self::load_tower(
|
||||||
.and_then(|restored_tower| {
|
tower_storage.as_ref(),
|
||||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
&my_pubkey,
|
||||||
let slot_history = root_bank.get_slot_history();
|
&vote_account,
|
||||||
restored_tower.adjust_lockouts_after_replay(
|
&bank_forks,
|
||||||
root_bank.slot(),
|
);
|
||||||
&slot_history,
|
|
||||||
)
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|err| {
|
|
||||||
if err.is_file_missing() {
|
|
||||||
Tower::new_from_bankforks(
|
|
||||||
&bank_forks.read().unwrap(),
|
|
||||||
&my_pubkey,
|
|
||||||
&vote_account,
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
error!("Failed to load tower for {}: {}", my_pubkey, err);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Ensure the validator can land votes with the new identity before
|
// Ensure the validator can land votes with the new identity before
|
||||||
// becoming leader
|
// becoming leader
|
||||||
has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
|
has_new_vote_been_rooted = !wait_for_vote_to_start_leader;
|
||||||
|
@ -1154,6 +1153,32 @@ impl ReplayStage {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn load_tower(
|
||||||
|
tower_storage: &dyn TowerStorage,
|
||||||
|
node_pubkey: &Pubkey,
|
||||||
|
vote_account: &Pubkey,
|
||||||
|
bank_forks: &Arc<RwLock<BankForks>>,
|
||||||
|
) -> Tower {
|
||||||
|
Tower::restore(tower_storage, node_pubkey)
|
||||||
|
.and_then(|restored_tower| {
|
||||||
|
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||||
|
let slot_history = root_bank.get_slot_history();
|
||||||
|
restored_tower.adjust_lockouts_after_replay(root_bank.slot(), &slot_history)
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|err| {
|
||||||
|
if err.is_file_missing() {
|
||||||
|
Tower::new_from_bankforks(
|
||||||
|
&bank_forks.read().unwrap(),
|
||||||
|
node_pubkey,
|
||||||
|
vote_account,
|
||||||
|
)
|
||||||
|
} else {
|
||||||
|
error!("Failed to load tower for {}: {}", node_pubkey, err);
|
||||||
|
std::process::exit(1);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn check_for_vote_only_mode(
|
fn check_for_vote_only_mode(
|
||||||
heaviest_bank_slot: Slot,
|
heaviest_bank_slot: Slot,
|
||||||
forks_root: Slot,
|
forks_root: Slot,
|
||||||
|
@ -4230,9 +4255,9 @@ pub(crate) mod tests {
|
||||||
crate::{
|
crate::{
|
||||||
consensus::{
|
consensus::{
|
||||||
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
|
progress_map::{ValidatorStakeInfo, RETRANSMIT_BASE_DELAY_MS},
|
||||||
tower_storage::NullTowerStorage,
|
tower_storage::{FileTowerStorage, NullTowerStorage},
|
||||||
tree_diff::TreeDiff,
|
tree_diff::TreeDiff,
|
||||||
Tower,
|
Tower, VOTE_THRESHOLD_DEPTH,
|
||||||
},
|
},
|
||||||
replay_stage::ReplayStage,
|
replay_stage::ReplayStage,
|
||||||
vote_simulator::{self, VoteSimulator},
|
vote_simulator::{self, VoteSimulator},
|
||||||
|
@ -4254,7 +4279,7 @@ pub(crate) mod tests {
|
||||||
},
|
},
|
||||||
solana_runtime::{
|
solana_runtime::{
|
||||||
accounts_background_service::AbsRequestSender,
|
accounts_background_service::AbsRequestSender,
|
||||||
commitment::BlockCommitment,
|
commitment::{BlockCommitment, VOTE_THRESHOLD_SIZE},
|
||||||
genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs},
|
genesis_utils::{GenesisConfigInfo, ValidatorVoteKeypairs},
|
||||||
},
|
},
|
||||||
solana_sdk::{
|
solana_sdk::{
|
||||||
|
@ -4278,6 +4303,7 @@ pub(crate) mod tests {
|
||||||
iter,
|
iter,
|
||||||
sync::{atomic::AtomicU64, Arc, RwLock},
|
sync::{atomic::AtomicU64, Arc, RwLock},
|
||||||
},
|
},
|
||||||
|
tempfile::tempdir,
|
||||||
trees::{tr, Tree},
|
trees::{tr, Tree},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -8598,4 +8624,54 @@ pub(crate) mod tests {
|
||||||
assert_eq!(reset_fork, Some(4));
|
assert_eq!(reset_fork, Some(4));
|
||||||
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
|
assert_eq!(failures, vec![HeaviestForkFailures::LockedOut(4),]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tower_load_missing() {
|
||||||
|
let tower_file = tempdir().unwrap().into_path();
|
||||||
|
let tower_storage = FileTowerStorage::new(tower_file);
|
||||||
|
let node_pubkey = Pubkey::new_unique();
|
||||||
|
let vote_account = Pubkey::new_unique();
|
||||||
|
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
|
||||||
|
let generate_votes = |pubkeys: Vec<Pubkey>| {
|
||||||
|
pubkeys
|
||||||
|
.into_iter()
|
||||||
|
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
let (vote_simulator, _blockstore) =
|
||||||
|
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
|
||||||
|
let bank_forks = vote_simulator.bank_forks;
|
||||||
|
|
||||||
|
let tower =
|
||||||
|
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
|
||||||
|
let expected_tower = Tower::new_for_tests(VOTE_THRESHOLD_DEPTH, VOTE_THRESHOLD_SIZE);
|
||||||
|
assert_eq!(tower.vote_state, expected_tower.vote_state);
|
||||||
|
assert_eq!(tower.node_pubkey, node_pubkey);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_tower_load() {
|
||||||
|
let tower_file = tempdir().unwrap().into_path();
|
||||||
|
let tower_storage = FileTowerStorage::new(tower_file);
|
||||||
|
let node_keypair = Keypair::new();
|
||||||
|
let node_pubkey = node_keypair.pubkey();
|
||||||
|
let vote_account = Pubkey::new_unique();
|
||||||
|
let tree = tr(0) / (tr(1) / (tr(3) / (tr(4))) / (tr(2) / (tr(5) / (tr(6)))));
|
||||||
|
let generate_votes = |pubkeys: Vec<Pubkey>| {
|
||||||
|
pubkeys
|
||||||
|
.into_iter()
|
||||||
|
.zip(iter::once(vec![0, 1, 2, 5, 6]).chain(iter::repeat(vec![0, 1, 3, 4]).take(2)))
|
||||||
|
.collect()
|
||||||
|
};
|
||||||
|
let (vote_simulator, _blockstore) =
|
||||||
|
setup_forks_from_tree(tree, 3, Some(Box::new(generate_votes)));
|
||||||
|
let bank_forks = vote_simulator.bank_forks;
|
||||||
|
let expected_tower = Tower::new_random(node_pubkey);
|
||||||
|
expected_tower.save(&tower_storage, &node_keypair).unwrap();
|
||||||
|
|
||||||
|
let tower =
|
||||||
|
ReplayStage::load_tower(&tower_storage, &node_pubkey, &vote_account, &bank_forks);
|
||||||
|
assert_eq!(tower.vote_state, expected_tower.vote_state);
|
||||||
|
assert_eq!(tower.node_pubkey, expected_tower.node_pubkey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -352,6 +352,24 @@ impl VoteState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn new_rand_for_tests(node_pubkey: Pubkey, root_slot: Slot) -> Self {
|
||||||
|
let votes = (1..32)
|
||||||
|
.map(|x| LandedVote {
|
||||||
|
latency: 0,
|
||||||
|
lockout: Lockout::new_with_confirmation_count(
|
||||||
|
u64::from(x).saturating_add(root_slot),
|
||||||
|
32_u32.saturating_sub(x),
|
||||||
|
),
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
Self {
|
||||||
|
node_pubkey,
|
||||||
|
root_slot: Some(root_slot),
|
||||||
|
votes,
|
||||||
|
..VoteState::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn get_authorized_voter(&self, epoch: Epoch) -> Option<Pubkey> {
|
pub fn get_authorized_voter(&self, epoch: Epoch) -> Option<Pubkey> {
|
||||||
self.authorized_voters.get_authorized_voter(epoch)
|
self.authorized_voters.get_authorized_voter(epoch)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue