sends slots (instead of stakes) through broadcast flow

Current broadcast code is computing stakes for each slot before sending
them down the channel:
https://github.com/solana-labs/solana/blob/049fb0417/core/src/broadcast_stage/standard_broadcast_run.rs#L208-L228
https://github.com/solana-labs/solana/blob/0cf52e206/core/src/broadcast_stage.rs#L342-L349

Since the stakes are a function of epoch the slot belongs to (and so
does not necessarily change from one slot to another), forwarding the
slot itself would allow better caching downstream.

In addition we need to invalidate the cache if the epoch changes (which
the current code does not do), and that requires to know which slot (and
so epoch) current broadcasted shreds belong to:
https://github.com/solana-labs/solana/blob/19bd30262/core/src/broadcast_stage/standard_broadcast_run.rs#L332-L344
This commit is contained in:
behzad nouri 2021-07-29 11:13:25 -04:00
parent b67ffab370
commit 44b11154ca
5 changed files with 77 additions and 158 deletions

View File

@ -124,7 +124,7 @@ impl BroadcastStageType {
} }
} }
pub type TransmitShreds = (Option<Arc<HashMap<Pubkey, u64>>>, Arc<Vec<Shred>>); type TransmitShreds = (Slot, Arc<Vec<Shred>>);
trait BroadcastRun { trait BroadcastRun {
fn run( fn run(
&mut self, &mut self,
@ -339,27 +339,25 @@ impl BroadcastStage {
} }
for (_, bank) in retransmit_slots.iter() { for (_, bank) in retransmit_slots.iter() {
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let slot = bank.slot();
let stakes = bank.epoch_staked_nodes(bank_epoch);
let stakes = stakes.map(Arc::new);
let data_shreds = Arc::new( let data_shreds = Arc::new(
blockstore blockstore
.get_data_shreds_for_slot(bank.slot(), 0) .get_data_shreds_for_slot(slot, 0)
.expect("My own shreds must be reconstructable"), .expect("My own shreds must be reconstructable"),
); );
if !data_shreds.is_empty() { if !data_shreds.is_empty() {
socket_sender.send(((stakes.clone(), data_shreds), None))?; socket_sender.send(((slot, data_shreds), None))?;
} }
let coding_shreds = Arc::new( let coding_shreds = Arc::new(
blockstore blockstore
.get_coding_shreds_for_slot(bank.slot(), 0) .get_coding_shreds_for_slot(slot, 0)
.expect("My own shreds must be reconstructable"), .expect("My own shreds must be reconstructable"),
); );
if !coding_shreds.is_empty() { if !coding_shreds.is_empty() {
socket_sender.send(((stakes.clone(), coding_shreds), None))?; socket_sender.send(((slot, coding_shreds), None))?;
} }
} }
@ -464,10 +462,9 @@ pub mod test {
}; };
#[allow(clippy::implicit_hasher)] #[allow(clippy::implicit_hasher)]
pub fn make_transmit_shreds( fn make_transmit_shreds(
slot: Slot, slot: Slot,
num: u64, num: u64,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
) -> ( ) -> (
Vec<Shred>, Vec<Shred>,
Vec<Shred>, Vec<Shred>,
@ -489,11 +486,11 @@ pub mod test {
coding_shreds.clone(), coding_shreds.clone(),
data_shreds data_shreds
.into_iter() .into_iter()
.map(|s| (stakes.clone(), Arc::new(vec![s]))) .map(|s| (slot, Arc::new(vec![s])))
.collect(), .collect(),
coding_shreds coding_shreds
.into_iter() .into_iter()
.map(|s| (stakes.clone(), Arc::new(vec![s]))) .map(|s| (slot, Arc::new(vec![s])))
.collect(), .collect(),
) )
} }
@ -537,7 +534,7 @@ pub mod test {
// Make some shreds // Make some shreds
let updated_slot = 0; let updated_slot = 0;
let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) = let (all_data_shreds, all_coding_shreds, _, _all_coding_transmit_shreds) =
make_transmit_shreds(updated_slot, 10, None); make_transmit_shreds(updated_slot, 10);
let num_data_shreds = all_data_shreds.len(); let num_data_shreds = all_data_shreds.len();
let num_coding_shreds = all_coding_shreds.len(); let num_coding_shreds = all_coding_shreds.len();
assert!(num_data_shreds >= 10); assert!(num_data_shreds >= 10);

View File

@ -49,70 +49,6 @@ impl BroadcastDuplicatesRun {
num_slots_broadcasted: 0, num_slots_broadcasted: 0,
} }
} }
fn get_non_partitioned_batches(
&self,
my_pubkey: &Pubkey,
bank: &Bank,
data_shreds: Arc<Vec<Shred>>,
) -> TransmitShreds {
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let mut stakes: HashMap<Pubkey, u64> = bank.epoch_staked_nodes(bank_epoch).unwrap();
stakes.retain(|pubkey, _stake| pubkey != my_pubkey);
(Some(Arc::new(stakes)), data_shreds)
}
fn get_partitioned_batches(
&self,
my_pubkey: &Pubkey,
bank: &Bank,
original_shreds: Arc<Vec<Shred>>,
partition_shreds: Arc<Vec<Shred>>,
) -> (TransmitShreds, TransmitShreds) {
// On the last shred, partition network with duplicate and real shreds
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let mut original_recipients = HashMap::new();
let mut partition_recipients = HashMap::new();
let mut stakes: Vec<(Pubkey, u64)> = bank
.epoch_staked_nodes(bank_epoch)
.unwrap()
.into_iter()
.filter(|(pubkey, _)| pubkey != my_pubkey)
.collect();
stakes.sort_by(|(l_key, l_stake), (r_key, r_stake)| {
if r_stake == l_stake {
l_key.cmp(r_key)
} else {
l_stake.cmp(r_stake)
}
});
let mut cumulative_stake: u64 = 0;
for (pubkey, stake) in stakes.into_iter() {
cumulative_stake += stake;
if cumulative_stake <= self.config.stake_partition {
partition_recipients.insert(pubkey, stake);
} else {
original_recipients.insert(pubkey, stake);
}
}
warn!(
"{} sent duplicate slot {} to nodes: {:?}",
my_pubkey,
bank.slot(),
&partition_recipients,
);
let original_recipients = Arc::new(original_recipients);
let original_transmit_shreds = (Some(original_recipients), original_shreds);
let partition_recipients = Arc::new(partition_recipients);
let partition_transmit_shreds = (Some(partition_recipients), partition_shreds);
(original_transmit_shreds, partition_transmit_shreds)
}
} }
impl BroadcastRun for BroadcastDuplicatesRun { impl BroadcastRun for BroadcastDuplicatesRun {
@ -243,8 +179,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
blockstore_sender.send((data_shreds.clone(), None))?; blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step // 3) Start broadcast step
let transmit_shreds = let transmit_shreds = (bank.slot(), data_shreds.clone());
self.get_non_partitioned_batches(&keypair.pubkey(), &bank, data_shreds.clone());
info!( info!(
"{} Sending good shreds for slot {} to network", "{} Sending good shreds for slot {} to network",
keypair.pubkey(), keypair.pubkey(),
@ -260,13 +195,15 @@ impl BroadcastRun for BroadcastDuplicatesRun {
// Store the original shreds that this node replayed // Store the original shreds that this node replayed
blockstore_sender.send((original_last_data_shred.clone(), None))?; blockstore_sender.send((original_last_data_shred.clone(), None))?;
let (original_transmit_shreds, partition_transmit_shreds) = self // TODO: Previously, on the last shred, the code here was using
.get_partitioned_batches( // stakes to partition the network with duplicate and real shreds
&keypair.pubkey(), // at self.config.stake_partition of cumulative stake. This is no
&bank, // longer possible here as stakes are computed elsewhere further
original_last_data_shred, // down the stream. Figure out how to replicate old behaviour to
partition_last_data_shred, // preserve test coverage.
); // https://github.com/solana-labs/solana/blob/cde146155/core/src/broadcast_stage/broadcast_duplicates_run.rs#L65-L116
let original_transmit_shreds = (bank.slot(), original_last_data_shred);
let partition_transmit_shreds = (bank.slot(), partition_last_data_shred);
socket_sender.send((original_transmit_shreds, None))?; socket_sender.send((original_transmit_shreds, None))?;
socket_sender.send((partition_transmit_shreds, None))?; socket_sender.send((partition_transmit_shreds, None))?;
@ -281,12 +218,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch);
// Broadcast data // Broadcast data
let cluster_nodes = ClusterNodes::<BroadcastStage>::new( let cluster_nodes =
cluster_info, ClusterNodes::<BroadcastStage>::new(cluster_info, &stakes.unwrap_or_default());
stakes.as_deref().unwrap_or(&HashMap::default()),
);
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,

View File

@ -84,19 +84,20 @@ impl BroadcastRun for BroadcastFakeShredsRun {
let data_shreds = Arc::new(data_shreds); let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?; blockstore_sender.send((data_shreds.clone(), None))?;
let slot = bank.slot();
let batch_info = BroadcastShredBatchInfo {
slot,
num_expected_batches: None,
slot_start_ts: Instant::now(),
};
// 3) Start broadcast step // 3) Start broadcast step
//some indicates fake shreds //some indicates fake shreds
socket_sender.send(( let batch_info = Some(batch_info);
(Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)), socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?;
None, socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?;
))?;
socket_sender.send((
(Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)),
None,
))?;
//none indicates real shreds //none indicates real shreds
socket_sender.send(((None, data_shreds), None))?; socket_sender.send(((slot, data_shreds), None))?;
socket_sender.send(((None, Arc::new(coding_shreds)), None))?; socket_sender.send(((slot, Arc::new(coding_shreds)), None))?;
Ok(()) Ok(())
} }
@ -107,18 +108,15 @@ impl BroadcastRun for BroadcastFakeShredsRun {
sock: &UdpSocket, sock: &UdpSocket,
_bank_forks: &Arc<RwLock<BankForks>>, _bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() { for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() {
let fake = batch_info.is_some();
let peers = cluster_info.tvu_peers(); let peers = cluster_info.tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| { peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition && stakes.is_some() { if fake == (i <= self.partition) {
// Send fake shreds to the first N peers // Send fake shreds to the first N peers
data_shreds.iter().for_each(|b| { data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
}); });
} else if i > self.partition && stakes.is_none() {
data_shreds.iter().for_each(|b| {
sock.send_to(&b.payload, &peer.tvu_forwards).unwrap();
});
} }
}); });
} }

