From 97589f77f8164825d17a720d244442e2ee3ada04 Mon Sep 17 00:00:00 2001 From: anatoly yakovenko Date: Mon, 16 Dec 2019 17:11:18 -0800 Subject: [PATCH] Pipeline broadcast socket transmit and blocktree record (#7481) automerge --- core/benches/cluster_info.rs | 4 +- core/src/broadcast_stage.rs | 154 +++++++---- .../broadcast_fake_shreds_run.rs | 76 ++++-- .../fail_entry_verification_broadcast_run.rs | 73 +++--- .../broadcast_stage/standard_broadcast_run.rs | 240 +++++++++++------- core/src/cluster_info.rs | 25 +- core/src/retransmit_stage.rs | 3 +- core/src/tpu.rs | 4 +- core/src/validator.rs | 7 +- core/tests/cluster_info.rs | 3 +- ledger/src/shred.rs | 33 ++- perf/benches/sigverify.rs | 6 +- validator/src/main.rs | 6 +- 13 files changed, 405 insertions(+), 229 deletions(-) diff --git a/core/benches/cluster_info.rs b/core/benches/cluster_info.rs index 207c9a0d5..3ad6ac803 100644 --- a/core/benches/cluster_info.rs +++ b/core/benches/cluster_info.rs @@ -9,6 +9,7 @@ use solana_sdk::pubkey::Pubkey; use solana_sdk::timing::timestamp; use std::collections::HashMap; use std::net::UdpSocket; +use std::sync::Arc; use test::Bencher; #[bench] @@ -31,10 +32,11 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) { cluster_info.insert_info(contact_info); stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64); } + let stakes = Arc::new(stakes); bencher.iter(move || { let shreds = shreds.clone(); cluster_info - .broadcast_shreds(&socket, shreds, &seeds, Some(&stakes)) + .broadcast_shreds(&socket, shreds, &seeds, Some(stakes.clone())) .unwrap(); }); } diff --git a/core/src/broadcast_stage.rs b/core/src/broadcast_stage.rs index 911052d5b..d4c247871 100644 --- a/core/src/broadcast_stage.rs +++ b/core/src/broadcast_stage.rs @@ -6,22 +6,25 @@ use crate::cluster_info::{ClusterInfo, ClusterInfoError}; use crate::poh_recorder::WorkingBankEntry; use crate::result::{Error, Result}; use solana_ledger::blocktree::Blocktree; +use solana_ledger::shred::Shred; use solana_ledger::staking_utils; use solana_metrics::{inc_new_counter_error, inc_new_counter_info}; +use solana_sdk::pubkey::Pubkey; +use std::collections::HashMap; use std::net::UdpSocket; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc::{Receiver, RecvTimeoutError}; -use std::sync::{Arc, RwLock}; +use std::sync::mpsc::{channel, Receiver, RecvError, RecvTimeoutError, Sender}; +use std::sync::{Arc, Mutex, RwLock}; use std::thread::{self, Builder, JoinHandle}; use std::time::Instant; +pub const NUM_INSERT_THREADS: usize = 2; + mod broadcast_fake_shreds_run; pub(crate) mod broadcast_utils; mod fail_entry_verification_broadcast_run; mod standard_broadcast_run; -pub const NUM_THREADS: u32 = 10; - #[derive(Debug, PartialEq, Eq, Clone)] pub enum BroadcastStageReturnType { ChannelDisconnected, @@ -37,25 +40,23 @@ pub enum BroadcastStageType { impl BroadcastStageType { pub fn new_broadcast_stage( &self, - sock: UdpSocket, + sock: Vec, cluster_info: Arc>, receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, shred_version: u16, ) -> BroadcastStage { + let keypair = cluster_info.read().unwrap().keypair.clone(); match self { - BroadcastStageType::Standard => { - let keypair = cluster_info.read().unwrap().keypair.clone(); - BroadcastStage::new( - sock, - cluster_info, - receiver, - exit_sender, - blocktree, - StandardBroadcastRun::new(keypair, shred_version), - ) - } + BroadcastStageType::Standard => BroadcastStage::new( + sock, + cluster_info, + receiver, + exit_sender, + blocktree, + StandardBroadcastRun::new(keypair, shred_version), + ), BroadcastStageType::FailEntryVerification => BroadcastStage::new( sock, @@ -63,7 +64,7 @@ impl BroadcastStageType { receiver, exit_sender, blocktree, - FailEntryVerificationBroadcastRun::new(shred_version), + FailEntryVerificationBroadcastRun::new(keypair, shred_version), ), BroadcastStageType::BroadcastFakeShreds => BroadcastStage::new( @@ -72,18 +73,30 @@ impl BroadcastStageType { receiver, exit_sender, blocktree, - BroadcastFakeShredsRun::new(0, shred_version), + BroadcastFakeShredsRun::new(keypair, 0, shred_version), ), } } } +type TransmitShreds = (Option>>, Arc>); trait BroadcastRun { fn run( &mut self, - cluster_info: &Arc>, + blocktree: &Arc, receiver: &Receiver, + socket_sender: &Sender, + blocktree_sender: &Sender>>, + ) -> Result<()>; + fn transmit( + &self, + receiver: &Arc>>, + cluster_info: &Arc>, sock: &UdpSocket, + ) -> Result<()>; + fn record( + &self, + receiver: &Arc>>>>, blocktree: &Arc, ) -> Result<()>; } @@ -107,33 +120,43 @@ impl Drop for Finalizer { } pub struct BroadcastStage { - thread_hdl: JoinHandle, + thread_hdls: Vec>, } impl BroadcastStage { #[allow(clippy::too_many_arguments)] fn run( - sock: &UdpSocket, - cluster_info: &Arc>, - receiver: &Receiver, blocktree: &Arc, + receiver: &Receiver, + socket_sender: &Sender, + blocktree_sender: &Sender>>, mut broadcast_stage_run: impl BroadcastRun, ) -> BroadcastStageReturnType { loop { - if let Err(e) = broadcast_stage_run.run(&cluster_info, receiver, sock, blocktree) { - match e { - Error::RecvTimeoutError(RecvTimeoutError::Disconnected) | Error::SendError => { - return BroadcastStageReturnType::ChannelDisconnected; - } - Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), - Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? - _ => { - inc_new_counter_error!("streamer-broadcaster-error", 1, 1); - error!("broadcaster error: {:?}", e); - } + let res = broadcast_stage_run.run(blocktree, receiver, socket_sender, blocktree_sender); + let res = Self::handle_error(res); + if let Some(res) = res { + return res; + } + } + } + fn handle_error(r: Result<()>) -> Option { + if let Err(e) = r { + match e { + Error::RecvTimeoutError(RecvTimeoutError::Disconnected) + | Error::SendError + | Error::RecvError(RecvError) => { + return Some(BroadcastStageReturnType::ChannelDisconnected); + } + Error::RecvTimeoutError(RecvTimeoutError::Timeout) => (), + Error::ClusterInfoError(ClusterInfoError::NoPeers) => (), // TODO: Why are the unit-tests throwing hundreds of these? + _ => { + inc_new_counter_error!("streamer-broadcaster-error", 1, 1); + error!("broadcaster error: {:?}", e); } } } + None } /// Service to broadcast messages from the leader to layer 1 nodes. @@ -153,34 +176,69 @@ impl BroadcastStage { /// completing the cycle. #[allow(clippy::too_many_arguments)] fn new( - sock: UdpSocket, + socks: Vec, cluster_info: Arc>, receiver: Receiver, exit_sender: &Arc, blocktree: &Arc, - broadcast_stage_run: impl BroadcastRun + Send + 'static, + broadcast_stage_run: impl BroadcastRun + Send + 'static + Clone, ) -> Self { - let blocktree = blocktree.clone(); - let exit_sender = exit_sender.clone(); + let btree = blocktree.clone(); + let exit = exit_sender.clone(); + let (socket_sender, socket_receiver) = channel(); + let (blocktree_sender, blocktree_receiver) = channel(); + let bs_run = broadcast_stage_run.clone(); let thread_hdl = Builder::new() .name("solana-broadcaster".to_string()) .spawn(move || { - let _finalizer = Finalizer::new(exit_sender); - Self::run( - &sock, - &cluster_info, - &receiver, - &blocktree, - broadcast_stage_run, - ) + let _finalizer = Finalizer::new(exit); + Self::run(&btree, &receiver, &socket_sender, &blocktree_sender, bs_run) }) .unwrap(); + let mut thread_hdls = vec![thread_hdl]; + let socket_receiver = Arc::new(Mutex::new(socket_receiver)); + for sock in socks.into_iter() { + let socket_receiver = socket_receiver.clone(); + let bs_transmit = broadcast_stage_run.clone(); + let cluster_info = cluster_info.clone(); + let t = Builder::new() + .name("solana-broadcaster-transmit".to_string()) + .spawn(move || loop { + let res = bs_transmit.transmit(&socket_receiver, &cluster_info, &sock); + let res = Self::handle_error(res); + if let Some(res) = res { + return res; + } + }) + .unwrap(); + thread_hdls.push(t); + } + let blocktree_receiver = Arc::new(Mutex::new(blocktree_receiver)); + for _ in 0..NUM_INSERT_THREADS { + let blocktree_receiver = blocktree_receiver.clone(); + let bs_record = broadcast_stage_run.clone(); + let btree = blocktree.clone(); + let t = Builder::new() + .name("solana-broadcaster-record".to_string()) + .spawn(move || loop { + let res = bs_record.record(&blocktree_receiver, &btree); + let res = Self::handle_error(res); + if let Some(res) = res { + return res; + } + }) + .unwrap(); + thread_hdls.push(t); + } - Self { thread_hdl } + Self { thread_hdls } } pub fn join(self) -> thread::Result { - self.thread_hdl.join() + for thread_hdl in self.thread_hdls.into_iter() { + let _ = thread_hdl.join(); + } + Ok(BroadcastStageReturnType::ChannelDisconnected) } } diff --git a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs index 1d508ecd4..fc9ed79fc 100644 --- a/core/src/broadcast_stage/broadcast_fake_shreds_run.rs +++ b/core/src/broadcast_stage/broadcast_fake_shreds_run.rs @@ -2,19 +2,23 @@ use super::*; use solana_ledger::entry::Entry; use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::hash::Hash; +use solana_sdk::signature::Keypair; +#[derive(Clone)] pub(super) struct BroadcastFakeShredsRun { last_blockhash: Hash, partition: usize, shred_version: u16, + keypair: Arc, } impl BroadcastFakeShredsRun { - pub(super) fn new(partition: usize, shred_version: u16) -> Self { + pub(super) fn new(keypair: Arc, partition: usize, shred_version: u16) -> Self { Self { last_blockhash: Hash::default(), partition, shred_version, + keypair, } } } @@ -22,17 +26,16 @@ impl BroadcastFakeShredsRun { impl BroadcastRun for BroadcastFakeShredsRun { fn run( &mut self, - cluster_info: &Arc>, - receiver: &Receiver, - sock: &UdpSocket, blocktree: &Arc, + receiver: &Receiver, + socket_sender: &Sender, + blocktree_sender: &Sender>>, ) -> Result<()> { // 1) Pull entries from banking stage let receive_results = broadcast_utils::recv_slot_entries(receiver)?; let bank = receive_results.bank.clone(); let last_tick_height = receive_results.last_tick_height; - let keypair = &cluster_info.read().unwrap().keypair.clone(); let next_shred_index = blocktree .meta(bank.slot()) .expect("Database error") @@ -45,7 +48,7 @@ impl BroadcastRun for BroadcastFakeShredsRun { bank.slot(), bank.parent().unwrap().slot(), RECOMMENDED_FEC_RATE, - keypair.clone(), + self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) @@ -79,31 +82,52 @@ impl BroadcastRun for BroadcastFakeShredsRun { self.last_blockhash = Hash::default(); } - blocktree.insert_shreds(data_shreds.clone(), None, true)?; + let data_shreds = Arc::new(data_shreds); + blocktree_sender.send(data_shreds.clone())?; // 3) Start broadcast step - let peers = cluster_info.read().unwrap().tvu_peers(); - peers.iter().enumerate().for_each(|(i, peer)| { - if i <= self.partition { - // Send fake shreds to the first N peers - fake_data_shreds - .iter() - .chain(fake_coding_shreds.iter()) - .for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); - } else { - data_shreds - .iter() - .chain(coding_shreds.iter()) - .for_each(|b| { - sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); - }); - } - }); + //some indicates fake shreds + socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)))?; + socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)))?; + //none indicates real shreds + socket_sender.send((None, data_shreds))?; + socket_sender.send((None, Arc::new(coding_shreds)))?; Ok(()) } + fn transmit( + &self, + receiver: &Arc>>, + cluster_info: &Arc>, + sock: &UdpSocket, + ) -> Result<()> { + for (stakes, data_shreds) in receiver.lock().unwrap().iter() { + let peers = cluster_info.read().unwrap().tvu_peers(); + peers.iter().enumerate().for_each(|(i, peer)| { + if i <= self.partition && stakes.is_some() { + // Send fake shreds to the first N peers + data_shreds.iter().for_each(|b| { + sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); + }); + } else if i > self.partition && stakes.is_none() { + data_shreds.iter().for_each(|b| { + sock.send_to(&b.payload, &peer.tvu_forwards).unwrap(); + }); + } + }); + } + Ok(()) + } + fn record( + &self, + receiver: &Arc>>>>, + blocktree: &Arc, + ) -> Result<()> { + for data_shreds in receiver.lock().unwrap().iter() { + blocktree.insert_shreds(data_shreds.to_vec(), None, true)?; + } + Ok(()) + } } #[cfg(test)] diff --git a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs index 46fc61766..739b45759 100644 --- a/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs +++ b/core/src/broadcast_stage/fail_entry_verification_broadcast_run.rs @@ -1,24 +1,30 @@ use super::*; use solana_ledger::shred::{Shredder, RECOMMENDED_FEC_RATE}; use solana_sdk::hash::Hash; +use solana_sdk::signature::Keypair; +#[derive(Clone)] pub(super) struct FailEntryVerificationBroadcastRun { shred_version: u16, + keypair: Arc, } impl FailEntryVerificationBroadcastRun { - pub(super) fn new(shred_version: u16) -> Self { - Self { shred_version } + pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { + Self { + shred_version, + keypair, + } } } impl BroadcastRun for FailEntryVerificationBroadcastRun { fn run( &mut self, - cluster_info: &Arc>, - receiver: &Receiver, - sock: &UdpSocket, blocktree: &Arc, + receiver: &Receiver, + socket_sender: &Sender, + blocktree_sender: &Sender>>, ) -> Result<()> { // 1) Pull entries from banking stage let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?; @@ -32,7 +38,6 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { last_entry.hash = Hash::default(); } - let keypair = cluster_info.read().unwrap().keypair.clone(); let next_shred_index = blocktree .meta(bank.slot()) .expect("Database error") @@ -43,7 +48,7 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { bank.slot(), bank.parent().unwrap().slot(), RECOMMENDED_FEC_RATE, - keypair.clone(), + self.keypair.clone(), (bank.tick_height() % bank.ticks_per_slot()) as u8, self.shred_version, ) @@ -55,34 +60,42 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun { next_shred_index, ); - let all_shreds = data_shreds - .iter() - .cloned() - .chain(coding_shreds.iter().cloned()) - .collect::>(); - let all_seeds: Vec<[u8; 32]> = all_shreds.iter().map(|s| s.seed()).collect(); - blocktree - .insert_shreds(all_shreds, None, true) - .expect("Failed to insert shreds in blocktree"); - + let data_shreds = Arc::new(data_shreds); + blocktree_sender.send(data_shreds.clone())?; // 3) Start broadcast step let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - let all_shred_bufs: Vec> = data_shreds - .into_iter() - .chain(coding_shreds.into_iter()) - .map(|s| s.payload) - .collect(); - + let stakes = stakes.map(Arc::new); + socket_sender.send((stakes.clone(), data_shreds))?; + socket_sender.send((stakes, Arc::new(coding_shreds)))?; + Ok(()) + } + fn transmit( + &self, + receiver: &Arc>>, + cluster_info: &Arc>, + sock: &UdpSocket, + ) -> Result<()> { + let (stakes, shreds) = receiver.lock().unwrap().recv()?; + let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); // Broadcast data - cluster_info.read().unwrap().broadcast_shreds( - sock, - all_shred_bufs, - &all_seeds, - stakes.as_ref(), - )?; - + let all_shred_bufs: Vec> = shreds.to_vec().into_iter().map(|s| s.payload).collect(); + cluster_info + .read() + .unwrap() + .broadcast_shreds(sock, all_shred_bufs, &all_seeds, stakes)?; + Ok(()) + } + fn record( + &self, + receiver: &Arc>>>>, + blocktree: &Arc, + ) -> Result<()> { + let all_shreds = receiver.lock().unwrap().recv()?; + blocktree + .insert_shreds(all_shreds.to_vec(), None, true) + .expect("Failed to insert shreds in blocktree"); Ok(()) } } diff --git a/core/src/broadcast_stage/standard_broadcast_run.rs b/core/src/broadcast_stage/standard_broadcast_run.rs index ac8d59bf4..b37ce1804 100644 --- a/core/src/broadcast_stage/standard_broadcast_run.rs +++ b/core/src/broadcast_stage/standard_broadcast_run.rs @@ -29,8 +29,9 @@ impl BroadcastStats { } } +#[derive(Clone)] pub(super) struct StandardBroadcastRun { - stats: BroadcastStats, + stats: Arc>, unfinished_slot: Option, current_slot_and_parent: Option<(u64, u64)>, slot_broadcast_start: Option, @@ -41,7 +42,7 @@ pub(super) struct StandardBroadcastRun { impl StandardBroadcastRun { pub(super) fn new(keypair: Arc, shred_version: u16) -> Self { Self { - stats: BroadcastStats::default(), + stats: Arc::new(RwLock::new(BroadcastStats::default())), unfinished_slot: None, current_slot_and_parent: None, slot_broadcast_start: None, @@ -82,25 +83,8 @@ impl StandardBroadcastRun { last_unfinished_slot_shred } - - fn entries_to_shreds( - &mut self, - blocktree: &Blocktree, - entries: &[Entry], - is_slot_end: bool, - reference_tick: u8, - ) -> (Vec, Vec) { + fn init_shredder(&self, blocktree: &Blocktree, reference_tick: u8) -> (Shredder, u32) { let (slot, parent_slot) = self.current_slot_and_parent.unwrap(); - let shredder = Shredder::new( - slot, - parent_slot, - RECOMMENDED_FEC_RATE, - self.keypair.clone(), - reference_tick, - self.shred_version, - ) - .expect("Expected to create a new shredder"); - let next_shred_index = self .unfinished_slot .map(|s| s.next_shred_index) @@ -111,25 +95,65 @@ impl StandardBroadcastRun { .map(|meta| meta.consumed) .unwrap_or(0) as u32 }); - - let (data_shreds, coding_shreds, new_next_shred_index) = - shredder.entries_to_shreds(entries, is_slot_end, next_shred_index); + ( + Shredder::new( + slot, + parent_slot, + RECOMMENDED_FEC_RATE, + self.keypair.clone(), + reference_tick, + self.shred_version, + ) + .expect("Expected to create a new shredder"), + next_shred_index, + ) + } + fn entries_to_data_shreds( + &mut self, + shredder: &Shredder, + next_shred_index: u32, + entries: &[Entry], + is_slot_end: bool, + ) -> Vec { + let (data_shreds, new_next_shred_index) = + shredder.entries_to_data_shreds(entries, is_slot_end, next_shred_index); self.unfinished_slot = Some(UnfinishedSlotInfo { next_shred_index: new_next_shred_index, - slot, - parent: parent_slot, + slot: shredder.slot, + parent: shredder.parent_slot, }); - (data_shreds, coding_shreds) + data_shreds } - fn process_receive_results( + #[cfg(test)] + fn test_process_receive_results( &mut self, cluster_info: &Arc>, sock: &UdpSocket, blocktree: &Arc, receive_results: ReceiveResults, + ) -> Result<()> { + let (bsend, brecv) = channel(); + let (ssend, srecv) = channel(); + self.process_receive_results(&blocktree, &ssend, &bsend, receive_results)?; + let srecv = Arc::new(Mutex::new(srecv)); + let brecv = Arc::new(Mutex::new(brecv)); + //data + let _ = self.transmit(&srecv, cluster_info, sock); + //coding + let _ = self.transmit(&srecv, cluster_info, sock); + let _ = self.record(&brecv, blocktree); + Ok(()) + } + + fn process_receive_results( + &mut self, + blocktree: &Arc, + socket_sender: &Sender, + blocktree_sender: &Sender>>, + receive_results: ReceiveResults, ) -> Result<()> { let mut receive_elapsed = receive_results.time_elapsed; let num_entries = receive_results.entries.len(); @@ -155,12 +179,26 @@ impl StandardBroadcastRun { self.check_for_interrupted_slot(bank.ticks_per_slot() as u8); // 2) Convert entries to shreds and coding shreds - let (mut data_shreds, coding_shreds) = self.entries_to_shreds( + + let (shredder, next_shred_index) = self.init_shredder( blocktree, - &receive_results.entries, - last_tick_height == bank.max_tick_height(), (bank.tick_height() % bank.ticks_per_slot()) as u8, ); + let mut data_shreds = self.entries_to_data_shreds( + &shredder, + next_shred_index, + &receive_results.entries, + last_tick_height == bank.max_tick_height(), + ); + //Insert the first shred so blocktree stores that the leader started this block + //This must be done before the blocks are sent out over the wire. + if !data_shreds.is_empty() && data_shreds[0].index() == 0 { + let first = vec![data_shreds[0].clone()]; + blocktree + .insert_shreds(first, None, true) + .expect("Failed to insert shreds in blocktree"); + } + let last_data_shred = data_shreds.len(); if let Some(last_shred) = last_unfinished_slot_shred { data_shreds.push(last_shred); } @@ -168,24 +206,13 @@ impl StandardBroadcastRun { let bank_epoch = bank.get_leader_schedule_epoch(bank.slot()); let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch); - - self.maybe_insert_and_broadcast( - data_shreds, - true, - blocktree, - cluster_info, - stakes.as_ref(), - sock, - )?; - self.maybe_insert_and_broadcast( - coding_shreds, - false, - blocktree, - cluster_info, - stakes.as_ref(), - sock, - )?; - + let stakes = stakes.map(Arc::new); + let data_shreds = Arc::new(data_shreds); + socket_sender.send((stakes.clone(), data_shreds.clone()))?; + blocktree_sender.send(data_shreds.clone())?; + let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]); + let coding_shreds = Arc::new(coding_shreds); + socket_sender.send((stakes, coding_shreds))?; self.update_broadcast_stats(BroadcastStats { shredding_elapsed: duration_as_us(&to_shreds_elapsed), receive_elapsed: duration_as_us(&receive_elapsed), @@ -200,31 +227,40 @@ impl StandardBroadcastRun { Ok(()) } - fn maybe_insert_and_broadcast( - &mut self, - shreds: Vec, - insert: bool, - blocktree: &Arc, - cluster_info: &Arc>, - stakes: Option<&HashMap>, + fn insert(&self, blocktree: &Arc, shreds: Arc>) -> Result<()> { + // Insert shreds into blocktree + let insert_shreds_start = Instant::now(); + //The first shred is inserted synchronously + let data_shreds = if !shreds.is_empty() && shreds[0].index() == 0 { + shreds[1..].to_vec() + } else { + shreds.to_vec() + }; + blocktree + .insert_shreds(data_shreds, None, true) + .expect("Failed to insert shreds in blocktree"); + let insert_shreds_elapsed = insert_shreds_start.elapsed(); + self.update_broadcast_stats(BroadcastStats { + insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed), + ..BroadcastStats::default() + }); + Ok(()) + } + + fn broadcast( + &self, sock: &UdpSocket, + cluster_info: &Arc>, + stakes: Option>>, + shreds: Arc>, ) -> Result<()> { let seed_start = Instant::now(); let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect(); let seed_elapsed = seed_start.elapsed(); - // Insert shreds into blocktree - let insert_shreds_start = Instant::now(); - if insert { - blocktree - .insert_shreds(shreds.clone(), None, true) - .expect("Failed to insert shreds in blocktree"); - } - let insert_shreds_elapsed = insert_shreds_start.elapsed(); - // Broadcast the shreds let broadcast_start = Instant::now(); - let shred_bufs: Vec> = shreds.into_iter().map(|s| s.payload).collect(); + let shred_bufs: Vec> = shreds.to_vec().into_iter().map(|s| s.payload).collect(); trace!("Broadcasting {:?} shreds", shred_bufs.len()); cluster_info @@ -235,7 +271,6 @@ impl StandardBroadcastRun { let broadcast_elapsed = broadcast_start.elapsed(); self.update_broadcast_stats(BroadcastStats { - insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed), broadcast_elapsed: duration_as_us(&broadcast_elapsed), seed_elapsed: duration_as_us(&seed_elapsed), ..BroadcastStats::default() @@ -243,28 +278,26 @@ impl StandardBroadcastRun { Ok(()) } - fn update_broadcast_stats(&mut self, stats: BroadcastStats) { - self.stats.receive_elapsed += stats.receive_elapsed; - self.stats.shredding_elapsed += stats.shredding_elapsed; - self.stats.insert_shreds_elapsed += stats.insert_shreds_elapsed; - self.stats.broadcast_elapsed += stats.broadcast_elapsed; - self.stats.seed_elapsed += stats.seed_elapsed; + fn update_broadcast_stats(&self, stats: BroadcastStats) { + let mut wstats = self.stats.write().unwrap(); + wstats.receive_elapsed += stats.receive_elapsed; + wstats.shredding_elapsed += stats.shredding_elapsed; + wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed; + wstats.broadcast_elapsed += stats.broadcast_elapsed; + wstats.seed_elapsed += stats.seed_elapsed; } fn report_and_reset_stats(&mut self) { + let stats = self.stats.read().unwrap(); assert!(self.unfinished_slot.is_some()); datapoint_info!( "broadcast-bank-stats", ("slot", self.unfinished_slot.unwrap().slot as i64, i64), - ("shredding_time", self.stats.shredding_elapsed as i64, i64), - ( - "insertion_time", - self.stats.insert_shreds_elapsed as i64, - i64 - ), - ("broadcast_time", self.stats.broadcast_elapsed as i64, i64), - ("receive_time", self.stats.receive_elapsed as i64, i64), - ("seed", self.stats.seed_elapsed as i64, i64), + ("shredding_time", stats.shredding_elapsed as i64, i64), + ("insertion_time", stats.insert_shreds_elapsed as i64, i64), + ("broadcast_time", stats.broadcast_elapsed as i64, i64), + ("receive_time", stats.receive_elapsed as i64, i64), + ("seed", stats.seed_elapsed as i64, i64), ( "num_shreds", i64::from(self.unfinished_slot.unwrap().next_shred_index), @@ -276,20 +309,38 @@ impl StandardBroadcastRun { i64 ), ); - self.stats.reset(); + drop(stats); + self.stats.write().unwrap().reset(); } } impl BroadcastRun for StandardBroadcastRun { fn run( &mut self, - cluster_info: &Arc>, - receiver: &Receiver, - sock: &UdpSocket, blocktree: &Arc, + receiver: &Receiver, + socket_sender: &Sender, + blocktree_sender: &Sender>>, ) -> Result<()> { let receive_results = broadcast_utils::recv_slot_entries(receiver)?; - self.process_receive_results(cluster_info, sock, blocktree, receive_results) + self.process_receive_results(blocktree, socket_sender, blocktree_sender, receive_results) + } + fn transmit( + &self, + receiver: &Arc>>, + cluster_info: &Arc>, + sock: &UdpSocket, + ) -> Result<()> { + let (stakes, shreds) = receiver.lock().unwrap().recv()?; + self.broadcast(sock, cluster_info, stakes, shreds) + } + fn record( + &self, + receiver: &Arc>>>>, + blocktree: &Arc, + ) -> Result<()> { + let shreds = receiver.lock().unwrap().recv()?; + self.insert(blocktree, shreds) } } @@ -397,7 +448,7 @@ mod test { // Step 1: Make an incomplete transmission for slot 0 let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair.clone(), 0); standard_broadcast_run - .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) + .test_process_receive_results(&cluster_info, &socket, &blocktree, receive_results) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); assert_eq!(unfinished_slot.next_shred_index as u64, num_shreds_per_slot); @@ -406,7 +457,11 @@ mod test { // Make sure the slot is not complete assert!(!blocktree.is_full(0)); // Modify the stats, should reset later - standard_broadcast_run.stats.receive_elapsed = 10; + standard_broadcast_run + .stats + .write() + .unwrap() + .receive_elapsed = 10; // Try to fetch ticks from blocktree, nothing should break assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks0); @@ -432,7 +487,7 @@ mod test { last_tick_height: (ticks1.len() - 1) as u64, }; standard_broadcast_run - .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) + .test_process_receive_results(&cluster_info, &socket, &blocktree, receive_results) .unwrap(); let unfinished_slot = standard_broadcast_run.unfinished_slot.as_ref().unwrap(); @@ -443,7 +498,10 @@ mod test { assert_eq!(unfinished_slot.parent, 0); // Check that the stats were reset as well - assert_eq!(standard_broadcast_run.stats.receive_elapsed, 0); + assert_eq!( + standard_broadcast_run.stats.read().unwrap().receive_elapsed, + 0 + ); // Try to fetch the incomplete ticks from blocktree, should succeed assert_eq!(blocktree.get_slot_entries(0, 0, None).unwrap(), ticks0); @@ -473,7 +531,7 @@ mod test { let mut standard_broadcast_run = StandardBroadcastRun::new(leader_keypair, 0); standard_broadcast_run - .process_receive_results(&cluster_info, &socket, &blocktree, receive_results) + .test_process_receive_results(&cluster_info, &socket, &blocktree, receive_results) .unwrap(); assert!(standard_broadcast_run.unfinished_slot.is_none()) } diff --git a/core/src/cluster_info.rs b/core/src/cluster_info.rs index e363cdfdc..81583e083 100644 --- a/core/src/cluster_info.rs +++ b/core/src/cluster_info.rs @@ -518,7 +518,7 @@ impl ClusterInfo { fn sorted_stakes_with_index( peers: &[ContactInfo], - stakes: Option<&HashMap>, + stakes: Option>>, ) -> Vec<(u64, usize)> { let stakes_and_index: Vec<_> = peers .iter() @@ -526,7 +526,11 @@ impl ClusterInfo { .map(|(i, c)| { // For stake weighted shuffle a valid weight is atleast 1. Weight 0 is // assumed to be missing entry. So let's make sure stake weights are atleast 1 - let stake = 1.max(stakes.map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1))); + let stake = 1.max( + stakes + .as_ref() + .map_or(1, |stakes| *stakes.get(&c.id).unwrap_or(&1)), + ); (stake, i) }) .sorted_by(|(l_stake, l_info), (r_stake, r_info)| { @@ -555,7 +559,7 @@ impl ClusterInfo { // Return sorted_retransmit_peers(including self) and their stakes pub fn sorted_retransmit_peers_and_stakes( &self, - stakes: Option<&HashMap>, + stakes: Option>>, ) -> (Vec, Vec<(u64, usize)>) { let mut peers = self.retransmit_peers(); // insert "self" into this list for the layer and neighborhood computation @@ -729,7 +733,7 @@ impl ClusterInfo { fn sorted_tvu_peers_and_stakes( &self, - stakes: Option<&HashMap>, + stakes: Option>>, ) -> (Vec, Vec<(u64, usize)>) { let mut peers = self.tvu_peers(); peers.dedup(); @@ -744,7 +748,7 @@ impl ClusterInfo { s: &UdpSocket, shreds: Vec>, seeds: &[[u8; 32]], - stakes: Option<&HashMap>, + stakes: Option>>, ) -> Result<()> { let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes); let broadcast_len = peers_and_stakes.len(); @@ -1703,7 +1707,7 @@ pub struct Sockets { pub tvu_forwards: Vec, pub tpu: Vec, pub tpu_forwards: Vec, - pub broadcast: UdpSocket, + pub broadcast: Vec, pub repair: UdpSocket, pub retransmit_sockets: Vec, pub storage: Option, @@ -1728,7 +1732,7 @@ impl Node { let empty = "0.0.0.0:0".parse().unwrap(); let repair = UdpSocket::bind("127.0.0.1:0").unwrap(); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo::new( pubkey, @@ -1774,7 +1778,7 @@ impl Node { let rpc_pubsub_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), rpc_pubsub_port); - let broadcast = UdpSocket::bind("0.0.0.0:0").unwrap(); + let broadcast = vec![UdpSocket::bind("0.0.0.0:0").unwrap()]; let retransmit_socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let storage = UdpSocket::bind("0.0.0.0:0").unwrap(); let info = ContactInfo::new( @@ -1845,7 +1849,7 @@ impl Node { multi_bind_in_range(port_range, 8).expect("retransmit multi_bind"); let (repair_port, repair) = Self::bind(port_range); - let (_, broadcast) = Self::bind(port_range); + let (_, broadcast) = multi_bind_in_range(port_range, 4).expect("broadcast multi_bind"); let info = ContactInfo::new( pubkey, @@ -2658,7 +2662,8 @@ mod tests { cluster_info.insert_info(contact_info); stakes.insert(id3, 10); - let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(&stakes)); + let stakes = Arc::new(stakes); + let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(stakes)); assert_eq!(peers.len(), 2); assert_eq!(peers[0].id, id); assert_eq!(peers[1].id, id2); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index 4d827260b..1f94fc97d 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -62,10 +62,11 @@ fn retransmit( let bank_epoch = r_bank.get_leader_schedule_epoch(r_bank.slot()); let mut peers_len = 0; let stakes = staking_utils::staked_nodes_at_epoch(&r_bank, bank_epoch); + let stakes = stakes.map(Arc::new); let (peers, stakes_and_index) = cluster_info .read() .unwrap() - .sorted_retransmit_peers_and_stakes(stakes.as_ref()); + .sorted_retransmit_peers_and_stakes(stakes); let me = cluster_info.read().unwrap().my_data().clone(); let mut discard_total = 0; let mut repair_total = 0; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index f37089094..d1ea41193 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -39,7 +39,7 @@ impl Tpu { entry_receiver: Receiver, transactions_sockets: Vec, tpu_forwards_sockets: Vec, - broadcast_socket: UdpSocket, + broadcast_sockets: Vec, sigverify_disabled: bool, transaction_status_sender: Option, blocktree: &Arc, @@ -83,7 +83,7 @@ impl Tpu { ); let broadcast_stage = broadcast_type.new_broadcast_stage( - broadcast_socket, + broadcast_sockets, cluster_info.clone(), entry_receiver, &exit, diff --git a/core/src/validator.rs b/core/src/validator.rs index 479b97794..abc596323 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -432,7 +432,12 @@ impl Validator { ); info!( "local broadcast address: {}", - node.sockets.broadcast.local_addr().unwrap() + node.sockets + .broadcast + .first() + .unwrap() + .local_addr() + .unwrap() ); info!( "local repair address: {}", diff --git a/core/tests/cluster_info.rs b/core/tests/cluster_info.rs index eaf8394ea..d165ac52a 100644 --- a/core/tests/cluster_info.rs +++ b/core/tests/cluster_info.rs @@ -107,13 +107,14 @@ fn run_simulation(stakes: &[u64], fanout: usize) { }); let c_info = cluster_info.clone(); + let staked_nodes = Arc::new(staked_nodes); let shreds_len = 100; let shuffled_peers: Vec> = (0..shreds_len as i32) .map(|i| { let mut seed = [0; 32]; seed[0..4].copy_from_slice(&i.to_le_bytes()); let (peers, stakes_and_index) = - cluster_info.sorted_retransmit_peers_and_stakes(Some(&staked_nodes)); + cluster_info.sorted_retransmit_peers_and_stakes(Some(staked_nodes.clone())); let (_, shuffled_stakes_and_indexes) = ClusterInfo::shuffle_peers_and_index( &cluster_info.id(), &peers, diff --git a/ledger/src/shred.rs b/ledger/src/shred.rs index 100c1c37f..b9c6ec416 100644 --- a/ledger/src/shred.rs +++ b/ledger/src/shred.rs @@ -402,8 +402,8 @@ impl Shred { #[derive(Debug)] pub struct Shredder { - slot: Slot, - parent_slot: Slot, + pub slot: Slot, + pub parent_slot: Slot, version: u16, fec_rate: f32, keypair: Arc, @@ -443,6 +443,18 @@ impl Shredder { is_last_in_slot: bool, next_shred_index: u32, ) -> (Vec, Vec, u32) { + let (data_shreds, last_shred_index) = + self.entries_to_data_shreds(entries, is_last_in_slot, next_shred_index); + let coding_shreds = self.data_shreds_to_coding_shreds(&data_shreds); + (data_shreds, coding_shreds, last_shred_index) + } + + pub fn entries_to_data_shreds( + &self, + entries: &[Entry], + is_last_in_slot: bool, + next_shred_index: u32, + ) -> (Vec, u32) { let now = Instant::now(); let serialized_shreds = bincode::serialize(entries).expect("Expect to serialize all entries"); @@ -495,7 +507,17 @@ impl Shredder { }) }); let gen_data_time = now.elapsed().as_millis(); + datapoint_debug!( + "shredding-stats", + ("slot", self.slot as i64, i64), + ("num_data_shreds", data_shreds.len() as i64, i64), + ("serializing", serialize_time as i64, i64), + ("gen_data", gen_data_time as i64, i64), + ); + (data_shreds, last_shred_index + 1) + } + pub fn data_shreds_to_coding_shreds(&self, data_shreds: &[Shred]) -> Vec { let now = Instant::now(); // 2) Generate coding shreds let mut coding_shreds: Vec<_> = PAR_THREAD_POOL.with(|thread_pool| { @@ -528,16 +550,11 @@ impl Shredder { datapoint_debug!( "shredding-stats", - ("slot", self.slot as i64, i64), - ("num_data_shreds", data_shreds.len() as i64, i64), ("num_coding_shreds", coding_shreds.len() as i64, i64), - ("serializing", serialize_time as i64, i64), - ("gen_data", gen_data_time as i64, i64), ("gen_coding", gen_coding_time as i64, i64), ("sign_coding", sign_coding_time as i64, i64), ); - - (data_shreds, coding_shreds, last_shred_index + 1) + coding_shreds } pub fn sign_shred(signer: &Keypair, shred: &mut Shred) { diff --git a/perf/benches/sigverify.rs b/perf/benches/sigverify.rs index 49d057044..d943b38c8 100644 --- a/perf/benches/sigverify.rs +++ b/perf/benches/sigverify.rs @@ -35,10 +35,6 @@ fn bench_get_offsets(bencher: &mut Bencher) { bencher.iter(|| { let ans = sigverify::generate_offsets(&batches, &recycler); assert!(ans.is_ok()); - let ans = ans.unwrap(); - recycler.recycle(ans.0); - recycler.recycle(ans.1); - recycler.recycle(ans.2); - recycler.recycle(ans.3); + let _ans = ans.unwrap(); }) } diff --git a/validator/src/main.rs b/validator/src/main.rs index 3ffb1b1d4..ef461eeca 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -761,11 +761,7 @@ pub fn main() { }; if let Some(ref cluster_entrypoint) = cluster_entrypoint { - let udp_sockets = [ - &node.sockets.gossip, - &node.sockets.broadcast, - &node.sockets.repair, - ]; + let udp_sockets = vec![&node.sockets.gossip, &node.sockets.repair]; let mut tcp_listeners: Vec<(_, _)> = tcp_ports .iter()