diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 1525531814..9a8028f660 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,6 +1,7 @@ //! A stage to broadcast data from a leader node to validators #![allow(clippy::rc_buffer)] use self::{ + broadcast_duplicates_run::BroadcastDuplicatesRun, broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*, fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, standard_broadcast_run::StandardBroadcastRun, @@ -35,6 +36,7 @@ use std::{ time::{Duration, Instant}, }; +mod broadcast_duplicates_run; mod broadcast_fake_shreds_run; pub mod broadcast_metrics; pub(crate) mod broadcast_utils; @@ -52,11 +54,20 @@ pub enum BroadcastStageReturnType { ChannelDisconnected, } +#[derive(PartialEq, Clone, Debug)] +pub struct BroadcastDuplicatesConfig { + /// Percentage of stake to send different version of slots to + pub stake_partition: u8, + /// Number of slots to wait before sending duplicate shreds + pub duplicate_send_delay: usize, +} + #[derive(PartialEq, Clone, Debug)] pub enum BroadcastStageType { Standard, FailEntryVerification, BroadcastFakeShreds, + BroadcastDuplicates(BroadcastDuplicatesConfig), } impl BroadcastStageType { @@ -101,6 +112,16 @@ impl BroadcastStageType { blockstore, BroadcastFakeShredsRun::new(keypair, 0, shred_version), ), + + BroadcastStageType::BroadcastDuplicates(config) => BroadcastStage::new( + sock, + cluster_info, + receiver, + retransmit_slots_receiver, + exit_sender, + blockstore, + BroadcastDuplicatesRun::new(keypair, shred_version, config.clone()), + ), } } } diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs new file mode 100644 index 0000000000..674d8d06bf --- /dev/null +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -0,0 +1,333 @@ +use super::broadcast_utils::ReceiveResults; +use super::*; +use log::*; +use solana_ledger::entry::{create_ticks, Entry, EntrySlice}; +use solana_ledger::shred::Shredder; +use solana_runtime::blockhash_queue::BlockhashQueue; +use solana_sdk::clock::Slot; +use solana_sdk::fee_calculator::FeeCalculator; +use solana_sdk::hash::Hash; +use solana_sdk::signature::{Keypair, Signer}; +use solana_sdk::transaction::Transaction; +use std::collections::VecDeque; +use std::sync::Mutex; + +// Queue which facilitates delivering shreds with a delay +type DelayedQueue = VecDeque<(Option, Option>)>; + +#[derive(Clone)] +pub(super) struct BroadcastDuplicatesRun { + config: BroadcastDuplicatesConfig, + // Local queue for broadcast to track which duplicate blockhashes we've sent + duplicate_queue: BlockhashQueue, + // Shared queue between broadcast and transmit threads + delayed_queue: Arc>, + // Buffer for duplicate entries + duplicate_entries_buffer: Vec, + last_duplicate_entry_hash: Hash, + last_broadcast_slot: Slot, + next_shred_index: u32, + shred_version: u16, + keypair: Arc, +} + +impl BroadcastDuplicatesRun { + pub(super) fn new( + keypair: Arc, + shred_version: u16, + config: BroadcastDuplicatesConfig, + ) -> Self { + let mut delayed_queue = DelayedQueue::new(); + delayed_queue.resize(config.duplicate_send_delay, (None, None)); + Self { + config, + delayed_queue: Arc::new(Mutex::new(delayed_queue)), + duplicate_queue: BlockhashQueue::default(), + duplicate_entries_buffer: vec![], + next_shred_index: u32::MAX, + last_broadcast_slot: 0, + last_duplicate_entry_hash: Hash::default(), + shred_version, + keypair, + } + } + + fn queue_or_create_duplicate_entries( + &mut self, + bank: &Arc, + receive_results: &ReceiveResults, + ) -> (Vec, u32) { + // If the last entry hash is default, grab the last blockhash from the parent bank + if self.last_duplicate_entry_hash == Hash::default() { + self.last_duplicate_entry_hash = bank.last_blockhash(); + } + + // Create duplicate entries by.. + // 1) rearranging real entries so that all transaction entries are moved to + // the front and tick entries are moved to the back. + // 2) setting all transaction entries to zero hashes and all tick entries to `hashes_per_tick`. + // 3) removing any transactions which reference blockhashes which aren't in the + // duplicate blockhash queue. + let (duplicate_entries, next_shred_index) = if bank.slot() > MINIMUM_DUPLICATE_SLOT { + let mut tx_entries: Vec = receive_results + .entries + .iter() + .filter_map(|entry| { + if entry.is_tick() { + return None; + } + + let transactions: Vec = entry + .transactions + .iter() + .filter(|tx| { + self.duplicate_queue + .get_hash_age(&tx.message.recent_blockhash) + .is_some() + }) + .cloned() + .collect(); + if !transactions.is_empty() { + Some(Entry::new_mut( + &mut self.last_duplicate_entry_hash, + &mut 0, + transactions, + )) + } else { + None + } + }) + .collect(); + let mut tick_entries = create_ticks( + receive_results.entries.tick_count(), + bank.hashes_per_tick().unwrap_or_default(), + self.last_duplicate_entry_hash, + ); + self.duplicate_entries_buffer.append(&mut tx_entries); + self.duplicate_entries_buffer.append(&mut tick_entries); + + // Only send out duplicate entries when the block is finished otherwise the + // recipient will start repairing for shreds they haven't received yet and + // hit duplicate slot issues before we want them to. + let entries = if receive_results.last_tick_height == bank.max_tick_height() { + self.duplicate_entries_buffer.drain(..).collect() + } else { + vec![] + }; + + // Set next shred index to 0 since we are sending the full slot + (entries, 0) + } else { + // Send real entries until we hit min duplicate slot + (receive_results.entries.clone(), self.next_shred_index) + }; + + // Save last duplicate entry hash to avoid invalid entry hash errors + if let Some(last_duplicate_entry) = duplicate_entries.last() { + self.last_duplicate_entry_hash = last_duplicate_entry.hash; + } + + (duplicate_entries, next_shred_index) + } +} + +/// Duplicate slots should only be sent once all validators have started. +/// This constant is intended to be used as a buffer so that all validators +/// are live before sending duplicate slots. +pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; + +impl BroadcastRun for BroadcastDuplicatesRun { + fn run( + &mut self, + blockstore: &Arc, + receiver: &Receiver, + socket_sender: &Sender<(TransmitShreds, Option)>, + blockstore_sender: &Sender<(Arc>, Option)>, + ) -> Result<()> { + // 1) Pull entries from banking stage + let receive_results = broadcast_utils::recv_slot_entries(receiver)?; + let bank = receive_results.bank.clone(); + let last_tick_height = receive_results.last_tick_height; + + if self.next_shred_index == u32::MAX { + self.next_shred_index = blockstore + .meta(bank.slot()) + .expect("Database error") + .map(|meta| meta.consumed) + .unwrap_or(0) as u32 + } + + // We were not the leader, but just became leader again + if bank.slot() > self.last_broadcast_slot + 1 { + self.last_duplicate_entry_hash = Hash::default(); + } + self.last_broadcast_slot = bank.slot(); + + let shredder = Shredder::new( + bank.slot(), + bank.parent().unwrap().slot(), + self.keypair.clone(), + (bank.tick_height() % bank.ticks_per_slot()) as u8, + self.shred_version, + ) + .expect("Expected to create a new shredder"); + + let (data_shreds, coding_shreds, last_shred_index) = shredder.entries_to_shreds( + &receive_results.entries, + last_tick_height == bank.max_tick_height(), + self.next_shred_index, + ); + + let (duplicate_entries, next_duplicate_shred_index) = + self.queue_or_create_duplicate_entries(&bank, &receive_results); + let (duplicate_data_shreds, duplicate_coding_shreds, _) = if !duplicate_entries.is_empty() { + shredder.entries_to_shreds( + &duplicate_entries, + last_tick_height == bank.max_tick_height(), + next_duplicate_shred_index, + ) + } else { + (vec![], vec![], 0) + }; + + // Manually track the shred index because relying on slot meta consumed is racy + if last_tick_height == bank.max_tick_height() { + self.next_shred_index = 0; + self.duplicate_queue + .register_hash(&self.last_duplicate_entry_hash, &FeeCalculator::default()); + } else { + self.next_shred_index = last_shred_index; + } + + // Partition network with duplicate and real shreds based on stake + let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); + let mut duplicate_recipients = HashMap::new(); + let mut real_recipients = HashMap::new(); + + let mut stakes: Vec<(Pubkey, u64)> = bank + .epoch_staked_nodes(bank_epoch) + .unwrap() + .into_iter() + .filter(|(pubkey, _)| *pubkey != self.keypair.pubkey()) + .collect(); + stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| { + if r_stake == l_stake { + l_key.cmp(&r_key) + } else { + r_stake.cmp(&l_stake) + } + }); + + let highest_staked_node = stakes.first().cloned().map(|x| x.0); + let stake_total: u64 = stakes.iter().map(|(_, stake)| *stake).sum(); + let mut cumulative_stake: u64 = 0; + for (pubkey, stake) in stakes.into_iter().rev() { + cumulative_stake += stake; + if (100 * cumulative_stake / stake_total) as u8 <= self.config.stake_partition { + duplicate_recipients.insert(pubkey, stake); + } else { + real_recipients.insert(pubkey, stake); + } + } + + if let Some(highest_staked_node) = highest_staked_node { + if bank.slot() > MINIMUM_DUPLICATE_SLOT && last_tick_height == bank.max_tick_height() { + warn!( + "{} sent duplicate slot {} to nodes: {:?}", + self.keypair.pubkey(), + bank.slot(), + &duplicate_recipients, + ); + warn!( + "Duplicate shreds for slot {} will be broadcast in {} slot(s)", + bank.slot(), + self.config.duplicate_send_delay + ); + + let delayed_shreds: Option> = vec![ + duplicate_data_shreds.last().cloned(), + data_shreds.last().cloned(), + ] + .into_iter() + .collect(); + self.delayed_queue + .lock() + .unwrap() + .push_back((Some(highest_staked_node), delayed_shreds)); + } + } + + let duplicate_recipients = Arc::new(duplicate_recipients); + let real_recipients = Arc::new(real_recipients); + + let data_shreds = Arc::new(data_shreds); + blockstore_sender.send((data_shreds.clone(), None))?; + + // 3) Start broadcast step + socket_sender.send(( + ( + Some(duplicate_recipients.clone()), + Arc::new(duplicate_data_shreds), + ), + None, + ))?; + socket_sender.send(( + ( + Some(duplicate_recipients), + Arc::new(duplicate_coding_shreds), + ), + None, + ))?; + socket_sender.send(((Some(real_recipients.clone()), data_shreds), None))?; + socket_sender.send(((Some(real_recipients), Arc::new(coding_shreds)), None))?; + + Ok(()) + } + fn transmit( + &mut self, + receiver: &Arc>, + cluster_info: &ClusterInfo, + sock: &UdpSocket, + ) -> Result<()> { + // Check the delay queue for shreds that are ready to be sent + let (delayed_recipient, delayed_shreds) = { + let mut delayed_deque = self.delayed_queue.lock().unwrap(); + if delayed_deque.len() > self.config.duplicate_send_delay { + delayed_deque.pop_front().unwrap() + } else { + (None, None) + } + }; + + let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; + let stakes = stakes.unwrap(); + for peer in cluster_info.tvu_peers() { + // Forward shreds to circumvent gossip + if stakes.get(&peer.id).is_some() { + shreds.iter().for_each(|shred| { + sock.send_to(&shred.payload, &peer.tvu_forwards).unwrap(); + }); + } + + // After a delay, broadcast duplicate shreds to a single node + if let Some(shreds) = delayed_shreds.as_ref() { + if Some(peer.id) == delayed_recipient { + shreds.iter().for_each(|shred| { + sock.send_to(&shred.payload, &peer.tvu).unwrap(); + }); + } + } + } + + Ok(()) + } + fn record( + &mut self, + receiver: &Arc>, + blockstore: &Arc, + ) -> Result<()> { + let (data_shreds, _) = receiver.lock().unwrap().recv()?; + blockstore.insert_shreds(data_shreds.to_vec(), None, true)?; + Ok(()) + } +} diff --git a/local-cluster/src/cluster_tests.rs b/local-cluster/src/cluster_tests.rs index 72bccc2a81..dfb95a42a4 100644 --- a/local-cluster/src/cluster_tests.rs +++ b/local-cluster/src/cluster_tests.rs @@ -280,18 +280,23 @@ pub fn check_for_new_roots(num_new_roots: usize, contact_infos: &[ContactInfo], let mut last_print = Instant::now(); let loop_start = Instant::now(); let loop_timeout = Duration::from_secs(60); + let mut num_roots_map = HashMap::new(); while !done { assert!(loop_start.elapsed() < loop_timeout); + for (i, ingress_node) in contact_infos.iter().enumerate() { let client = create_client(ingress_node.client_facing_addr(), VALIDATOR_PORT_RANGE); - let slot = client.get_slot().unwrap_or(0); - roots[i].insert(slot); - let min_node = roots.iter().map(|r| r.len()).min().unwrap_or(0); - done = min_node >= num_new_roots; + let root_slot = client + .get_slot_with_commitment(CommitmentConfig::finalized()) + .unwrap_or(0); + roots[i].insert(root_slot); + num_roots_map.insert(ingress_node.id, roots[i].len()); + let num_roots = roots.iter().map(|r| r.len()).min().unwrap(); + done = num_roots >= num_new_roots; if done || last_print.elapsed().as_secs() > 3 { info!( - "{} {} min observed roots {}/16", - test_name, ingress_node.id, min_node + "{} waiting for {} new roots.. observed: {:?}", + test_name, num_new_roots, num_roots_map ); last_print = Instant::now(); } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index ec4fcb4f49..c227470fa6 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -12,7 +12,7 @@ use solana_client::{ thin_client::{create_client, ThinClient}, }; use solana_core::{ - broadcast_stage::BroadcastStageType, + broadcast_stage::{BroadcastDuplicatesConfig, BroadcastStageType}, consensus::{Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH}, optimistic_confirmation_verifier::OptimisticConfirmationVerifier, validator::ValidatorConfig, @@ -1967,32 +1967,52 @@ fn test_fail_entry_verification_leader() { } #[test] -#[allow(unused_attributes)] +#[serial] #[ignore] +#[allow(unused_attributes)] fn test_fake_shreds_broadcast_leader() { test_faulty_node(BroadcastStageType::BroadcastFakeShreds); } +#[test] +#[serial] +#[ignore] +#[allow(unused_attributes)] +fn test_duplicate_shreds_broadcast_leader() { + test_faulty_node(BroadcastStageType::BroadcastDuplicates( + BroadcastDuplicatesConfig { + stake_partition: 50, + duplicate_send_delay: 1, + }, + )); +} + fn test_faulty_node(faulty_node_type: BroadcastStageType) { - solana_logger::setup_with_default(RUST_LOG_FILTER); - let num_nodes = 2; + solana_logger::setup_with_default("solana_local_cluster=info"); + let num_nodes = 3; + let error_validator_config = ValidatorConfig { broadcast_stage_type: faulty_node_type, ..ValidatorConfig::default() }; - let mut validator_configs = Vec::with_capacity(num_nodes - 1); + let mut validator_configs = Vec::with_capacity(num_nodes); validator_configs.resize_with(num_nodes - 1, ValidatorConfig::default); + validator_configs.push(error_validator_config); - // Push a faulty_bootstrap = vec![error_validator_config]; - validator_configs.insert(0, error_validator_config); - let node_stakes = vec![300, 100]; + let mut validator_keys = Vec::with_capacity(num_nodes); + validator_keys.resize_with(num_nodes, || (Arc::new(Keypair::new()), true)); + + let node_stakes = vec![60, 50, 60]; assert_eq!(node_stakes.len(), num_nodes); + assert_eq!(validator_keys.len(), num_nodes); + let mut cluster_config = ClusterConfig { cluster_lamports: 10_000, node_stakes, validator_configs, - slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2, - stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2, + validator_keys: Some(validator_keys), + slots_per_epoch: MINIMUM_SLOTS_PER_EPOCH * 2u64, + stakers_slot_offset: MINIMUM_SLOTS_PER_EPOCH * 2u64, ..ClusterConfig::default() }; diff --git a/runtime/src/blockhash_queue.rs b/runtime/src/blockhash_queue.rs index ee109c7a02..5e2b76f3b5 100644 --- a/runtime/src/blockhash_queue.rs +++ b/runtime/src/blockhash_queue.rs @@ -127,7 +127,7 @@ impl BlockhashQueue { .map(|(k, v)| recent_blockhashes::IterItem(v.hash_height, k, &v.fee_calculator)) } - pub fn len(&self) -> usize { + pub(crate) fn len(&self) -> usize { self.max_age } } diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 4f61834e33..ebde96ae41 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -12,7 +12,7 @@ pub mod bank; pub mod bank_client; pub mod bank_forks; pub mod bank_utils; -mod blockhash_queue; +pub mod blockhash_queue; pub mod bloom; pub mod builtins; pub mod commitment;