View File

@ -100,10 +100,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
let data_shreds = Arc::new(data_shreds); let data_shreds = Arc::new(data_shreds);
blockstore_sender.send((data_shreds.clone(), None))?; blockstore_sender.send((data_shreds.clone(), None))?;
// 4) Start broadcast step // 4) Start broadcast step
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); socket_sender.send(((bank.slot(), data_shreds), None))?;
let stakes = bank.epoch_staked_nodes(bank_epoch);
let stakes = stakes.map(Arc::new);
socket_sender.send(((stakes.clone(), data_shreds), None))?;
if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds {
// Stash away the good shred so we can rewrite them later // Stash away the good shred so we can rewrite them later
self.good_shreds.extend(good_last_data_shred.clone()); self.good_shreds.extend(good_last_data_shred.clone());
@ -122,7 +119,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
// Store the bad shred so we serve bad repairs to validators catching up // Store the bad shred so we serve bad repairs to validators catching up
blockstore_sender.send((bad_last_data_shred.clone(), None))?; blockstore_sender.send((bad_last_data_shred.clone(), None))?;
// Send bad shreds to rest of network // Send bad shreds to rest of network
socket_sender.send(((stakes, bad_last_data_shred), None))?; socket_sender.send(((bank.slot(), bad_last_data_shred), None))?;
} }
Ok(()) Ok(())
} }
@ -133,12 +130,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?; let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
let root_bank = bank_forks.read().unwrap().root_bank();
let epoch = root_bank.get_leader_schedule_epoch(slot);
let stakes = root_bank.epoch_staked_nodes(epoch);
// Broadcast data // Broadcast data
let cluster_nodes = ClusterNodes::<BroadcastStage>::new( let cluster_nodes =
cluster_info, ClusterNodes::<BroadcastStage>::new(cluster_info, &stakes.unwrap_or_default());
stakes.as_deref().unwrap_or(&HashMap::default()),
);
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,

