diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index fd15ae7451..9f2c2b3f6e 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -124,7 +124,7 @@ impl BroadcastStageType { } } -pub type TransmitShreds = (Option>>, Arc>); +type TransmitShreds = (Slot, Arc>); trait BroadcastRun { fn run( &mut self, @@ -339,27 +339,25 @@ impl BroadcastStage { } for (_, bank) in retransmit_slots.iter() { - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); + let slot = bank.slot(); let data_shreds = Arc::new( blockstore - .get_data_shreds_for_slot(bank.slot(), 0) + .get_data_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); if !data_shreds.is_empty() { - socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((slot, data_shreds), None))?; } let coding_shreds = Arc::new( blockstore - .get_coding_shreds_for_slot(bank.slot(), 0) + .get_coding_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); if !coding_shreds.is_empty() { - socket_sender.send(((stakes.clone(), coding_shreds), None))?; + socket_sender.send(((slot, coding_shreds), None))?; } } @@ -464,10 +462,9 @@ pub mod test { }; #[allow(clippy::implicit_hasher)] - pub fn make_transmit_shreds( + fn make_transmit_shreds( slot: Slot, num: u64, - stakes: Option>>, ) -> ( Vec, Vec, @@ -489,11 +486,11 @@ pub mod test { coding_shreds.clone(), data_shreds .into_iter() - .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .map(|s| (slot, Arc::new(vec![s]))) .collect(), coding_shreds .into_iter() - .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .map(|s| (slot, Arc::new(vec![s]))) .collect(), ) } @@ -537,7 +534,7 @@ pub mod test { // Make some shreds let updated_slot = 0; let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = - make_transmit_shreds(updated_slot, 10, None); + make_transmit_shreds(updated_slot, 10); let num_data_shreds = all_data_shreds.len(); let num_coding_shreds = all_coding_shreds.len(); assert!(num_data_shreds >= 10); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 638563d62e..8e7ba4a479 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -49,70 +49,6 @@ impl BroadcastDuplicatesRun { num_slots_broadcasted: 0, } } - - fn get_non_partitioned_batches( - &self, - my_pubkey: &Pubkey, - bank: &Bank, - data_shreds: Arc>, - ) -> TransmitShreds { - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let mut stakes: HashMap = bank.epoch_staked_nodes(bank_epoch).unwrap(); - stakes.retain(|pubkey, _stake| pubkey != my_pubkey); - (Some(Arc::new(stakes)), data_shreds) - } - - fn get_partitioned_batches( - &self, - my_pubkey: &Pubkey, - bank: &Bank, - original_shreds: Arc>, - partition_shreds: Arc>, - ) -> (TransmitShreds, TransmitShreds) { - // On the last shred, partition network with duplicate and real shreds - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let mut original_recipients = HashMap::new(); - let mut partition_recipients = HashMap::new(); - - let mut stakes: Vec<(Pubkey, u64)> = bank - .epoch_staked_nodes(bank_epoch) - .unwrap() - .into_iter() - .filter(|(pubkey, _)| pubkey != my_pubkey) - .collect(); - stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| { - if r_stake == l_stake { - l_key.cmp(r_key) - } else { - l_stake.cmp(r_stake) - } - }); - - let mut cumulative_stake: u64 = 0; - for (pubkey, stake) in stakes.into_iter() { - cumulative_stake += stake; - if cumulative_stake <= self.config.stake_partition { - partition_recipients.insert(pubkey, stake); - } else { - original_recipients.insert(pubkey, stake); - } - } - - warn!( - "{} sent duplicate slot {} to nodes: {:?}", - my_pubkey, - bank.slot(), - &partition_recipients, - ); - - let original_recipients = Arc::new(original_recipients); - let original_transmit_shreds = (Some(original_recipients), original_shreds); - - let partition_recipients = Arc::new(partition_recipients); - let partition_transmit_shreds = (Some(partition_recipients), partition_shreds); - - (original_transmit_shreds, partition_transmit_shreds) - } } impl BroadcastRun for BroadcastDuplicatesRun { @@ -243,8 +179,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step - let transmit_shreds = - self.get_non_partitioned_batches(&keypair.pubkey(), &bank, data_shreds.clone()); + let transmit_shreds = (bank.slot(), data_shreds.clone()); info!( "{} Sending good shreds for slot {} to network", keypair.pubkey(), @@ -260,13 +195,15 @@ impl BroadcastRun for BroadcastDuplicatesRun { // Store the original shreds that this node replayed blockstore_sender.send((original_last_data_shred.clone(), None))?; - let (original_transmit_shreds, partition_transmit_shreds) = self - .get_partitioned_batches( - &keypair.pubkey(), - &bank, - original_last_data_shred, - partition_last_data_shred, - ); + // TODO: Previously, on the last shred, the code here was using + // stakes to partition the network with duplicate and real shreds + // at self.config.stake_partition of cumulative stake. This is no + // longer possible here as stakes are computed elsewhere further + // down the stream. Figure out how to replicate old behaviour to + // preserve test coverage. + // https://github.com/solana-labs/solana/blob/cde146155/core/src/broadcast_stage/broadcast_duplicates_run.rs#L65-L116 + let original_transmit_shreds = (bank.slot(), original_last_data_shred); + let partition_transmit_shreds = (bank.slot(), partition_last_data_shred); socket_sender.send((original_transmit_shreds, None))?; socket_sender.send((partition_transmit_shreds, None))?; @@ -281,12 +218,13 @@ impl BroadcastRun for BroadcastDuplicatesRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; + let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = ClusterNodes::::new( - cluster_info, - stakes.as_deref().unwrap_or(&HashMap::default()), - ); + let cluster_nodes = + ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index f0a16b0209..621c570ce3 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -84,19 +84,20 @@ impl BroadcastRun for BroadcastFakeShredsRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; + let slot = bank.slot(); + let batch_info = BroadcastShredBatchInfo { + slot, + num_expected_batches: None, + slot_start_ts: Instant::now(), + }; // 3) Start broadcast step //some indicates fake shreds - socket_sender.send(( - (Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)), - None, - ))?; - socket_sender.send(( - (Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)), - None, - ))?; + let batch_info = Some(batch_info); + socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?; + socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?; //none indicates real shreds - socket_sender.send(((None, data_shreds), None))?; - socket_sender.send(((None, Arc::new(coding_shreds)), None))?; + socket_sender.send(((slot, data_shreds), None))?; + socket_sender.send(((slot, Arc::new(coding_shreds)), None))?; Ok(()) } @@ -107,18 +108,15 @@ impl BroadcastRun for BroadcastFakeShredsRun { sock: &UdpSocket, _bank_forks: &Arc>, ) -> Result<()> { - for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { + for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() { + let fake = batch_info.is_some(); let peers = cluster_info.tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { - if i <= self.partition && stakes.is_some() { + if fake == (i <= self.partition) { // Send fake shreds to the first N peers data_shreds.iter().for_each(|b| { sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); }); - } else if i > self.partition && stakes.is_none() { - data_shreds.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); } }); } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 80daf8befb..39d92af654 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -100,10 +100,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; // 4) Start broadcast step - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); - socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((bank.slot(), data_shreds), None))?; if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { // Stash away the good shred so we can rewrite them later self.good_shreds.extend(good_last_data_shred.clone()); @@ -122,7 +119,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Store the bad shred so we serve bad repairs to validators catching up blockstore_sender.send((bad_last_data_shred.clone(), None))?; // Send bad shreds to rest of network - socket_sender.send(((stakes, bad_last_data_shred), None))?; + socket_sender.send(((bank.slot(), bad_last_data_shred), None))?; } Ok(()) } @@ -133,12 +130,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; + let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = ClusterNodes::::new( - cluster_info, - stakes.as_deref().unwrap_or(&HashMap::default()), - ); + let cluster_nodes = + ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 691211a35e..db02a0b3e4 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,20 +1,22 @@ #![allow(clippy::rc_buffer)] -use super::{ - broadcast_utils::{self, ReceiveResults}, - *, +use { + super::{ + broadcast_utils::{self, ReceiveResults}, + *, + }, + crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}, + solana_entry::entry::Entry, + solana_ledger::shred::{ + ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, + SHRED_TICK_REFERENCE_MASK, + }, + solana_sdk::{ + signature::Keypair, + timing::{duration_as_us, AtomicInterval}, + }, + std::{sync::RwLock, time::Duration}, }; -use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}; -use solana_entry::entry::Entry; -use solana_ledger::shred::{ - ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK, -}; -use solana_sdk::{ - pubkey::Pubkey, - signature::Keypair, - timing::{duration_as_us, AtomicInterval}, -}; -use std::{collections::HashMap, sync::RwLock, time::Duration}; #[derive(Clone)] pub struct StandardBroadcastRun { @@ -224,13 +226,11 @@ impl StandardBroadcastRun { to_shreds_time.stop(); let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule"); - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new); - // Broadcast the last shred of the interrupted slot if necessary if !prev_slot_shreds.is_empty() { + let slot = prev_slot_shreds[0].slot(); let batch_info = Some(BroadcastShredBatchInfo { - slot: prev_slot_shreds[0].slot(), + slot, num_expected_batches: Some(old_num_batches + 1), slot_start_ts: old_broadcast_start.expect( "Old broadcast start time for previous slot must exist if the previous slot @@ -238,7 +238,7 @@ impl StandardBroadcastRun { ), }); let shreds = Arc::new(prev_slot_shreds); - socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?; + socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?; blockstore_sender.send((shreds, batch_info))?; } @@ -264,7 +264,7 @@ impl StandardBroadcastRun { // Send data shreds let data_shreds = Arc::new(data_shreds); - socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; + socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?; // Create and send coding shreds @@ -275,7 +275,7 @@ impl StandardBroadcastRun { &mut process_stats, ); let coding_shreds = Arc::new(coding_shreds); - socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; + socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((coding_shreds, batch_info))?; coding_send_time.stop(); @@ -333,24 +333,19 @@ impl StandardBroadcastRun { &mut self, sock: &UdpSocket, cluster_info: &ClusterInfo, - stakes: Option<&HashMap>, + slot: Slot, shreds: Arc>, broadcast_shred_batch_info: Option, bank_forks: &Arc>, ) -> Result<()> { - const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); - if self - .last_peer_update - .should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false) - { - *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( - cluster_info, - stakes.unwrap_or(&HashMap::default()), - ); - } + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch); + *self.cluster_nodes.write().unwrap() = + ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); get_peers_time.stop(); let cluster_nodes = self.cluster_nodes.read().unwrap(); @@ -475,15 +470,8 @@ impl BroadcastRun for StandardBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast( - sock, - cluster_info, - stakes.as_deref(), - shreds, - slot_start_ts, - bank_forks, - ) + let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; + self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks) } fn record( &mut self,