diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index ccf8af822b..982fdffa87 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -1,14 +1,16 @@ use { super::*, crate::cluster_nodes::ClusterNodesCache, + itertools::Itertools, solana_entry::entry::Entry, solana_ledger::shred::Shredder, solana_runtime::blockhash_queue::BlockhashQueue, solana_sdk::{ hash::Hash, - signature::{Keypair, Signer}, + signature::{Keypair, Signature, Signer}, system_transaction, }, + std::collections::HashSet, }; pub const MINIMUM_DUPLICATE_SLOT: Slot = 20; @@ -36,6 +38,8 @@ pub(super) struct BroadcastDuplicatesRun { prev_entry_hash: Option, num_slots_broadcasted: usize, cluster_nodes_cache: Arc>, + original_last_data_shreds: HashSet, + partition_last_data_shreds: HashSet, } impl BroadcastDuplicatesRun { @@ -56,6 +60,8 @@ impl BroadcastDuplicatesRun { prev_entry_hash: None, num_slots_broadcasted: 0, cluster_nodes_cache, + original_last_data_shreds: HashSet::default(), + partition_last_data_shreds: HashSet::default(), } } } @@ -198,19 +204,23 @@ impl BroadcastRun for BroadcastDuplicatesRun { // Special handling of last shred to cause partition if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds { + let pubkey = keypair.pubkey(); + self.original_last_data_shreds + .extend(original_last_data_shred.iter().map(|shred| { + assert!(shred.verify(&pubkey)); + shred.signature() + })); + self.partition_last_data_shreds + .extend(partition_last_data_shred.iter().map(|shred| { + assert!(shred.verify(&pubkey)); + shred.signature() + })); let original_last_data_shred = Arc::new(original_last_data_shred); let partition_last_data_shred = Arc::new(partition_last_data_shred); // Store the original shreds that this node replayed blockstore_sender.send((original_last_data_shred.clone(), None))?; - // TODO: Previously, on the last shred, the code here was using - // stakes to partition the network with duplicate and real shreds - // at self.config.stake_partition of cumulative stake. This is no - // longer possible here as stakes are computed elsewhere further - // down the stream. Figure out how to replicate old behaviour to - // preserve test coverage. - // https://github.com/solana-labs/solana/blob/cde146155/core/src/broadcast_stage/broadcast_duplicates_run.rs#L65-L116 let original_transmit_shreds = (bank.slot(), original_last_data_shred); let partition_transmit_shreds = (bank.slot(), partition_last_data_shred); @@ -232,21 +242,67 @@ impl BroadcastRun for BroadcastDuplicatesRun { let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) }; + let self_pubkey = cluster_info.id(); + // Creat cluster partition. + let cluster_partition: HashSet = { + let mut cumilative_stake = 0; + let epoch = root_bank.get_leader_schedule_epoch(slot); + root_bank + .epoch_staked_nodes(epoch) + .unwrap() + .iter() + .filter(|(pubkey, _)| **pubkey != self_pubkey) + .sorted_by_key(|(pubkey, stake)| (**stake, **pubkey)) + .take_while(|(_, stake)| { + cumilative_stake += *stake; + cumilative_stake <= self.config.stake_partition + }) + .map(|(pubkey, _)| *pubkey) + .collect() + }; // Broadcast data let cluster_nodes = self.cluster_nodes_cache .get(slot, &root_bank, &working_bank, cluster_info); - broadcast_shreds( - sock, - &shreds, - &cluster_nodes, - &Arc::new(AtomicInterval::default()), - &mut TransmitShredsStats::default(), - cluster_info.id(), - bank_forks, - cluster_info.socket_addr_space(), - )?; - + let socket_addr_space = cluster_info.socket_addr_space(); + let packets: Vec<_> = shreds + .iter() + .filter_map(|shred| { + let seed = shred.seed(Some(self_pubkey), &root_bank); + let node = cluster_nodes.get_broadcast_peer(seed)?; + if !socket_addr_space.check(&node.tvu) { + return None; + } + if self.original_last_data_shreds.contains(&shred.signature()) { + // If the node is within the partitin skip the shred. + if cluster_partition.contains(&node.id) { + info!( + "skipping node {} for original shred index {}, slot {}", + node.id, + shred.index(), + shred.slot() + ); + return None; + } + } + if self.partition_last_data_shreds.contains(&shred.signature()) { + // If the node is not within the partitin skip the shred. + if !cluster_partition.contains(&node.id) { + info!( + "skipping node {} for partition shred index {}, slot {}", + node.id, + shred.index(), + shred.slot() + ); + return None; + } + } + Some((&shred.payload, node.tvu)) + }) + .collect(); + if let Err(SendPktsError::IoError(ioerr, _)) = batch_send(sock, &packets) { + return Err(Error::Io(ioerr)); + } Ok(()) }