From bc7e741514e722b298374df774ee9d195cd12cea Mon Sep 17 00:00:00 2001 From: carllin Date: Tue, 4 May 2021 00:51:42 -0700 Subject: [PATCH] Integrate gossip votes into switching threshold (#16973) --- core/src/cluster_info.rs | 15 + core/src/consensus.rs | 192 ++++++-- ...latest_validator_votes_for_frozen_banks.rs | 4 + core/src/replay_stage.rs | 11 +- ledger/src/blockstore.rs | 2 +- local-cluster/src/cluster_tests.rs | 5 +- local-cluster/tests/local_cluster.rs | 418 ++++++++++++++++-- 7 files changed, 564 insertions(+), 83 deletions(-) diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index ff4bbbfb1..789cdb505 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -59,6 +59,7 @@ use solana_sdk::{ timing::timestamp, transaction::Transaction, }; +use solana_streamer::packet; use solana_streamer::sendmmsg::multicast; use solana_streamer::streamer::{PacketReceiver, PacketSender}; use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; @@ -3110,6 +3111,20 @@ impl Node { } } +pub fn push_messages_to_peer( + messages: Vec, + self_id: Pubkey, + peer_gossip: SocketAddr, +) -> Result<()> { + let reqs: Vec<_> = ClusterInfo::split_gossip_messages(PUSH_MESSAGE_MAX_PAYLOAD_SIZE, messages) + .map(move |payload| (peer_gossip, Protocol::PushMessage(self_id, payload))) + .collect(); + let packets = to_packets_with_destination(PacketsRecycler::default(), &reqs); + let sock = UdpSocket::bind("0.0.0.0:0").unwrap(); + packet::send_to(&packets, &sock)?; + Ok(()) +} + pub fn stake_weight_peers( peers: &mut Vec, stakes: Option<&HashMap>, diff --git a/core/src/consensus.rs b/core/src/consensus.rs index c01f81565..6f11649fc 100644 --- a/core/src/consensus.rs +++ b/core/src/consensus.rs @@ -527,9 +527,7 @@ impl Tower { false } - pub fn is_locked_out(&self, slot: Slot, ancestors: &HashMap>) -> bool { - assert!(ancestors.contains_key(&slot)); - + pub fn is_locked_out(&self, slot: Slot, ancestors: &HashSet) -> bool { if !self.is_recent(slot) { return true; } @@ -541,7 +539,7 @@ impl Tower { let mut lockouts = self.lockouts.clone(); lockouts.process_slot_vote_unchecked(slot); for vote in &lockouts.votes { - if slot != vote.slot && !ancestors[&slot].contains(&vote.slot) { + if slot != vote.slot && !ancestors.contains(&vote.slot) { return true; } } @@ -551,9 +549,9 @@ impl Tower { // This case should never happen because bank forks purges all // non-descendants of the root every time root is set assert!( - ancestors[&slot].contains(&root_slot), + ancestors.contains(&root_slot), "ancestors: {:?}, slot: {} root: {}", - ancestors[&slot], + ancestors, slot, root_slot ); @@ -563,6 +561,16 @@ impl Tower { false } + fn is_candidate_slot_descendant_of_last_vote( + candidate_slot: Slot, + last_voted_slot: Slot, + ancestors: &HashMap>, + ) -> Option { + ancestors + .get(&candidate_slot) + .map(|candidate_slot_ancestors| candidate_slot_ancestors.contains(&last_voted_slot)) + } + fn make_check_switch_threshold_decision( &self, switch_slot: u64, @@ -571,6 +579,7 @@ impl Tower { progress: &ProgressMap, total_stake: u64, epoch_vote_accounts: &HashMap, + latest_validator_votes_for_frozen_banks: &LatestValidatorVotesForFrozenBanks, ) -> SwitchForkDecision { self.last_voted_slot() .map(|last_voted_slot| { @@ -704,13 +713,7 @@ impl Tower { // then use this bank as a representative for the fork. || descendants.iter().any(|d| progress.get_fork_stats(*d).map(|stats| stats.computed).unwrap_or(false)) || *candidate_slot == last_voted_slot - || ancestors - .get(&candidate_slot) - .expect( - "empty descendants implies this is a child, not parent of root, so must - exist in the ancestors map", - ) - .contains(&last_voted_slot) + || Self::is_candidate_slot_descendant_of_last_vote(*candidate_slot, last_voted_slot, ancestors).expect("exists in descendants map, so must exist in ancestors map") || *candidate_slot <= root { continue; @@ -755,17 +758,40 @@ impl Tower { .map(|(stake, _)| *stake) .unwrap_or(0); locked_out_stake += stake; + if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD { + return SwitchForkDecision::SwitchProof(switch_proof); + } locked_out_vote_accounts.insert(vote_account_pubkey); } } } } - if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD { - SwitchForkDecision::SwitchProof(switch_proof) - } else { - SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake) + // Check the latest votes for potentially gossip votes that haven't landed yet + for (vote_account_pubkey, (candidate_latest_frozen_vote, _candidate_latest_frozen_vote_hash)) in latest_validator_votes_for_frozen_banks.max_gossip_frozen_votes() { + if locked_out_vote_accounts.contains(&vote_account_pubkey) { + continue; + } + + if *candidate_latest_frozen_vote > last_voted_slot + && !Self::is_candidate_slot_descendant_of_last_vote( + *candidate_latest_frozen_vote, last_voted_slot, ancestors) + .expect("candidate_latest_frozen_vote is a frozen bank, so must exist in ancestors map") { + let stake = epoch_vote_accounts + .get(vote_account_pubkey) + .map(|(stake, _)| *stake) + .unwrap_or(0); + locked_out_stake += stake; + if (locked_out_stake as f64 / total_stake as f64) > SWITCH_FORK_THRESHOLD { + return SwitchForkDecision::SwitchProof(switch_proof); + } + locked_out_vote_accounts.insert(vote_account_pubkey); + } } + + // We have not detected sufficient lockout past the last voted slot to generate + // a switching proof + SwitchForkDecision::FailedSwitchThreshold(locked_out_stake, total_stake) }) .unwrap_or(SwitchForkDecision::SameFork) } @@ -778,6 +804,7 @@ impl Tower { progress: &ProgressMap, total_stake: u64, epoch_vote_accounts: &HashMap, + latest_validator_votes_for_frozen_banks: &LatestValidatorVotesForFrozenBanks, ) -> SwitchForkDecision { let decision = self.make_check_switch_threshold_decision( switch_slot, @@ -786,6 +813,7 @@ impl Tower { progress, total_stake, epoch_vote_accounts, + latest_validator_votes_for_frozen_banks, ); let new_check = Some((switch_slot, decision.clone())); if new_check != self.last_switch_threshold_check { @@ -1476,6 +1504,7 @@ pub mod test { &descendants, &self.progress, tower, + &self.latest_validator_votes_for_frozen_banks, ); // Make sure this slot isn't locked out or failing threshold @@ -1835,6 +1864,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchDuplicateRollback(duplicate_ancestor2) ); @@ -1859,6 +1889,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks, ); if i == 0 { assert_eq!( @@ -1894,6 +1925,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::SameFork ); @@ -1907,6 +1939,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -1922,6 +1955,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -1937,6 +1971,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -1952,6 +1987,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -1969,6 +2005,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -1984,6 +2021,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::SwitchProof(Hash::default()) ); @@ -2000,6 +2038,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::SwitchProof(Hash::default()) ); @@ -2025,11 +2064,90 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); } + #[test] + fn test_switch_threshold_use_gossip_votes() { + let num_validators = 2; + let (bank0, mut vote_simulator, total_stake) = setup_switch_test(2); + let ancestors = vote_simulator.bank_forks.read().unwrap().ancestors(); + let descendants = vote_simulator + .bank_forks + .read() + .unwrap() + .descendants() + .clone(); + let mut tower = Tower::new_with_key(&vote_simulator.node_pubkeys[0]); + let other_vote_account = vote_simulator.vote_pubkeys[1]; + + // Last vote is 47 + tower.record_vote(47, Hash::default()); + + // Trying to switch to another fork at 110 should fail + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks + ), + SwitchForkDecision::FailedSwitchThreshold(0, num_validators * 10000) + ); + + // Adding a vote on the descendant shouldn't count toward the switch threshold + vote_simulator.simulate_lockout_interval(50, (49, 100), &other_vote_account); + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks + ), + SwitchForkDecision::FailedSwitchThreshold(0, 20000) + ); + + // Adding a later vote from gossip that isn't on the same fork should count toward the + // switch threshold + vote_simulator + .latest_validator_votes_for_frozen_banks + .check_add_vote( + other_vote_account, + 110, + Some( + vote_simulator + .bank_forks + .read() + .unwrap() + .get(110) + .unwrap() + .hash(), + ), + false, + ); + assert_eq!( + tower.check_switch_threshold( + 110, + &ancestors, + &descendants, + &vote_simulator.progress, + total_stake, + bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks + ), + SwitchForkDecision::SwitchProof(Hash::default()) + ); + } + #[test] fn test_switch_threshold_votes() { // Init state @@ -2306,16 +2424,14 @@ pub mod test { #[test] fn test_is_locked_out_empty() { let tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![(0, HashSet::new())].into_iter().collect(); + let ancestors = HashSet::new(); assert!(!tower.is_locked_out(0, &ancestors)); } #[test] fn test_is_locked_out_root_slot_child_pass() { let mut tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![(1, vec![0].into_iter().collect())] - .into_iter() - .collect(); + let ancestors: HashSet = vec![0].into_iter().collect(); tower.lockouts.root_slot = Some(0); assert!(!tower.is_locked_out(1, &ancestors)); } @@ -2323,9 +2439,7 @@ pub mod test { #[test] fn test_is_locked_out_root_slot_sibling_fail() { let mut tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![(2, vec![0].into_iter().collect())] - .into_iter() - .collect(); + let ancestors: HashSet = vec![0].into_iter().collect(); tower.lockouts.root_slot = Some(0); tower.record_vote(1, Hash::default()); assert!(tower.is_locked_out(2, &ancestors)); @@ -2356,9 +2470,7 @@ pub mod test { #[test] fn test_is_locked_out_double_vote() { let mut tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![(1, vec![0].into_iter().collect()), (0, HashSet::new())] - .into_iter() - .collect(); + let ancestors: HashSet = vec![0].into_iter().collect(); tower.record_vote(0, Hash::default()); tower.record_vote(1, Hash::default()); assert!(tower.is_locked_out(0, &ancestors)); @@ -2367,9 +2479,7 @@ pub mod test { #[test] fn test_is_locked_out_child() { let mut tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![(1, vec![0].into_iter().collect())] - .into_iter() - .collect(); + let ancestors: HashSet = vec![0].into_iter().collect(); tower.record_vote(0, Hash::default()); assert!(!tower.is_locked_out(1, &ancestors)); } @@ -2377,13 +2487,7 @@ pub mod test { #[test] fn test_is_locked_out_sibling() { let mut tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![ - (0, HashSet::new()), - (1, vec![0].into_iter().collect()), - (2, vec![0].into_iter().collect()), - ] - .into_iter() - .collect(); + let ancestors: HashSet = vec![0].into_iter().collect(); tower.record_vote(0, Hash::default()); tower.record_vote(1, Hash::default()); assert!(tower.is_locked_out(2, &ancestors)); @@ -2392,13 +2496,7 @@ pub mod test { #[test] fn test_is_locked_out_last_vote_expired() { let mut tower = Tower::new_for_tests(0, 0.67); - let ancestors = vec![ - (0, HashSet::new()), - (1, vec![0].into_iter().collect()), - (4, vec![0].into_iter().collect()), - ] - .into_iter() - .collect(); + let ancestors: HashSet = vec![0].into_iter().collect(); tower.record_vote(0, Hash::default()); tower.record_vote(1, Hash::default()); assert!(!tower.is_locked_out(4, &ancestors)); @@ -2723,6 +2821,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::SameFork ); @@ -2736,6 +2835,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -2750,6 +2850,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::SwitchProof(Hash::default()) ); @@ -2819,6 +2920,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -2833,6 +2935,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::FailedSwitchThreshold(0, 20000) ); @@ -2847,6 +2950,7 @@ pub mod test { &vote_simulator.progress, total_stake, bank0.epoch_vote_accounts(0).unwrap(), + &vote_simulator.latest_validator_votes_for_frozen_banks ), SwitchForkDecision::SwitchProof(Hash::default()) ); diff --git a/core/src/latest_validator_votes_for_frozen_banks.rs b/core/src/latest_validator_votes_for_frozen_banks.rs index 3e498f1e0..10b01d60a 100644 --- a/core/src/latest_validator_votes_for_frozen_banks.rs +++ b/core/src/latest_validator_votes_for_frozen_banks.rs @@ -100,6 +100,10 @@ impl LatestValidatorVotesForFrozenBanks { .collect() } + pub(crate) fn max_gossip_frozen_votes(&self) -> &HashMap)> { + &self.max_gossip_frozen_votes + } + #[cfg(test)] fn latest_vote(&self, pubkey: &Pubkey, is_replay_vote: bool) -> Option<&(Slot, Vec)> { let vote_map = if is_replay_vote { diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index c0e09509f..5a8117e35 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -509,6 +509,7 @@ impl ReplayStage { &descendants, &progress, &mut tower, + &latest_validator_votes_for_frozen_banks, ); select_vote_and_reset_forks_time.stop(); @@ -1872,7 +1873,12 @@ impl ReplayStage { stats.vote_threshold = tower.check_vote_stake_threshold(bank_slot, &stats.voted_stakes, stats.total_stake); - stats.is_locked_out = tower.is_locked_out(bank_slot, &ancestors); + stats.is_locked_out = tower.is_locked_out( + bank_slot, + ancestors + .get(&bank_slot) + .expect("Ancestors map should contain slot for is_locked_out() check"), + ); stats.has_voted = tower.has_voted(bank_slot); stats.is_recent = tower.is_recent(bank_slot); } @@ -1951,6 +1957,7 @@ impl ReplayStage { descendants: &HashMap>, progress: &ProgressMap, tower: &mut Tower, + latest_validator_votes_for_frozen_banks: &LatestValidatorVotesForFrozenBanks, ) -> SelectVoteAndResetForkResult { // Try to vote on the actual heaviest fork. If the heaviest bank is // locked out or fails the threshold check, the validator will: @@ -1976,6 +1983,7 @@ impl ReplayStage { heaviest_bank .epoch_vote_accounts(heaviest_bank.epoch()) .expect("Bank epoch vote accounts must contain entry for the bank's own epoch"), + latest_validator_votes_for_frozen_banks, ); match switch_fork_decision { @@ -5010,6 +5018,7 @@ pub(crate) mod tests { &descendants, progress, tower, + latest_validator_votes_for_frozen_banks, ); ( vote_bank.map(|(b, _)| b.slot()), diff --git a/ledger/src/blockstore.rs b/ledger/src/blockstore.rs index de6e97e25..36c3962b0 100644 --- a/ledger/src/blockstore.rs +++ b/ledger/src/blockstore.rs @@ -1860,7 +1860,7 @@ impl Blockstore { Err(BlockstoreError::SlotUnavailable) } - fn map_transactions_to_statuses<'a>( + pub fn map_transactions_to_statuses<'a>( &self, slot: Slot, iterator: impl Iterator + 'a, diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 29ea47603..a11b5d391 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -289,7 +289,10 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); done = min_node >= num_new_roots; if done || last_print.elapsed().as_secs() > 3 { - info!("{} min observed roots {}/16", test_name, min_node); + info!( + "{} {} min observed roots {}/16", + test_name, ingress_node.id, min_node + ); last_print = Instant::now(); } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index d50eab78f..40f99a8dc 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -13,8 +13,9 @@ use solana_client::{ }; use solana_core::{ broadcast_stage::BroadcastStageType, - cluster_info::VALIDATOR_PORT_RANGE, + cluster_info::{self, VALIDATOR_PORT_RANGE}, consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, + crds_value::{self, CrdsData, CrdsValue}, gossip_service::discover_cluster, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, validator::ValidatorConfig, @@ -49,8 +50,13 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, system_program, system_transaction, + timing::timestamp, + transaction::Transaction, +}; +use solana_vote_program::{ + vote_instruction, + vote_state::{Vote, MAX_LOCKOUT_HISTORY}, }; -use solana_vote_program::vote_state::MAX_LOCKOUT_HISTORY; use std::{ collections::{BTreeSet, HashMap, HashSet}, fs, @@ -539,7 +545,7 @@ fn run_kill_partition_switch_threshold( partition_duration: Option, ticks_per_slot: Option, partition_context: C, - on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], &mut C), + on_partition_start: impl Fn(&mut LocalCluster, &[Pubkey], Vec, &mut C), on_before_partition_resolved: impl Fn(&mut LocalCluster, &mut C), on_partition_resolved: impl Fn(&mut LocalCluster, &mut C), ) { @@ -581,11 +587,20 @@ fn run_kill_partition_switch_threshold( ); let validator_pubkeys: Vec = validator_keys.iter().map(|k| k.pubkey()).collect(); let on_partition_start = |cluster: &mut LocalCluster, partition_context: &mut C| { - for validator_to_kill in &validator_pubkeys[0..stakes_to_kill.len()] { - info!("Killing validator with id: {}", validator_to_kill); - cluster.exit_node(&validator_to_kill); - } - on_partition_start(cluster, &validator_pubkeys, partition_context); + let dead_validator_infos: Vec = validator_pubkeys + [0..stakes_to_kill.len()] + .iter() + .map(|validator_to_kill| { + info!("Killing validator with id: {}", validator_to_kill); + cluster.exit_node(&validator_to_kill) + }) + .collect(); + on_partition_start( + cluster, + &validator_pubkeys, + dead_validator_infos, + partition_context, + ); }; run_cluster_partition( &stake_partitions, @@ -600,6 +615,332 @@ fn run_kill_partition_switch_threshold( ) } +fn find_latest_replayed_slot_from_ledger( + ledger_path: &Path, + mut latest_slot: Slot, +) -> (Slot, HashSet) { + loop { + let mut blockstore = open_blockstore(&ledger_path); + // This is kind of a hack because we can't query for new frozen blocks over RPC + // since the validator is not voting. + let new_latest_slots: Vec = blockstore + .slot_meta_iterator(latest_slot) + .unwrap() + .filter_map(|(s, _)| if s > latest_slot { Some(s) } else { None }) + .collect(); + + for new_latest_slot in new_latest_slots { + latest_slot = new_latest_slot; + info!("Checking latest_slot {}", latest_slot); + // Wait for the slot to be fully received by the validator + let entries; + loop { + info!("Waiting for slot {} to be full", latest_slot); + if blockstore.is_full(latest_slot) { + entries = blockstore.get_slot_entries(latest_slot, 0).unwrap(); + assert!(!entries.is_empty()); + break; + } else { + sleep(Duration::from_millis(50)); + blockstore = open_blockstore(&ledger_path); + } + } + // Check the slot has been replayed + let non_tick_entry = entries.into_iter().find(|e| !e.transactions.is_empty()); + if let Some(non_tick_entry) = non_tick_entry { + // Wait for the slot to be replayed + loop { + info!("Waiting for slot {} to be replayed", latest_slot); + if !blockstore + .map_transactions_to_statuses( + latest_slot, + non_tick_entry.transactions.clone().into_iter(), + ) + .is_empty() + { + return ( + latest_slot, + AncestorIterator::new(latest_slot, &blockstore).collect(), + ); + } else { + sleep(Duration::from_millis(50)); + blockstore = open_blockstore(&ledger_path); + } + } + } else { + info!( + "No transactions in slot {}, can't tell if it was replayed", + latest_slot + ); + } + } + sleep(Duration::from_millis(50)); + } +} + +#[test] +#[serial] +fn test_switch_threshold_uses_gossip_votes() { + solana_logger::setup_with_default(RUST_LOG_FILTER); + let total_stake = 100; + + // Minimum stake needed to generate a switching proof + let minimum_switch_stake = (SWITCH_FORK_THRESHOLD as f64 * total_stake as f64) as u64; + + // Make the heavier stake insufficient for switching so tha the lighter validator + // cannot switch without seeing a vote from the dead/failure_stake validator. + let heavier_stake = minimum_switch_stake; + let lighter_stake = heavier_stake - 1; + let failures_stake = total_stake - heavier_stake - lighter_stake; + + let partitions: &[&[(usize, usize)]] = &[ + &[(heavier_stake as usize, 8)], + &[(lighter_stake as usize, 8)], + ]; + + #[derive(Default)] + struct PartitionContext { + heaviest_validator_key: Pubkey, + lighter_validator_key: Pubkey, + dead_validator_info: Option, + } + + let on_partition_start = |_cluster: &mut LocalCluster, + validator_keys: &[Pubkey], + mut dead_validator_infos: Vec, + context: &mut PartitionContext| { + assert_eq!(dead_validator_infos.len(), 1); + context.dead_validator_info = Some(dead_validator_infos.pop().unwrap()); + // validator_keys[0] is the validator that will be killed, i.e. the validator with + // stake == `failures_stake` + context.heaviest_validator_key = validator_keys[1]; + context.lighter_validator_key = validator_keys[2]; + }; + + let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut PartitionContext| {}; + + // Check that new roots were set after the partition resolves (gives time + // for lockouts built during partition to resolve and gives validators an opportunity + // to try and switch forks) + let on_partition_resolved = |cluster: &mut LocalCluster, context: &mut PartitionContext| { + let lighter_validator_ledger_path = cluster.ledger_path(&context.lighter_validator_key); + let heavier_validator_ledger_path = cluster.ledger_path(&context.heaviest_validator_key); + + let (lighter_validator_latest_vote, _) = last_vote_in_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + + info!( + "Lighter validator's latest vote is for slot {}", + lighter_validator_latest_vote + ); + + // Lighter partition should stop voting after detecting the heavier partition and try + // to switch. Loop until we see a greater vote by the heavier validator than the last + // vote made by the lighter validator on the lighter fork. + let mut heavier_validator_latest_vote; + let mut heavier_validator_latest_vote_hash; + let heavier_blockstore = open_blockstore(&heavier_validator_ledger_path); + loop { + let (sanity_check_lighter_validator_latest_vote, _) = last_vote_in_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + + // Lighter validator should stop voting, because `on_partition_resolved` is only + // called after a propagation time where blocks from the other fork should have + // finished propagating + assert_eq!( + sanity_check_lighter_validator_latest_vote, + lighter_validator_latest_vote + ); + + let (new_heavier_validator_latest_vote, new_heavier_validator_latest_vote_hash) = + last_vote_in_tower( + &heavier_validator_ledger_path, + &context.heaviest_validator_key, + ) + .unwrap(); + + heavier_validator_latest_vote = new_heavier_validator_latest_vote; + heavier_validator_latest_vote_hash = new_heavier_validator_latest_vote_hash; + + // Latest vote for each validator should be on different forks + assert_ne!(lighter_validator_latest_vote, heavier_validator_latest_vote); + if heavier_validator_latest_vote > lighter_validator_latest_vote { + let heavier_ancestors: HashSet = + AncestorIterator::new(heavier_validator_latest_vote, &heavier_blockstore) + .collect(); + assert!(!heavier_ancestors.contains(&lighter_validator_latest_vote)); + break; + } + } + + info!("Checking to make sure lighter validator doesn't switch"); + let mut latest_slot = lighter_validator_latest_vote; + + // Number of chances the validator had to switch votes but didn't + let mut total_voting_opportunities = 0; + while total_voting_opportunities <= 5 { + let (new_latest_slot, latest_slot_ancestors) = + find_latest_replayed_slot_from_ledger(&lighter_validator_ledger_path, latest_slot); + latest_slot = new_latest_slot; + // Ensure `latest_slot` is on the other fork + if latest_slot_ancestors.contains(&heavier_validator_latest_vote) { + let tower = restore_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + // Check that there was an opportunity to vote + if !tower.is_locked_out(latest_slot, &latest_slot_ancestors) { + // Ensure the lighter blockstore has not voted again + let new_lighter_validator_latest_vote = tower.last_voted_slot().unwrap(); + assert_eq!( + new_lighter_validator_latest_vote, + lighter_validator_latest_vote + ); + info!( + "Incrementing voting opportunities: {}", + total_voting_opportunities + ); + total_voting_opportunities += 1; + } else { + info!( + "Tower still locked out, can't vote for slot: {}", + latest_slot + ); + } + } else if latest_slot > heavier_validator_latest_vote { + warn!( + "validator is still generating blocks on its own fork, last processed slot: {}", + latest_slot + ); + } + sleep(Duration::from_millis(50)); + } + + // Make a vote from the killed validator for slot `heavier_validator_latest_vote` in gossip + info!( + "Simulate vote for slot: {} from dead validator", + heavier_validator_latest_vote + ); + let vote_keypair = &context + .dead_validator_info + .as_ref() + .unwrap() + .info + .voting_keypair + .clone(); + let node_keypair = &context + .dead_validator_info + .as_ref() + .unwrap() + .info + .keypair + .clone(); + let vote_ix = vote_instruction::vote( + &vote_keypair.pubkey(), + &vote_keypair.pubkey(), + Vote::new( + vec![heavier_validator_latest_vote], + heavier_validator_latest_vote_hash, + ), + ); + + let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); + + // Make the vote transaction with a random blockhash. Thus, the vote only lives in gossip but + // never makes it into a block + let blockhash = Hash::new_unique(); + vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); + vote_tx.partial_sign(&[vote_keypair.as_ref()], blockhash); + let heavier_node_gossip = cluster + .get_contact_info(&context.heaviest_validator_key) + .unwrap() + .gossip; + cluster_info::push_messages_to_peer( + vec![CrdsValue::new_signed( + CrdsData::Vote( + 0, + crds_value::Vote::new(node_keypair.pubkey(), vote_tx, timestamp()), + ), + &node_keypair, + )], + context + .dead_validator_info + .as_ref() + .unwrap() + .info + .keypair + .pubkey(), + heavier_node_gossip, + ) + .unwrap(); + + loop { + // Wait for the lighter validator to switch to the heavier fork + let (new_lighter_validator_latest_vote, _) = last_vote_in_tower( + &lighter_validator_ledger_path, + &context.lighter_validator_key, + ) + .unwrap(); + + if new_lighter_validator_latest_vote != lighter_validator_latest_vote { + info!( + "Lighter validator switched forks at slot: {}", + new_lighter_validator_latest_vote + ); + let (heavier_validator_latest_vote, _) = last_vote_in_tower( + &heavier_validator_ledger_path, + &context.heaviest_validator_key, + ) + .unwrap(); + let (smaller, larger) = + if new_lighter_validator_latest_vote > heavier_validator_latest_vote { + ( + heavier_validator_latest_vote, + new_lighter_validator_latest_vote, + ) + } else { + ( + new_lighter_validator_latest_vote, + heavier_validator_latest_vote, + ) + }; + + // Check the new vote is on the same fork as the heaviest fork + let heavier_blockstore = open_blockstore(&heavier_validator_ledger_path); + let larger_slot_ancestors: HashSet = + AncestorIterator::new(larger, &heavier_blockstore) + .chain(std::iter::once(larger)) + .collect(); + assert!(larger_slot_ancestors.contains(&smaller)); + break; + } else { + sleep(Duration::from_millis(50)); + } + } + }; + + let ticks_per_slot = 8; + run_kill_partition_switch_threshold( + &[&[(failures_stake as usize, 0)]], + partitions, + // Partition long enough such that the first vote made by validator with + // `alive_stake_3` won't be ingested due to BlockhashTooOld, + None, + Some(ticks_per_slot), + PartitionContext::default(), + on_partition_start, + on_before_partition_resolved, + on_partition_resolved, + ); +} + #[test] #[serial] fn test_kill_partition_switch_threshold_no_progress() { @@ -615,7 +956,8 @@ fn test_kill_partition_switch_threshold_no_progress() { // Check that no new roots were set 400 slots after partition resolves (gives time // for lockouts built during partition to resolve and gives validators an opportunity // to try and switch forks) - let on_partition_start = |_: &mut LocalCluster, _: &[Pubkey], _: &mut ()| {}; + let on_partition_start = + |_: &mut LocalCluster, _: &[Pubkey], _: Vec, _: &mut ()| {}; let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_no_new_roots(400, &"PARTITION_TEST"); @@ -667,7 +1009,8 @@ fn test_kill_partition_switch_threshold_progress() { && smaller as f64 / total_stake as f64 <= SWITCH_FORK_THRESHOLD ); - let on_partition_start = |_: &mut LocalCluster, _: &[Pubkey], _: &mut ()| {}; + let on_partition_start = + |_: &mut LocalCluster, _: &[Pubkey], _: Vec, _: &mut ()| {}; let on_before_partition_resolved = |_: &mut LocalCluster, _: &mut ()| {}; let on_partition_resolved = |cluster: &mut LocalCluster, _: &mut ()| { cluster.check_for_new_roots(16, &"PARTITION_TEST"); @@ -746,17 +1089,21 @@ fn test_fork_choice_refresh_old_votes() { lighter_fork_validator_key: Pubkey, heaviest_validator_key: Pubkey, } - let on_partition_start = - |cluster: &mut LocalCluster, validator_keys: &[Pubkey], context: &mut PartitionContext| { - // Kill validator with alive_stake_3, second in `partitions` slice - let smallest_validator_key = &validator_keys[3]; - let info = cluster.exit_node(smallest_validator_key); - context.alive_stake3_info = Some(info); - context.smallest_validator_key = *smallest_validator_key; - context.lighter_fork_validator_key = validator_keys[1]; - // Third in `partitions` slice - context.heaviest_validator_key = validator_keys[2]; - }; + let on_partition_start = |cluster: &mut LocalCluster, + validator_keys: &[Pubkey], + _: Vec, + context: &mut PartitionContext| { + // Kill validator with alive_stake_3, second in `partitions` slice + let smallest_validator_key = &validator_keys[3]; + let info = cluster.exit_node(smallest_validator_key); + context.alive_stake3_info = Some(info); + context.smallest_validator_key = *smallest_validator_key; + // validator_keys[0] is the validator that will be killed, i.e. the validator with + // stake == `failures_stake` + context.lighter_fork_validator_key = validator_keys[1]; + // Third in `partitions` slice + context.heaviest_validator_key = validator_keys[2]; + }; let ticks_per_slot = 8; let on_before_partition_resolved = @@ -790,12 +1137,12 @@ fn test_fork_choice_refresh_old_votes() { info!("Opened blockstores"); // Get latest votes - let lighter_fork_latest_vote = last_vote_in_tower( + let (lighter_fork_latest_vote, _) = last_vote_in_tower( &lighter_fork_ledger_path, &context.lighter_fork_validator_key, ) .unwrap(); - let heaviest_fork_latest_vote = + let (heaviest_fork_latest_vote, _) = last_vote_in_tower(&heaviest_ledger_path, &context.heaviest_validator_key).unwrap(); // Find the first slot on the smaller fork @@ -825,8 +1172,6 @@ fn test_fork_choice_refresh_old_votes() { first_slot_in_lighter_partition ); - assert!(first_slot_in_lighter_partition != 0); - // Copy all the blocks from the smaller partition up to `first_slot_in_lighter_partition` // into the smallest validator's blockstore for lighter_slot in std::iter::once(first_slot_in_lighter_partition).chain( @@ -869,12 +1214,12 @@ fn test_fork_choice_refresh_old_votes() { 0 \-------- 4 (38%, heavier fork) */ - if let Some(last_vote) = + if let Some((last_vote_slot, _last_vote_hash)) = last_vote_in_tower(&smallest_ledger_path, &context.smallest_validator_key) { // Check that the heaviest validator on the other fork doesn't have this slot, // this must mean we voted on a unique slot on this fork - if last_vote == first_slot_in_lighter_partition { + if last_vote_slot == first_slot_in_lighter_partition { info!( "Saw vote on first slot in lighter partition {}", first_slot_in_lighter_partition @@ -883,7 +1228,7 @@ fn test_fork_choice_refresh_old_votes() { } else { info!( "Haven't seen vote on first slot in lighter partition, latest vote is: {}", - last_vote + last_vote_slot ); } } @@ -1992,8 +2337,8 @@ fn restore_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option { Tower::restore(&ledger_path, &node_pubkey).ok() } -fn last_vote_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option { - restore_tower(ledger_path, node_pubkey).map(|tower| tower.last_voted_slot().unwrap()) +fn last_vote_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option<(Slot, Hash)> { + restore_tower(ledger_path, node_pubkey).map(|tower| tower.last_voted_slot_hash().unwrap()) } fn root_in_tower(ledger_path: &Path, node_pubkey: &Pubkey) -> Option { @@ -2112,12 +2457,12 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b } sleep(Duration::from_millis(100)); - if let Some(last_vote) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { + if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { if !validator_a_finished && last_vote >= next_slot_on_a { validator_a_finished = true; } } - if let Some(last_vote) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey) { + if let Some((last_vote, _)) = last_vote_in_tower(&val_b_ledger_path, &validator_b_pubkey) { if !validator_b_finished && last_vote >= next_slot_on_a { validator_b_finished = true; } @@ -2172,7 +2517,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b for _ in 0..100 { sleep(Duration::from_millis(100)); - if let Some(last_vote) = last_vote_in_tower(&val_c_ledger_path, &validator_c_pubkey) { + if let Some((last_vote, _)) = last_vote_in_tower(&val_c_ledger_path, &validator_c_pubkey) { if last_vote != base_slot { votes_on_c_fork.insert(last_vote); // Collect 4 votes @@ -2196,7 +2541,7 @@ fn do_test_optimistic_confirmation_violation_with_or_without_tower(with_tower: b for _ in 0..100 { sleep(Duration::from_millis(100)); - if let Some(last_vote) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { + if let Some((last_vote, _)) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey) { a_votes.push(last_vote); let blockstore = Blockstore::open_with_access_type( &val_a_ledger_path, @@ -2317,7 +2662,7 @@ fn do_test_future_tower(cluster_mode: ClusterMode) { let _validator_a_info = cluster.exit_node(&validator_a_pubkey); if newly_rooted { // there should be no forks; i.e. monotonically increasing ancestor chain - let last_vote = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); + let (last_vote, _) = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); let blockstore = open_blockstore(&val_a_ledger_path); let actual_block_ancestors = AncestorIterator::new_inclusive(last_vote, &blockstore) .take_while(|a| *a >= some_root_after_restart) @@ -2440,7 +2785,8 @@ fn test_hard_fork_invalidates_tower() { for _ in 0..10 { sleep(Duration::from_millis(1000)); - let new_last_vote = last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); + let (new_last_vote, _) = + last_vote_in_tower(&val_a_ledger_path, &validator_a_pubkey).unwrap(); if let Some(last_vote) = last_vote { assert_eq!(last_vote, new_last_vote); } else {