diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index e42cbd064..b1de0380f 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -2,32 +2,37 @@ extern crate test; -use rand::{thread_rng, Rng}; -use solana_core::{ - broadcast_stage::{broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage}, - cluster_nodes::ClusterNodes, +use { + rand::{thread_rng, Rng}, + solana_core::{ + broadcast_stage::{ + broadcast_metrics::TransmitShredsStats, broadcast_shreds, BroadcastStage, + }, + cluster_nodes::ClusterNodesCache, + }, + solana_gossip::{ + cluster_info::{ClusterInfo, Node}, + contact_info::ContactInfo, + }, + solana_ledger::{ + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + shred::Shred, + }, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + pubkey, + signature::Keypair, + timing::{timestamp, AtomicInterval}, + }, + solana_streamer::socket::SocketAddrSpace, + std::{ + collections::HashMap, + net::UdpSocket, + sync::{Arc, RwLock}, + time::Duration, + }, + test::Bencher, }; -use solana_gossip::{ - cluster_info::{ClusterInfo, Node}, - contact_info::ContactInfo, -}; -use solana_ledger::{ - genesis_utils::{create_genesis_config, GenesisConfigInfo}, - shred::Shred, -}; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::{ - pubkey, - signature::Keypair, - timing::{timestamp, AtomicInterval}, -}; -use solana_streamer::socket::SocketAddrSpace; -use std::{ - collections::HashMap, - net::UdpSocket, - sync::{Arc, RwLock}, -}; -use test::Bencher; #[bench] fn broadcast_shreds_bench(bencher: &mut Bencher) { @@ -56,7 +61,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } let cluster_info = Arc::new(cluster_info); - let cluster_nodes = ClusterNodes::::new(&cluster_info, &stakes); + let cluster_nodes_cache = ClusterNodesCache::::new( + 8, // cap + Duration::from_secs(5), // ttl + ); let shreds = Arc::new(shreds); let last_datapoint = Arc::new(AtomicInterval::default()); bencher.iter(move || { @@ -64,10 +72,10 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { broadcast_shreds( &socket, &shreds, - &cluster_nodes, + &cluster_nodes_cache, &last_datapoint, &mut TransmitShredsStats::default(), - cluster_info.id(), + &cluster_info, &bank_forks, &SocketAddrSpace::Unspecified, ) diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 15c4c815d..f8fdc7a09 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -1,40 +1,47 @@ //! A stage to broadcast data from a leader node to validators #![allow(clippy::rc_buffer)] -use self::{ - broadcast_duplicates_run::{BroadcastDuplicatesConfig, BroadcastDuplicatesRun}, - broadcast_fake_shreds_run::BroadcastFakeShredsRun, - broadcast_metrics::*, - fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, - standard_broadcast_run::StandardBroadcastRun, -}; -use crate::{ - cluster_nodes::ClusterNodes, - result::{Error, Result}, -}; -use crossbeam_channel::{ - Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, - Sender as CrossbeamSender, -}; -use solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}; -use solana_ledger::{blockstore::Blockstore, shred::Shred}; -use solana_measure::measure::Measure; -use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; -use solana_poh::poh_recorder::WorkingBankEntry; -use solana_runtime::{bank::Bank, bank_forks::BankForks}; -use solana_sdk::timing::{timestamp, AtomicInterval}; -use solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}; -use solana_streamer::{ - sendmmsg::{batch_send, SendPktsError}, - socket::SocketAddrSpace, -}; -use std::{ - collections::HashMap, - net::UdpSocket, - sync::atomic::{AtomicBool, Ordering}, - sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, - sync::{Arc, Mutex, RwLock}, - thread::{self, Builder, JoinHandle}, - time::{Duration, Instant}, +use { + self::{ + broadcast_duplicates_run::{BroadcastDuplicatesConfig, BroadcastDuplicatesRun}, + broadcast_fake_shreds_run::BroadcastFakeShredsRun, + broadcast_metrics::*, + fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun, + standard_broadcast_run::StandardBroadcastRun, + }, + crate::{ + cluster_nodes::{ClusterNodes, ClusterNodesCache}, + result::{Error, Result}, + }, + crossbeam_channel::{ + Receiver as CrossbeamReceiver, RecvTimeoutError as CrossbeamRecvTimeoutError, + Sender as CrossbeamSender, + }, + itertools::Itertools, + solana_gossip::cluster_info::{ClusterInfo, ClusterInfoError}, + solana_ledger::{blockstore::Blockstore, shred::Shred}, + solana_measure::measure::Measure, + solana_metrics::{inc_new_counter_error, inc_new_counter_info}, + solana_poh::poh_recorder::WorkingBankEntry, + solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_sdk::{ + timing::{timestamp, AtomicInterval}, + {clock::Slot, pubkey::Pubkey, signature::Keypair}, + }, + solana_streamer::{ + sendmmsg::{batch_send, SendPktsError}, + socket::SocketAddrSpace, + }, + std::{ + collections::HashMap, + net::UdpSocket, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}, + Arc, Mutex, RwLock, + }, + thread::{self, Builder, JoinHandle}, + time::{Duration, Instant}, + }, }; pub mod broadcast_duplicates_run; @@ -51,7 +58,7 @@ pub(crate) const NUM_INSERT_THREADS: usize = 2; pub(crate) type RetransmitSlotsSender = CrossbeamSender>>; pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver>>; pub(crate) type RecordReceiver = Receiver<(Arc>, Option)>; -pub(crate) type TransmitReceiver = Receiver<(TransmitShreds, Option)>; +pub(crate) type TransmitReceiver = Receiver<(Arc>, Option)>; #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { @@ -127,14 +134,13 @@ impl BroadcastStageType { } } -type TransmitShreds = (Slot, Arc>); trait BroadcastRun { fn run( &mut self, keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()>; fn transmit( @@ -179,7 +185,7 @@ impl BroadcastStage { cluster_info: Arc, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { @@ -331,7 +337,7 @@ impl BroadcastStage { fn check_retransmit_signals( blockstore: &Blockstore, retransmit_slots_receiver: &RetransmitSlotsReceiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { let timer = Duration::from_millis(100); @@ -348,9 +354,9 @@ impl BroadcastStage { .get_data_shreds_for_slot(slot, 0) .expect("My own shreds must be reconstructable"), ); - + debug_assert!(data_shreds.iter().all(|shred| shred.slot() == slot)); if !data_shreds.is_empty() { - socket_sender.send(((slot, data_shreds), None))?; + socket_sender.send((data_shreds, None))?; } let coding_shreds = Arc::new( @@ -359,8 +365,9 @@ impl BroadcastStage { .expect("My own shreds must be reconstructable"), ); + debug_assert!(coding_shreds.iter().all(|shred| shred.slot() == slot)); if !coding_shreds.is_empty() { - socket_sender.send(((slot, coding_shreds), None))?; + socket_sender.send((coding_shreds, None))?; } } @@ -376,11 +383,13 @@ impl BroadcastStage { } fn update_peer_stats( - num_live_peers: i64, - broadcast_len: i64, + cluster_nodes: &ClusterNodes, last_datapoint_submit: &Arc, ) { if last_datapoint_submit.should_update(1000) { + let now = timestamp(); + let num_live_peers = cluster_nodes.num_peers_live(now); + let broadcast_len = cluster_nodes.num_peers() + 1; datapoint_info!( "cluster_info-num_nodes", ("live_count", num_live_peers, i64), @@ -394,31 +403,37 @@ fn update_peer_stats( pub fn broadcast_shreds( s: &UdpSocket, shreds: &[Shred], - cluster_nodes: &ClusterNodes, + cluster_nodes_cache: &ClusterNodesCache, last_datapoint_submit: &Arc, transmit_stats: &mut TransmitShredsStats, - self_pubkey: Pubkey, + cluster_info: &ClusterInfo, bank_forks: &Arc>, socket_addr_space: &SocketAddrSpace, ) -> Result<()> { let mut result = Ok(()); - let broadcast_len = cluster_nodes.num_peers(); - if broadcast_len == 0 { - update_peer_stats(1, 1, last_datapoint_submit); - return result; - } let mut shred_select = Measure::start("shred_select"); - let root_bank = bank_forks.read().unwrap().root_bank(); + // Only the leader broadcasts shreds. + let leader = Some(cluster_info.id()); + let (root_bank, working_bank) = { + let bank_forks = bank_forks.read().unwrap(); + (bank_forks.root_bank(), bank_forks.working_bank()) + }; let packets: Vec<_> = shreds .iter() - .filter_map(|shred| { - let seed = shred.seed(Some(self_pubkey), &root_bank); - let node = cluster_nodes.get_broadcast_peer(seed)?; - if socket_addr_space.check(&node.tvu) { - Some((&shred.payload, node.tvu)) - } else { - None - } + .group_by(|shred| shred.slot()) + .into_iter() + .flat_map(|(slot, shreds)| { + let cluster_nodes = + cluster_nodes_cache.get(slot, &root_bank, &working_bank, cluster_info); + update_peer_stats(&cluster_nodes, last_datapoint_submit); + let root_bank = root_bank.clone(); + shreds.filter_map(move |shred| { + let seed = shred.seed(leader, &root_bank); + let node = cluster_nodes.get_broadcast_peer(seed)?; + socket_addr_space + .check(&node.tvu) + .then(|| (&shred.payload, node.tvu)) + }) }) .collect(); shred_select.stop(); @@ -432,13 +447,6 @@ pub fn broadcast_shreds( send_mmsg_time.stop(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us(); transmit_stats.total_packets += packets.len(); - - let num_live_peers = cluster_nodes.num_peers_live(timestamp()) as i64; - update_peer_stats( - num_live_peers, - broadcast_len as i64 + 1, - last_datapoint_submit, - ); result } @@ -465,14 +473,15 @@ pub mod test { }; #[allow(clippy::implicit_hasher)] + #[allow(clippy::type_complexity)] fn make_transmit_shreds( slot: Slot, num: u64, ) -> ( Vec, Vec, - Vec, - Vec, + Vec>>, + Vec>>, ) { let num_entries = max_ticks_per_n_shreds(num, None); let (data_shreds, _) = make_slot_entries(slot, 0, num_entries); @@ -489,11 +498,11 @@ pub mod test { coding_shreds.clone(), data_shreds .into_iter() - .map(|s| (slot, Arc::new(vec![s]))) + .map(|shred| Arc::new(vec![shred])) .collect(), coding_shreds .into_iter() - .map(|s| (slot, Arc::new(vec![s]))) + .map(|shred| Arc::new(vec![shred])) .collect(), ) } @@ -505,15 +514,15 @@ pub mod test { num_expected_data_shreds: u64, num_expected_coding_shreds: u64, ) { - while let Ok((new_retransmit_slots, _)) = transmit_receiver.try_recv() { - if new_retransmit_slots.1[0].is_data() { - for data_shred in new_retransmit_slots.1.iter() { + while let Ok((shreds, _)) = transmit_receiver.try_recv() { + if shreds[0].is_data() { + for data_shred in shreds.iter() { assert_eq!(data_shred.index() as u64, data_index); data_index += 1; } } else { - assert_eq!(new_retransmit_slots.1[0].index() as u64, coding_index); - for coding_shred in new_retransmit_slots.1.iter() { + assert_eq!(shreds[0].index() as u64, coding_index); + for coding_shred in shreds.iter() { assert_eq!(coding_shred.index() as u64, coding_index); coding_index += 1; } diff --git a/core/src/broadcast_stage/broadcast_duplicates_run.rs b/core/src/broadcast_stage/broadcast_duplicates_run.rs index f2efb27c7..1e5edb521 100644 --- a/core/src/broadcast_stage/broadcast_duplicates_run.rs +++ b/core/src/broadcast_stage/broadcast_duplicates_run.rs @@ -72,7 +72,7 @@ impl BroadcastRun for BroadcastDuplicatesRun { keypair: &Keypair, _blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { // 1) Pull entries from banking stage @@ -194,13 +194,13 @@ impl BroadcastRun for BroadcastDuplicatesRun { blockstore_sender.send((data_shreds.clone(), None))?; // 3) Start broadcast step - let transmit_shreds = (bank.slot(), data_shreds.clone()); info!( "{} Sending good shreds for slot {} to network", keypair.pubkey(), data_shreds.first().unwrap().slot() ); - socket_sender.send((transmit_shreds, None))?; + assert!(data_shreds.iter().all(|shred| shred.slot() == bank.slot())); + socket_sender.send((data_shreds, None))?; // Special handling of last shred to cause partition if let Some((original_last_data_shred, partition_last_data_shred)) = last_shreds { @@ -221,11 +221,15 @@ impl BroadcastRun for BroadcastDuplicatesRun { // Store the original shreds that this node replayed blockstore_sender.send((original_last_data_shred.clone(), None))?; - let original_transmit_shreds = (bank.slot(), original_last_data_shred); - let partition_transmit_shreds = (bank.slot(), partition_last_data_shred); + assert!(original_last_data_shred + .iter() + .all(|shred| shred.slot() == bank.slot())); + assert!(partition_last_data_shred + .iter() + .all(|shred| shred.slot() == bank.slot())); - socket_sender.send((original_transmit_shreds, None))?; - socket_sender.send((partition_transmit_shreds, None))?; + socket_sender.send((original_last_data_shred, None))?; + socket_sender.send((partition_last_data_shred, None))?; } Ok(()) } @@ -237,7 +241,12 @@ impl BroadcastRun for BroadcastDuplicatesRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; + let (shreds, _) = receiver.lock().unwrap().recv()?; + if shreds.is_empty() { + return Ok(()); + } + let slot = shreds.first().unwrap().slot(); + assert!(shreds.iter().all(|shred| shred.slot() == slot)); let (root_bank, working_bank) = { let bank_forks = bank_forks.read().unwrap(); (bank_forks.root_bank(), bank_forks.working_bank()) diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 621c570ce..5069e1f58 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -27,7 +27,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { // 1) Pull entries from banking stage @@ -93,11 +93,13 @@ impl BroadcastRun for BroadcastFakeShredsRun { // 3) Start broadcast step //some indicates fake shreds let batch_info = Some(batch_info); - socket_sender.send(((slot, Arc::new(fake_data_shreds)), batch_info.clone()))?; - socket_sender.send(((slot, Arc::new(fake_coding_shreds)), batch_info))?; + assert!(fake_data_shreds.iter().all(|shred| shred.slot() == slot)); + assert!(fake_coding_shreds.iter().all(|shred| shred.slot() == slot)); + socket_sender.send((Arc::new(fake_data_shreds), batch_info.clone()))?; + socket_sender.send((Arc::new(fake_coding_shreds), batch_info))?; //none indicates real shreds - socket_sender.send(((slot, data_shreds), None))?; - socket_sender.send(((slot, Arc::new(coding_shreds)), None))?; + socket_sender.send((data_shreds, None))?; + socket_sender.send((Arc::new(coding_shreds), None))?; Ok(()) } @@ -108,7 +110,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { sock: &UdpSocket, _bank_forks: &Arc>, ) -> Result<()> { - for ((_slot, data_shreds), batch_info) in receiver.lock().unwrap().iter() { + for (data_shreds, batch_info) in receiver.lock().unwrap().iter() { let fake = batch_info.is_some(); let peers = cluster_info.tvu_peers(); peers.iter().enumerate().for_each(|(i, peer)| { diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 43b354c17..5c74e1e56 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -40,7 +40,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { // 1) Pull entries from banking stage @@ -107,7 +107,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { let data_shreds = Arc::new(data_shreds); blockstore_sender.send((data_shreds.clone(), None))?; // 4) Start broadcast step - socket_sender.send(((bank.slot(), data_shreds), None))?; + socket_sender.send((data_shreds, None))?; if let Some((good_last_data_shred, bad_last_data_shred)) = last_shreds { // Stash away the good shred so we can rewrite them later self.good_shreds.extend(good_last_data_shred.clone()); @@ -126,7 +126,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { // Store the bad shred so we serve bad repairs to validators catching up blockstore_sender.send((bad_last_data_shred.clone(), None))?; // Send bad shreds to rest of network - socket_sender.send(((bank.slot(), bad_last_data_shred), None))?; + socket_sender.send((bad_last_data_shred, None))?; } Ok(()) } @@ -137,27 +137,17 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((slot, shreds), _) = receiver.lock().unwrap().recv()?; - let (root_bank, working_bank) = { - let bank_forks = bank_forks.read().unwrap(); - (bank_forks.root_bank(), bank_forks.working_bank()) - }; - // Broadcast data - let cluster_nodes = - self.cluster_nodes_cache - .get(slot, &root_bank, &working_bank, cluster_info); + let (shreds, _) = receiver.lock().unwrap().recv()?; broadcast_shreds( sock, &shreds, - &cluster_nodes, + &self.cluster_nodes_cache, &Arc::new(AtomicInterval::default()), &mut TransmitShredsStats::default(), - cluster_info.id(), + cluster_info, bank_forks, cluster_info.socket_addr_space(), - )?; - - Ok(()) + ) } fn record( &mut self, diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index fc3d11e74..1d018ceb0 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -179,7 +179,7 @@ impl StandardBroadcastRun { &mut self, keypair: &Keypair, blockstore: &Arc, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, receive_results: ReceiveResults, ) -> Result<()> { @@ -244,7 +244,8 @@ impl StandardBroadcastRun { ), }); let shreds = Arc::new(prev_slot_shreds); - socket_sender.send(((slot, shreds.clone()), batch_info.clone()))?; + debug_assert!(shreds.iter().all(|shred| shred.slot() == slot)); + socket_sender.send((shreds.clone(), batch_info.clone()))?; blockstore_sender.send((shreds, batch_info))?; } @@ -270,7 +271,8 @@ impl StandardBroadcastRun { // Send data shreds let data_shreds = Arc::new(data_shreds); - socket_sender.send(((bank.slot(), data_shreds.clone()), batch_info.clone()))?; + debug_assert!(data_shreds.iter().all(|shred| shred.slot() == bank.slot())); + socket_sender.send((data_shreds.clone(), batch_info.clone()))?; blockstore_sender.send((data_shreds, batch_info.clone()))?; // Create and send coding shreds @@ -281,7 +283,10 @@ impl StandardBroadcastRun { &mut process_stats, ); let coding_shreds = Arc::new(coding_shreds); - socket_sender.send(((bank.slot(), coding_shreds.clone()), batch_info.clone()))?; + debug_assert!(coding_shreds + .iter() + .all(|shred| shred.slot() == bank.slot())); + socket_sender.send((coding_shreds.clone(), batch_info.clone()))?; blockstore_sender.send((coding_shreds, batch_info))?; coding_send_time.stop(); @@ -339,23 +344,11 @@ impl StandardBroadcastRun { &mut self, sock: &UdpSocket, cluster_info: &ClusterInfo, - slot: Slot, shreds: Arc>, broadcast_shred_batch_info: Option, bank_forks: &Arc>, ) -> Result<()> { trace!("Broadcasting {:?} shreds", shreds.len()); - // Get the list of peers to broadcast to - let mut get_peers_time = Measure::start("broadcast::get_peers"); - let (root_bank, working_bank) = { - let bank_forks = bank_forks.read().unwrap(); - (bank_forks.root_bank(), bank_forks.working_bank()) - }; - let cluster_nodes = - self.cluster_nodes_cache - .get(slot, &root_bank, &working_bank, cluster_info); - get_peers_time.stop(); - let mut transmit_stats = TransmitShredsStats::default(); // Broadcast the shreds let mut transmit_time = Measure::start("broadcast_shreds"); @@ -363,17 +356,16 @@ impl StandardBroadcastRun { broadcast_shreds( sock, &shreds, - &cluster_nodes, + &self.cluster_nodes_cache, &self.last_datapoint_submit, &mut transmit_stats, - cluster_info.id(), + cluster_info, bank_forks, cluster_info.socket_addr_space(), )?; transmit_time.stop(); transmit_stats.transmit_elapsed = transmit_time.as_us(); - transmit_stats.get_peers_elapsed = get_peers_time.as_us(); transmit_stats.num_shreds = shreds.len(); // Process metrics @@ -455,7 +447,7 @@ impl BroadcastRun for StandardBroadcastRun { keypair: &Keypair, blockstore: &Arc, receiver: &Receiver, - socket_sender: &Sender<(TransmitShreds, Option)>, + socket_sender: &Sender<(Arc>, Option)>, blockstore_sender: &Sender<(Arc>, Option)>, ) -> Result<()> { let receive_results = broadcast_utils::recv_slot_entries(receiver)?; @@ -476,8 +468,8 @@ impl BroadcastRun for StandardBroadcastRun { sock: &UdpSocket, bank_forks: &Arc>, ) -> Result<()> { - let ((slot, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?; - self.broadcast(sock, cluster_info, slot, shreds, slot_start_ts, bank_forks) + let (shreds, batch_info) = receiver.lock().unwrap().recv()?; + self.broadcast(sock, cluster_info, shreds, batch_info, bank_forks) } fn record( &mut self, diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 82754ea72..47ce90829 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -50,7 +50,7 @@ pub struct ClusterNodes { type CacheEntry = Option<(/*as of:*/ Instant, Arc>)>; -pub(crate) struct ClusterNodesCache { +pub struct ClusterNodesCache { // Cache entries are wrapped in Arc>, so that, when needed, only // one thread does the computations to update the entry for the epoch. cache: Mutex>>>>, @@ -230,7 +230,7 @@ fn get_nodes(cluster_info: &ClusterInfo, stakes: &HashMap) -> Vec ClusterNodesCache { - pub(crate) fn new( + pub fn new( // Capacity of underlying LRU-cache in terms of number of epochs. cap: usize, // A time-to-live eviction policy is enforced to refresh entries in