From 44b11154ca1551da4db9f14aa23c07c895752f8c Mon Sep 17 00:00:00 2001 From: behzad nouri Date: Thu, 29 Jul 2021 11:13:25 -0400 Subject: [PATCH] sends slots (instead of stakes) through broadcast flow Current broadcast code is computing stakes for each slot before sending them down the channel: https://github.com/solana-labs/solana/blob/049fb0417/core/src/broadcast_stage/standard_broadcast_run.rs#L208-L228 https://github.com/solana-labs/solana/blob/0cf52e206/core/src/broadcast_stage.rs#L342-L349 Since the stakes are a function of epoch the slot belongs to (and so does not necessarily change from one slot to another), forwarding the slot itself would allow better caching downstream. In addition we need to invalidate the cache if the epoch changes (which the current code does not do), and that requires to know which slot (and so epoch) current broadcasted shreds belong to: https://github.com/solana-labs/solana/blob/19bd30262/core/src/broadcast_stage/standard_broadcast_run.rs#L332-L344 --- core/src/broadcast_stage.rs | 23 ++--- .../broadcast_duplicates_run.rs | 94 ++++--------------- .../broadcast_fake_shreds_run.rs | 30 +++--- .../fail_entry_verification_broadcast_run.rs | 18 ++-- .../broadcast_stage/standard_broadcast_run.rs | 70 ++++++-------- 5 files changed, 77 insertions(+), 158 deletions(-) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index fd15ae7451..9f2c2b3f6e 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -124,7 +124,7 @@ impl BroadcastStageType { } } -pub type TransmitShreds = (Option>>, Arc>); +type TransmitShreds = (Slot, Arc>); trait BroadcastRun { fn run( &mut self, @@ -339,27 +339,25 @@ impl BroadcastStage { } for (_, bank) in retransmit_slots.iter() { - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); + let slot = bank.slot(); let data_shreds = Arc::new( blockstore - .get_data_shreds_for_slot(bank.slot(), 0) + .get_data_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); if !data_shreds.is_empty() { - socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((slot, data_shreds), None))?; } let coding_shreds = Arc::new( blockstore - .get_coding_shreds_for_slot(bank.slot(), 0) + .get_coding_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); if !coding_shreds.is_empty() { - socket_sender.send(((stakes.clone(), coding_shreds), None))?; + socket_sender.send(((slot, coding_shreds), None))?; } } @@ -464,10 +462,9 @@ pub mod test { }; #[allow(clippy::implicit_hasher)] - pub fn make_transmit_shreds( + fn make_transmit_shreds( slot: Slot, num: u64, - stakes: Option>>, ) -> ( Vec, Vec, @@ -489,11 +486,11 @@ pub mod test { coding_shreds.clone(), data_shreds .into_iter() - .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .map(|s| (slot, Arc::new(vec![s]))) .collect(), coding_shreds .into_iter() - .map(|s| (stakes.clone(), Arc::new(vec![s]))) + .map(|s| (slot, Arc::new(vec![s]))) .collect(), ) } @@ -537,7 +534,7 @@ pub mod test { // Make some shreds let updated_slot = 0; let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = - make_transmit_shreds(updated_slot, 10, None); + make_transmit_shreds(updated_slot, 10); let num_data_shreds = all_data_shreds.len(); let num_coding_shreds = all_coding_shreds.len(); assert!(num_data_shreds >= 10); diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index 638563d62e..8e7ba4a479 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -49,70 +49,6 @@ impl BroadcastDuplicatesRun { num_slots_broadcasted: 0, } } - - fn get_non_partitioned_batches( - &self, - my_pubkey: &Pubkey, - bank: &Bank, - data_shreds: Arc>, - ) -> TransmitShreds { - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let mut stakes: HashMap = bank.epoch_staked_nodes(bank_epoch).unwrap(); - stakes.retain(|pubkey, _stake| pubkey != my_pubkey); - (Some(Arc::new(stakes)), data_shreds) - } - - fn get_partitioned_batches( - &self, - my_pubkey: &Pubkey, - bank: &Bank, - original_shreds: Arc>, - partition_shreds: Arc>, - ) -> (TransmitShreds, TransmitShreds) { - // On the last shred, partition network with duplicate and real shreds - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let mut original_recipients = HashMap::new(); - let mut partition_recipients = HashMap::new(); - - let mut stakes: Vec<(Pubkey, u64)> = bank - .epoch_staked_nodes(bank_epoch) - .unwrap() - .into_iter() - .filter(|(pubkey, _)| pubkey != my_pubkey) - .collect(); - stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| { - if r_stake == l_stake { - l_key.cmp(r_key) - } else { - l_stake.cmp(r_stake) - } - }); - - let mut cumulative_stake: u64 = 0; - for (pubkey, stake) in stakes.into_iter() { - cumulative_stake += stake; - if cumulative_stake <= self.config.stake_partition { - partition_recipients.insert(pubkey, stake); - } else { - original_recipients.insert(pubkey, stake); - } - } - - warn!( - "{} sent duplicate slot {} to nodes: {:?}", - my_pubkey, - bank.slot(), - &partition_recipients, - ); - - let original_recipients = Arc::new(original_recipients); - let original_transmit_shreds = (Some(original_recipients), original_shreds); - - let partition_recipients = Arc::new(partition_recipients); - let partition_transmit_shreds = (Some(partition_recipients), partition_shreds); - - (original_transmit_shreds, partition_transmit_shreds) - } } impl BroadcastRun for BroadcastDuplicatesRun { @@ -243,8 +179,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step - let transmit_shreds = - self.get_non_partitioned_batches(&keypair.pubkey(), &bank, data_shreds.clone()); + let transmit_shreds = (bank.slot(), data_shreds.clone()); info!( "{} Sending good shreds for slot {} to network", keypair.pubkey(), @@ -260,13 +195,15 @@ impl BroadcastRun for BroadcastDuplicatesRun { // Store the original shreds that this node replayed blockstore_sender.send((original_last_data_shred.clone(), None))?; - let (original_transmit_shreds, partition_transmit_shreds) = self - .get_partitioned_batches( - &keypair.pubkey(), - &bank, - original_last_data_shred, - partition_last_data_shred, - ); + // TODO: Previously, on the last shred, the code here was using + // stakes to partition the network with duplicate and real shreds + // at self.config.stake_partition of cumulative stake. This is no + // longer possible here as stakes are computed elsewhere further + // down the stream. Figure out how to replicate old behaviour to + // preserve test coverage. + // https://github.com/solana-labs/solana/blob/cde146155/core/src/broadcast_stage/broadcast_duplicates_run.rs#L65-L116 + let original_transmit_shreds = (bank.slot(), original_last_data_shred); + let partition_transmit_shreds = (bank.slot(), partition_last_data_shred); socket_sender.send((original_transmit_shreds, None))?; socket_sender.send((partition_transmit_shreds, None))?; @@ -281,12 +218,13 @@ impl BroadcastRun for BroadcastDuplicatesRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; + let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = ClusterNodes::::new( - cluster_info, - stakes.as_deref().unwrap_or(&HashMap::default()), - ); + let cluster_nodes = + ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index f0a16b0209..621c570ce3 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -84,19 +84,20 @@ impl BroadcastRun for BroadcastFakeShredsRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; + let slot = bank.slot(); + let batch_info = BroadcastShredBatchInfo { + slot, + num_expected_batches: None, + slot_start_ts: Instant::now(), + }; // 3) Start broadcast step //some indicates fake shreds - socket_sender.send(( - (Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)), - None, - ))?; - socket_sender.send(( - (Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)), - None, - ))?; + let batch_info = Some(batch_info); + socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?; + socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?; //none indicates real shreds - socket_sender.send(((None, data_shreds), None))?; - socket_sender.send(((None, Arc::new(coding_shreds)), None))?; + socket_sender.send(((slot, data_shreds), None))?; + socket_sender.send(((slot, Arc::new(coding_shreds)), None))?; Ok(()) } @@ -107,18 +108,15 @@ impl BroadcastRun for BroadcastFakeShredsRun { sock: &UdpSocket, _bank_forks: &Arc>, ) -> Result<()> { - for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { + for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() { + let fake = batch_info.is_some(); let peers = cluster_info.tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { - if i <= self.partition && stakes.is_some() { + if fake == (i <= self.partition) { // Send fake shreds to the first N peers data_shreds.iter().for_each(|b| { sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); }); - } else if i > self.partition && stakes.is_none() { - data_shreds.iter().for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); } }); } diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 80daf8befb..39d92af654 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -100,10 +100,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; // 4) Start broadcast step - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch); - let stakes = stakes.map(Arc::new); - socket_sender.send(((stakes.clone(), data_shreds), None))?; + socket_sender.send(((bank.slot(), data_shreds), None))?; if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { // Stash away the good shred so we can rewrite them later self.good_shreds.extend(good_last_data_shred.clone()); @@ -122,7 +119,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Store the bad shred so we serve bad repairs to validators catching up blockstore_sender.send((bad_last_data_shred.clone(), None))?; // Send bad shreds to rest of network - socket_sender.send(((stakes, bad_last_data_shred), None))?; + socket_sender.send(((bank.slot(), bad_last_data_shred), None))?; } Ok(()) } @@ -133,12 +130,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; + let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch); // Broadcast data - let cluster_nodes = ClusterNodes::::new( - cluster_info, - stakes.as_deref().unwrap_or(&HashMap::default()), - ); + let cluster_nodes = + ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); broadcast_shreds( sock, &shreds, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index 691211a35e..db02a0b3e4 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -1,20 +1,22 @@ #![allow(clippy::rc_buffer)] -use super::{ - broadcast_utils::{self, ReceiveResults}, - *, +use { + super::{ + broadcast_utils::{self, ReceiveResults}, + *, + }, + crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}, + solana_entry::entry::Entry, + solana_ledger::shred::{ + ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, + SHRED_TICK_REFERENCE_MASK, + }, + solana_sdk::{ + signature::Keypair, + timing::{duration_as_us, AtomicInterval}, + }, + std::{sync::RwLock, time::Duration}, }; -use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes}; -use solana_entry::entry::Entry; -use solana_ledger::shred::{ - ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK, -}; -use solana_sdk::{ - pubkey::Pubkey, - signature::Keypair, - timing::{duration_as_us, AtomicInterval}, -}; -use std::{collections::HashMap, sync::RwLock, time::Duration}; #[derive(Clone)] pub struct StandardBroadcastRun { @@ -224,13 +226,11 @@ impl StandardBroadcastRun { to_shreds_time.stop(); let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule"); - let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); - let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new); - // Broadcast the last shred of the interrupted slot if necessary if !prev_slot_shreds.is_empty() { + let slot = prev_slot_shreds[0].slot(); let batch_info = Some(BroadcastShredBatchInfo { - slot: prev_slot_shreds[0].slot(), + slot, num_expected_batches: Some(old_num_batches + 1), slot_start_ts: old_broadcast_start.expect( "Old broadcast start time for previous slot must exist if the previous slot @@ -238,7 +238,7 @@ impl StandardBroadcastRun { ), }); let shreds = Arc::new(prev_slot_shreds); - socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?; + socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?; blockstore_sender.send((shreds, batch_info))?; } @@ -264,7 +264,7 @@ impl StandardBroadcastRun { // Send data shreds let data_shreds = Arc::new(data_shreds); - socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; + socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?; // Create and send coding shreds @@ -275,7 +275,7 @@ impl StandardBroadcastRun { &mut process_stats, ); let coding_shreds = Arc::new(coding_shreds); - socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; + socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?; blockstore_sender.send((coding_shreds, batch_info))?; coding_send_time.stop(); @@ -333,24 +333,19 @@ impl StandardBroadcastRun { &mut self, sock: &UdpSocket, cluster_info: &ClusterInfo, - stakes: Option<&HashMap>, + slot: Slot, shreds: Arc>, broadcast_shred_batch_info: Option, bank_forks: &Arc>, ) -> Result<()> { - const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000; trace!("Broadcasting {:?} shreds", shreds.len()); // Get the list of peers to broadcast to let mut get_peers_time = Measure::start("broadcast::get_peers"); - if self - .last_peer_update - .should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false) - { - *self.cluster_nodes.write().unwrap() = ClusterNodes::::new( - cluster_info, - stakes.unwrap_or(&HashMap::default()), - ); - } + let root_bank = bank_forks.read().unwrap().root_bank(); + let epoch = root_bank.get_leader_schedule_epoch(slot); + let stakes = root_bank.epoch_staked_nodes(epoch); + *self.cluster_nodes.write().unwrap() = + ClusterNodes::::new(cluster_info, &stakes.unwrap_or_default()); get_peers_time.stop(); let cluster_nodes = self.cluster_nodes.read().unwrap(); @@ -475,15 +470,8 @@ impl BroadcastRun for StandardBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast( - sock, - cluster_info, - stakes.as_deref(), - shreds, - slot_start_ts, - bank_forks, - ) + let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; + self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks) } fn record( &mut self,