From 7058287273562412125a30179eb4cbf66e4fea7b Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 27 Jan 2020 16:49:25 -0800 Subject: [PATCH] Consensus fix, don't consider threshold check if.. (#7948) * Consensus fix, don't consider threshold check if lockouts are not increased * Change partition tests to wait for epoch with > lockout slots * Use atomic bool to signal partition --- core/src/consensus.rs | 46 ++++++++++- core/src/lib.rs | 1 - core/src/partition_cfg.rs | 92 --------------------- core/src/retransmit_stage.rs | 7 +- core/src/tvu.rs | 3 +- core/src/validator.rs | 7 +- local-cluster/tests/local_cluster.rs | 117 +++++++++++++-------------- 7 files changed, 108 insertions(+), 165 deletions(-) delete mode 100644 core/src/partition_cfg.rs diff --git a/core/src/consensus.rs b/core/src/consensus.rs index 988957ac9..16b16f7e8 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -321,13 +321,27 @@ impl Tower { if let Some(fork_stake) = stake_lockouts.get(&vote.slot) { let lockout = fork_stake.stake as f64 / total_staked as f64; trace!( - "fork_stake {} {} {} {}", + "fork_stake slot: {} lockout: {} fork_stake: {} total_stake: {}", slot, lockout, fork_stake.stake, total_staked ); - lockout > self.threshold_size + for (new_lockout, original_lockout) in + lockouts.votes.iter().zip(self.lockouts.votes.iter()) + { + if new_lockout.slot == original_lockout.slot { + if new_lockout.confirmation_count <= self.threshold_depth as u32 { + break; + } + if new_lockout.confirmation_count != original_lockout.confirmation_count { + return lockout > self.threshold_size; + } + } else { + break; + } + } + true } else { false } @@ -742,6 +756,34 @@ mod test { assert!(!tower.check_vote_stake_threshold(1, &stakes, 2)); } + #[test] + fn test_check_vote_threshold_lockouts_not_updated() { + solana_logger::setup(); + let mut tower = Tower::new_for_tests(1, 0.67); + let stakes = vec![ + ( + 0, + StakeLockout { + stake: 1, + lockout: 8, + }, + ), + ( + 1, + StakeLockout { + stake: 2, + lockout: 8, + }, + ), + ] + .into_iter() + .collect(); + tower.record_vote(0, Hash::default()); + tower.record_vote(1, Hash::default()); + tower.record_vote(2, Hash::default()); + assert!(tower.check_vote_stake_threshold(6, &stakes, 2)); + } + #[test] fn test_lockout_is_updated_for_entire_branch() { let mut stake_lockouts = HashMap::new(); diff --git a/core/src/lib.rs b/core/src/lib.rs index caa18dad6..08bde97ad 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -30,7 +30,6 @@ pub mod gossip_service; pub mod ledger_cleanup_service; pub mod local_vote_signer_service; pub mod packet; -pub mod partition_cfg; pub mod poh_recorder; pub mod poh_service; pub mod recvmmsg; diff --git a/core/src/partition_cfg.rs b/core/src/partition_cfg.rs deleted file mode 100644 index 480176c19..000000000 --- a/core/src/partition_cfg.rs +++ /dev/null @@ -1,92 +0,0 @@ -use solana_ledger::leader_schedule_cache::LeaderScheduleCache; -use solana_ledger::shred::Shred; -use solana_runtime::bank::Bank; -use solana_sdk::pubkey::Pubkey; -use solana_sdk::timing::timestamp; -use std::collections::HashSet; -use std::sync::Arc; -use std::sync::RwLock; - -///Configure a partition in the retransmit stage -#[derive(Debug, Clone)] -pub struct Partition { - pub num_partitions: usize, - pub my_partition: usize, - pub start_ts: u64, - pub end_ts: u64, - leaders: Arc>>, -} -impl Default for Partition { - fn default() -> Self { - Self { - num_partitions: 0, - my_partition: 0, - start_ts: 0, - end_ts: 0, - leaders: Arc::new(RwLock::new(vec![])), - } - } -} - -#[derive(Default, Debug, Clone)] -pub struct PartitionCfg { - partitions: Vec, -} - -impl PartitionCfg { - pub fn new(partitions: Vec) -> Self { - Self { partitions } - } - pub fn is_connected( - &self, - bank: &Option>, - leader_schedule_cache: &Arc, - shred: &Shred, - ) -> bool { - if bank.is_none() { - return true; - } - let bank = bank.as_ref().unwrap().clone(); - let slot_leader_pubkey = leader_schedule_cache.slot_leader_at(shred.slot(), Some(&bank)); - let slot_leader_pubkey = slot_leader_pubkey.unwrap_or_default(); - let time = timestamp(); - for p in &self.partitions { - let is_time = (p.start_ts <= time) && (time < p.end_ts); - if !is_time { - continue; - } - trace!("PARTITION_TEST partition time! {}", p.my_partition); - if p.num_partitions == 0 { - continue; - } - if p.leaders.read().unwrap().is_empty() { - let mut leader_vec = p.leaders.write().unwrap(); - let mut leaders: Vec = bank.vote_accounts().keys().cloned().collect(); - leaders.sort(); - *leader_vec = leaders; - warn!("PARTITION_TEST partition enabled {}", p.my_partition); - } - let is_connected: bool = { - let leaders = p.leaders.read().unwrap(); - let start = p.my_partition * leaders.len() / p.num_partitions; - let partition_size = leaders.len() / p.num_partitions; - let end = start + partition_size; - let end = if leaders.len() - end < partition_size { - leaders.len() - } else { - end - }; - let my_leaders: HashSet<_> = leaders[start..end].iter().collect(); - my_leaders.contains(&slot_leader_pubkey) - }; - if is_connected { - trace!("PARTITION_TEST connected {}", p.my_partition); - continue; - } - trace!("PARTITION_TEST not connected {}", p.my_partition); - return false; - } - trace!("PARTITION_TEST connected"); - true - } -} diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 880b943b6..b00700555 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,7 +3,6 @@ use crate::{ cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, packet::Packets, - partition_cfg::PartitionCfg, repair_service::RepairStrategy, result::{Error, Result}, streamer::PacketReceiver, @@ -22,7 +21,7 @@ use solana_sdk::epoch_schedule::EpochSchedule; use std::{ cmp, net::UdpSocket, - sync::atomic::AtomicBool, + sync::atomic::{AtomicBool, Ordering}, sync::mpsc::channel, sync::mpsc::RecvTimeoutError, sync::Mutex, @@ -213,7 +212,7 @@ impl RetransmitStage { exit: &Arc, completed_slots_receiver: CompletedSlotsReceiver, epoch_schedule: EpochSchedule, - cfg: Option, + cfg: Option>, shred_version: u16, ) -> Self { let (retransmit_sender, retransmit_receiver) = channel(); @@ -245,7 +244,7 @@ impl RetransmitStage { move |id, shred, working_bank, last_root| { let is_connected = cfg .as_ref() - .map(|x| x.is_connected(&working_bank, &leader_schedule_cache, shred)) + .map(|x| x.load(Ordering::Relaxed)) .unwrap_or(true); let rv = should_retransmit_and_persist( shred, diff --git a/core/src/tvu.rs b/core/src/tvu.rs index c7824e8e6..8c9dfbf74 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -6,7 +6,6 @@ use crate::{ cluster_info::ClusterInfo, commitment::BlockCommitmentCache, ledger_cleanup_service::LedgerCleanupService, - partition_cfg::PartitionCfg, poh_recorder::PohRecorder, replay_stage::{ReplayStage, ReplayStageConfig}, retransmit_stage::RetransmitStage, @@ -84,7 +83,7 @@ impl Tvu { completed_slots_receiver: CompletedSlotsReceiver, block_commitment_cache: Arc>, sigverify_disabled: bool, - cfg: Option, + cfg: Option>, shred_version: u16, transaction_status_sender: Option, ) -> Self { diff --git a/core/src/validator.rs b/core/src/validator.rs index 196772b6e..9d9b18d21 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -6,7 +6,6 @@ use crate::{ commitment::BlockCommitmentCache, contact_info::ContactInfo, gossip_service::{discover_cluster, GossipService}, - partition_cfg::PartitionCfg, poh_recorder::PohRecorder, poh_service::PohService, rpc::JsonRpcConfig, @@ -66,7 +65,7 @@ pub struct ValidatorConfig { pub snapshot_config: Option, pub max_ledger_slots: Option, pub broadcast_stage_type: BroadcastStageType, - pub partition_cfg: Option, + pub enable_partition: Option>, pub fixed_leader_schedule: Option, pub wait_for_supermajority: bool, pub new_hard_forks: Option>, @@ -87,7 +86,7 @@ impl Default for ValidatorConfig { rpc_config: JsonRpcConfig::default(), snapshot_config: None, broadcast_stage_type: BroadcastStageType::Standard, - partition_cfg: None, + enable_partition: None, fixed_leader_schedule: None, wait_for_supermajority: false, new_hard_forks: None, @@ -370,7 +369,7 @@ impl Validator { completed_slots_receiver, block_commitment_cache, config.dev_sigverify_disabled, - config.partition_cfg.clone(), + config.enable_partition.clone(), node.info.shred_version, transaction_status_sender.clone(), ); diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 5b35a36da..b695eb827 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1,13 +1,11 @@ use assert_matches::assert_matches; use log::*; use serial_test_derive::serial; +use solana_client::rpc_client::RpcClient; use solana_client::thin_client::create_client; use solana_core::{ - broadcast_stage::BroadcastStageType, - consensus::VOTE_THRESHOLD_DEPTH, - gossip_service::discover_cluster, - partition_cfg::{Partition, PartitionCfg}, - validator::ValidatorConfig, + broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH, + gossip_service::discover_cluster, validator::ValidatorConfig, }; use solana_ledger::{ bank_forks::SnapshotConfig, blockstore::Blockstore, leader_schedule::FixedSchedule, @@ -18,7 +16,6 @@ use solana_local_cluster::{ cluster_tests, local_cluster::{ClusterConfig, LocalCluster}, }; -use solana_sdk::timing::timestamp; use solana_sdk::{ client::SyncClient, clock, @@ -28,6 +25,7 @@ use solana_sdk::{ poh_config::PohConfig, signature::{Keypair, KeypairUtil}, }; +use std::sync::atomic::{AtomicBool, Ordering}; use std::{ collections::{HashMap, HashSet}, fs, iter, @@ -248,7 +246,7 @@ fn run_cluster_partition( }; let validator_pubkeys: Vec<_> = validator_keys.iter().map(|v| v.pubkey()).collect(); - let mut config = ClusterConfig { + let config = ClusterConfig { cluster_lamports, node_stakes, validator_configs: vec![validator_config.clone(); num_nodes], @@ -256,71 +254,65 @@ fn run_cluster_partition( ..ClusterConfig::default() }; - let now = timestamp(); - // Partition needs to start after the first few shorter warmup epochs, otherwise - // no root will be set before the partition is resolved, the leader schedule will - // not be computable, and the cluster wll halt. - let partition_epoch_start_offset = cluster_tests::time_until_nth_epoch( - partition_start_epoch, - config.slots_per_epoch, - config.stakers_slot_offset, + let enable_partition = Some(Arc::new(AtomicBool::new(true))); + info!( + "PARTITION_TEST starting cluster with {:?} partitions slots_per_epoch: {}", + partitions, config.slots_per_epoch, ); - // Assume it takes <= 10 seconds for `LocalCluster::new` to boot up. - let local_cluster_boot_time = 10_000; - let partition_start = now + partition_epoch_start_offset + local_cluster_boot_time; - let partition_end = partition_start + leader_schedule_time as u64; - let mut validator_index = 0; - for (i, partition) in partitions.iter().enumerate() { - for _ in partition.iter() { - let mut p1 = Partition::default(); - p1.num_partitions = partitions.len(); - p1.my_partition = i; - p1.start_ts = partition_start; - p1.end_ts = partition_end; - config.validator_configs[validator_index].partition_cfg = - Some(PartitionCfg::new(vec![p1])); - validator_index += 1; + let mut cluster = LocalCluster::new(&config); + + let (cluster_nodes, _) = discover_cluster(&cluster.entry_point_info.gossip, num_nodes).unwrap(); + + info!("PARTITION_TEST sleeping until partition starting condition",); + loop { + let mut reached_epoch = true; + for node in &cluster_nodes { + let node_client = RpcClient::new_socket(node.rpc); + if let Ok(epoch_info) = node_client.get_epoch_info() { + info!("slots_per_epoch: {:?}", epoch_info); + if epoch_info.slots_in_epoch <= (1 << VOTE_THRESHOLD_DEPTH) { + reached_epoch = false; + break; + } + } else { + reached_epoch = false; + } + } + + if reached_epoch { + info!("PARTITION_TEST start partition"); + enable_partition + .clone() + .unwrap() + .store(false, Ordering::Relaxed); + break; + } else { + sleep(Duration::from_millis(100)); } } - info!( - "PARTITION_TEST starting cluster with {:?} partitions", - partitions - ); - let now = Instant::now(); - let mut cluster = LocalCluster::new(&config); - let elapsed = now.elapsed(); - assert!(elapsed.as_millis() < local_cluster_boot_time as u128); + sleep(Duration::from_millis(leader_schedule_time)); + + info!("PARTITION_TEST remove partition"); + enable_partition.unwrap().store(true, Ordering::Relaxed); - let now = timestamp(); - let timeout = partition_start as u64 - now as u64; - info!( - "PARTITION_TEST sleeping until partition start timeout {}", - timeout - ); let mut dead_nodes = HashSet::new(); - if timeout > 0 { - sleep(Duration::from_millis(timeout as u64)); - } - info!("PARTITION_TEST done sleeping until partition start timeout"); - let now = timestamp(); - let timeout = partition_end as u64 - now as u64; - info!( - "PARTITION_TEST sleeping until partition end timeout {}", - timeout - ); let mut alive_node_contact_infos = vec![]; let should_exits: Vec<_> = partitions .iter() .flat_map(|p| p.iter().map(|(_, should_exit)| should_exit)) .collect(); assert_eq!(should_exits.len(), validator_pubkeys.len()); + let timeout = 10; if timeout > 0 { - // Give partitions time to propagate their blocks from durinig the partition + // Give partitions time to propagate their blocks from during the partition // after the partition resolves let propagation_time = leader_schedule_time; - info!("PARTITION_TEST resolving partition"); - sleep(Duration::from_millis(timeout)); - info!("PARTITION_TEST waiting for blocks to propagate after partition"); + info!("PARTITION_TEST resolving partition. sleeping {}ms", timeout); + sleep(Duration::from_millis(10_000)); + info!( + "PARTITION_TEST waiting for blocks to propagate after partition {}ms", + propagation_time + ); sleep(Duration::from_millis(propagation_time)); info!("PARTITION_TEST resuming normal operation"); for (pubkey, should_exit) in validator_pubkeys.iter().zip(should_exits) { @@ -353,6 +345,7 @@ fn run_cluster_partition( info!("PARTITION_TEST looking for new roots on all nodes"); let mut roots = vec![HashSet::new(); alive_node_contact_infos.len()]; let mut done = false; + let mut last_print = Instant::now(); while !done { for (i, ingress_node) in alive_node_contact_infos.iter().enumerate() { let client = create_client( @@ -362,12 +355,15 @@ fn run_cluster_partition( let slot = client.get_slot().unwrap_or(0); roots[i].insert(slot); let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); - info!("PARTITION_TEST min observed roots {}/16", min_node); + if last_print.elapsed().as_secs() > 3 { + info!("PARTITION_TEST min observed roots {}/16", min_node); + last_print = Instant::now(); + } done = min_node >= 16; } sleep(Duration::from_millis(clock::DEFAULT_MS_PER_SLOT / 2)); } - info!("PARTITION_TEST done spending on all node"); + info!("PARTITION_TEST done waiting for roots"); } #[allow(unused_attributes)] @@ -424,6 +420,7 @@ fn test_kill_partition() { leader_schedule.push(k.pubkey()) } } + info!("leader_schedule: {}", leader_schedule.len()); run_cluster_partition( &partitions,