Local cluster test cleanup and refactoring (#24559)

* remove FixedSchedule.start_epoch

* use duration for timing

* Rename to partition bool to turbine_disabled

* simplify partition config
This commit is contained in:
Justin Starry 2022-04-22 12:14:07 +08:00 committed by GitHub
parent 016c11b978
commit c544742091
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 54 additions and 122 deletions

View File

@ -440,7 +440,7 @@ impl RetransmitStage {
exit: Arc<AtomicBool>,
cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
epoch_schedule: EpochSchedule,
cfg: Option<Arc<AtomicBool>>,
turbine_disabled: Option<Arc<AtomicBool>>,
shred_version: u16,
cluster_slots: Arc<ClusterSlots>,
duplicate_slots_reset_sender: DuplicateSlotsResetSender,
@ -492,10 +492,10 @@ impl RetransmitStage {
repair_info,
leader_schedule_cache,
move |id, shred, working_bank, last_root| {
let is_connected = cfg
let turbine_disabled = turbine_disabled
.as_ref()
.map(|x| x.load(Ordering::Relaxed))
.unwrap_or(true);
.unwrap_or(false);
let rv = should_retransmit_and_persist(
shred,
working_bank,
@ -504,7 +504,7 @@ impl RetransmitStage {
last_root,
shred_version,
);
rv && is_connected
rv && !turbine_disabled
},
verified_vote_receiver,
completed_data_sets_sender,

View File

@ -114,7 +114,7 @@ impl Tvu {
leader_schedule_cache: &Arc<LeaderScheduleCache>,
exit: &Arc<AtomicBool>,
block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
cfg: Option<Arc<AtomicBool>>,
turbine_disabled: Option<Arc<AtomicBool>>,
transaction_status_sender: Option<TransactionStatusSender>,
rewards_recorder_sender: Option<RewardsRecorderSender>,
cache_block_meta_sender: Option<CacheBlockMetaSender>,
@ -182,7 +182,7 @@ impl Tvu {
exit.clone(),
cluster_slots_update_receiver,
*bank_forks.read().unwrap().working_bank().epoch_schedule(),
cfg,
turbine_disabled,
tvu_config.shred_version,
cluster_slots.clone(),
duplicate_slots_reset_sender,

View File

@ -130,7 +130,7 @@ pub struct ValidatorConfig {
pub snapshot_config: Option<SnapshotConfig>,
pub max_ledger_shreds: Option<u64>,
pub broadcast_stage_type: BroadcastStageType,
pub enable_partition: Option<Arc<AtomicBool>>,
pub turbine_disabled: Option<Arc<AtomicBool>>,
pub enforce_ulimit_nofile: bool,
pub fixed_leader_schedule: Option<FixedSchedule>,
pub wait_for_supermajority: Option<Slot>,
@ -190,7 +190,7 @@ impl Default for ValidatorConfig {
pubsub_config: PubSubConfig::default(),
snapshot_config: None,
broadcast_stage_type: BroadcastStageType::Standard,
enable_partition: None,
turbine_disabled: None,
enforce_ulimit_nofile: true,
fixed_leader_schedule: None,
wait_for_supermajority: None,
@ -966,7 +966,7 @@ impl Validator {
&leader_schedule_cache,
&exit,
block_commitment_cache,
config.enable_partition.clone(),
config.turbine_disabled.clone(),
transaction_status_sender.clone(),
rewards_recorder_sender,
cache_block_meta_sender,

View File

@ -8529,7 +8529,6 @@ pub mod tests {
leader_schedule: Arc::new(LeaderSchedule::new_from_schedule(vec![
leader_keypair.pubkey()
])),
start_epoch: 0,
};
leader_schedule_cache.set_fixed_leader_schedule(Some(fixed_schedule));

View File

@ -10,7 +10,6 @@ use {
#[derive(Clone, Debug)]
pub struct FixedSchedule {
pub leader_schedule: Arc<LeaderSchedule>,
pub start_epoch: u64,
}
/// Stake-weighted leader schedule for one epoch.

View File

@ -165,9 +165,7 @@ impl LeaderScheduleCache {
fn slot_leader_at_no_compute(&self, slot: Slot) -> Option<Pubkey> {
let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot);
if let Some(ref fixed_schedule) = self.fixed_schedule {
if epoch >= fixed_schedule.start_epoch {
return Some(fixed_schedule.leader_schedule[slot_index]);
}
return Some(fixed_schedule.leader_schedule[slot_index]);
}
self.cached_schedules
.read()
@ -207,9 +205,7 @@ impl LeaderScheduleCache {
bank: &Bank,
) -> Option<Arc<LeaderSchedule>> {
if let Some(ref fixed_schedule) = self.fixed_schedule {
if epoch >= fixed_schedule.start_epoch {
return Some(fixed_schedule.leader_schedule.clone());
}
return Some(fixed_schedule.leader_schedule.clone());
}
let epoch_schedule = self.get_epoch_leader_schedule(epoch);
if epoch_schedule.is_some() {

View File

@ -20,7 +20,7 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
snapshot_config: config.snapshot_config.clone(),
max_ledger_shreds: config.max_ledger_shreds,
broadcast_stage_type: config.broadcast_stage_type.clone(),
enable_partition: config.enable_partition.clone(),
turbine_disabled: config.turbine_disabled.clone(),
enforce_ulimit_nofile: config.enforce_ulimit_nofile,
fixed_leader_schedule: config.fixed_leader_schedule.clone(),
wait_for_supermajority: config.wait_for_supermajority,

View File

@ -130,9 +130,8 @@ pub fn ms_for_n_slots(num_blocks: u64, ticks_per_slot: u64) -> u64 {
#[allow(clippy::assertions_on_constants)]
pub fn run_kill_partition_switch_threshold<C>(
stakes_to_kill: &[&[(usize, usize)]],
alive_stakes: &[&[(usize, usize)]],
partition_duration: Option<u64>,
stakes_to_kill: &[(usize, usize)],
alive_stakes: &[(usize, usize)],
ticks_per_slot: Option<u64>,
partition_context: C,
on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], Vec<ClusterValidatorInfo>, &mut C),
@ -151,20 +150,15 @@ pub fn run_kill_partition_switch_threshold<C>(
// 1) Spins up three partitions
// 2) Kills the first partition with the stake `failures_stake`
// 5) runs `on_partition_resolved`
let partitions: Vec<&[(usize, usize)]> = stakes_to_kill
let partitions: Vec<(usize, usize)> = stakes_to_kill
.iter()
.cloned()
.chain(alive_stakes.iter().cloned())
.collect();
let stake_partitions: Vec<Vec<usize>> = partitions
.iter()
.map(|stakes_and_slots| stakes_and_slots.iter().map(|(stake, _)| *stake).collect())
.collect();
let num_slots_per_validator: Vec<usize> = partitions
.iter()
.flat_map(|stakes_and_slots| stakes_and_slots.iter().map(|(_, num_slots)| *num_slots))
.collect();
let stake_partitions: Vec<usize> = partitions.iter().map(|(stake, _)| *stake).collect();
let num_slots_per_validator: Vec<usize> =
partitions.iter().map(|(_, num_slots)| *num_slots).collect();
let (leader_schedule, validator_keys) =
create_custom_leader_schedule_with_random_keys(&num_slots_per_validator);
@ -200,7 +194,6 @@ pub fn run_kill_partition_switch_threshold<C>(
on_partition_start,
on_before_partition_resolved,
on_partition_resolved,
partition_duration,
ticks_per_slot,
vec![],
)
@ -240,19 +233,17 @@ pub fn create_custom_leader_schedule_with_random_keys(
/// continues to achieve consensus
/// # Arguments
/// * `partitions` - A slice of partition configurations, where each partition
/// configuration is a slice of (usize, bool), representing a node's stake and
/// whether or not it should be killed during the partition
/// configuration is a usize representing a node's stake
/// * `leader_schedule` - An option that specifies whether the cluster should
/// run with a fixed, predetermined leader schedule
#[allow(clippy::cognitive_complexity)]
pub fn run_cluster_partition<C>(
partitions: &[Vec<usize>],
partitions: &[usize],
leader_schedule: Option<(LeaderSchedule, Vec<Arc<Keypair>>)>,
mut context: C,
on_partition_start: impl FnOnce(&mut LocalCluster, &mut C),
on_before_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C),
on_partition_resolved: impl FnOnce(&mut LocalCluster, &mut C),
partition_duration: Option<u64>,
ticks_per_slot: Option<u64>,
additional_accounts: Vec<(Pubkey, AccountSharedData)>,
) {
@ -261,39 +252,35 @@ pub fn run_cluster_partition<C>(
let num_nodes = partitions.len();
let node_stakes: Vec<_> = partitions
.iter()
.flat_map(|p| p.iter().map(|stake_weight| 100 * *stake_weight as u64))
.map(|stake_weight| 100 * *stake_weight as u64)
.collect();
assert_eq!(node_stakes.len(), num_nodes);
let cluster_lamports = node_stakes.iter().sum::<u64>() * 2;
let enable_partition = Arc::new(AtomicBool::new(true));
let turbine_disabled = Arc::new(AtomicBool::new(false));
let mut validator_config = ValidatorConfig {
enable_partition: Some(enable_partition.clone()),
turbine_disabled: Some(turbine_disabled.clone()),
..ValidatorConfig::default_for_test()
};
// Returns:
// 1) The keys for the validators
// 2) The amount of time it would take to iterate through one full iteration of the given
// leader schedule
let (validator_keys, leader_schedule_time): (Vec<_>, u64) = {
let (validator_keys, partition_duration): (Vec<_>, Duration) = {
if let Some((leader_schedule, validator_keys)) = leader_schedule {
assert_eq!(validator_keys.len(), num_nodes);
let num_slots_per_rotation = leader_schedule.num_slots() as u64;
let fixed_schedule = FixedSchedule {
start_epoch: 0,
leader_schedule: Arc::new(leader_schedule),
};
validator_config.fixed_leader_schedule = Some(fixed_schedule);
(
validator_keys,
num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT,
// partition for the duration of one full iteration of the leader schedule
Duration::from_millis(num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT),
)
} else {
(
iter::repeat_with(|| Arc::new(Keypair::new()))
.take(partitions.len())
.collect(),
10_000,
Duration::from_secs(10),
)
}
};
@ -349,30 +336,28 @@ pub fn run_cluster_partition<C>(
info!("PARTITION_TEST start partition");
on_partition_start(&mut cluster, &mut context);
enable_partition.store(false, Ordering::Relaxed);
turbine_disabled.store(true, Ordering::Relaxed);
sleep(Duration::from_millis(
partition_duration.unwrap_or(leader_schedule_time),
));
sleep(partition_duration);
on_before_partition_resolved(&mut cluster, &mut context);
info!("PARTITION_TEST remove partition");
enable_partition.store(true, Ordering::Relaxed);
turbine_disabled.store(false, Ordering::Relaxed);
// Give partitions time to propagate their blocks from during the partition
// after the partition resolves
let timeout = 10_000;
let propagation_time = leader_schedule_time;
let timeout_duration = Duration::from_secs(10);
let propagation_duration = partition_duration;
info!(
"PARTITION_TEST resolving partition. sleeping {} ms",
timeout
timeout_duration.as_millis()
);
sleep(Duration::from_millis(timeout));
sleep(timeout_duration);
info!(
"PARTITION_TEST waiting for blocks to propagate after partition {}ms",
propagation_time
propagation_duration.as_millis()
);
sleep(Duration::from_millis(propagation_time));
sleep(propagation_duration);
info!("PARTITION_TEST resuming normal operation");
on_partition_resolved(&mut cluster, &mut context);
}

View File

@ -319,28 +319,6 @@ fn test_leader_failure_4() {
);
}
#[allow(unused_attributes)]
#[ignore]
#[test]
#[serial]
fn test_cluster_partition_1_2() {
let empty = |_: &mut LocalCluster, _: &mut ()| {};
let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| {
cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified);
};
run_cluster_partition(
&[vec![1], vec![1, 1]],
None,
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
#[test]
#[serial]
fn test_cluster_partition_1_1() {
@ -349,14 +327,13 @@ fn test_cluster_partition_1_1() {
cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified);
};
run_cluster_partition(
&[vec![1], vec![1]],
&[1, 1],
None,
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
@ -369,14 +346,13 @@ fn test_cluster_partition_1_1_1() {
cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified);
};
run_cluster_partition(
&[vec![1], vec![1], vec![1]],
&[1, 1, 1],
None,
(),
empty,
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
@ -2506,7 +2482,7 @@ fn test_run_test_load_program_accounts_partition_root() {
fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
let num_slots_per_validator = 8;
let partitions: [Vec<usize>; 2] = [vec![1], vec![1]];
let partitions: [usize; 2] = [1, 1];
let (leader_schedule, validator_keys) = create_custom_leader_schedule_with_random_keys(&[
num_slots_per_validator,
num_slots_per_validator,
@ -2556,7 +2532,6 @@ fn run_test_load_program_accounts_partition(scan_commitment: CommitmentConfig) {
on_partition_before_resolved,
on_partition_resolved,
None,
None,
additional_accounts,
);
}
@ -2572,10 +2547,8 @@ fn test_votes_land_in_fork_during_long_partition() {
// Give lighter stake 30 consecutive slots before
// the heavier stake gets a single slot
let partitions: &[&[(usize, usize)]] = &[
&[(heavier_stake as usize, 1)],
&[(lighter_stake as usize, 30)],
];
let partitions: &[(usize, usize)] =
&[(heavier_stake as usize, 1), (lighter_stake as usize, 30)];
#[derive(Default)]
struct PartitionContext {
@ -2651,10 +2624,9 @@ fn test_votes_land_in_fork_during_long_partition() {
};
run_kill_partition_switch_threshold(
&[&[(failures_stake as usize, 0)]],
&[(failures_stake as usize, 0)],
partitions,
None,
None,
PartitionContext::default(),
on_partition_start,
on_before_partition_resolved,

View File

@ -147,7 +147,6 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b
}
default_config.fixed_leader_schedule = Some(FixedSchedule {
start_epoch: 0,
leader_schedule: Arc::new(leader_schedule),
});
let mut validator_configs =

View File

@ -98,10 +98,10 @@ fn test_fork_choice_refresh_old_votes() {
assert!(alive_stake_1 < alive_stake_2);
assert!(alive_stake_1 + alive_stake_3 > alive_stake_2);
let partitions: &[&[(usize, usize)]] = &[
&[(alive_stake_1 as usize, 8)],
&[(alive_stake_2 as usize, 8)],
&[(alive_stake_3 as usize, 0)],
let partitions: &[(usize, usize)] = &[
(alive_stake_1 as usize, 8),
(alive_stake_2 as usize, 8),
(alive_stake_3 as usize, 0),
];
#[derive(Default)]
@ -254,11 +254,8 @@ fn test_fork_choice_refresh_old_votes() {
};
run_kill_partition_switch_threshold(
&[&[(failures_stake as usize - 1, 16)]],
&[(failures_stake as usize - 1, 16)],
partitions,
// Partition long enough such that the first vote made by validator with
// `alive_stake_3` won't be ingested due to BlockhashTooOld,
None,
Some(ticks_per_slot),
PartitionContext::default(),
on_partition_start,
@ -279,7 +276,7 @@ fn test_kill_heaviest_partition() {
// eventually choose the major partition
// 4) Check for recovery
let num_slots_per_validator = 8;
let partitions: [Vec<usize>; 4] = [vec![11], vec![10], vec![10], vec![10]];
let partitions: [usize; 4] = [11, 10, 10, 10];
let (leader_schedule, validator_keys) = create_custom_leader_schedule_with_random_keys(&[
num_slots_per_validator * (partitions.len() - 1),
num_slots_per_validator,
@ -302,7 +299,6 @@ fn test_kill_heaviest_partition() {
empty,
on_partition_resolved,
None,
None,
vec![],
)
}
@ -331,12 +327,8 @@ fn test_kill_partition_switch_threshold_no_progress() {
// This kills `max_failures_stake`, so no progress should be made
run_kill_partition_switch_threshold(
&[&[(failures_stake as usize, 16)]],
&[
&[(alive_stake_1 as usize, 8)],
&[(alive_stake_2 as usize, 8)],
],
None,
&[(failures_stake as usize, 16)],
&[(alive_stake_1 as usize, 8), (alive_stake_2 as usize, 8)],
None,
(),
on_partition_start,
@ -382,12 +374,8 @@ fn test_kill_partition_switch_threshold_progress() {
cluster.check_for_new_roots(16, "PARTITION_TEST", SocketAddrSpace::Unspecified);
};
run_kill_partition_switch_threshold(
&[&[(failures_stake as usize, 16)]],
&[
&[(alive_stake_1 as usize, 8)],
&[(alive_stake_2 as usize, 8)],
],
None,
&[(failures_stake as usize, 16)],
&[(alive_stake_1 as usize, 8), (alive_stake_2 as usize, 8)],
None,
(),
on_partition_start,
@ -612,10 +600,7 @@ fn test_switch_threshold_uses_gossip_votes() {
let lighter_stake = heavier_stake - 1;
let failures_stake = total_stake - heavier_stake - lighter_stake;
let partitions: &[&[(usize, usize)]] = &[
&[(heavier_stake as usize, 8)],
&[(lighter_stake as usize, 8)],
];
let partitions: &[(usize, usize)] = &[(heavier_stake as usize, 8), (lighter_stake as usize, 8)];
#[derive(Default)]
struct PartitionContext {
@ -825,11 +810,8 @@ fn test_switch_threshold_uses_gossip_votes() {
let ticks_per_slot = 8;
run_kill_partition_switch_threshold(
&[&[(failures_stake as usize, 0)]],
&[(failures_stake as usize, 0)],
partitions,
// Partition long enough such that the first vote made by validator with
// `alive_stake_3` won't be ingested due to BlockhashTooOld,
None,
Some(ticks_per_slot),
PartitionContext::default(),
on_partition_start,