From c544742091bf6dba3fdb290cba57ac47984c87e1 Mon Sep 17 00:00:00 2001 From: Justin Starry Date: Fri, 22 Apr 2022 12:14:07 +0800 Subject: [PATCH] Local cluster test cleanup and refactoring (#24559) * remove FixedSchedule.start_epoch * use duration for timing * Rename to partition bool to turbine_disabled * simplify partition config --- core/src/retransmit_stage.rs | 8 +-- core/src/tvu.rs | 4 +- core/src/validator.rs | 6 +- ledger/src/blockstore.rs | 1 - ledger/src/leader_schedule.rs | 1 - ledger/src/leader_schedule_cache.rs | 8 +-- local-cluster/src/validator_configs.rs | 2 +- local-cluster/tests/common.rs | 63 ++++++++------------- local-cluster/tests/local_cluster.rs | 40 ++----------- local-cluster/tests/local_cluster_flakey.rs | 1 - local-cluster/tests/local_cluster_slow.rs | 42 ++++---------- 11 files changed, 54 insertions(+), 122 deletions(-) diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index d837a6455..e89fe32bb 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -440,7 +440,7 @@ impl RetransmitStage { exit: Arc, cluster_slots_update_receiver: ClusterSlotsUpdateReceiver, epoch_schedule: EpochSchedule, - cfg: Option>, + turbine_disabled: Option>, shred_version: u16, cluster_slots: Arc, duplicate_slots_reset_sender: DuplicateSlotsResetSender, @@ -492,10 +492,10 @@ impl RetransmitStage { repair_info, leader_schedule_cache, move |id, shred, working_bank, last_root| { - let is_connected = cfg + let turbine_disabled = turbine_disabled .as_ref() .map(|x| x.load(Ordering::Relaxed)) - .unwrap_or(true); + .unwrap_or(false); let rv = should_retransmit_and_persist( shred, working_bank, @@ -504,7 +504,7 @@ impl RetransmitStage { last_root, shred_version, ); - rv && is_connected + rv && !turbine_disabled }, verified_vote_receiver, completed_data_sets_sender, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 497a1abf0..12e27dabd 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -114,7 +114,7 @@ impl Tvu { leader_schedule_cache: &Arc, exit: &Arc, block_commitment_cache: Arc>, - cfg: Option>, + turbine_disabled: Option>, transaction_status_sender: Option, rewards_recorder_sender: Option, cache_block_meta_sender: Option, @@ -182,7 +182,7 @@ impl Tvu { exit.clone(), cluster_slots_update_receiver, *bank_forks.read().unwrap().working_bank().epoch_schedule(), - cfg, + turbine_disabled, tvu_config.shred_version, cluster_slots.clone(), duplicate_slots_reset_sender, diff --git a/core/src/validator.rs b/core/src/validator.rs index ff0bbc850..3321c609d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -130,7 +130,7 @@ pub struct ValidatorConfig { pub snapshot_config: Option, pub max_ledger_shreds: Option, pub broadcast_stage_type: BroadcastStageType, - pub enable_partition: Option>, + pub turbine_disabled: Option>, pub enforce_ulimit_nofile: bool, pub fixed_leader_schedule: Option, pub wait_for_supermajority: Option, @@ -190,7 +190,7 @@ impl Default for ValidatorConfig { pubsub_config: PubSubConfig::default(), snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, - enable_partition: None, + turbine_disabled: None, enforce_ulimit_nofile: true, fixed_leader_schedule: None, wait_for_supermajority: None, @@ -966,7 +966,7 @@ impl Validator { &leader_schedule_cache, &exit, block_commitment_cache, - config.enable_partition.clone(), + config.turbine_disabled.clone(), transaction_status_sender.clone(), rewards_recorder_sender, cache_block_meta_sender, diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index e9ed43565..ff4194e90 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -8529,7 +8529,6 @@ pub mod tests { leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![ leader_keypair.pubkey() ])), - start_epoch: 0, }; leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule)); diff --git a/ledger/src/leader_schedule.rs b/ledger/src/leader_schedule.rs index 16d6dcbe7..856bb0db0 100644 --- a/ledger/src/leader_schedule.rs +++ b/ledger/src/leader_schedule.rs @@ -10,7 +10,6 @@ use { #[derive(Clone, Debug)] pub struct FixedSchedule { pub leader_schedule: Arc, - pub start_epoch: u64, } /// Stake-weighted leader schedule for one epoch. diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 4560ab7cb..accadce4a 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -165,9 +165,7 @@ impl LeaderScheduleCache { fn slot_leader_at_no_compute(&self, slot: Slot) -> Option { let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); if let Some(ref fixed_schedule) = self.fixed_schedule { - if epoch >= fixed_schedule.start_epoch { - return Some(fixed_schedule.leader_schedule[slot_index]); - } + return Some(fixed_schedule.leader_schedule[slot_index]); } self.cached_schedules .read() @@ -207,9 +205,7 @@ impl LeaderScheduleCache { bank: &Bank, ) -> Option> { if let Some(ref fixed_schedule) = self.fixed_schedule { - if epoch >= fixed_schedule.start_epoch { - return Some(fixed_schedule.leader_schedule.clone()); - } + return Some(fixed_schedule.leader_schedule.clone()); } let epoch_schedule = self.get_epoch_leader_schedule(epoch); if epoch_schedule.is_some() { diff --git a/local-cluster/src/validator_configs.rs b/local-cluster/src/validator_configs.rs index abed01ca8..4cad12dfc 100644 --- a/local-cluster/src/validator_configs.rs +++ b/local-cluster/src/validator_configs.rs @@ -20,7 +20,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig { snapshot_config: config.snapshot_config.clone(), max_ledger_shreds: config.max_ledger_shreds, broadcast_stage_type: config.broadcast_stage_type.clone(), - enable_partition: config.enable_partition.clone(), + turbine_disabled: config.turbine_disabled.clone(), enforce_ulimit_nofile: config.enforce_ulimit_nofile, fixed_leader_schedule: config.fixed_leader_schedule.clone(), wait_for_supermajority: config.wait_for_supermajority, diff --git a/local-cluster/tests/common.rs b/local-cluster/tests/common.rs index b16447ef8..c8c8d39d6 100644 --- a/local-cluster/tests/common.rs +++ b/local-cluster/tests/common.rs @@ -130,9 +130,8 @@ pub fn ms_for_n_slots(num_blocks: u64, ticks_per_slot: u64) -> u64 { #[allow(clippy::assertions_on_constants)] pub fn run_kill_partition_switch_threshold( - stakes_to_kill: &[&[(usize, usize)]], - alive_stakes: &[&[(usize, usize)]], - partition_duration: Option, + stakes_to_kill: &[(usize, usize)], + alive_stakes: &[(usize, usize)], ticks_per_slot: Option, partition_context: C, on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], Vec, &mut C), @@ -151,20 +150,15 @@ pub fn run_kill_partition_switch_threshold( // 1) Spins up three partitions // 2) Kills the first partition with the stake `failures_stake` // 5) runs `on_partition_resolved` - let partitions: Vec<&[(usize, usize)]> = stakes_to_kill + let partitions: Vec<(usize, usize)> = stakes_to_kill .iter() .cloned() .chain(alive_stakes.iter().cloned()) .collect(); - let stake_partitions: Vec> = partitions - .iter() - .map(|stakes_and_slots| stakes_and_slots.iter().map(|(stake, _)| *stake).collect()) - .collect(); - let num_slots_per_validator: Vec = partitions - .iter() - .flat_map(|stakes_and_slots| stakes_and_slots.iter().map(|(_, num_slots)| *num_slots)) - .collect(); + let stake_partitions: Vec = partitions.iter().map(|(stake, _)| *stake).collect(); + let num_slots_per_validator: Vec = + partitions.iter().map(|(_, num_slots)| *num_slots).collect(); let (leader_schedule, validator_keys) = create_custom_leader_schedule_with_random_keys(&num_slots_per_validator); @@ -200,7 +194,6 @@ pub fn run_kill_partition_switch_threshold( on_partition_start, on_before_partition_resolved, on_partition_resolved, - partition_duration, ticks_per_slot, vec![], ) @@ -240,19 +233,17 @@ pub fn create_custom_leader_schedule_with_random_keys( /// continues to achieve consensus /// # Arguments /// * `partitions` - A slice of partition configurations, where each partition -/// configuration is a slice of (usize, bool), representing a node's stake and -/// whether or not it should be killed during the partition +/// configuration is a usize representing a node's stake /// * `leader_schedule` - An option that specifies whether the cluster should /// run with a fixed, predetermined leader schedule #[allow(clippy::cognitive_complexity)] pub fn run_cluster_partition( - partitions: &[Vec], + partitions: &[usize], leader_schedule: Option<(LeaderSchedule, Vec>)>, mut context: C, on_partition_start: impl FnOnce(&mut LocalCluster, &mut C), on_before_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), on_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C), - partition_duration: Option, ticks_per_slot: Option, additional_accounts: Vec<(Pubkey, AccountSharedData)>, ) { @@ -261,39 +252,35 @@ pub fn run_cluster_partition( let num_nodes = partitions.len(); let node_stakes: Vec<_> = partitions .iter() - .flat_map(|p| p.iter().map(|stake_weight| 100 * *stake_weight as u64)) + .map(|stake_weight| 100 * *stake_weight as u64) .collect(); assert_eq!(node_stakes.len(), num_nodes); let cluster_lamports = node_stakes.iter().sum::() * 2; - let enable_partition = Arc::new(AtomicBool::new(true)); + let turbine_disabled = Arc::new(AtomicBool::new(false)); let mut validator_config = ValidatorConfig { - enable_partition: Some(enable_partition.clone()), + turbine_disabled: Some(turbine_disabled.clone()), ..ValidatorConfig::default_for_test() }; - // Returns: - // 1) The keys for the validators - // 2) The amount of time it would take to iterate through one full iteration of the given - // leader schedule - let (validator_keys, leader_schedule_time): (Vec<_>, u64) = { + let (validator_keys, partition_duration): (Vec<_>, Duration) = { if let Some((leader_schedule, validator_keys)) = leader_schedule { assert_eq!(validator_keys.len(), num_nodes); let num_slots_per_rotation = leader_schedule.num_slots() as u64; let fixed_schedule = FixedSchedule { - start_epoch: 0, leader_schedule: Arc::new(leader_schedule), }; validator_config.fixed_leader_schedule = Some(fixed_schedule); ( validator_keys, - num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT, + // partition for the duration of one full iteration of the leader schedule + Duration::from_millis(num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT), ) } else { ( iter::repeat_with(|| Arc::new(Keypair::new())) .take(partitions.len()) .collect(), - 10_000, + Duration::from_secs(10), ) } }; @@ -349,30 +336,28 @@ pub fn run_cluster_partition( info!("PARTITION_TEST start partition"); on_partition_start(&mut cluster, &mut context); - enable_partition.store(false, Ordering::Relaxed); + turbine_disabled.store(true, Ordering::Relaxed); - sleep(Duration::from_millis( - partition_duration.unwrap_or(leader_schedule_time), - )); + sleep(partition_duration); on_before_partition_resolved(&mut cluster, &mut context); info!("PARTITION_TEST remove partition"); - enable_partition.store(true, Ordering::Relaxed); + turbine_disabled.store(false, Ordering::Relaxed); // Give partitions time to propagate their blocks from during the partition // after the partition resolves - let timeout = 10_000; - let propagation_time = leader_schedule_time; + let timeout_duration = Duration::from_secs(10); + let propagation_duration = partition_duration; info!( "PARTITION_TEST resolving partition. sleeping {} ms", - timeout + timeout_duration.as_millis() ); - sleep(Duration::from_millis(timeout)); + sleep(timeout_duration); info!( "PARTITION_TEST waiting for blocks to propagate after partition {}ms", - propagation_time + propagation_duration.as_millis() ); - sleep(Duration::from_millis(propagation_time)); + sleep(propagation_duration); info!("PARTITION_TEST resuming normal operation"); on_partition_resolved(&mut cluster, &mut context); } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 80f60d192..f580cfead 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -319,28 +319,6 @@ fn test_leader_failure_4() { ); } -#[allow(unused_attributes)] -#[ignore] -#[test] -#[serial] -fn test_cluster_partition_1_2() { - let empty = |_: &mut LocalCluster, _: &mut ()| {}; - let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { - cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); - }; - run_cluster_partition( - &[vec![1], vec![1, 1]], - None, - (), - empty, - empty, - on_partition_resolved, - None, - None, - vec![], - ) -} - #[test] #[serial] fn test_cluster_partition_1_1() { @@ -349,14 +327,13 @@ fn test_cluster_partition_1_1() { cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); }; run_cluster_partition( - &[vec![1], vec![1]], + &[1, 1], None, (), empty, empty, on_partition_resolved, None, - None, vec![], ) } @@ -369,14 +346,13 @@ fn test_cluster_partition_1_1_1() { cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); }; run_cluster_partition( - &[vec![1], vec![1], vec![1]], + &[1, 1, 1], None, (), empty, empty, on_partition_resolved, None, - None, vec![], ) } @@ -2506,7 +2482,7 @@ fn test_run_test_load_program_accounts_partition_root() { fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { let num_slots_per_validator = 8; - let partitions: [Vec; 2] = [vec![1], vec![1]]; + let partitions: [usize; 2] = [1, 1]; let (leader_schedule, validator_keys) = create_custom_leader_schedule_with_random_keys(&[ num_slots_per_validator, num_slots_per_validator, @@ -2556,7 +2532,6 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) { on_partition_before_resolved, on_partition_resolved, None, - None, additional_accounts, ); } @@ -2572,10 +2547,8 @@ fn test_votes_land_in_fork_during_long_partition() { // Give lighter stake 30 consecutive slots before // the heavier stake gets a single slot - let partitions: &[&[(usize, usize)]] = &[ - &[(heavier_stake as usize, 1)], - &[(lighter_stake as usize, 30)], - ]; + let partitions: &[(usize, usize)] = + &[(heavier_stake as usize, 1), (lighter_stake as usize, 30)]; #[derive(Default)] struct PartitionContext { @@ -2651,10 +2624,9 @@ fn test_votes_land_in_fork_during_long_partition() { }; run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 0)]], + &[(failures_stake as usize, 0)], partitions, None, - None, PartitionContext::default(), on_partition_start, on_before_partition_resolved, diff --git a/local-cluster/tests/local_cluster_flakey.rs b/local-cluster/tests/local_cluster_flakey.rs index f38cc943a..b78cd464e 100644 --- a/local-cluster/tests/local_cluster_flakey.rs +++ b/local-cluster/tests/local_cluster_flakey.rs @@ -147,7 +147,6 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b } default_config.fixed_leader_schedule = Some(FixedSchedule { - start_epoch: 0, leader_schedule: Arc::new(leader_schedule), }); let mut validator_configs = diff --git a/local-cluster/tests/local_cluster_slow.rs b/local-cluster/tests/local_cluster_slow.rs index e58637109..70e21c7b5 100644 --- a/local-cluster/tests/local_cluster_slow.rs +++ b/local-cluster/tests/local_cluster_slow.rs @@ -98,10 +98,10 @@ fn test_fork_choice_refresh_old_votes() { assert!(alive_stake_1 < alive_stake_2); assert!(alive_stake_1 + alive_stake_3 > alive_stake_2); - let partitions: &[&[(usize, usize)]] = &[ - &[(alive_stake_1 as usize, 8)], - &[(alive_stake_2 as usize, 8)], - &[(alive_stake_3 as usize, 0)], + let partitions: &[(usize, usize)] = &[ + (alive_stake_1 as usize, 8), + (alive_stake_2 as usize, 8), + (alive_stake_3 as usize, 0), ]; #[derive(Default)] @@ -254,11 +254,8 @@ fn test_fork_choice_refresh_old_votes() { }; run_kill_partition_switch_threshold( - &[&[(failures_stake as usize - 1, 16)]], + &[(failures_stake as usize - 1, 16)], partitions, - // Partition long enough such that the first vote made by validator with - // `alive_stake_3` won't be ingested due to BlockhashTooOld, - None, Some(ticks_per_slot), PartitionContext::default(), on_partition_start, @@ -279,7 +276,7 @@ fn test_kill_heaviest_partition() { // eventually choose the major partition // 4) Check for recovery let num_slots_per_validator = 8; - let partitions: [Vec; 4] = [vec![11], vec![10], vec![10], vec![10]]; + let partitions: [usize; 4] = [11, 10, 10, 10]; let (leader_schedule, validator_keys) = create_custom_leader_schedule_with_random_keys(&[ num_slots_per_validator * (partitions.len() - 1), num_slots_per_validator, @@ -302,7 +299,6 @@ fn test_kill_heaviest_partition() { empty, on_partition_resolved, None, - None, vec![], ) } @@ -331,12 +327,8 @@ fn test_kill_partition_switch_threshold_no_progress() { // This kills `max_failures_stake`, so no progress should be made run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 16)]], - &[ - &[(alive_stake_1 as usize, 8)], - &[(alive_stake_2 as usize, 8)], - ], - None, + &[(failures_stake as usize, 16)], + &[(alive_stake_1 as usize, 8), (alive_stake_2 as usize, 8)], None, (), on_partition_start, @@ -382,12 +374,8 @@ fn test_kill_partition_switch_threshold_progress() { cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified); }; run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 16)]], - &[ - &[(alive_stake_1 as usize, 8)], - &[(alive_stake_2 as usize, 8)], - ], - None, + &[(failures_stake as usize, 16)], + &[(alive_stake_1 as usize, 8), (alive_stake_2 as usize, 8)], None, (), on_partition_start, @@ -612,10 +600,7 @@ fn test_switch_threshold_uses_gossip_votes() { let lighter_stake = heavier_stake - 1; let failures_stake = total_stake - heavier_stake - lighter_stake; - let partitions: &[&[(usize, usize)]] = &[ - &[(heavier_stake as usize, 8)], - &[(lighter_stake as usize, 8)], - ]; + let partitions: &[(usize, usize)] = &[(heavier_stake as usize, 8), (lighter_stake as usize, 8)]; #[derive(Default)] struct PartitionContext { @@ -825,11 +810,8 @@ fn test_switch_threshold_uses_gossip_votes() { let ticks_per_slot = 8; run_kill_partition_switch_threshold( - &[&[(failures_stake as usize, 0)]], + &[(failures_stake as usize, 0)], partitions, - // Partition long enough such that the first vote made by validator with - // `alive_stake_3` won't be ingested due to BlockhashTooOld, - None, Some(ticks_per_slot), PartitionContext::default(), on_partition_start,