diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 2846cec3b..0000dabba 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,8 +1,9 @@ //! A stage to broadcast data from a leader node to validators #![allow(clippy::rc_buffer)] use self::{ - broadcast_duplicates_run::BroadcastDuplicatesRun, - broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*, + broadcast_duplicates_run::{BroadcastDuplicatesConfig, BroadcastDuplicatesRun}, + broadcast_fake_shreds_run::BroadcastFakeShredsRun, + broadcast_metrics::*, fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, standard_broadcast_run::StandardBroadcastRun, }; @@ -34,7 +35,7 @@ use std::{ time::{Duration, Instant}, }; -mod broadcast_duplicates_run; +pub mod broadcast_duplicates_run; mod broadcast_fake_shreds_run; pub mod broadcast_metrics; pub(crate) mod broadcast_utils; @@ -52,14 +53,6 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } -#[derive(PartialEq, Clone, Debug)] -pub struct BroadcastDuplicatesConfig { - /// Percentage of stake to send different version of slots to - pub stake_partition: u8, - /// Number of slots to wait before sending duplicate shreds - pub duplicate_send_delay: usize, -} - #[derive(PartialEq, Clone, Debug)] pub enum BroadcastStageType { Standard, diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 16db6b5a0..f144f0944 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -1,162 +1,206 @@ -use super::broadcast_utils::ReceiveResults; use super::*; -use log::*; -use solana_ledger::entry::{create_ticks, Entry, EntrySlice}; -use solana_ledger::shred::Shredder; +use solana_ledger::{entry::Entry, shred::Shredder}; use solana_runtime::blockhash_queue::BlockhashQueue; -use solana_sdk::clock::Slot; -use solana_sdk::fee_calculator::FeeCalculator; -use solana_sdk::hash::Hash; -use solana_sdk::signature::{Keypair, Signer}; -use solana_sdk::transaction::Transaction; -use std::collections::VecDeque; -use std::sync::Mutex; +use solana_sdk::{ + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, +}; -// Queue which facilitates delivering shreds with a delay -type DelayedQueue = VecDeque<(Option, Option>)>; +pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; +pub const DUPLICATE_RATE: usize = 10; + +#[derive(PartialEq, Clone, Debug)] +pub struct BroadcastDuplicatesConfig { + /// Amount of stake (excluding the leader) to send different version of slots to. + /// Note this is sampled from a list of stakes sorted least to greatest. + pub stake_partition: u64, +} #[derive(Clone)] pub(super) struct BroadcastDuplicatesRun { config: BroadcastDuplicatesConfig, // Local queue for broadcast to track which duplicate blockhashes we've sent duplicate_queue: BlockhashQueue, - // Shared queue between broadcast and transmit threads - delayed_queue: Arc>, // Buffer for duplicate entries duplicate_entries_buffer: Vec, last_duplicate_entry_hash: Hash, - last_broadcast_slot: Slot, + current_slot: Slot, next_shred_index: u32, shred_version: u16, + recent_blockhash: Option, + prev_entry_hash: Option, + num_slots_broadcasted: usize, } impl BroadcastDuplicatesRun { pub(super) fn new(shred_version: u16, config: BroadcastDuplicatesConfig) -> Self { - let mut delayed_queue = DelayedQueue::new(); - delayed_queue.resize(config.duplicate_send_delay, (None, None)); Self { config, - delayed_queue: Arc::new(Mutex::new(delayed_queue)), duplicate_queue: BlockhashQueue::default(), duplicate_entries_buffer: vec![], next_shred_index: u32::MAX, - last_broadcast_slot: 0, last_duplicate_entry_hash: Hash::default(), shred_version, + current_slot: 0, + recent_blockhash: None, + prev_entry_hash: None, + num_slots_broadcasted: 0, } } - fn queue_or_create_duplicate_entries( - &mut self, - bank: &Arc, - receive_results: &ReceiveResults, - ) -> (Vec, u32) { - // If the last entry hash is default, grab the last blockhash from the parent bank - if self.last_duplicate_entry_hash == Hash::default() { - self.last_duplicate_entry_hash = bank.last_blockhash(); - } + fn get_non_partitioned_batches( + &self, + my_pubkey: &Pubkey, + bank: &Bank, + data_shreds: Arc>, + ) -> TransmitShreds { + let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); + let mut stakes: HashMap = bank.epoch_staked_nodes(bank_epoch).unwrap(); + stakes.retain(|pubkey, _stake| pubkey != my_pubkey); + (Some(Arc::new(stakes)), data_shreds) + } - // Create duplicate entries by.. - // 1) rearranging real entries so that all transaction entries are moved to - // the front and tick entries are moved to the back. - // 2) setting all transaction entries to zero hashes and all tick entries to `hashes_per_tick`. - // 3) removing any transactions which reference blockhashes which aren't in the - // duplicate blockhash queue. - let (duplicate_entries, next_shred_index) = if bank.slot() > MINIMUM_DUPLICATE_SLOT { - let mut tx_entries: Vec = receive_results - .entries - .iter() - .filter_map(|entry| { - if entry.is_tick() { - return None; - } + fn get_partitioned_batches( + &self, + my_pubkey: &Pubkey, + bank: &Bank, + original_shreds: Arc>, + partition_shreds: Arc>, + ) -> (TransmitShreds, TransmitShreds) { + // On the last shred, partition network with duplicate and real shreds + let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); + let mut original_recipients = HashMap::new(); + let mut partition_recipients = HashMap::new(); - let transactions: Vec = entry - .transactions - .iter() - .filter(|tx| { - self.duplicate_queue - .get_hash_age(&tx.message.recent_blockhash) - .is_some() - }) - .cloned() - .collect(); - if !transactions.is_empty() { - Some(Entry::new_mut( - &mut self.last_duplicate_entry_hash, - &mut 0, - transactions, - )) - } else { - None - } - }) - .collect(); - let mut tick_entries = create_ticks( - receive_results.entries.tick_count(), - bank.hashes_per_tick().unwrap_or_default(), - self.last_duplicate_entry_hash, - ); - self.duplicate_entries_buffer.append(&mut tx_entries); - self.duplicate_entries_buffer.append(&mut tick_entries); - - // Only send out duplicate entries when the block is finished otherwise the - // recipient will start repairing for shreds they haven't received yet and - // hit duplicate slot issues before we want them to. - let entries = if receive_results.last_tick_height == bank.max_tick_height() { - self.duplicate_entries_buffer.drain(..).collect() + let mut stakes: Vec<(Pubkey, u64)> = bank + .epoch_staked_nodes(bank_epoch) + .unwrap() + .into_iter() + .filter(|(pubkey, _)| pubkey != my_pubkey) + .collect(); + stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| { + if r_stake == l_stake { + l_key.cmp(r_key) } else { - vec![] - }; + l_stake.cmp(r_stake) + } + }); - // Set next shred index to 0 since we are sending the full slot - (entries, 0) - } else { - // Send real entries until we hit min duplicate slot - (receive_results.entries.clone(), self.next_shred_index) - }; - - // Save last duplicate entry hash to avoid invalid entry hash errors - if let Some(last_duplicate_entry) = duplicate_entries.last() { - self.last_duplicate_entry_hash = last_duplicate_entry.hash; + let mut cumulative_stake: u64 = 0; + for (pubkey, stake) in stakes.into_iter() { + cumulative_stake += stake; + if cumulative_stake <= self.config.stake_partition { + partition_recipients.insert(pubkey, stake); + } else { + original_recipients.insert(pubkey, stake); + } } - (duplicate_entries, next_shred_index) + warn!( + "{} sent duplicate slot {} to nodes: {:?}", + my_pubkey, + bank.slot(), + &partition_recipients, + ); + + let original_recipients = Arc::new(original_recipients); + let original_transmit_shreds = (Some(original_recipients), original_shreds); + + let partition_recipients = Arc::new(partition_recipients); + let partition_transmit_shreds = (Some(partition_recipients), partition_shreds); + + (original_transmit_shreds, partition_transmit_shreds) } } -/// Duplicate slots should only be sent once all validators have started. -/// This constant is intended to be used as a buffer so that all validators -/// are live before sending duplicate slots. -pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; - impl BroadcastRun for BroadcastDuplicatesRun { fn run( &mut self, keypair: &Keypair, - blockstore: &Arc, + _blockstore: &Arc, receiver: &Receiver, socket_sender: &Sender<(TransmitShreds, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { // 1) Pull entries from banking stage - let receive_results = broadcast_utils::recv_slot_entries(receiver)?; + let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?; let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; - if self.next_shred_index == u32::MAX { - self.next_shred_index = blockstore - .meta(bank.slot()) - .expect("Database error") - .map(|meta| meta.consumed) - .unwrap_or(0) as u32 + if bank.slot() != self.current_slot { + self.next_shred_index = 0; + self.current_slot = bank.slot(); + self.prev_entry_hash = None; + self.num_slots_broadcasted += 1; } - // We were not the leader, but just became leader again - if bank.slot() > self.last_broadcast_slot + 1 { - self.last_duplicate_entry_hash = Hash::default(); + if receive_results.entries.is_empty() { + return Ok(()); } - self.last_broadcast_slot = bank.slot(); + + // Update the recent blockhash based on transactions in the entries + for entry in &receive_results.entries { + if !entry.transactions.is_empty() { + self.recent_blockhash = Some(entry.transactions[0].message.recent_blockhash); + break; + } + } + + // 2) Convert entries to shreds + generate coding shreds. Set a garbage PoH on the last entry + // in the slot to make verification fail on validators + let last_entries = { + if last_tick_height == bank.max_tick_height() + && bank.slot() > MINIMUM_DUPLICATE_SLOT + && self.num_slots_broadcasted % DUPLICATE_RATE == 0 + && self.recent_blockhash.is_some() + { + let entry_batch_len = receive_results.entries.len(); + let prev_entry_hash = + // Try to get second-to-last entry before last tick + if entry_batch_len > 1 { + Some(receive_results.entries[entry_batch_len - 2].hash) + } else { + self.prev_entry_hash + }; + + if let Some(prev_entry_hash) = prev_entry_hash { + let original_last_entry = receive_results.entries.pop().unwrap(); + + // Last entry has to be a tick + assert!(original_last_entry.is_tick()); + + // Inject an extra entry before the last tick + let extra_tx = system_transaction::transfer( + keypair, + &Pubkey::new_unique(), + 1, + self.recent_blockhash.unwrap(), + ); + let new_extra_entry = Entry::new(&prev_entry_hash, 1, vec![extra_tx]); + + // This will only work with sleepy tick producer where the hashing + // checks in replay are turned off, because we're introducing an extra + // hash for the last tick in the `new_extra_entry`. + let new_last_entry = Entry::new( + &new_extra_entry.hash, + original_last_entry.num_hashes, + vec![], + ); + + Some((original_last_entry, vec![new_extra_entry, new_last_entry])) + } else { + None + } + } else { + None + } + }; + + self.prev_entry_hash = last_entries + .as_ref() + .map(|(original_last_entry, _)| original_last_entry.hash) + .or_else(|| Some(receive_results.entries.last().unwrap().hash)); let shredder = Shredder::new( bank.slot(), @@ -166,165 +210,104 @@ impl BroadcastRun for BroadcastDuplicatesRun { ) .expect("Expected to create a new shredder"); - let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds( + let (data_shreds, _, _) = shredder.entries_to_shreds( keypair, &receive_results.entries, - last_tick_height == bank.max_tick_height(), + last_tick_height == bank.max_tick_height() && last_entries.is_none(), self.next_shred_index, ); - let (duplicate_entries, next_duplicate_shred_index) = - self.queue_or_create_duplicate_entries(&bank, &receive_results); - let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() { - shredder.entries_to_shreds( - keypair, - &duplicate_entries, - last_tick_height == bank.max_tick_height(), - next_duplicate_shred_index, - ) - } else { - (vec![], vec![], 0) - }; + self.next_shred_index += data_shreds.len() as u32; + let last_shreds = last_entries.map(|(original_last_entry, duplicate_extra_last_entries)| { + let (original_last_data_shred, _, _) = + shredder.entries_to_shreds(keypair, &[original_last_entry], true, self.next_shred_index); - // Manually track the shred index because relying on slot meta consumed is racy - if last_tick_height == bank.max_tick_height() { - self.next_shred_index = 0; - self.duplicate_queue - .register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default()); - } else { - self.next_shred_index = last_shred_index; - } + let (partition_last_data_shred, _, _) = + // Don't mark the last shred as last so that validators won't know that + // they've gotten all the shreds, and will continue trying to repair + shredder.entries_to_shreds(keypair, &duplicate_extra_last_entries, true, self.next_shred_index); - // Partition network with duplicate and real shreds based on stake - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let mut duplicate_recipients = HashMap::new(); - let mut real_recipients = HashMap::new(); + let sigs: Vec<_> = partition_last_data_shred.iter().map(|s| (s.signature(), s.index())).collect(); + info!( + "duplicate signatures for slot {}, sigs: {:?}", + bank.slot(), + sigs, + ); - let mut stakes: Vec<(Pubkey, u64)> = bank - .epoch_staked_nodes(bank_epoch) - .unwrap() - .into_iter() - .filter(|(pubkey, _)| *pubkey != keypair.pubkey()) - .collect(); - stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| { - if r_stake == l_stake { - l_key.cmp(r_key) - } else { - r_stake.cmp(l_stake) - } + self.next_shred_index += 1; + (original_last_data_shred, partition_last_data_shred) }); - let highest_staked_node = stakes.first().cloned().map(|x| x.0); - let stake_total: u64 = stakes.iter().map(|(_, stake)| *stake).sum(); - let mut cumulative_stake: u64 = 0; - for (pubkey, stake) in stakes.into_iter().rev() { - cumulative_stake += stake; - if (100 * cumulative_stake / stake_total) as u8 <= self.config.stake_partition { - duplicate_recipients.insert(pubkey, stake); - } else { - real_recipients.insert(pubkey, stake); - } - } - - if let Some(highest_staked_node) = highest_staked_node { - if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() { - warn!( - "{} sent duplicate slot {} to nodes: {:?}", - keypair.pubkey(), - bank.slot(), - &duplicate_recipients, - ); - warn!( - "Duplicate shreds for slot {} will be broadcast in {} slot(s)", - bank.slot(), - self.config.duplicate_send_delay - ); - - let delayed_shreds: Option> = vec![ - duplicate_data_shreds.last().cloned(), - data_shreds.last().cloned(), - ] - .into_iter() - .collect(); - self.delayed_queue - .lock() - .unwrap() - .push_back((Some(highest_staked_node), delayed_shreds)); - } - } - - let duplicate_recipients = Arc::new(duplicate_recipients); - let real_recipients = Arc::new(real_recipients); - let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step - socket_sender.send(( - ( - Some(duplicate_recipients.clone()), - Arc::new(duplicate_data_shreds), - ), - None, - ))?; - socket_sender.send(( - ( - Some(duplicate_recipients), - Arc::new(duplicate_coding_shreds), - ), - None, - ))?; - socket_sender.send(((Some(real_recipients.clone()), data_shreds), None))?; - socket_sender.send(((Some(real_recipients), Arc::new(coding_shreds)), None))?; + let transmit_shreds = + self.get_non_partitioned_batches(&keypair.pubkey(), &bank, data_shreds.clone()); + info!( + "{} Sending good shreds for slot {} to network", + keypair.pubkey(), + data_shreds.first().unwrap().slot() + ); + socket_sender.send((transmit_shreds, None))?; + // Special handling of last shred to cause partition + if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds { + let original_last_data_shred = Arc::new(original_last_data_shred); + let partition_last_data_shred = Arc::new(partition_last_data_shred); + + // Store the original shreds that this node replayed + blockstore_sender.send((original_last_data_shred.clone(), None))?; + + let (original_transmit_shreds, partition_transmit_shreds) = self + .get_partitioned_batches( + &keypair.pubkey(), + &bank, + original_last_data_shred, + partition_last_data_shred, + ); + + socket_sender.send((original_transmit_shreds, None))?; + socket_sender.send((partition_transmit_shreds, None))?; + } Ok(()) } + fn transmit( &mut self, receiver: &Arc>, cluster_info: &ClusterInfo, sock: &UdpSocket, - _bank_forks: &Arc>, + bank_forks: &Arc>, ) -> Result<()> { - // Check the delay queue for shreds that are ready to be sent - let (delayed_recipient, delayed_shreds) = { - let mut delayed_deque = self.delayed_queue.lock().unwrap(); - if delayed_deque.len() > self.config.duplicate_send_delay { - delayed_deque.pop_front().unwrap() - } else { - (None, None) - } - }; - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; - let stakes = stakes.unwrap(); - for peer in cluster_info.tvu_peers() { - // Forward shreds to circumvent gossip - if stakes.get(&peer.id).is_some() { - shreds.iter().for_each(|shred| { - sock.send_to(&shred.payload, &peer.tvu_forwards).unwrap(); - }); - } - - // After a delay, broadcast duplicate shreds to a single node - if let Some(shreds) = delayed_shreds.as_ref() { - if Some(peer.id) == delayed_recipient { - shreds.iter().for_each(|shred| { - sock.send_to(&shred.payload, &peer.tvu).unwrap(); - }); - } - } - } + // Broadcast data + let cluster_nodes = ClusterNodes::::new( + cluster_info, + stakes.as_deref().unwrap_or(&HashMap::default()), + ); + broadcast_shreds( + sock, + &shreds, + &cluster_nodes, + &Arc::new(AtomicU64::new(0)), + &mut TransmitShredsStats::default(), + cluster_info.id(), + bank_forks, + )?; Ok(()) } + fn record( &mut self, receiver: &Arc>, blockstore: &Arc, ) -> Result<()> { - let (data_shreds, _) = receiver.lock().unwrap().recv()?; - blockstore.insert_shreds(data_shreds.to_vec(), None, true)?; + let (all_shreds, _) = receiver.lock().unwrap().recv()?; + blockstore + .insert_shreds(all_shreds.to_vec(), None, true) + .expect("Failed to insert shreds in blockstore"); Ok(()) } } diff --git a/gossip/src/cluster_info.rs b/gossip/src/cluster_info.rs index 700a1d021..c42d04c00 100644 --- a/gossip/src/cluster_info.rs +++ b/gossip/src/cluster_info.rs @@ -961,7 +961,7 @@ impl ClusterInfo { self.push_message(CrdsValue::new_signed(message, &self.keypair())); } - fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) { + pub fn push_vote_at_index(&self, vote: Transaction, vote_index: u8) { assert!((vote_index as usize) < MAX_LOCKOUT_HISTORY); let self_pubkey = self.id(); let now = timestamp(); diff --git a/gossip/src/gossip_service.rs b/gossip/src/gossip_service.rs index f50f78cf9..98823c98f 100644 --- a/gossip/src/gossip_service.rs +++ b/gossip/src/gossip_service.rs @@ -296,7 +296,7 @@ fn spy( /// Makes a spy or gossip node based on whether or not a gossip_addr was passed in /// Pass in a gossip addr to fully participate in gossip instead of relying on just pulls -fn make_gossip_node( +pub fn make_gossip_node( keypair: Keypair, entrypoint: Option<&SocketAddr>, exit: &Arc, diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index f37759b00..9c7269e27 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -8,7 +8,11 @@ use rayon::prelude::*; use solana_client::thin_client::create_client; use solana_core::consensus::VOTE_THRESHOLD_DEPTH; use solana_gossip::{ - cluster_info::VALIDATOR_PORT_RANGE, contact_info::ContactInfo, gossip_service::discover_cluster, + cluster_info::{self, VALIDATOR_PORT_RANGE}, + contact_info::ContactInfo, + crds_value::{self, CrdsData, CrdsValue}, + gossip_error::GossipError, + gossip_service::discover_cluster, }; use solana_ledger::{ blockstore::Blockstore, @@ -25,11 +29,13 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signature, Signer}, system_transaction, - timing::duration_as_ms, + timing::{duration_as_ms, timestamp}, transport::TransportError, }; +use solana_vote_program::vote_transaction; use std::{ collections::{HashMap, HashSet}, + net::SocketAddr, path::Path, sync::{Arc, RwLock}, thread::sleep, @@ -406,3 +412,34 @@ fn verify_slot_ticks( } entries.last().unwrap().hash } + +pub fn submit_vote_to_cluster_gossip( + node_keypair: &Keypair, + vote_keypair: &Keypair, + vote_slot: Slot, + vote_hash: Hash, + blockhash: Hash, + gossip_addr: SocketAddr, +) -> Result<(), GossipError> { + let vote_tx = vote_transaction::new_vote_transaction( + vec![vote_slot], + vote_hash, + blockhash, + node_keypair, + vote_keypair, + vote_keypair, + None, + ); + + cluster_info::push_messages_to_peer( + vec![CrdsValue::new_signed( + CrdsData::Vote( + 0, + crds_value::Vote::new(node_keypair.pubkey(), vote_tx, timestamp()), + ), + node_keypair, + )], + node_keypair.pubkey(), + gossip_addr, + ) +} diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index db10cbb7c..45703741f 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -12,16 +12,17 @@ use solana_client::{ thin_client::{create_client, ThinClient}, }; use solana_core::{ - broadcast_stage::{BroadcastDuplicatesConfig, BroadcastStageType}, + broadcast_stage::{broadcast_duplicates_run::BroadcastDuplicatesConfig, BroadcastStageType}, consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, + replay_stage::DUPLICATE_THRESHOLD, validator::ValidatorConfig, }; use solana_download_utils::download_snapshot; use solana_gossip::{ - cluster_info::{self, VALIDATOR_PORT_RANGE}, - crds_value::{self, CrdsData, CrdsValue}, - gossip_service::discover_cluster, + cluster_info::VALIDATOR_PORT_RANGE, + crds::Cursor, + gossip_service::{self, discover_cluster}, }; use solana_ledger::{ ancestor_iterator::AncestorIterator, @@ -52,13 +53,8 @@ use solana_sdk::{ pubkey::Pubkey, signature::{Keypair, Signer}, system_program, system_transaction, - timing::timestamp, - transaction::Transaction, -}; -use solana_vote_program::{ - vote_instruction, - vote_state::{Vote, MAX_LOCKOUT_HISTORY}, }; +use solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}; use std::{ collections::{BTreeSet, HashMap, HashSet}, fs, @@ -844,42 +840,19 @@ fn test_switch_threshold_uses_gossip_votes() { .info .keypair .clone(); - let vote_ix = vote_instruction::vote( - &vote_keypair.pubkey(), - &vote_keypair.pubkey(), - Vote::new( - vec![heavier_validator_latest_vote], - heavier_validator_latest_vote_hash, - ), - ); - let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); - - // Make the vote transaction with a random blockhash. Thus, the vote only lives in gossip but - // never makes it into a block - let blockhash = Hash::new_unique(); - vote_tx.partial_sign(&[node_keypair.as_ref()], blockhash); - vote_tx.partial_sign(&[vote_keypair.as_ref()], blockhash); - let heavier_node_gossip = cluster - .get_contact_info(&context.heaviest_validator_key) - .unwrap() - .gossip; - cluster_info::push_messages_to_peer( - vec![CrdsValue::new_signed( - CrdsData::Vote( - 0, - crds_value::Vote::new(node_keypair.pubkey(), vote_tx, timestamp()), - ), - node_keypair, - )], - context - .dead_validator_info - .as_ref() + cluster_tests::submit_vote_to_cluster_gossip( + node_keypair, + vote_keypair, + heavier_validator_latest_vote, + heavier_validator_latest_vote_hash, + // Make the vote transaction with a random blockhash. Thus, the vote only lives in gossip but + // never makes it into a block + Hash::new_unique(), + cluster + .get_contact_info(&context.heaviest_validator_key) .unwrap() - .info - .keypair - .pubkey(), - heavier_node_gossip, + .gossip, ) .unwrap(); @@ -1966,7 +1939,9 @@ fn test_snapshots_restart_validity() { #[allow(unused_attributes)] #[ignore] fn test_fail_entry_verification_leader() { - test_faulty_node(BroadcastStageType::FailEntryVerification); + let (cluster, _) = + test_faulty_node(BroadcastStageType::FailEntryVerification, vec![60, 50, 60]); + cluster.check_for_new_roots(16, "test_fail_entry_verification_leader"); } #[test] @@ -1974,7 +1949,9 @@ fn test_fail_entry_verification_leader() { #[ignore] #[allow(unused_attributes)] fn test_fake_shreds_broadcast_leader() { - test_faulty_node(BroadcastStageType::BroadcastFakeShreds); + let node_stakes = vec![300, 100]; + let (cluster, _) = test_faulty_node(BroadcastStageType::BroadcastFakeShreds, node_stakes); + cluster.check_for_new_roots(16, "test_fake_shreds_broadcast_leader"); } #[test] @@ -1982,30 +1959,212 @@ fn test_fake_shreds_broadcast_leader() { #[ignore] #[allow(unused_attributes)] fn test_duplicate_shreds_broadcast_leader() { - test_faulty_node(BroadcastStageType::BroadcastDuplicates( - BroadcastDuplicatesConfig { - stake_partition: 50, - duplicate_send_delay: 1, - }, - )); + // Create 4 nodes: + // 1) Bad leader sending different versions of shreds to both of the other nodes + // 2) 1 node who's voting behavior in gossip + // 3) 1 validator gets the same version as the leader, will see duplicate confirmation + // 4) 1 validator will not get the same version as the leader. For each of these + // duplicate slots `S` either: + // a) The leader's version of `S` gets > DUPLICATE_THRESHOLD of votes in gossip and so this + // node will repair that correct version + // b) A descendant `D` of some version of `S` gets > DUPLICATE_THRESHOLD votes in gossip, + // but no version of `S` does. Then the node will not know to repair the right version + // by just looking at gossip, but will instead have to use EpochSlots repair after + // detecting that a descendant does not chain to its version of `S`, and marks that descendant + // dead. + // Scenarios a) or b) are triggered by our node in 2) who's voting behavior we control. + + // Critical that bad_leader_stake + good_node_stake < DUPLICATE_THRESHOLD and that + // bad_leader_stake + good_node_stake + our_node_stake > DUPLICATE_THRESHOLD so that + // our vote is the determining factor + let bad_leader_stake = 10000000000; + // Ensure that the good_node_stake is always on the critical path, and the partition node + // should never be on the critical path. This way, none of the bad shreds sent to the partition + // node corrupt the good node. + let good_node_stake = 500000; + let our_node_stake = 10000000000; + let partition_node_stake = 1; + + let node_stakes = vec![ + bad_leader_stake, + partition_node_stake, + good_node_stake, + // Needs to be last in the vector, so that we can + // find the id of this node. See call to `test_faulty_node` + // below for more details. + our_node_stake, + ]; + assert_eq!(*node_stakes.last().unwrap(), our_node_stake); + let total_stake: u64 = node_stakes.iter().sum(); + + assert!( + ((bad_leader_stake + good_node_stake) as f64 / total_stake as f64) < DUPLICATE_THRESHOLD + ); + assert!( + (bad_leader_stake + good_node_stake + our_node_stake) as f64 / total_stake as f64 + > DUPLICATE_THRESHOLD + ); + + // Important that the partition node stake is the smallest so that it gets selected + // for the partition. + assert!(partition_node_stake < our_node_stake && partition_node_stake < good_node_stake); + + // 1) Set up the cluster + let (mut cluster, validator_keys) = test_faulty_node( + BroadcastStageType::BroadcastDuplicates(BroadcastDuplicatesConfig { + stake_partition: partition_node_stake, + }), + node_stakes, + ); + + // This is why it's important our node was last in `node_stakes` + let our_id = validator_keys.last().unwrap().pubkey(); + + // 2) Kill our node and start up a thread to simulate votes to control our voting behavior + let our_info = cluster.exit_node(&our_id); + let node_keypair = our_info.info.keypair; + let vote_keypair = our_info.info.voting_keypair; + let bad_leader_id = cluster.entry_point_info.id; + let bad_leader_ledger_path = cluster.validators[&bad_leader_id].info.ledger_path.clone(); + info!("our node id: {}", node_keypair.pubkey()); + + // 3) Start up a spy to listen for votes + let exit = Arc::new(AtomicBool::new(false)); + let (gossip_service, _tcp_listener, cluster_info) = gossip_service::make_gossip_node( + // Need to use our validator's keypair to gossip EpochSlots and votes for our + // node later. + Keypair::from_bytes(&node_keypair.to_bytes()).unwrap(), + Some(&cluster.entry_point_info.gossip), + &exit, + None, + 0, + false, + ); + + let t_voter = { + let exit = exit.clone(); + std::thread::spawn(move || { + let mut cursor = Cursor::default(); + let mut max_vote_slot = 0; + let mut gossip_vote_index = 0; + loop { + if exit.load(Ordering::Relaxed) { + return; + } + + let (labels, votes) = cluster_info.get_votes(&mut cursor); + let mut parsed_vote_iter: Vec<_> = labels + .into_iter() + .zip(votes.into_iter()) + .filter_map(|(label, leader_vote_tx)| { + // Filter out votes not from the bad leader + if label.pubkey() == bad_leader_id { + let vote = vote_transaction::parse_vote_transaction(&leader_vote_tx) + .map(|(_, vote, _)| vote) + .unwrap(); + // Filter out empty votes + if !vote.slots.is_empty() { + Some((vote, leader_vote_tx)) + } else { + None + } + } else { + None + } + }) + .collect(); + + parsed_vote_iter.sort_by(|(vote, _), (vote2, _)| { + vote.slots.last().unwrap().cmp(vote2.slots.last().unwrap()) + }); + + for (parsed_vote, leader_vote_tx) in parsed_vote_iter { + if let Some(latest_vote_slot) = parsed_vote.slots.last() { + info!("received vote for {}", latest_vote_slot); + // Add to EpochSlots. Mark all slots frozen between slot..=max_vote_slot. + if *latest_vote_slot > max_vote_slot { + let new_epoch_slots: Vec = + (max_vote_slot + 1..latest_vote_slot + 1).collect(); + info!( + "Simulating epoch slots from our node: {:?}", + new_epoch_slots + ); + cluster_info.push_epoch_slots(&new_epoch_slots); + max_vote_slot = *latest_vote_slot; + } + + // Only vote on even slots. Note this may violate lockouts if the + // validator started voting on a different fork before we could exit + // it above. + let vote_hash = parsed_vote.hash; + if latest_vote_slot % 2 == 0 { + info!( + "Simulating vote from our node on slot {}, hash {}", + latest_vote_slot, vote_hash + ); + + // Add all recent vote slots on this fork to allow cluster to pass + // vote threshold checks in replay. Note this will instantly force a + // root by this validator, but we're not concerned with lockout violations + // by this validator so it's fine. + let leader_blockstore = open_blockstore(&bad_leader_ledger_path); + let mut vote_slots: Vec = AncestorIterator::new_inclusive( + *latest_vote_slot, + &leader_blockstore, + ) + .take(MAX_LOCKOUT_HISTORY) + .collect(); + vote_slots.reverse(); + let vote_tx = vote_transaction::new_vote_transaction( + vote_slots, + vote_hash, + leader_vote_tx.message.recent_blockhash, + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ); + gossip_vote_index += 1; + gossip_vote_index %= MAX_LOCKOUT_HISTORY; + cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) + } + } + // Give vote some time to propagate + sleep(Duration::from_millis(100)); + } + } + }) + }; + + // 4) Check that the cluster is making progress + cluster.check_for_new_roots(16, "test_duplicate_shreds_broadcast_leader"); + + // Clean up threads + exit.store(true, Ordering::Relaxed); + t_voter.join().unwrap(); + gossip_service.join().unwrap(); } -fn test_faulty_node(faulty_node_type: BroadcastStageType) { +fn test_faulty_node( + faulty_node_type: BroadcastStageType, + node_stakes: Vec, +) -> (LocalCluster, Vec>) { solana_logger::setup_with_default("solana_local_cluster=info"); - let num_nodes = 3; + let num_nodes = node_stakes.len(); let error_validator_config = ValidatorConfig { broadcast_stage_type: faulty_node_type, ..ValidatorConfig::default() }; let mut validator_configs = Vec::with_capacity(num_nodes); - validator_configs.resize_with(num_nodes - 1, ValidatorConfig::default); + + // First validator is the bootstrap leader with the malicious broadcast logic. validator_configs.push(error_validator_config); + validator_configs.resize_with(num_nodes, ValidatorConfig::default); let mut validator_keys = Vec::with_capacity(num_nodes); validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true)); - let node_stakes = vec![60, 50, 60]; assert_eq!(node_stakes.len(), num_nodes); assert_eq!(validator_keys.len(), num_nodes); @@ -2013,16 +2172,18 @@ fn test_faulty_node(faulty_node_type: BroadcastStageType) { cluster_lamports: 10_000, node_stakes, validator_configs, - validator_keys: Some(validator_keys), - slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2u64, - stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2u64, + validator_keys: Some(validator_keys.clone()), + skip_warmup_slots: true, ..ClusterConfig::default() }; let cluster = LocalCluster::new(&mut cluster_config); + let validator_keys: Vec> = validator_keys + .into_iter() + .map(|(keypair, _)| keypair) + .collect(); - // Check for new roots - cluster.check_for_new_roots(16, "test_faulty_node"); + (cluster, validator_keys) } #[test]