diff --git a/Cargo.lock b/Cargo.lock index 701860555..3cc8700a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3631,6 +3631,7 @@ dependencies = [ name = "solana-local-cluster" version = "0.22.0" dependencies = [ + "itertools 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serial_test 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 34d81a456..ce9cf6601 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1033,7 +1033,7 @@ impl ReplayStage { ); datapoint_error!( - "replay-stage-entry_verification_failure", + "replay-stage-block-error", ("slot", bank.slot(), i64), ("last_entry", last_entry.to_string(), String), ); diff --git a/core/src/validator.rs b/core/src/validator.rs index 3d24df448..7eda71f3c 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -26,6 +26,7 @@ use solana_ledger::{ blocktree::{Blocktree, CompletedSlotsReceiver}, blocktree_processor::{self, BankForksInfo}, create_new_tmp_ledger, + leader_schedule::FixedSchedule, leader_schedule_cache::LeaderScheduleCache, }; use solana_metrics::datapoint_info; @@ -65,6 +66,7 @@ pub struct ValidatorConfig { pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, pub partition_cfg: Option, + pub fixed_leader_schedule: Option, } impl Default for ValidatorConfig { @@ -83,6 +85,7 @@ impl Default for ValidatorConfig { snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, partition_cfg: None, + fixed_leader_schedule: None, } } } @@ -182,6 +185,7 @@ impl Validator { config.snapshot_config.clone(), poh_verify, config.dev_halt_at_slot, + config.fixed_leader_schedule.clone(), ); let leader_schedule_cache = Arc::new(leader_schedule_cache); @@ -469,6 +473,7 @@ pub fn new_banks_from_blocktree( snapshot_config: Option, poh_verify: bool, dev_halt_at_slot: Option, + fixed_leader_schedule: Option, ) -> ( Hash, BankForks, @@ -506,7 +511,7 @@ pub fn new_banks_from_blocktree( ..blocktree_processor::ProcessOptions::default() }; - let (mut bank_forks, bank_forks_info, leader_schedule_cache) = bank_forks_utils::load( + let (mut bank_forks, bank_forks_info, mut leader_schedule_cache) = bank_forks_utils::load( &genesis_config, &blocktree, account_paths, @@ -518,6 +523,8 @@ pub fn new_banks_from_blocktree( std::process::exit(1); }); + leader_schedule_cache.set_fixed_leader_schedule(fixed_leader_schedule); + bank_forks.set_snapshot_config(snapshot_config); ( diff --git a/ledger/src/leader_schedule.rs b/ledger/src/leader_schedule.rs index 572c84a79..312aac531 100644 --- a/ledger/src/leader_schedule.rs +++ b/ledger/src/leader_schedule.rs @@ -3,6 +3,14 @@ use rand::SeedableRng; use rand_chacha::ChaChaRng; use solana_sdk::pubkey::Pubkey; use std::ops::Index; +use std::sync::Arc; + +// Used for testing +#[derive(Clone, Debug)] +pub struct FixedSchedule { + pub leader_schedule: Arc, + pub start_epoch: u64, +} /// Stake-weighted leader schedule for one epoch. #[derive(Debug, Default, PartialEq)] @@ -30,9 +38,17 @@ impl LeaderSchedule { Self { slot_leaders } } + pub fn new_from_schedule(slot_leaders: Vec) -> Self { + Self { slot_leaders } + } + pub fn get_slot_leaders(&self) -> &[Pubkey] { &self.slot_leaders } + + pub fn num_slots(&self) -> usize { + self.slot_leaders.len() + } } impl Index for LeaderSchedule { diff --git a/ledger/src/leader_schedule_cache.rs b/ledger/src/leader_schedule_cache.rs index 33e7e2d87..e5b6a0567 100644 --- a/ledger/src/leader_schedule_cache.rs +++ b/ledger/src/leader_schedule_cache.rs @@ -1,4 +1,8 @@ -use crate::{blocktree::Blocktree, leader_schedule::LeaderSchedule, leader_schedule_utils}; +use crate::{ + blocktree::Blocktree, + leader_schedule::{FixedSchedule, LeaderSchedule}, + leader_schedule_utils, +}; use log::*; use solana_runtime::bank::Bank; use solana_sdk::{ @@ -28,6 +32,7 @@ pub struct LeaderScheduleCache { epoch_schedule: EpochSchedule, max_epoch: RwLock, max_schedules: CacheCapacity, + fixed_schedule: Option>, } impl LeaderScheduleCache { @@ -41,6 +46,7 @@ impl LeaderScheduleCache { epoch_schedule, max_epoch: RwLock::new(0), max_schedules: CacheCapacity::default(), + fixed_schedule: None, }; // This sets the root and calculates the schedule at leader_schedule_epoch(root) @@ -153,8 +159,17 @@ impl LeaderScheduleCache { first_slot.map(|slot| (slot, last_slot)) } + pub fn set_fixed_leader_schedule(&mut self, fixed_schedule: Option) { + self.fixed_schedule = fixed_schedule.map(Arc::new); + } + fn slot_leader_at_no_compute(&self, slot: Slot) -> Option { let (epoch, slot_index) = self.epoch_schedule.get_epoch_and_slot_index(slot); + if let Some(ref fixed_schedule) = self.fixed_schedule { + if epoch >= fixed_schedule.start_epoch { + return Some(fixed_schedule.leader_schedule[slot_index]); + } + } self.cached_schedules .read() .unwrap() @@ -191,6 +206,11 @@ impl LeaderScheduleCache { epoch: Epoch, bank: &Bank, ) -> Option> { + if let Some(ref fixed_schedule) = self.fixed_schedule { + if epoch >= fixed_schedule.start_epoch { + return Some(fixed_schedule.leader_schedule.clone()); + } + } let epoch_schedule = self.cached_schedules.read().unwrap().0.get(&epoch).cloned(); if epoch_schedule.is_some() { diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index 3760a5a25..5e31bda23 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -9,6 +9,7 @@ license = "Apache-2.0" homepage = "https://solana.com/" [dependencies] +itertools = "0.8.1" log = "0.4.8" rand = "0.6.5" solana-config-program = { path = "../programs/config", version = "0.22.0" } diff --git a/local-cluster/src/cluster.rs b/local-cluster/src/cluster.rs index 8078617cc..beb47dda5 100644 --- a/local-cluster/src/cluster.rs +++ b/local-cluster/src/cluster.rs @@ -1,5 +1,6 @@ use solana_client::thin_client::ThinClient; use solana_core::contact_info::ContactInfo; +use solana_core::validator::Validator; use solana_core::validator::ValidatorConfig; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Keypair; @@ -17,13 +18,19 @@ pub struct ValidatorInfo { pub struct ClusterValidatorInfo { pub info: ValidatorInfo, pub config: ValidatorConfig, + pub validator: Option, } impl ClusterValidatorInfo { - pub fn new(validator_info: ValidatorInfo, config: ValidatorConfig) -> Self { + pub fn new( + validator_info: ValidatorInfo, + config: ValidatorConfig, + validator: Validator, + ) -> Self { Self { info: validator_info, config, + validator: Some(validator), } } } diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index a80bee220..7ad416271 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -15,9 +15,12 @@ use solana_ledger::{ }; use solana_sdk::{ client::SyncClient, - clock::{Slot, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, NUM_CONSECUTIVE_LEADER_SLOTS}, + clock::{ + Slot, DEFAULT_MS_PER_SLOT, DEFAULT_TICKS_PER_SECOND, DEFAULT_TICKS_PER_SLOT, + NUM_CONSECUTIVE_LEADER_SLOTS, + }, commitment_config::CommitmentConfig, - epoch_schedule::MINIMUM_SLOTS_PER_EPOCH, + epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, hash::Hash, poh_config::PohConfig, pubkey::Pubkey, @@ -169,6 +172,11 @@ pub fn verify_ledger_ticks(ledger_path: &Path, ticks_per_slot: usize) { } } +pub fn time_until_nth_epoch(epoch: u64, slots_per_epoch: u64, stakers_slot_offset: u64) -> u64 { + let epoch_schedule = EpochSchedule::custom(slots_per_epoch, stakers_slot_offset, true); + epoch_schedule.get_last_slot_in_epoch(epoch) * DEFAULT_MS_PER_SLOT +} + pub fn sleep_n_epochs( num_epochs: f64, config: &PohConfig, diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index 190688e16..8d7a29219 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -1,4 +1,5 @@ use crate::cluster::{Cluster, ClusterValidatorInfo, ValidatorInfo}; +use itertools::izip; use log::*; use solana_client::thin_client::{create_client, ThinClient}; use solana_core::{ @@ -39,6 +40,7 @@ use std::{ collections::HashMap, fs::remove_dir_all, io::{Error, ErrorKind, Result}, + iter, path::PathBuf, sync::Arc, }; @@ -66,6 +68,8 @@ pub struct ClusterConfig { pub num_archivers: usize, /// Number of nodes that are unstaked and not voting (a.k.a listening) pub num_listeners: u64, + /// The specific pubkeys of each node if specified + pub validator_keys: Option>>, /// The stakes of each node pub node_stakes: Vec, /// The total lamports available to the cluster @@ -85,6 +89,7 @@ impl Default for ClusterConfig { validator_configs: vec![], num_archivers: 0, num_listeners: 0, + validator_keys: None, node_stakes: vec![], cluster_lamports: 0, ticks_per_slot: DEFAULT_TICKS_PER_SLOT, @@ -103,9 +108,7 @@ pub struct LocalCluster { pub funding_keypair: Keypair, /// Entry point from which the rest of the network can be discovered pub entry_point_info: ContactInfo, - pub validator_infos: HashMap, - pub listener_infos: HashMap, - validators: HashMap, + pub validators: HashMap, pub genesis_config: GenesisConfig, archivers: Vec, pub archiver_infos: HashMap, @@ -129,9 +132,20 @@ impl LocalCluster { pub fn new(config: &ClusterConfig) -> Self { assert_eq!(config.validator_configs.len(), config.node_stakes.len()); - let leader_keypair = Arc::new(Keypair::new()); + let validator_keys = { + if let Some(ref keys) = config.validator_keys { + assert_eq!(config.validator_configs.len(), keys.len()); + keys.clone() + } else { + iter::repeat_with(|| Arc::new(Keypair::new())) + .take(config.validator_configs.len()) + .collect() + } + }; + + let leader_keypair = &validator_keys[0]; let leader_pubkey = leader_keypair.pubkey(); - let leader_node = Node::new_localhost_with_pubkey(&leader_keypair.pubkey()); + let leader_node = Node::new_localhost_with_pubkey(&leader_pubkey); let GenesisConfigInfo { mut genesis_config, mint_keypair, @@ -208,20 +222,22 @@ impl LocalCluster { ); let mut validators = HashMap::new(); - let mut validator_infos = HashMap::new(); - validators.insert(leader_pubkey, leader_server); + error!("leader_pubkey: {}", leader_pubkey); let leader_info = ValidatorInfo { - keypair: leader_keypair, + keypair: leader_keypair.clone(), voting_keypair: leader_voting_keypair, storage_keypair: leader_storage_keypair, ledger_path: leader_ledger_path, contact_info: leader_contact_info.clone(), }; - let cluster_leader = - ClusterValidatorInfo::new(leader_info, config.validator_configs[0].clone()); + let cluster_leader = ClusterValidatorInfo::new( + leader_info, + config.validator_configs[0].clone(), + leader_server, + ); - validator_infos.insert(leader_pubkey, cluster_leader); + validators.insert(leader_pubkey, cluster_leader); let mut cluster = Self { funding_keypair: mint_keypair, @@ -229,23 +245,24 @@ impl LocalCluster { validators, archivers: vec![], genesis_config, - validator_infos, archiver_infos: HashMap::new(), - listener_infos: HashMap::new(), }; - for (stake, validator_config) in (&config.node_stakes[1..]) - .iter() - .zip((&config.validator_configs[1..]).iter()) - { - cluster.add_validator(validator_config, *stake); + for (stake, validator_config, key) in izip!( + (&config.node_stakes[1..]).iter(), + config.validator_configs[1..].iter(), + validator_keys[1..].iter(), + ) { + cluster.add_validator(validator_config, *stake, key.clone()); } let listener_config = ValidatorConfig { voting_disabled: true, ..config.validator_configs[0].clone() }; - (0..config.num_listeners).for_each(|_| cluster.add_validator(&listener_config, 0)); + (0..config.num_listeners).for_each(|_| { + cluster.add_validator(&listener_config, 0, Arc::new(Keypair::new())); + }); discover_cluster( &cluster.entry_point_info.gossip, @@ -268,14 +285,18 @@ impl LocalCluster { pub fn exit(&mut self) { for node in self.validators.values_mut() { - node.exit(); + if let Some(ref mut v) = node.validator { + v.exit(); + } } } pub fn close_preserve_ledgers(&mut self) { self.exit(); - for (_, node) in self.validators.drain() { - node.join().unwrap(); + for (_, node) in self.validators.iter_mut() { + if let Some(v) = node.validator.take() { + v.join().unwrap(); + } } while let Some(archiver) = self.archivers.pop() { @@ -283,14 +304,18 @@ impl LocalCluster { } } - pub fn add_validator(&mut self, validator_config: &ValidatorConfig, stake: u64) { + pub fn add_validator( + &mut self, + validator_config: &ValidatorConfig, + stake: u64, + validator_keypair: Arc, + ) -> Pubkey { let client = create_client( self.entry_point_info.client_facing_addr(), VALIDATOR_PORT_RANGE, ); // Must have enough tokens to fund vote account and set delegate - let validator_keypair = Arc::new(Keypair::new()); let voting_keypair = Keypair::new(); let storage_keypair = Arc::new(Keypair::new()); let validator_pubkey = validator_keypair.pubkey(); @@ -341,8 +366,6 @@ impl LocalCluster { &config, ); - self.validators - .insert(validator_keypair.pubkey(), validator_server); let validator_pubkey = validator_keypair.pubkey(); let validator_info = ClusterValidatorInfo::new( ValidatorInfo { @@ -353,14 +376,11 @@ impl LocalCluster { contact_info, }, validator_config.clone(), + validator_server, ); - if validator_config.voting_disabled { - self.listener_infos.insert(validator_pubkey, validator_info); - } else { - self.validator_infos - .insert(validator_pubkey, validator_info); - } + self.validators.insert(validator_pubkey, validator_info); + validator_pubkey } fn add_archiver(&mut self) { @@ -405,7 +425,7 @@ impl LocalCluster { fn close(&mut self) { self.close_preserve_ledgers(); for ledger_path in self - .validator_infos + .validators .values() .map(|f| &f.info.ledger_path) .chain(self.archiver_infos.values().map(|info| &info.ledger_path)) @@ -616,7 +636,7 @@ impl Cluster for LocalCluster { } fn get_validator_client(&self, pubkey: &Pubkey) -> Option { - self.validator_infos.get(pubkey).map(|f| { + self.validators.get(pubkey).map(|f| { create_client( f.info.contact_info.client_facing_addr(), VALIDATOR_PORT_RANGE, @@ -628,10 +648,10 @@ impl Cluster for LocalCluster { let mut node = self.validators.remove(&pubkey).unwrap(); // Shut down the validator - node.exit(); - node.join().unwrap(); - - self.validator_infos.remove(&pubkey).unwrap() + let mut validator = node.validator.take().expect("Validator must be running"); + validator.exit(); + validator.join().unwrap(); + node } fn restart_node(&mut self, pubkey: &Pubkey, mut cluster_validator_info: ClusterValidatorInfo) { @@ -666,8 +686,8 @@ impl Cluster for LocalCluster { &cluster_validator_info.config, ); - self.validators.insert(*pubkey, restarted_node); - self.validator_infos.insert(*pubkey, cluster_validator_info); + cluster_validator_info.validator = Some(restarted_node); + self.validators.insert(*pubkey, cluster_validator_info); } fn exit_restart_node(&mut self, pubkey: &Pubkey, validator_config: ValidatorConfig) { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index c2a956ddd..ba613d406 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -8,7 +8,10 @@ use solana_core::{ partition_cfg::{Partition, PartitionCfg}, validator::ValidatorConfig, }; -use solana_ledger::{bank_forks::SnapshotConfig, blocktree::Blocktree, snapshot_utils}; +use solana_ledger::{ + bank_forks::SnapshotConfig, blocktree::Blocktree, leader_schedule::FixedSchedule, + leader_schedule::LeaderSchedule, snapshot_utils, +}; use solana_local_cluster::{ cluster::Cluster, cluster_tests, @@ -23,13 +26,15 @@ use solana_sdk::{ epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, genesis_config::OperatingMode, poh_config::PohConfig, + signature::{Keypair, KeypairUtil}, }; use std::{ collections::{HashMap, HashSet}, - fs, + fs, iter, path::{Path, PathBuf}, + sync::Arc, thread::sleep, - time::Duration, + time::{Duration, Instant}, }; use tempfile::TempDir; @@ -60,7 +65,7 @@ fn test_ledger_cleanup_service() { ); cluster.close_preserve_ledgers(); //check everyone's ledgers and make sure only ~100 slots are stored - for (_, info) in &cluster.validator_infos { + for (_, info) in &cluster.validators { let mut slots = 0; let blocktree = Blocktree::open(&info.info.ledger_path).unwrap(); blocktree @@ -188,71 +193,166 @@ fn test_leader_failure_4() { ); } -fn run_network_partition(partitions: &[usize]) { +/// This function runs a network, initiates a partition based on a +/// configuration, resolve the partition, then checks that the network +/// continues to achieve consensus +/// # Arguments +/// * `partitions` - A slice of partition configurations, where each partition +/// configuration is a slice of (usize, bool), representing a node's stake and +/// whether or not it should be killed during the partition +/// * `leader_schedule` - An option that specifies whether the cluster should +/// run with a fixed, predetermined leader schedule +fn run_cluster_partition( + partitions: &[&[(usize, bool)]], + leader_schedule: Option<(LeaderSchedule, Vec>)>, +) { solana_logger::setup(); info!("PARTITION_TEST!"); - let num_nodes = partitions.iter().sum(); - let validator_config = ValidatorConfig::default(); + let num_nodes = partitions.len(); + let node_stakes: Vec<_> = partitions + .iter() + .flat_map(|p| p.iter().map(|(stake_weight, _)| 100 * *stake_weight as u64)) + .collect(); + assert_eq!(node_stakes.len(), num_nodes); + let cluster_lamports = node_stakes.iter().sum::() * 2; + let partition_start_epoch = 2; + let mut validator_config = ValidatorConfig::default(); + + // Returns: + // 1) The keys for the validiators + // 2) The amount of time it would take to iterate through one full iteration of the given + // leader schedule + let (validator_keys, leader_schedule_time): (Vec<_>, u64) = { + if let Some((leader_schedule, validator_keys)) = leader_schedule { + assert_eq!(validator_keys.len(), num_nodes); + let num_slots_per_rotation = leader_schedule.num_slots() as u64; + let fixed_schedule = FixedSchedule { + start_epoch: partition_start_epoch, + leader_schedule: Arc::new(leader_schedule), + }; + validator_config.fixed_leader_schedule = Some(fixed_schedule); + ( + validator_keys, + num_slots_per_rotation * clock::DEFAULT_MS_PER_SLOT, + ) + } else { + ( + iter::repeat_with(|| Arc::new(Keypair::new())) + .take(partitions.len()) + .collect(), + 10_000, + ) + } + }; + + let validator_pubkeys: Vec<_> = validator_keys.iter().map(|v| v.pubkey()).collect(); let mut config = ClusterConfig { - cluster_lamports: 10_000, - node_stakes: vec![100; num_nodes], + cluster_lamports, + node_stakes, validator_configs: vec![validator_config.clone(); num_nodes], + validator_keys: Some(validator_keys), ..ClusterConfig::default() }; + let now = timestamp(); - let partition_start = now + 60_000; - let partition_end = partition_start + 10_000; - let mut total = 0; - for (j, pn) in partitions.iter().enumerate() { - info!( - "PARTITION_TEST configuring partition {} for nodes {} - {}", - j, - total, - total + *pn - ); - for i in total..(total + *pn) { + // Partition needs to start after the first few shorter warmup epochs, otherwise + // no root will be set before the partition is resolved, the leader schedule will + // not be computable, and the cluster wll halt. + let partition_epoch_start_offset = cluster_tests::time_until_nth_epoch( + partition_start_epoch, + config.slots_per_epoch, + config.stakers_slot_offset, + ); + // Assume it takes <= 10 seconds for `LocalCluster::new` to boot up. + let local_cluster_boot_time = 10_000; + let partition_start = now + partition_epoch_start_offset + local_cluster_boot_time; + let partition_end = partition_start + leader_schedule_time as u64; + let mut validator_index = 0; + for (i, partition) in partitions.iter().enumerate() { + for _ in partition.iter() { let mut p1 = Partition::default(); p1.num_partitions = partitions.len(); - p1.my_partition = j; + p1.my_partition = i; p1.start_ts = partition_start; p1.end_ts = partition_end; - config.validator_configs[i].partition_cfg = Some(PartitionCfg::new(vec![p1])); + config.validator_configs[validator_index].partition_cfg = + Some(PartitionCfg::new(vec![p1])); + validator_index += 1; } - total += *pn; } info!( "PARTITION_TEST starting cluster with {:?} partitions", partitions ); - let cluster = LocalCluster::new(&config); + let now = Instant::now(); + let mut cluster = LocalCluster::new(&config); + let elapsed = now.elapsed(); + assert!(elapsed.as_millis() < local_cluster_boot_time as u128); + let now = timestamp(); - let timeout = partition_start as i64 - now as i64; + let timeout = partition_start as u64 - now as u64; info!( "PARTITION_TEST sleeping until partition start timeout {}", timeout ); + let mut dead_nodes = HashSet::new(); if timeout > 0 { sleep(Duration::from_millis(timeout as u64)); } info!("PARTITION_TEST done sleeping until partition start timeout"); let now = timestamp(); - let timeout = partition_end as i64 - now as i64; + let timeout = partition_end as u64 - now as u64; info!( "PARTITION_TEST sleeping until partition end timeout {}", timeout ); + let mut alive_node_contact_infos = vec![]; + let should_exits: Vec<_> = partitions + .iter() + .flat_map(|p| p.iter().map(|(_, should_exit)| should_exit)) + .collect(); + assert_eq!(should_exits.len(), validator_pubkeys.len()); if timeout > 0 { - sleep(Duration::from_millis(timeout as u64)); + // Give partitions time to propagate their blocks from durinig the partition + // after the partition resolves + let propagation_time = leader_schedule_time; + info!("PARTITION_TEST resolving partition"); + sleep(Duration::from_millis(timeout)); + info!("PARTITION_TEST waiting for blocks to propagate after partition"); + sleep(Duration::from_millis(propagation_time)); + info!("PARTITION_TEST resuming normal operation"); + for (pubkey, should_exit) in validator_pubkeys.iter().zip(should_exits) { + if *should_exit { + info!("Killing validator with id: {}", pubkey); + cluster.exit_node(pubkey); + dead_nodes.insert(*pubkey); + } else { + alive_node_contact_infos.push( + cluster + .validators + .get(pubkey) + .unwrap() + .info + .contact_info + .clone(), + ); + } + } } - info!("PARTITION_TEST done sleeping until partition end timeout"); + + assert!(alive_node_contact_infos.len() > 0); info!("PARTITION_TEST discovering nodes"); - let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, num_nodes).unwrap(); + let (cluster_nodes, _) = discover_cluster( + &alive_node_contact_infos[0].gossip, + alive_node_contact_infos.len(), + ) + .unwrap(); info!("PARTITION_TEST discovered {} nodes", cluster_nodes.len()); info!("PARTITION_TEST looking for new roots on all nodes"); - let mut roots = vec![HashSet::new(); cluster_nodes.len()]; + let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()]; let mut done = false; while !done { - for (i, ingress_node) in cluster_nodes.iter().enumerate() { + for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() { let client = create_client( ingress_node.client_facing_addr(), solana_core::cluster_info::VALIDATOR_PORT_RANGE, @@ -272,22 +372,64 @@ fn run_network_partition(partitions: &[usize]) { #[ignore] #[test] #[serial] -fn test_network_partition_1_2() { - run_network_partition(&[1, 2]) +fn test_cluster_partition_1_2() { + run_cluster_partition(&[&[(1, false)], &[(1, false), (1, false)]], None) } #[allow(unused_attributes)] #[ignore] #[test] #[serial] -fn test_network_partition_1_1() { - run_network_partition(&[1, 1]) +fn test_cluster_partition_1_1() { + run_cluster_partition(&[&[(1, false)], &[(1, false)]], None) } #[test] #[serial] -fn test_network_partition_1_1_1() { - run_network_partition(&[1, 1, 1]) +fn test_cluster_partition_1_1_1() { + run_cluster_partition(&[&[(1, false)], &[(1, false)], &[(1, false)]], None) +} + +#[test] +#[serial] +fn test_kill_partition() { + // This test: + // 1) Spins up three partitions + // 2) Forces more slots in the leader schedule for the first partition so + // that this partition will be the heaviiest + // 3) Schedules the other validators for sufficient slots in the schedule + // so that they will still be locked out of voting for the major partitoin + // when the partition resolves + // 4) Kills the major partition. Validators are locked out, but should be + // able to reset to the major partition + // 5) Check for recovery + let mut leader_schedule = vec![]; + let num_slots_per_validator = 8; + let partitions: [&[(usize, bool)]; 3] = [&[(9, true)], &[(10, false)], &[(10, false)]]; + let validator_keys: Vec<_> = iter::repeat_with(|| Arc::new(Keypair::new())) + .take(partitions.len()) + .collect(); + for (i, k) in validator_keys.iter().enumerate() { + let num_slots = { + if i == 0 { + // Set up the leader to have 50% of the slots + num_slots_per_validator * (partitions.len() - 1) + } else { + num_slots_per_validator + } + }; + for _ in 0..num_slots { + leader_schedule.push(k.pubkey()) + } + } + + run_cluster_partition( + &partitions, + Some(( + LeaderSchedule::new_from_schedule(leader_schedule), + validator_keys, + )), + ) } #[test] @@ -319,10 +461,7 @@ fn test_two_unbalanced_stakes() { ); cluster.close_preserve_ledgers(); let leader_pubkey = cluster.entry_point_info.id; - let leader_ledger = cluster.validator_infos[&leader_pubkey] - .info - .ledger_path - .clone(); + let leader_ledger = cluster.validators[&leader_pubkey].info.ledger_path.clone(); cluster_tests::verify_ledger_ticks(&leader_ledger, num_ticks_per_slot as usize); } @@ -560,6 +699,7 @@ fn test_snapshots_blocktree_floor() { cluster.add_validator( &validator_snapshot_test_config.validator_config, validator_stake, + Arc::new(Keypair::new()), ); let all_pubkeys = cluster.get_node_pubkeys(); let validator_id = all_pubkeys @@ -583,7 +723,7 @@ fn test_snapshots_blocktree_floor() { // Check the validator ledger doesn't contain any slots < slot_floor cluster.close_preserve_ledgers(); - let validator_ledger_path = &cluster.validator_infos[&validator_id]; + let validator_ledger_path = &cluster.validators[&validator_id]; let blocktree = Blocktree::open(&validator_ledger_path.info.ledger_path).unwrap(); // Skip the zeroth slot in blocktree that the ledger is initialized with @@ -721,7 +861,7 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { ); let corrupt_node = cluster - .validator_infos + .validators .iter() .find(|(_, v)| v.config.broadcast_stage_type == faulty_node_type) .unwrap() @@ -768,10 +908,7 @@ fn test_no_voting() { cluster.close_preserve_ledgers(); let leader_pubkey = cluster.entry_point_info.id; - let ledger_path = cluster.validator_infos[&leader_pubkey] - .info - .ledger_path - .clone(); + let ledger_path = cluster.validators[&leader_pubkey].info.ledger_path.clone(); let ledger = Blocktree::open(&ledger_path).unwrap(); for i in 0..2 * VOTE_THRESHOLD_DEPTH { let meta = ledger.meta(i as u64).unwrap().unwrap(); @@ -850,7 +987,7 @@ fn run_repairman_catchup(num_repairmen: u64) { // Start up a new node, wait for catchup. Backwards repair won't be sufficient because the // leader is sending shreds past this validator's first two confirmed epochs. Thus, the repairman // protocol will have to kick in for this validator to repair. - cluster.add_validator(&validator_config, repairee_stake); + cluster.add_validator(&validator_config, repairee_stake, Arc::new(Keypair::new())); let all_pubkeys = cluster.get_node_pubkeys(); let repairee_id = all_pubkeys