removes Slot from TransmitShreds (#19327)
An earlier version of the code was funneling through stakes along with shreds to broadcast: https://github.com/solana-labs/solana/blob/b67ffab37/core/src/broadcast_stage.rs#L127 This was changed to only slots as stakes computation was pushed further down the pipeline in: https://github.com/solana-labs/solana/pull/18971 However shreds themselves embody which slot they belong to. So pairing them with slot is redundant and adds rooms for bugs should they become inconsistent.
This commit is contained in:
parent
f8fefc9c5a
commit
1deb4add81
|
@ -2,32 +2,37 @@
|
|||
|
||||
extern crate test;
|
||||
|
||||
use rand::{thread_rng, Rng};
|
||||
use solana_core::{
|
||||
broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage},
|
||||
cluster_nodes::ClusterNodes,
|
||||
use {
|
||||
rand::{thread_rng, Rng},
|
||||
solana_core::{
|
||||
broadcast_stage::{
|
||||
broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage,
|
||||
},
|
||||
cluster_nodes::ClusterNodesCache,
|
||||
},
|
||||
solana_gossip::{
|
||||
cluster_info::{ClusterInfo, Node},
|
||||
contact_info::ContactInfo,
|
||||
},
|
||||
solana_ledger::{
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
shred::Shred,
|
||||
},
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
solana_sdk::{
|
||||
pubkey,
|
||||
signature::Keypair,
|
||||
timing::{timestamp, AtomicInterval},
|
||||
},
|
||||
solana_streamer::socket::SocketAddrSpace,
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
sync::{Arc, RwLock},
|
||||
time::Duration,
|
||||
},
|
||||
test::Bencher,
|
||||
};
|
||||
use solana_gossip::{
|
||||
cluster_info::{ClusterInfo, Node},
|
||||
contact_info::ContactInfo,
|
||||
};
|
||||
use solana_ledger::{
|
||||
genesis_utils::{create_genesis_config, GenesisConfigInfo},
|
||||
shred::Shred,
|
||||
};
|
||||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
|
||||
use solana_sdk::{
|
||||
pubkey,
|
||||
signature::Keypair,
|
||||
timing::{timestamp, AtomicInterval},
|
||||
};
|
||||
use solana_streamer::socket::SocketAddrSpace;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
sync::{Arc, RwLock},
|
||||
};
|
||||
use test::Bencher;
|
||||
|
||||
#[bench]
|
||||
fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
||||
|
@ -56,7 +61,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
|||
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
|
||||
}
|
||||
let cluster_info = Arc::new(cluster_info);
|
||||
let cluster_nodes = ClusterNodes::<BroadcastStage>::new(&cluster_info, &stakes);
|
||||
let cluster_nodes_cache = ClusterNodesCache::<BroadcastStage>::new(
|
||||
8, // cap
|
||||
Duration::from_secs(5), // ttl
|
||||
);
|
||||
let shreds = Arc::new(shreds);
|
||||
let last_datapoint = Arc::new(AtomicInterval::default());
|
||||
bencher.iter(move || {
|
||||
|
@ -64,10 +72,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
|
|||
broadcast_shreds(
|
||||
&socket,
|
||||
&shreds,
|
||||
&cluster_nodes,
|
||||
&cluster_nodes_cache,
|
||||
&last_datapoint,
|
||||
&mut TransmitShredsStats::default(),
|
||||
cluster_info.id(),
|
||||
&cluster_info,
|
||||
&bank_forks,
|
||||
&SocketAddrSpace::Unspecified,
|
||||
)
|
||||
|
|
|
@ -1,40 +1,47 @@
|
|||
//! A stage to broadcast data from a leader node to validators
|
||||
#![allow(clippy::rc_buffer)]
|
||||
use self::{
|
||||
broadcast_duplicates_run::{BroadcastDuplicatesConfig, BroadcastDuplicatesRun},
|
||||
broadcast_fake_shreds_run::BroadcastFakeShredsRun,
|
||||
broadcast_metrics::*,
|
||||
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
||||
standard_broadcast_run::StandardBroadcastRun,
|
||||
};
|
||||
use crate::{
|
||||
cluster_nodes::ClusterNodes,
|
||||
result::{Error, Result},
|
||||
};
|
||||
use crossbeam_channel::{
|
||||
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
|
||||
Sender as CrossbeamSender,
|
||||
};
|
||||
use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError};
|
||||
use solana_ledger::{blockstore::Blockstore, shred::Shred};
|
||||
use solana_measure::measure::Measure;
|
||||
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
|
||||
use solana_poh::poh_recorder::WorkingBankEntry;
|
||||
use solana_runtime::{bank::Bank, bank_forks::BankForks};
|
||||
use solana_sdk::timing::{timestamp, AtomicInterval};
|
||||
use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair};
|
||||
use solana_streamer::{
|
||||
sendmmsg::{batch_send, SendPktsError},
|
||||
socket::SocketAddrSpace,
|
||||
};
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
sync::atomic::{AtomicBool, Ordering},
|
||||
sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender},
|
||||
sync::{Arc, Mutex, RwLock},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::{Duration, Instant},
|
||||
use {
|
||||
self::{
|
||||
broadcast_duplicates_run::{BroadcastDuplicatesConfig, BroadcastDuplicatesRun},
|
||||
broadcast_fake_shreds_run::BroadcastFakeShredsRun,
|
||||
broadcast_metrics::*,
|
||||
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
|
||||
standard_broadcast_run::StandardBroadcastRun,
|
||||
},
|
||||
crate::{
|
||||
cluster_nodes::{ClusterNodes, ClusterNodesCache},
|
||||
result::{Error, Result},
|
||||
},
|
||||
crossbeam_channel::{
|
||||
Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError,
|
||||
Sender as CrossbeamSender,
|
||||
},
|
||||
itertools::Itertools,
|
||||
solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError},
|
||||
solana_ledger::{blockstore::Blockstore, shred::Shred},
|
||||
solana_measure::measure::Measure,
|
||||
solana_metrics::{inc_new_counter_error, inc_new_counter_info},
|
||||
solana_poh::poh_recorder::WorkingBankEntry,
|
||||
solana_runtime::{bank::Bank, bank_forks::BankForks},
|
||||
solana_sdk::{
|
||||
timing::{timestamp, AtomicInterval},
|
||||
{clock::Slot, pubkey::Pubkey, signature::Keypair},
|
||||
},
|
||||
solana_streamer::{
|
||||
sendmmsg::{batch_send, SendPktsError},
|
||||
socket::SocketAddrSpace,
|
||||
},
|
||||
std::{
|
||||
collections::HashMap,
|
||||
net::UdpSocket,
|
||||
sync::{
|
||||
atomic::{AtomicBool, Ordering},
|
||||
mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender},
|
||||
Arc, Mutex, RwLock,
|
||||
},
|
||||
thread::{self, Builder, JoinHandle},
|
||||
time::{Duration, Instant},
|
||||
},
|
||||
};
|
||||
|
||||
pub mod broadcast_duplicates_run;
|
||||
|
@ -51,7 +58,7 @@ pub(crate) const NUM_INSERT_THREADS: usize = 2;
|
|||
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
|
||||
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
|
||||
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
||||
pub(crate) type TransmitReceiver = Receiver<(TransmitShreds, Option<BroadcastShredBatchInfo>)>;
|
||||
pub(crate) type TransmitReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
|
||||
|
||||
#[derive(Debug, PartialEq, Eq, Clone)]
|
||||
pub enum BroadcastStageReturnType {
|
||||
|
@ -127,14 +134,13 @@ impl BroadcastStageType {
|
|||
}
|
||||
}
|
||||
|
||||
type TransmitShreds = (Slot, Arc<Vec<Shred>>);
|
||||
trait BroadcastRun {
|
||||
fn run(
|
||||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()>;
|
||||
fn transmit(
|
||||
|
@ -179,7 +185,7 @@ impl BroadcastStage {
|
|||
cluster_info: Arc<ClusterInfo>,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
mut broadcast_stage_run: impl BroadcastRun,
|
||||
) -> BroadcastStageReturnType {
|
||||
|
@ -331,7 +337,7 @@ impl BroadcastStage {
|
|||
fn check_retransmit_signals(
|
||||
blockstore: &Blockstore,
|
||||
retransmit_slots_receiver: &RetransmitSlotsReceiver,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()> {
|
||||
let timer = Duration::from_millis(100);
|
||||
|
||||
|
@ -348,9 +354,9 @@ impl BroadcastStage {
|
|||
.get_data_shreds_for_slot(slot, 0)
|
||||
.expect("My own shreds must be reconstructable"),
|
||||
);
|
||||
|
||||
debug_assert!(data_shreds.iter().all(|shred| shred.slot() == slot));
|
||||
if !data_shreds.is_empty() {
|
||||
socket_sender.send(((slot, data_shreds), None))?;
|
||||
socket_sender.send((data_shreds, None))?;
|
||||
}
|
||||
|
||||
let coding_shreds = Arc::new(
|
||||
|
@ -359,8 +365,9 @@ impl BroadcastStage {
|
|||
.expect("My own shreds must be reconstructable"),
|
||||
);
|
||||
|
||||
debug_assert!(coding_shreds.iter().all(|shred| shred.slot() == slot));
|
||||
if !coding_shreds.is_empty() {
|
||||
socket_sender.send(((slot, coding_shreds), None))?;
|
||||
socket_sender.send((coding_shreds, None))?;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -376,11 +383,13 @@ impl BroadcastStage {
|
|||
}
|
||||
|
||||
fn update_peer_stats(
|
||||
num_live_peers: i64,
|
||||
broadcast_len: i64,
|
||||
cluster_nodes: &ClusterNodes<BroadcastStage>,
|
||||
last_datapoint_submit: &Arc<AtomicInterval>,
|
||||
) {
|
||||
if last_datapoint_submit.should_update(1000) {
|
||||
let now = timestamp();
|
||||
let num_live_peers = cluster_nodes.num_peers_live(now);
|
||||
let broadcast_len = cluster_nodes.num_peers() + 1;
|
||||
datapoint_info!(
|
||||
"cluster_info-num_nodes",
|
||||
("live_count", num_live_peers, i64),
|
||||
|
@ -394,31 +403,37 @@ fn update_peer_stats(
|
|||
pub fn broadcast_shreds(
|
||||
s: &UdpSocket,
|
||||
shreds: &[Shred],
|
||||
cluster_nodes: &ClusterNodes<BroadcastStage>,
|
||||
cluster_nodes_cache: &ClusterNodesCache<BroadcastStage>,
|
||||
last_datapoint_submit: &Arc<AtomicInterval>,
|
||||
transmit_stats: &mut TransmitShredsStats,
|
||||
self_pubkey: Pubkey,
|
||||
cluster_info: &ClusterInfo,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
socket_addr_space: &SocketAddrSpace,
|
||||
) -> Result<()> {
|
||||
let mut result = Ok(());
|
||||
let broadcast_len = cluster_nodes.num_peers();
|
||||
if broadcast_len == 0 {
|
||||
update_peer_stats(1, 1, last_datapoint_submit);
|
||||
return result;
|
||||
}
|
||||
let mut shred_select = Measure::start("shred_select");
|
||||
let root_bank = bank_forks.read().unwrap().root_bank();
|
||||
// Only the leader broadcasts shreds.
|
||||
let leader = Some(cluster_info.id());
|
||||
let (root_bank, working_bank) = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||
};
|
||||
let packets: Vec<_> = shreds
|
||||
.iter()
|
||||
.filter_map(|shred| {
|
||||
let seed = shred.seed(Some(self_pubkey), &root_bank);
|
||||
let node = cluster_nodes.get_broadcast_peer(seed)?;
|
||||
if socket_addr_space.check(&node.tvu) {
|
||||
Some((&shred.payload, node.tvu))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
.group_by(|shred| shred.slot())
|
||||
.into_iter()
|
||||
.flat_map(|(slot, shreds)| {
|
||||
let cluster_nodes =
|
||||
cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info);
|
||||
update_peer_stats(&cluster_nodes, last_datapoint_submit);
|
||||
let root_bank = root_bank.clone();
|
||||
shreds.filter_map(move |shred| {
|
||||
let seed = shred.seed(leader, &root_bank);
|
||||
let node = cluster_nodes.get_broadcast_peer(seed)?;
|
||||
socket_addr_space
|
||||
.check(&node.tvu)
|
||||
.then(|| (&shred.payload, node.tvu))
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
shred_select.stop();
|
||||
|
@ -432,13 +447,6 @@ pub fn broadcast_shreds(
|
|||
send_mmsg_time.stop();
|
||||
transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
|
||||
transmit_stats.total_packets += packets.len();
|
||||
|
||||
let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64;
|
||||
update_peer_stats(
|
||||
num_live_peers,
|
||||
broadcast_len as i64 + 1,
|
||||
last_datapoint_submit,
|
||||
);
|
||||
result
|
||||
}
|
||||
|
||||
|
@ -465,14 +473,15 @@ pub mod test {
|
|||
};
|
||||
|
||||
#[allow(clippy::implicit_hasher)]
|
||||
#[allow(clippy::type_complexity)]
|
||||
fn make_transmit_shreds(
|
||||
slot: Slot,
|
||||
num: u64,
|
||||
) -> (
|
||||
Vec<Shred>,
|
||||
Vec<Shred>,
|
||||
Vec<TransmitShreds>,
|
||||
Vec<TransmitShreds>,
|
||||
Vec<Arc<Vec<Shred>>>,
|
||||
Vec<Arc<Vec<Shred>>>,
|
||||
) {
|
||||
let num_entries = max_ticks_per_n_shreds(num, None);
|
||||
let (data_shreds, _) = make_slot_entries(slot, 0, num_entries);
|
||||
|
@ -489,11 +498,11 @@ pub mod test {
|
|||
coding_shreds.clone(),
|
||||
data_shreds
|
||||
.into_iter()
|
||||
.map(|s| (slot, Arc::new(vec![s])))
|
||||
.map(|shred| Arc::new(vec![shred]))
|
||||
.collect(),
|
||||
coding_shreds
|
||||
.into_iter()
|
||||
.map(|s| (slot, Arc::new(vec![s])))
|
||||
.map(|shred| Arc::new(vec![shred]))
|
||||
.collect(),
|
||||
)
|
||||
}
|
||||
|
@ -505,15 +514,15 @@ pub mod test {
|
|||
num_expected_data_shreds: u64,
|
||||
num_expected_coding_shreds: u64,
|
||||
) {
|
||||
while let Ok((new_retransmit_slots, _)) = transmit_receiver.try_recv() {
|
||||
if new_retransmit_slots.1[0].is_data() {
|
||||
for data_shred in new_retransmit_slots.1.iter() {
|
||||
while let Ok((shreds, _)) = transmit_receiver.try_recv() {
|
||||
if shreds[0].is_data() {
|
||||
for data_shred in shreds.iter() {
|
||||
assert_eq!(data_shred.index() as u64, data_index);
|
||||
data_index += 1;
|
||||
}
|
||||
} else {
|
||||
assert_eq!(new_retransmit_slots.1[0].index() as u64, coding_index);
|
||||
for coding_shred in new_retransmit_slots.1.iter() {
|
||||
assert_eq!(shreds[0].index() as u64, coding_index);
|
||||
for coding_shred in shreds.iter() {
|
||||
assert_eq!(coding_shred.index() as u64, coding_index);
|
||||
coding_index += 1;
|
||||
}
|
||||
|
|
|
@ -72,7 +72,7 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
keypair: &Keypair,
|
||||
_blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()> {
|
||||
// 1) Pull entries from banking stage
|
||||
|
@ -194,13 +194,13 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
blockstore_sender.send((data_shreds.clone(), None))?;
|
||||
|
||||
// 3) Start broadcast step
|
||||
let transmit_shreds = (bank.slot(), data_shreds.clone());
|
||||
info!(
|
||||
"{} Sending good shreds for slot {} to network",
|
||||
keypair.pubkey(),
|
||||
data_shreds.first().unwrap().slot()
|
||||
);
|
||||
socket_sender.send((transmit_shreds, None))?;
|
||||
assert!(data_shreds.iter().all(|shred| shred.slot() == bank.slot()));
|
||||
socket_sender.send((data_shreds, None))?;
|
||||
|
||||
// Special handling of last shred to cause partition
|
||||
if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds {
|
||||
|
@ -221,11 +221,15 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
// Store the original shreds that this node replayed
|
||||
blockstore_sender.send((original_last_data_shred.clone(), None))?;
|
||||
|
||||
let original_transmit_shreds = (bank.slot(), original_last_data_shred);
|
||||
let partition_transmit_shreds = (bank.slot(), partition_last_data_shred);
|
||||
assert!(original_last_data_shred
|
||||
.iter()
|
||||
.all(|shred| shred.slot() == bank.slot()));
|
||||
assert!(partition_last_data_shred
|
||||
.iter()
|
||||
.all(|shred| shred.slot() == bank.slot()));
|
||||
|
||||
socket_sender.send((original_transmit_shreds, None))?;
|
||||
socket_sender.send((partition_transmit_shreds, None))?;
|
||||
socket_sender.send((original_last_data_shred, None))?;
|
||||
socket_sender.send((partition_last_data_shred, None))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -237,7 +241,12 @@ impl BroadcastRun for BroadcastDuplicatesRun {
|
|||
sock: &UdpSocket,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
|
||||
let (shreds, _) = receiver.lock().unwrap().recv()?;
|
||||
if shreds.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
let slot = shreds.first().unwrap().slot();
|
||||
assert!(shreds.iter().all(|shred| shred.slot() == slot));
|
||||
let (root_bank, working_bank) = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||
|
|
|
@ -27,7 +27,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()> {
|
||||
// 1) Pull entries from banking stage
|
||||
|
@ -93,11 +93,13 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||
// 3) Start broadcast step
|
||||
//some indicates fake shreds
|
||||
let batch_info = Some(batch_info);
|
||||
socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?;
|
||||
socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?;
|
||||
assert!(fake_data_shreds.iter().all(|shred| shred.slot() == slot));
|
||||
assert!(fake_coding_shreds.iter().all(|shred| shred.slot() == slot));
|
||||
socket_sender.send((Arc::new(fake_data_shreds), batch_info.clone()))?;
|
||||
socket_sender.send((Arc::new(fake_coding_shreds), batch_info))?;
|
||||
//none indicates real shreds
|
||||
socket_sender.send(((slot, data_shreds), None))?;
|
||||
socket_sender.send(((slot, Arc::new(coding_shreds)), None))?;
|
||||
socket_sender.send((data_shreds, None))?;
|
||||
socket_sender.send((Arc::new(coding_shreds), None))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -108,7 +110,7 @@ impl BroadcastRun for BroadcastFakeShredsRun {
|
|||
sock: &UdpSocket,
|
||||
_bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() {
|
||||
for (data_shreds, batch_info) in receiver.lock().unwrap().iter() {
|
||||
let fake = batch_info.is_some();
|
||||
let peers = cluster_info.tvu_peers();
|
||||
peers.iter().enumerate().for_each(|(i, peer)| {
|
||||
|
|
|
@ -40,7 +40,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()> {
|
||||
// 1) Pull entries from banking stage
|
||||
|
@ -107,7 +107,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
let data_shreds = Arc::new(data_shreds);
|
||||
blockstore_sender.send((data_shreds.clone(), None))?;
|
||||
// 4) Start broadcast step
|
||||
socket_sender.send(((bank.slot(), data_shreds), None))?;
|
||||
socket_sender.send((data_shreds, None))?;
|
||||
if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds {
|
||||
// Stash away the good shred so we can rewrite them later
|
||||
self.good_shreds.extend(good_last_data_shred.clone());
|
||||
|
@ -126,7 +126,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
// Store the bad shred so we serve bad repairs to validators catching up
|
||||
blockstore_sender.send((bad_last_data_shred.clone(), None))?;
|
||||
// Send bad shreds to rest of network
|
||||
socket_sender.send(((bank.slot(), bad_last_data_shred), None))?;
|
||||
socket_sender.send((bad_last_data_shred, None))?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
@ -137,27 +137,17 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
|
|||
sock: &UdpSocket,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
let ((slot, shreds), _) = receiver.lock().unwrap().recv()?;
|
||||
let (root_bank, working_bank) = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||
};
|
||||
// Broadcast data
|
||||
let cluster_nodes =
|
||||
self.cluster_nodes_cache
|
||||
.get(slot, &root_bank, &working_bank, cluster_info);
|
||||
let (shreds, _) = receiver.lock().unwrap().recv()?;
|
||||
broadcast_shreds(
|
||||
sock,
|
||||
&shreds,
|
||||
&cluster_nodes,
|
||||
&self.cluster_nodes_cache,
|
||||
&Arc::new(AtomicInterval::default()),
|
||||
&mut TransmitShredsStats::default(),
|
||||
cluster_info.id(),
|
||||
cluster_info,
|
||||
bank_forks,
|
||||
cluster_info.socket_addr_space(),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
)
|
||||
}
|
||||
fn record(
|
||||
&mut self,
|
||||
|
|
|
@ -179,7 +179,7 @@ impl StandardBroadcastRun {
|
|||
&mut self,
|
||||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
receive_results: ReceiveResults,
|
||||
) -> Result<()> {
|
||||
|
@ -244,7 +244,8 @@ impl StandardBroadcastRun {
|
|||
),
|
||||
});
|
||||
let shreds = Arc::new(prev_slot_shreds);
|
||||
socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?;
|
||||
debug_assert!(shreds.iter().all(|shred| shred.slot() == slot));
|
||||
socket_sender.send((shreds.clone(), batch_info.clone()))?;
|
||||
blockstore_sender.send((shreds, batch_info))?;
|
||||
}
|
||||
|
||||
|
@ -270,7 +271,8 @@ impl StandardBroadcastRun {
|
|||
|
||||
// Send data shreds
|
||||
let data_shreds = Arc::new(data_shreds);
|
||||
socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?;
|
||||
debug_assert!(data_shreds.iter().all(|shred| shred.slot() == bank.slot()));
|
||||
socket_sender.send((data_shreds.clone(), batch_info.clone()))?;
|
||||
blockstore_sender.send((data_shreds, batch_info.clone()))?;
|
||||
|
||||
// Create and send coding shreds
|
||||
|
@ -281,7 +283,10 @@ impl StandardBroadcastRun {
|
|||
&mut process_stats,
|
||||
);
|
||||
let coding_shreds = Arc::new(coding_shreds);
|
||||
socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?;
|
||||
debug_assert!(coding_shreds
|
||||
.iter()
|
||||
.all(|shred| shred.slot() == bank.slot()));
|
||||
socket_sender.send((coding_shreds.clone(), batch_info.clone()))?;
|
||||
blockstore_sender.send((coding_shreds, batch_info))?;
|
||||
|
||||
coding_send_time.stop();
|
||||
|
@ -339,23 +344,11 @@ impl StandardBroadcastRun {
|
|||
&mut self,
|
||||
sock: &UdpSocket,
|
||||
cluster_info: &ClusterInfo,
|
||||
slot: Slot,
|
||||
shreds: Arc<Vec<Shred>>,
|
||||
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
trace!("Broadcasting {:?} shreds", shreds.len());
|
||||
// Get the list of peers to broadcast to
|
||||
let mut get_peers_time = Measure::start("broadcast::get_peers");
|
||||
let (root_bank, working_bank) = {
|
||||
let bank_forks = bank_forks.read().unwrap();
|
||||
(bank_forks.root_bank(), bank_forks.working_bank())
|
||||
};
|
||||
let cluster_nodes =
|
||||
self.cluster_nodes_cache
|
||||
.get(slot, &root_bank, &working_bank, cluster_info);
|
||||
get_peers_time.stop();
|
||||
|
||||
let mut transmit_stats = TransmitShredsStats::default();
|
||||
// Broadcast the shreds
|
||||
let mut transmit_time = Measure::start("broadcast_shreds");
|
||||
|
@ -363,17 +356,16 @@ impl StandardBroadcastRun {
|
|||
broadcast_shreds(
|
||||
sock,
|
||||
&shreds,
|
||||
&cluster_nodes,
|
||||
&self.cluster_nodes_cache,
|
||||
&self.last_datapoint_submit,
|
||||
&mut transmit_stats,
|
||||
cluster_info.id(),
|
||||
cluster_info,
|
||||
bank_forks,
|
||||
cluster_info.socket_addr_space(),
|
||||
)?;
|
||||
transmit_time.stop();
|
||||
|
||||
transmit_stats.transmit_elapsed = transmit_time.as_us();
|
||||
transmit_stats.get_peers_elapsed = get_peers_time.as_us();
|
||||
transmit_stats.num_shreds = shreds.len();
|
||||
|
||||
// Process metrics
|
||||
|
@ -455,7 +447,7 @@ impl BroadcastRun for StandardBroadcastRun {
|
|||
keypair: &Keypair,
|
||||
blockstore: &Arc<Blockstore>,
|
||||
receiver: &Receiver<WorkingBankEntry>,
|
||||
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
|
||||
socket_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
|
||||
) -> Result<()> {
|
||||
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
|
||||
|
@ -476,8 +468,8 @@ impl BroadcastRun for StandardBroadcastRun {
|
|||
sock: &UdpSocket,
|
||||
bank_forks: &Arc<RwLock<BankForks>>,
|
||||
) -> Result<()> {
|
||||
let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
|
||||
self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks)
|
||||
let (shreds, batch_info) = receiver.lock().unwrap().recv()?;
|
||||
self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks)
|
||||
}
|
||||
fn record(
|
||||
&mut self,
|
||||
|
|
|
@ -50,7 +50,7 @@ pub struct ClusterNodes<T> {
|
|||
|
||||
type CacheEntry<T> = Option<(/*as of:*/ Instant, Arc<ClusterNodes<T>>)>;
|
||||
|
||||
pub(crate) struct ClusterNodesCache<T> {
|
||||
pub struct ClusterNodesCache<T> {
|
||||
// Cache entries are wrapped in Arc<Mutex<...>>, so that, when needed, only
|
||||
// one thread does the computations to update the entry for the epoch.
|
||||
cache: Mutex<LruCache<Epoch, Arc<Mutex<CacheEntry<T>>>>>,
|
||||
|
@ -230,7 +230,7 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap<Pubkey, u64>) -> Vec<N
|
|||
}
|
||||
|
||||
impl<T> ClusterNodesCache<T> {
|
||||
pub(crate) fn new(
|
||||
pub fn new(
|
||||
// Capacity of underlying LRU-cache in terms of number of epochs.
|
||||
cap: usize,
|
||||
// A time-to-live eviction policy is enforced to refresh entries in
|
||||
|
|
Loading…
Reference in New Issue