From ab92578b0294fab9b37ed69afc6a80b5dc78a739 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Tue, 15 Feb 2022 12:19:34 -0800 Subject: [PATCH] Fix the flaky test test_restart_tower_rollback (#23129) * Add flag to disable voting until a slot to avoid duplicate voting * Fix the tower rollback test and remove it from flaky. --- core/src/replay_stage.rs | 24 +++++ core/src/tvu.rs | 3 + core/src/validator.rs | 3 + local-cluster/src/validator_configs.rs | 1 + local-cluster/tests/common.rs | 14 +-- local-cluster/tests/local_cluster.rs | 114 +++++++++++++++++++- local-cluster/tests/local_cluster_flakey.rs | 94 +--------------- 7 files changed, 147 insertions(+), 106 deletions(-) diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 83df19216..c9ce1fabb 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -139,6 +139,9 @@ pub struct ReplayStageConfig { pub wait_for_vote_to_start_leader: bool, pub ancestor_hashes_replay_update_sender: AncestorHashesReplayUpdateSender, pub tower_storage: Arc, + // Stops voting until this slot has been reached. Should be used to avoid + // duplicate voting which can lead to slashing. + pub wait_to_vote_slot: Option, } #[derive(Default)] @@ -382,6 +385,7 @@ impl ReplayStage { wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, tower_storage, + wait_to_vote_slot, } = config; trace!("replay stage"); @@ -604,6 +608,7 @@ impl ReplayStage { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + wait_to_vote_slot, ); } } @@ -687,6 +692,7 @@ impl ReplayStage { &voting_sender, &mut epoch_slots_frozen_slots, &drop_bank_sender, + wait_to_vote_slot, ); }; voting_time.stop(); @@ -1731,6 +1737,7 @@ impl ReplayStage { voting_sender: &Sender, epoch_slots_frozen_slots: &mut EpochSlotsFrozenSlots, bank_drop_sender: &Sender>>, + wait_to_vote_slot: Option, ) { if bank.is_empty() { inc_new_counter_info!("replay_stage-voted_empty_bank", 1); @@ -1819,6 +1826,7 @@ impl ReplayStage { *has_new_vote_been_rooted, replay_timing, voting_sender, + wait_to_vote_slot, ); } @@ -1831,10 +1839,16 @@ impl ReplayStage { switch_fork_decision: &SwitchForkDecision, vote_signatures: &mut Vec, has_new_vote_been_rooted: bool, + wait_to_vote_slot: Option, ) -> Option { if authorized_voter_keypairs.is_empty() { return None; } + if let Some(slot) = wait_to_vote_slot { + if bank.slot() < slot { + return None; + } + } let vote_account = match bank.get_vote_account(vote_account_pubkey) { None => { warn!( @@ -1929,6 +1943,7 @@ impl ReplayStage { has_new_vote_been_rooted: bool, last_vote_refresh_time: &mut LastVoteRefreshTime, voting_sender: &Sender, + wait_to_vote_slot: Option, ) { let last_voted_slot = tower.last_voted_slot(); if last_voted_slot.is_none() { @@ -1971,6 +1986,7 @@ impl ReplayStage { &SwitchForkDecision::SameFork, vote_signatures, has_new_vote_been_rooted, + wait_to_vote_slot, ); if let Some(vote_tx) = vote_tx { @@ -2008,6 +2024,7 @@ impl ReplayStage { has_new_vote_been_rooted: bool, replay_timing: &mut ReplayTiming, voting_sender: &Sender, + wait_to_vote_slot: Option, ) { let mut generate_time = Measure::start("generate_vote"); let vote_tx = Self::generate_vote_tx( @@ -2019,6 +2036,7 @@ impl ReplayStage { switch_fork_decision, vote_signatures, has_new_vote_been_rooted, + wait_to_vote_slot, ); generate_time.stop(); replay_timing.generate_vote_us += generate_time.as_us(); @@ -5868,6 +5886,7 @@ pub mod tests { has_new_vote_been_rooted, &mut ReplayTiming::default(), &voting_sender, + None, ); let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) @@ -5907,6 +5926,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); // No new votes have been submitted to gossip @@ -5932,6 +5952,7 @@ pub mod tests { has_new_vote_been_rooted, &mut ReplayTiming::default(), &voting_sender, + None, ); let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) @@ -5963,6 +5984,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); // No new votes have been submitted to gossip @@ -6000,6 +6022,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); let vote_info = voting_receiver .recv_timeout(Duration::from_secs(1)) @@ -6067,6 +6090,7 @@ pub mod tests { has_new_vote_been_rooted, &mut last_vote_refresh_time, &voting_sender, + None, ); let votes = cluster_info.get_votes(&mut cursor); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index ec30d9921..6c8a9961b 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -145,6 +145,7 @@ impl Tvu { accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver), last_full_snapshot_slot: Option, block_metadata_notifier: Option, + wait_to_vote_slot: Option, ) -> Self { let TvuSockets { repair: repair_socket, @@ -293,6 +294,7 @@ impl Tvu { wait_for_vote_to_start_leader: tvu_config.wait_for_vote_to_start_leader, ancestor_hashes_replay_update_sender, tower_storage: tower_storage.clone(), + wait_to_vote_slot, }; let (voting_sender, voting_receiver) = unbounded(); @@ -514,6 +516,7 @@ pub mod tests { accounts_package_channel, None, None, + None, ); exit.store(true, Ordering::Relaxed); tvu.join().unwrap(); diff --git a/core/src/validator.rs b/core/src/validator.rs index ed0042ef9..85f3a7c4e 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -164,6 +164,7 @@ pub struct ValidatorConfig { pub validator_exit: Arc>, pub no_wait_for_vote_to_start_leader: bool, pub accounts_shrink_ratio: AccountShrinkThreshold, + pub wait_to_vote_slot: Option, } impl Default for ValidatorConfig { @@ -223,6 +224,7 @@ impl Default for ValidatorConfig { no_wait_for_vote_to_start_leader: true, accounts_shrink_ratio: AccountShrinkThreshold::default(), accounts_db_config: None, + wait_to_vote_slot: None, } } } @@ -873,6 +875,7 @@ impl Validator { accounts_package_channel, last_full_snapshot_slot, block_metadata_notifier, + config.wait_to_vote_slot, ); let tpu = Tpu::new( diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index dff84c2ec..4cca70621 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -61,6 +61,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { no_wait_for_vote_to_start_leader: config.no_wait_for_vote_to_start_leader, accounts_shrink_ratio: config.accounts_shrink_ratio, accounts_db_config: config.accounts_db_config.clone(), + wait_to_vote_slot: config.wait_to_vote_slot, } } diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index b051d8bf3..48389861a 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -5,7 +5,7 @@ use { solana_core::{ broadcast_stage::BroadcastStageType, consensus::{Tower, SWITCH_FORK_THRESHOLD}, - tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, + tower_storage::FileTowerStorage, validator::ValidatorConfig, }, solana_gossip::gossip_service::discover_cluster, @@ -407,15 +407,3 @@ pub fn test_faulty_node( (cluster, validator_keys) } - -pub fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) { - let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); - let saved_tower = SavedTower::new(tower, node_keypair).unwrap(); - file_tower_storage - .store(&SavedTowerVersions::from(saved_tower)) - .unwrap(); -} - -pub fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { - restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) -} diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 4ade36b42..7a8c4b12c 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -3,8 +3,8 @@ use { assert_matches::assert_matches, common::{ copy_blocks, create_custom_leader_schedule, last_vote_in_tower, ms_for_n_slots, - open_blockstore, purge_slots, remove_tower, restore_tower, root_in_tower, - run_cluster_partition, run_kill_partition_switch_threshold, save_tower, test_faulty_node, + open_blockstore, purge_slots, remove_tower, restore_tower, run_cluster_partition, + run_kill_partition_switch_threshold, test_faulty_node, wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, }, crossbeam_channel::{unbounded, Receiver}, @@ -23,7 +23,7 @@ use { consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, replay_stage::DUPLICATE_THRESHOLD, - tower_storage::FileTowerStorage, + tower_storage::{FileTowerStorage, SavedTower, SavedTowerVersions, TowerStorage}, validator::ValidatorConfig, }, solana_download_utils::download_snapshot_archive, @@ -1913,6 +1913,18 @@ fn test_validator_saves_tower() { assert!(tower4.root() >= new_root); } +fn save_tower(tower_path: &Path, tower: &Tower, node_keypair: &Keypair) { + let file_tower_storage = FileTowerStorage::new(tower_path.to_path_buf()); + let saved_tower = SavedTower::new(tower, node_keypair).unwrap(); + file_tower_storage + .store(&SavedTowerVersions::from(saved_tower)) + .unwrap(); +} + +fn root_in_tower(tower_path: &Path, node_pubkey: &Pubkey) -> Option { + restore_tower(tower_path, node_pubkey).map(|tower| tower.root()) +} + // This test verifies that even if votes from a validator end up taking too long to land, and thus // some of the referenced slots are slots are no longer present in the slot hashes sysvar, // consensus can still be attained. @@ -2372,6 +2384,102 @@ fn test_run_test_load_program_accounts_root() { run_test_load_program_accounts(CommitmentConfig::finalized()); } +#[test] +#[serial] +fn test_restart_tower_rollback() { + // Test node crashing and failing to save its tower before restart + // Cluster continues to make progress, this node is able to rejoin with + // outdated tower post restart. + solana_logger::setup_with_default(RUST_LOG_FILTER); + + // First set up the cluster with 2 nodes + let slots_per_epoch = 2048; + let node_stakes = vec![10000, 1]; + + let validator_strings = vec![ + "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", + "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", + ]; + + let validator_keys = validator_strings + .iter() + .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) + .take(node_stakes.len()) + .collect::>(); + + let b_pubkey = validator_keys[1].0.pubkey(); + + let mut config = ClusterConfig { + cluster_lamports: 100_000, + node_stakes: node_stakes.clone(), + validator_configs: make_identical_validator_configs( + &ValidatorConfig::default_for_test(), + node_stakes.len(), + ), + validator_keys: Some(validator_keys), + slots_per_epoch, + stakers_slot_offset: slots_per_epoch, + skip_warmup_slots: true, + ..ClusterConfig::default() + }; + let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); + + let val_b_ledger_path = cluster.ledger_path(&b_pubkey); + + let mut earlier_tower: Tower; + loop { + sleep(Duration::from_millis(1000)); + + // Grab the current saved tower + earlier_tower = restore_tower(&val_b_ledger_path, &b_pubkey).unwrap(); + if earlier_tower.last_voted_slot().unwrap_or(0) > 1 { + break; + } + } + + let mut exited_validator_info: ClusterValidatorInfo; + let last_voted_slot: Slot; + loop { + sleep(Duration::from_millis(1000)); + + // Wait for second, lesser staked validator to make a root past the earlier_tower's + // latest vote slot, then exit that validator + let tower = restore_tower(&val_b_ledger_path, &b_pubkey).unwrap(); + if tower.root() + > earlier_tower + .last_voted_slot() + .expect("Earlier tower must have at least one vote") + { + exited_validator_info = cluster.exit_node(&b_pubkey); + last_voted_slot = tower.last_voted_slot().unwrap(); + break; + } + } + + // Now rewrite the tower with the *earlier_tower*. We disable voting until we reach + // a slot we did not previously vote for in order to avoid duplicate vote slashing + // issues. + save_tower( + &val_b_ledger_path, + &earlier_tower, + &exited_validator_info.info.keypair, + ); + exited_validator_info.config.wait_to_vote_slot = Some(last_voted_slot + 10); + + cluster.restart_node( + &b_pubkey, + exited_validator_info, + SocketAddrSpace::Unspecified, + ); + + // Check this node is making new roots + cluster.check_for_new_roots( + 20, + "test_restart_tower_rollback", + SocketAddrSpace::Unspecified, + ); +} + #[test] #[serial] fn test_run_test_load_program_accounts_partition_root() { diff --git a/local-cluster/tests/local_cluster_flakey.rs b/local-cluster/tests/local_cluster_flakey.rs index 9cc053175..ae21ff989 100644 --- a/local-cluster/tests/local_cluster_flakey.rs +++ b/local-cluster/tests/local_cluster_flakey.rs @@ -3,19 +3,19 @@ #![allow(clippy::integer_arithmetic)] use { common::{ - copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower, restore_tower, - root_in_tower, save_tower, wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, + copy_blocks, last_vote_in_tower, open_blockstore, purge_slots, remove_tower, + wait_for_last_vote_in_tower_to_land_in_ledger, RUST_LOG_FILTER, }, log::*, serial_test::serial, - solana_core::{consensus::Tower, validator::ValidatorConfig}, + solana_core::validator::ValidatorConfig, solana_ledger::{ ancestor_iterator::AncestorIterator, blockstore::Blockstore, blockstore_db::{AccessType, BlockstoreOptions}, }, solana_local_cluster::{ - cluster::{Cluster, ClusterValidatorInfo}, + cluster::Cluster, local_cluster::{ClusterConfig, LocalCluster}, validator_configs::*, }, @@ -359,89 +359,3 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b info!("THIS TEST expected no violation. And indeed, there was none, thanks to persisted tower."); } } - -#[test] -#[serial] -#[ignore] -fn test_restart_tower_rollback() { - // Test node crashing and failing to save its tower before restart - solana_logger::setup_with_default(RUST_LOG_FILTER); - - // First set up the cluster with 4 nodes - let slots_per_epoch = 2048; - let node_stakes = vec![10000, 1]; - - let validator_strings = vec![ - "28bN3xyvrP4E8LwEgtLjhnkb7cY4amQb6DrYAbAYjgRV4GAGgkVM2K7wnxnAS7WDneuavza7x21MiafLu1HkwQt4", - "2saHBBoTkLMmttmPQP8KfBkcCw45S5cwtV3wTdGCscRC8uxdgvHxpHiWXKx4LvJjNJtnNcbSv5NdheokFFqnNDt8", - ]; - - let validator_b_keypair = Arc::new(Keypair::from_base58_string(validator_strings[1])); - let validator_b_pubkey = validator_b_keypair.pubkey(); - - let validator_keys = validator_strings - .iter() - .map(|s| (Arc::new(Keypair::from_base58_string(s)), true)) - .take(node_stakes.len()) - .collect::>(); - let mut config = ClusterConfig { - cluster_lamports: 100_000, - node_stakes: node_stakes.clone(), - validator_configs: make_identical_validator_configs( - &ValidatorConfig::default_for_test(), - node_stakes.len(), - ), - validator_keys: Some(validator_keys), - slots_per_epoch, - stakers_slot_offset: slots_per_epoch, - skip_warmup_slots: true, - ..ClusterConfig::default() - }; - let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); - - let val_b_ledger_path = cluster.ledger_path(&validator_b_pubkey); - - let mut earlier_tower: Tower; - loop { - sleep(Duration::from_millis(1000)); - - // Grab the current saved tower - earlier_tower = restore_tower(&val_b_ledger_path, &validator_b_pubkey).unwrap(); - if earlier_tower.last_voted_slot().unwrap_or(0) > 1 { - break; - } - } - - let exited_validator_info: ClusterValidatorInfo; - loop { - sleep(Duration::from_millis(1000)); - - // Wait for second, lesser staked validator to make a root past the earlier_tower's - // latest vote slot, then exit that validator - if let Some(root) = root_in_tower(&val_b_ledger_path, &validator_b_pubkey) { - if root - > earlier_tower - .last_voted_slot() - .expect("Earlier tower must have at least one vote") - { - exited_validator_info = cluster.exit_node(&validator_b_pubkey); - break; - } - } - } - - // Now rewrite the tower with the *earlier_tower* - save_tower(&val_b_ledger_path, &earlier_tower, &validator_b_keypair); - cluster.restart_node( - &validator_b_pubkey, - exited_validator_info, - SocketAddrSpace::Unspecified, - ); - - // Check this node is making new roots - cluster.check_for_new_roots( - 20, - "test_restart_tower_rollback", - SocketAddrSpace::Unspecified, - ); -}