View File

@ -1,20 +1,22 @@
#![allow(clippy::rc_buffer)] #![allow(clippy::rc_buffer)]
use super::{ use {
broadcast_utils::{self, ReceiveResults}, super::{
*, broadcast_utils::{self, ReceiveResults},
*,
},
crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes},
solana_entry::entry::Entry,
solana_ledger::shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK,
SHRED_TICK_REFERENCE_MASK,
},
solana_sdk::{
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
},
std::{sync::RwLock, time::Duration},
}; };
use crate::{broadcast_stage::broadcast_utils::UnfinishedSlotInfo, cluster_nodes::ClusterNodes};
use solana_entry::entry::Entry;
use solana_ledger::shred::{
ProcessShredsStats, Shred, Shredder, MAX_DATA_SHREDS_PER_FEC_BLOCK, SHRED_TICK_REFERENCE_MASK,
};
use solana_sdk::{
pubkey::Pubkey,
signature::Keypair,
timing::{duration_as_us, AtomicInterval},
};
use std::{collections::HashMap, sync::RwLock, time::Duration};
#[derive(Clone)] #[derive(Clone)]
pub struct StandardBroadcastRun { pub struct StandardBroadcastRun {
@ -224,13 +226,11 @@ impl StandardBroadcastRun {
to_shreds_time.stop(); to_shreds_time.stop();
let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule"); let mut get_leader_schedule_time = Measure::start("broadcast_get_leader_schedule");
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = bank.epoch_staked_nodes(bank_epoch).map(Arc::new);
// Broadcast the last shred of the interrupted slot if necessary // Broadcast the last shred of the interrupted slot if necessary
if !prev_slot_shreds.is_empty() { if !prev_slot_shreds.is_empty() {
let slot = prev_slot_shreds[0].slot();
let batch_info = Some(BroadcastShredBatchInfo { let batch_info = Some(BroadcastShredBatchInfo {
slot: prev_slot_shreds[0].slot(), slot,
num_expected_batches: Some(old_num_batches + 1), num_expected_batches: Some(old_num_batches + 1),
slot_start_ts: old_broadcast_start.expect( slot_start_ts: old_broadcast_start.expect(
"Old broadcast start time for previous slot must exist if the previous slot "Old broadcast start time for previous slot must exist if the previous slot
@ -238,7 +238,7 @@ impl StandardBroadcastRun {
), ),
}); });
let shreds = Arc::new(prev_slot_shreds); let shreds = Arc::new(prev_slot_shreds);
socket_sender.send(((stakes.clone(), shreds.clone()), batch_info.clone()))?; socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((shreds, batch_info))?; blockstore_sender.send((shreds, batch_info))?;
} }
@ -264,7 +264,7 @@ impl StandardBroadcastRun {
// Send data shreds // Send data shreds
let data_shreds = Arc::new(data_shreds); let data_shreds = Arc::new(data_shreds);
socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?; socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((data_shreds, batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?;
// Create and send coding shreds // Create and send coding shreds
@ -275,7 +275,7 @@ impl StandardBroadcastRun {
&mut process_stats, &mut process_stats,
); );
let coding_shreds = Arc::new(coding_shreds); let coding_shreds = Arc::new(coding_shreds);
socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?; socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((coding_shreds, batch_info))?; blockstore_sender.send((coding_shreds, batch_info))?;
coding_send_time.stop(); coding_send_time.stop();
@ -333,24 +333,19 @@ impl StandardBroadcastRun {
&mut self, &mut self,
sock: &UdpSocket, sock: &UdpSocket,
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
stakes: Option<&HashMap<Pubkey, u64>>, slot: Slot,
shreds: Arc<Vec<Shred>>, shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000;
trace!("Broadcasting {:?} shreds", shreds.len()); trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to // Get the list of peers to broadcast to
let mut get_peers_time = Measure::start("broadcast::get_peers"); let mut get_peers_time = Measure::start("broadcast::get_peers");
if self let root_bank = bank_forks.read().unwrap().root_bank();
.last_peer_update let epoch = root_bank.get_leader_schedule_epoch(slot);
.should_update_ext(BROADCAST_PEER_UPDATE_INTERVAL_MS, false) let stakes = root_bank.epoch_staked_nodes(epoch);
{ *self.cluster_nodes.write().unwrap() =
*self.cluster_nodes.write().unwrap() = ClusterNodes::<BroadcastStage>::new( ClusterNodes::<BroadcastStage>::new(cluster_info, &stakes.unwrap_or_default());
cluster_info,
stakes.unwrap_or(&HashMap::default()),
);
}
get_peers_time.stop(); get_peers_time.stop();
let cluster_nodes = self.cluster_nodes.read().unwrap(); let cluster_nodes = self.cluster_nodes.read().unwrap();
@ -475,15 +470,8 @@ impl BroadcastRun for StandardBroadcastRun {
sock: &UdpSocket, sock: &UdpSocket,
bank_forks: &Arc<RwLock<BankForks>>, bank_forks: &Arc<RwLock<BankForks>>,
) -> Result<()> { ) -> Result<()> {
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
self.broadcast( self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks)
sock,
cluster_info,
stakes.as_deref(),
shreds,
slot_start_ts,
bank_forks,
)
} }
fn record( fn record(
&mut self, &mut self,