Fix broadcast metrics (#9461)

* Rework broadcast metrics to support multiple threads

* Update dashboards

Co-authored-by: Carl <carl@solana.com>
This commit is contained in:
carllin 2020-04-15 15:22:16 -07:00 committed by GitHub
parent a8b8c2f438
commit 7aa4d401f7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1389 additions and 621 deletions

1
Cargo.lock generated
View File

@ -4505,7 +4505,6 @@ dependencies = [
"rayon 1.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_derive 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-bpf-loader-program 1.2.0",
"solana-logger 1.2.0",
"solana-measure 1.2.0",
"solana-metrics 1.2.0",

View File

@ -1,6 +1,6 @@
//! A stage to broadcast data from a leader node to validators
use self::{
broadcast_fake_shreds_run::BroadcastFakeShredsRun,
broadcast_fake_shreds_run::BroadcastFakeShredsRun, broadcast_metrics::*,
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
};
@ -35,13 +35,16 @@ use std::{
};
mod broadcast_fake_shreds_run;
pub(crate) mod broadcast_metrics;
pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run;
pub const NUM_INSERT_THREADS: usize = 2;
pub type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
pub type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
pub(crate) const NUM_INSERT_THREADS: usize = 2;
pub(crate) type RetransmitSlotsSender = CrossbeamSender<HashMap<Slot, Arc<Bank>>>;
pub(crate) type RetransmitSlotsReceiver = CrossbeamReceiver<HashMap<Slot, Arc<Bank>>>;
pub(crate) type RecordReceiver = Receiver<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>;
pub(crate) type TransmitReceiver = Receiver<(TransmitShreds, Option<BroadcastShredBatchInfo>)>;
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum BroadcastStageReturnType {
@ -107,18 +110,18 @@ trait BroadcastRun {
&mut self,
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()>;
fn transmit(
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
) -> Result<()>;
fn record(
&self,
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()>;
}
@ -150,8 +153,8 @@ impl BroadcastStage {
fn run(
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
mut broadcast_stage_run: impl BroadcastRun,
) -> BroadcastStageReturnType {
loop {
@ -250,7 +253,7 @@ impl BroadcastStage {
let blockstore_receiver = Arc::new(Mutex::new(blockstore_receiver));
for _ in 0..NUM_INSERT_THREADS {
let blockstore_receiver = blockstore_receiver.clone();
let bs_record = broadcast_stage_run.clone();
let mut bs_record = broadcast_stage_run.clone();
let btree = blockstore.clone();
let t = Builder::new()
.name("solana-broadcaster-record".to_string())
@ -289,7 +292,7 @@ impl BroadcastStage {
fn check_retransmit_signals(
blockstore: &Blockstore,
retransmit_slots_receiver: &RetransmitSlotsReceiver,
socket_sender: &Sender<TransmitShreds>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
let timer = Duration::from_millis(100);
@ -310,7 +313,7 @@ impl BroadcastStage {
);
if !data_shreds.is_empty() {
socket_sender.send((stakes.clone(), data_shreds))?;
socket_sender.send(((stakes.clone(), data_shreds), None))?;
}
let coding_shreds = Arc::new(
@ -320,7 +323,7 @@ impl BroadcastStage {
);
if !coding_shreds.is_empty() {
socket_sender.send((stakes.clone(), coding_shreds))?;
socket_sender.send(((stakes.clone(), coding_shreds), None))?;
}
}
@ -478,13 +481,13 @@ pub mod test {
}
fn check_all_shreds_received(
transmit_receiver: &Receiver<TransmitShreds>,
transmit_receiver: &TransmitReceiver,
mut data_index: u64,
mut coding_index: u64,
num_expected_data_shreds: u64,
num_expected_coding_shreds: u64,
) {
while let Ok(new_retransmit_slots) = transmit_receiver.try_recv() {
while let Ok((new_retransmit_slots, _)) = transmit_receiver.try_recv() {
if new_retransmit_slots.1[0].is_data() {
for data_shred in new_retransmit_slots.1.iter() {
assert_eq!(data_shred.index() as u64, data_index);

View File

@ -28,8 +28,8 @@ impl BroadcastRun for BroadcastFakeShredsRun {
&mut self,
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
// 1) Pull entries from banking stage
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
@ -83,25 +83,31 @@ impl BroadcastRun for BroadcastFakeShredsRun {
}
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send(data_shreds.clone())?;
blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step
//some indicates fake shreds
socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)))?;
socket_sender.send((Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)))?;
socket_sender.send((
(Some(Arc::new(HashMap::new())), Arc::new(fake_data_shreds)),
None,
))?;
socket_sender.send((
(Some(Arc::new(HashMap::new())), Arc::new(fake_coding_shreds)),
None,
))?;
//none indicates real shreds
socket_sender.send((None, data_shreds))?;
socket_sender.send((None, Arc::new(coding_shreds)))?;
socket_sender.send(((None, data_shreds), None))?;
socket_sender.send(((None, Arc::new(coding_shreds)), None))?;
Ok(())
}
fn transmit(
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
) -> Result<()> {
for (stakes, data_shreds) in receiver.lock().unwrap().iter() {
for ((stakes, data_shreds), _) in receiver.lock().unwrap().iter() {
let peers = cluster_info.read().unwrap().tvu_peers();
peers.iter().enumerate().for_each(|(i, peer)| {
if i <= self.partition && stakes.is_some() {
@ -119,11 +125,11 @@ impl BroadcastRun for BroadcastFakeShredsRun {
Ok(())
}
fn record(
&self,
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
for data_shreds in receiver.lock().unwrap().iter() {
for (data_shreds, _) in receiver.lock().unwrap().iter() {
blockstore.insert_shreds(data_shreds.to_vec(), None, true)?;
}
Ok(())

View File

@ -0,0 +1,288 @@
use super::*;
pub(crate) trait BroadcastStats {
fn update(&mut self, new_stats: &Self);
fn report_stats(&mut self, slot: Slot, slot_start: Instant);
}
#[derive(Clone)]
pub(crate) struct BroadcastShredBatchInfo {
pub(crate) slot: Slot,
pub(crate) num_expected_batches: Option<usize>,
pub(crate) slot_start_ts: Instant,
}
#[derive(Default, Clone)]
pub(crate) struct ProcessShredsStats {
// Per-slot elapsed time
pub(crate) shredding_elapsed: u64,
pub(crate) receive_elapsed: u64,
}
impl ProcessShredsStats {
pub(crate) fn update(&mut self, new_stats: &ProcessShredsStats) {
self.shredding_elapsed += new_stats.shredding_elapsed;
self.receive_elapsed += new_stats.receive_elapsed;
}
pub(crate) fn reset(&mut self) {
*self = Self::default();
}
}
#[derive(Default, Clone)]
pub(crate) struct TransmitShredsStats {
pub(crate) transmit_elapsed: u64,
pub(crate) send_mmsg_elapsed: u64,
pub(crate) get_peers_elapsed: u64,
pub(crate) num_shreds: usize,
}
impl BroadcastStats for TransmitShredsStats {
fn update(&mut self, new_stats: &TransmitShredsStats) {
self.transmit_elapsed += new_stats.transmit_elapsed;
self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed;
self.get_peers_elapsed += new_stats.get_peers_elapsed;
self.num_shreds += new_stats.num_shreds;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
datapoint_info!(
"broadcast-transmit-shreds-stats",
("slot", slot as i64, i64),
(
"end_to_end_elapsed",
// `slot_start` signals when the first batch of shreds was
// received, used to measure duration of broadcast
slot_start.elapsed().as_micros() as i64,
i64
),
("transmit_elapsed", self.transmit_elapsed as i64, i64),
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
("num_shreds", self.num_shreds as i64, i64),
);
}
}
#[derive(Default, Clone)]
pub(crate) struct InsertShredsStats {
pub(crate) insert_shreds_elapsed: u64,
pub(crate) num_shreds: usize,
}
impl BroadcastStats for InsertShredsStats {
fn update(&mut self, new_stats: &InsertShredsStats) {
self.insert_shreds_elapsed += new_stats.insert_shreds_elapsed;
self.num_shreds += new_stats.num_shreds;
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
datapoint_info!(
"broadcast-insert-shreds-stats",
("slot", slot as i64, i64),
(
"end_to_end_elapsed",
// `slot_start` signals when the first batch of shreds was
// received, used to measure duration of broadcast
slot_start.elapsed().as_micros() as i64,
i64
),
(
"insert_shreds_elapsed",
self.insert_shreds_elapsed as i64,
i64
),
("num_shreds", self.num_shreds as i64, i64),
);
}
}
// Tracks metrics of type `T` acrosss multiple threads
#[derive(Default)]
pub(crate) struct BatchCounter<T: BroadcastStats + Default> {
// The number of batches processed across all threads so far
num_batches: usize,
// Filled in when the last batch of shreds is received,
// signals how many batches of shreds to expect
num_expected_batches: Option<usize>,
broadcast_shred_stats: T,
}
impl<T: BroadcastStats + Default> BatchCounter<T> {
#[cfg(test)]
pub(crate) fn num_batches(&self) -> usize {
self.num_batches
}
}
#[derive(Default)]
pub(crate) struct SlotBroadcastStats<T: BroadcastStats + Default>(HashMap<Slot, BatchCounter<T>>);
impl<T: BroadcastStats + Default> SlotBroadcastStats<T> {
#[cfg(test)]
pub(crate) fn get(&self, slot: Slot) -> Option<&BatchCounter<T>> {
self.0.get(&slot)
}
pub(crate) fn update(&mut self, new_stats: &T, batch_info: &Option<BroadcastShredBatchInfo>) {
if let Some(batch_info) = batch_info {
let mut should_delete = false;
{
let slot_batch_counter = self.0.entry(batch_info.slot).or_default();
slot_batch_counter.broadcast_shred_stats.update(new_stats);
// Only count the ones where `broadcast_shred_batch_info`.is_some(), because
// there could potentially be other `retransmit` slots inserted into the
// transmit pipeline (signaled by ReplayStage) that are not created by the
// main shredding/broadcast pipeline
slot_batch_counter.num_batches += 1;
if let Some(num_expected_batches) = batch_info.num_expected_batches {
slot_batch_counter.num_expected_batches = Some(num_expected_batches);
}
if let Some(num_expected_batches) = slot_batch_counter.num_expected_batches {
if slot_batch_counter.num_batches == num_expected_batches {
slot_batch_counter
.broadcast_shred_stats
.report_stats(batch_info.slot, batch_info.slot_start_ts);
should_delete = true;
}
}
}
if should_delete {
self.0
.remove(&batch_info.slot)
.expect("delete should be successful");
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
#[derive(Default)]
struct TestStats {
sender: Option<Sender<(usize, Slot, Instant)>>,
count: usize,
}
impl BroadcastStats for TestStats {
fn update(&mut self, new_stats: &TestStats) {
self.count += new_stats.count;
self.sender = new_stats.sender.clone();
}
fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
self.sender
.as_ref()
.unwrap()
.send((self.count, slot, slot_start))
.unwrap()
}
}
#[test]
fn test_update() {
let start = Instant::now();
let mut slot_broadcast_stats = SlotBroadcastStats::default();
slot_broadcast_stats.update(
&TransmitShredsStats {
transmit_elapsed: 1,
get_peers_elapsed: 1,
send_mmsg_elapsed: 1,
num_shreds: 1,
},
&Some(BroadcastShredBatchInfo {
slot: 0,
num_expected_batches: Some(2),
slot_start_ts: start.clone(),
}),
);
// Singular update
let slot_0_stats = slot_broadcast_stats.0.get(&0).unwrap();
assert_eq!(slot_0_stats.num_batches, 1);
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1);
slot_broadcast_stats.update(
&TransmitShredsStats {
transmit_elapsed: 1,
get_peers_elapsed: 1,
send_mmsg_elapsed: 1,
num_shreds: 1,
},
&None,
);
// If BroadcastShredBatchInfo == None, then update should be ignored
let slot_0_stats = slot_broadcast_stats.0.get(&0).unwrap();
assert_eq!(slot_0_stats.num_batches, 1);
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1);
// If another batch is given, then total number of batches == num_expected_batches == 2,
// so the batch should be purged from the HashMap
slot_broadcast_stats.update(
&TransmitShredsStats {
transmit_elapsed: 1,
get_peers_elapsed: 1,
send_mmsg_elapsed: 1,
num_shreds: 1,
},
&Some(BroadcastShredBatchInfo {
slot: 0,
num_expected_batches: None,
slot_start_ts: start.clone(),
}),
);
assert!(slot_broadcast_stats.0.get(&0).is_none());
}
#[test]
fn test_update_multi_threaded() {
for round in 0..50 {
let start = Instant::now();
let slot_broadcast_stats = Arc::new(Mutex::new(SlotBroadcastStats::default()));
let num_threads = 5;
let slot = 0;
let (sender, receiver) = channel();
let thread_handles: Vec<_> = (0..num_threads)
.into_iter()
.map(|i| {
let slot_broadcast_stats = slot_broadcast_stats.clone();
let sender = Some(sender.clone());
let test_stats = TestStats { sender, count: 1 };
let mut broadcast_batch_info = BroadcastShredBatchInfo {
slot,
num_expected_batches: None,
slot_start_ts: start.clone(),
};
if i == round % num_threads {
broadcast_batch_info.num_expected_batches = Some(num_threads);
}
Builder::new()
.name("test_update_multi_threaded".to_string())
.spawn(move || {
slot_broadcast_stats
.lock()
.unwrap()
.update(&test_stats, &Some(broadcast_batch_info))
})
.unwrap()
})
.collect();
for t in thread_handles {
t.join().unwrap();
}
assert!(slot_broadcast_stats.lock().unwrap().0.get(&slot).is_none());
let (returned_count, returned_slot, returned_instant) = receiver.recv().unwrap();
assert_eq!(returned_count, num_threads);
assert_eq!(returned_slot, slot);
assert_eq!(returned_instant, returned_instant);
}
}
}

View File

@ -23,8 +23,8 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
&mut self,
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
// 1) Pull entries from banking stage
let mut receive_results = broadcast_utils::recv_slot_entries(receiver)?;
@ -61,23 +61,23 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
);
let data_shreds = Arc::new(data_shreds);
blockstore_sender.send(data_shreds.clone())?;
blockstore_sender.send((data_shreds.clone(), None))?;
// 3) Start broadcast step
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let stakes = stakes.map(Arc::new);
socket_sender.send((stakes.clone(), data_shreds))?;
socket_sender.send((stakes, Arc::new(coding_shreds)))?;
socket_sender.send(((stakes.clone(), data_shreds), None))?;
socket_sender.send(((stakes, Arc::new(coding_shreds)), None))?;
Ok(())
}
fn transmit(
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
) -> Result<()> {
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
let ((stakes, shreds), _) = receiver.lock().unwrap().recv()?;
// Broadcast data
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
@ -94,11 +94,11 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
Ok(())
}
fn record(
&self,
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let all_shreds = receiver.lock().unwrap().recv()?;
let (all_shreds, _) = receiver.lock().unwrap().recv()?;
blockstore
.insert_shreds(all_shreds.to_vec(), None, true)
.expect("Failed to insert shreds in blockstore");

View File

@ -1,5 +1,7 @@
use super::broadcast_utils::{self, ReceiveResults};
use super::*;
use super::{
broadcast_utils::{self, ReceiveResults},
*,
};
use crate::broadcast_stage::broadcast_utils::UnfinishedSlotInfo;
use solana_ledger::{
entry::Entry,
@ -9,44 +11,33 @@ use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use std::collections::HashMap;
use std::time::Duration;
#[derive(Default)]
struct BroadcastStats {
// Per-slot elapsed time
shredding_elapsed: u64,
insert_shreds_elapsed: u64,
broadcast_elapsed: u64,
receive_elapsed: u64,
seed_elapsed: u64,
send_mmsg_elapsed: u64,
}
impl BroadcastStats {
fn reset(&mut self) {
*self = Self::default();
}
}
#[derive(Clone)]
pub struct StandardBroadcastRun {
stats: Arc<RwLock<BroadcastStats>>,
process_shreds_stats: ProcessShredsStats,
transmit_shreds_stats: Arc<Mutex<SlotBroadcastStats<TransmitShredsStats>>>,
insert_shreds_stats: Arc<Mutex<SlotBroadcastStats<InsertShredsStats>>>,
unfinished_slot: Option<UnfinishedSlotInfo>,
current_slot_and_parent: Option<(u64, u64)>,
slot_broadcast_start: Option<Instant>,
keypair: Arc<Keypair>,
shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>,
num_batches: usize,
}
impl StandardBroadcastRun {
pub(super) fn new(keypair: Arc<Keypair>, shred_version: u16) -> Self {
Self {
stats: Arc::new(RwLock::new(BroadcastStats::default())),
process_shreds_stats: ProcessShredsStats::default(),
transmit_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())),
insert_shreds_stats: Arc::new(Mutex::new(SlotBroadcastStats::default())),
unfinished_slot: None,
current_slot_and_parent: None,
slot_broadcast_start: None,
keypair,
shred_version,
last_datapoint_submit: Arc::new(AtomicU64::new(0)),
num_batches: 0,
}
}
@ -141,6 +132,7 @@ impl StandardBroadcastRun {
let brecv = Arc::new(Mutex::new(brecv));
//data
let _ = self.transmit(&srecv, cluster_info, sock);
let _ = self.record(&brecv, blockstore);
//coding
let _ = self.transmit(&srecv, cluster_info, sock);
let _ = self.record(&brecv, blockstore);
@ -150,8 +142,8 @@ impl StandardBroadcastRun {
fn process_receive_results(
&mut self,
blockstore: &Arc<Blockstore>,
socket_sender: &Sender<TransmitShreds>,
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
receive_results: ReceiveResults,
) -> Result<()> {
let mut receive_elapsed = receive_results.time_elapsed;
@ -159,11 +151,13 @@ impl StandardBroadcastRun {
let bank = receive_results.bank.clone();
let last_tick_height = receive_results.last_tick_height;
inc_new_counter_info!("broadcast_service-entries_received", num_entries);
let old_broadcast_start = self.slot_broadcast_start;
let old_num_batches = self.num_batches;
if self.current_slot_and_parent.is_none()
|| bank.slot() != self.current_slot_and_parent.unwrap().0
{
self.slot_broadcast_start = Some(Instant::now());
self.num_batches = 0;
let slot = bank.slot();
let parent_slot = bank.parent_slot();
@ -178,19 +172,19 @@ impl StandardBroadcastRun {
self.check_for_interrupted_slot(bank.ticks_per_slot() as u8);
// 2) Convert entries to shreds and coding shreds
let (shredder, next_shred_index) = self.init_shredder(
blockstore,
(bank.tick_height() % bank.ticks_per_slot()) as u8,
);
let mut data_shreds = self.entries_to_data_shreds(
let is_last_in_slot = last_tick_height == bank.max_tick_height();
let data_shreds = self.entries_to_data_shreds(
&shredder,
next_shred_index,
&receive_results.entries,
last_tick_height == bank.max_tick_height(),
is_last_in_slot,
);
//Insert the first shred so blockstore stores that the leader started this block
//This must be done before the blocks are sent out over the wire.
// Insert the first shred so blockstore stores that the leader started this block
// This must be done before the blocks are sent out over the wire.
if !data_shreds.is_empty() && data_shreds[0].index() == 0 {
let first = vec![data_shreds[0].clone()];
blockstore
@ -198,27 +192,56 @@ impl StandardBroadcastRun {
.expect("Failed to insert shreds in blockstore");
}
let last_data_shred = data_shreds.len();
if let Some(last_shred) = last_unfinished_slot_shred {
data_shreds.push(last_shred);
}
let to_shreds_elapsed = to_shreds_start.elapsed();
let bank_epoch = bank.get_leader_schedule_epoch(bank.slot());
let stakes = staking_utils::staked_nodes_at_epoch(&bank, bank_epoch);
let stakes = stakes.map(Arc::new);
let data_shreds = Arc::new(data_shreds);
socket_sender.send((stakes.clone(), data_shreds.clone()))?;
blockstore_sender.send(data_shreds.clone())?;
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]);
let coding_shreds = Arc::new(coding_shreds);
socket_sender.send((stakes, coding_shreds.clone()))?;
blockstore_sender.send(coding_shreds)?;
self.update_broadcast_stats(BroadcastStats {
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
receive_elapsed: duration_as_us(&receive_elapsed),
..BroadcastStats::default()
// Broadcast the last shred of the interrupted slot if necessary
if let Some(last_shred) = last_unfinished_slot_shred {
let batch_info = Some(BroadcastShredBatchInfo {
slot: last_shred.slot(),
num_expected_batches: Some(old_num_batches + 1),
slot_start_ts: old_broadcast_start.expect(
"Old broadcast start time for previous slot must exist if the previous slot
was interrupted",
),
});
let last_shred = Arc::new(vec![last_shred]);
socket_sender.send(((stakes.clone(), last_shred.clone()), batch_info.clone()))?;
blockstore_sender.send((last_shred, batch_info))?;
}
// Increment by two batches, one for the data batch, one for the coding batch.
self.num_batches += 2;
let num_expected_batches = {
if is_last_in_slot {
Some(self.num_batches)
} else {
None
}
};
let batch_info = Some(BroadcastShredBatchInfo {
slot: bank.slot(),
num_expected_batches,
slot_start_ts: self
.slot_broadcast_start
.clone()
.expect("Start timestamp must exist for a slot if we're broadcasting the slot"),
});
let data_shreds = Arc::new(data_shreds);
socket_sender.send(((stakes.clone(), data_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((data_shreds.clone(), batch_info.clone()))?;
let coding_shreds = shredder.data_shreds_to_coding_shreds(&data_shreds[0..last_data_shred]);
let coding_shreds = Arc::new(coding_shreds);
socket_sender.send(((stakes, coding_shreds.clone()), batch_info.clone()))?;
blockstore_sender.send((coding_shreds, batch_info))?;
self.process_shreds_stats.update(&ProcessShredsStats {
shredding_elapsed: duration_as_us(&to_shreds_elapsed),
receive_elapsed: duration_as_us(&receive_elapsed),
});
if last_tick_height == bank.max_tick_height() {
self.report_and_reset_stats();
self.unfinished_slot = None;
@ -227,10 +250,15 @@ impl StandardBroadcastRun {
Ok(())
}
fn insert(&self, blockstore: &Arc<Blockstore>, shreds: Arc<Vec<Shred>>) -> Result<()> {
fn insert(
&mut self,
blockstore: &Arc<Blockstore>,
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
) -> Result<()> {
// Insert shreds into blockstore
let insert_shreds_start = Instant::now();
//The first shred is inserted synchronously
// The first shred is inserted synchronously
let data_shreds = if !shreds.is_empty() && shreds[0].index() == 0 {
shreds[1..].to_vec()
} else {
@ -240,29 +268,39 @@ impl StandardBroadcastRun {
.insert_shreds(data_shreds, None, true)
.expect("Failed to insert shreds in blockstore");
let insert_shreds_elapsed = insert_shreds_start.elapsed();
self.update_broadcast_stats(BroadcastStats {
let new_insert_shreds_stats = InsertShredsStats {
insert_shreds_elapsed: duration_as_us(&insert_shreds_elapsed),
..BroadcastStats::default()
});
num_shreds: shreds.len(),
};
self.update_insertion_metrics(&new_insert_shreds_stats, &broadcast_shred_batch_info);
Ok(())
}
fn update_insertion_metrics(
&mut self,
new_insertion_shreds_stats: &InsertShredsStats,
broadcast_shred_batch_info: &Option<BroadcastShredBatchInfo>,
) {
let mut insert_shreds_stats = self.insert_shreds_stats.lock().unwrap();
insert_shreds_stats.update(new_insertion_shreds_stats, broadcast_shred_batch_info);
}
fn broadcast(
&mut self,
sock: &UdpSocket,
cluster_info: &Arc<RwLock<ClusterInfo>>,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
) -> Result<()> {
let seed_start = Instant::now();
let seed_elapsed = seed_start.elapsed();
trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to
let get_peers_start = Instant::now();
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let get_peers_elapsed = get_peers_start.elapsed();
// Broadcast the shreds
let broadcast_start = Instant::now();
trace!("Broadcasting {:?} shreds", shreds.len());
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let transmit_start = Instant::now();
let mut send_mmsg_total = 0;
broadcast_shreds(
sock,
@ -272,42 +310,38 @@ impl StandardBroadcastRun {
&self.last_datapoint_submit,
&mut send_mmsg_total,
)?;
let broadcast_elapsed = broadcast_start.elapsed();
self.update_broadcast_stats(BroadcastStats {
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
seed_elapsed: duration_as_us(&seed_elapsed),
let transmit_elapsed = transmit_start.elapsed();
let new_transmit_shreds_stats = TransmitShredsStats {
transmit_elapsed: duration_as_us(&transmit_elapsed),
get_peers_elapsed: duration_as_us(&get_peers_elapsed),
send_mmsg_elapsed: send_mmsg_total,
..BroadcastStats::default()
});
num_shreds: shreds.len(),
};
// Process metrics
self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info);
Ok(())
}
fn update_broadcast_stats(&self, stats: BroadcastStats) {
let mut wstats = self.stats.write().unwrap();
wstats.receive_elapsed += stats.receive_elapsed;
wstats.shredding_elapsed += stats.shredding_elapsed;
wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
wstats.broadcast_elapsed += stats.broadcast_elapsed;
wstats.seed_elapsed += stats.seed_elapsed;
wstats.send_mmsg_elapsed += stats.send_mmsg_elapsed;
fn update_transmit_metrics(
&mut self,
new_transmit_shreds_stats: &TransmitShredsStats,
broadcast_shred_batch_info: &Option<BroadcastShredBatchInfo>,
) {
let mut transmit_shreds_stats = self.transmit_shreds_stats.lock().unwrap();
transmit_shreds_stats.update(new_transmit_shreds_stats, broadcast_shred_batch_info);
}
fn report_and_reset_stats(&mut self) {
let stats = self.stats.read().unwrap();
let stats = &self.process_shreds_stats;
assert!(self.unfinished_slot.is_some());
datapoint_info!(
"broadcast-bank-stats",
"broadcast-process-shreds-stats",
("slot", self.unfinished_slot.unwrap().slot as i64, i64),
("shredding_time", stats.shredding_elapsed as i64, i64),
("insertion_time", stats.insert_shreds_elapsed as i64, i64),
("broadcast_time", stats.broadcast_elapsed as i64, i64),
("receive_time", stats.receive_elapsed as i64, i64),
("send_mmsg", stats.send_mmsg_elapsed as i64, i64),
("seed", stats.seed_elapsed as i64, i64),
(
"num_shreds",
"num_data_shreds",
i64::from(self.unfinished_slot.unwrap().next_shred_index),
i64
),
@ -317,8 +351,7 @@ impl StandardBroadcastRun {
i64
),
);
drop(stats);
self.stats.write().unwrap().reset();
self.process_shreds_stats.reset();
}
}
@ -327,8 +360,8 @@ impl BroadcastRun for StandardBroadcastRun {
&mut self,
blockstore: &Arc<Blockstore>,
receiver: &Receiver<WorkingBankEntry>,
socket_sender: &Sender<TransmitShreds>,
blockstore_sender: &Sender<Arc<Vec<Shred>>>,
socket_sender: &Sender<(TransmitShreds, Option<BroadcastShredBatchInfo>)>,
blockstore_sender: &Sender<(Arc<Vec<Shred>>, Option<BroadcastShredBatchInfo>)>,
) -> Result<()> {
let receive_results = broadcast_utils::recv_slot_entries(receiver)?;
self.process_receive_results(
@ -340,20 +373,20 @@ impl BroadcastRun for StandardBroadcastRun {
}
fn transmit(
&mut self,
receiver: &Arc<Mutex<Receiver<TransmitShreds>>>,
receiver: &Arc<Mutex<TransmitReceiver>>,
cluster_info: &Arc<RwLock<ClusterInfo>>,
sock: &UdpSocket,
) -> Result<()> {
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, stakes, shreds)
let ((stakes, shreds), slot_start_ts) = receiver.lock().unwrap().recv()?;
self.broadcast(sock, cluster_info, stakes, shreds, slot_start_ts)
}
fn record(
&self,
receiver: &Arc<Mutex<Receiver<Arc<Vec<Shred>>>>>,
&mut self,
receiver: &Arc<Mutex<RecordReceiver>>,
blockstore: &Arc<Blockstore>,
) -> Result<()> {
let shreds = receiver.lock().unwrap().recv()?;
self.insert(blockstore, shreds)
let (shreds, slot_start_ts) = receiver.lock().unwrap().recv()?;
self.insert(blockstore, shreds, slot_start_ts)
}
}
@ -469,12 +502,29 @@ mod test {
// Make sure the slot is not complete
assert!(!blockstore.is_full(0));
// Modify the stats, should reset later
standard_broadcast_run
.stats
.write()
.unwrap()
.receive_elapsed = 10;
standard_broadcast_run.process_shreds_stats.receive_elapsed = 10;
// Broadcast stats should exist, and 2 batches should have been sent,
// one for data, one for coding
assert_eq!(
standard_broadcast_run
.transmit_shreds_stats
.lock()
.unwrap()
.get(unfinished_slot.slot)
.unwrap()
.num_batches(),
2
);
assert_eq!(
standard_broadcast_run
.insert_shreds_stats
.lock()
.unwrap()
.get(unfinished_slot.slot)
.unwrap()
.num_batches(),
2
);
// Try to fetch ticks from blockstore, nothing should break
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0);
assert_eq!(
@ -485,7 +535,7 @@ mod test {
// Step 2: Make a transmission for another bank that interrupts the transmission for
// slot 0
let bank2 = Arc::new(Bank::new_from_parent(&bank0, &leader_keypair.pubkey(), 2));
let interrupted_slot = unfinished_slot.slot;
// Interrupting the slot should cause the unfinished_slot and stats to reset
let num_shreds = 1;
assert!(num_shreds < num_shreds_per_slot);
@ -509,10 +559,24 @@ mod test {
// Check that the stats were reset as well
assert_eq!(
standard_broadcast_run.stats.read().unwrap().receive_elapsed,
standard_broadcast_run.process_shreds_stats.receive_elapsed,
0
);
// Broadcast stats for interrupted slot should be cleared
assert!(standard_broadcast_run
.transmit_shreds_stats
.lock()
.unwrap()
.get(interrupted_slot)
.is_none());
assert!(standard_broadcast_run
.insert_shreds_stats
.lock()
.unwrap()
.get(interrupted_slot)
.is_none());
// Try to fetch the incomplete ticks from blockstore, should succeed
assert_eq!(blockstore.get_slot_entries(0, 0).unwrap(), ticks0);
assert_eq!(

View File

@ -278,7 +278,7 @@ impl CrdsGossipPull {
failed
}
// build a set of filters of the current crds table
// num_filters - used to increase the likely hood of a value in crds being added to some filter
// num_filters - used to increase the likelyhood of a value in crds being added to some filter
pub fn build_crds_filters(&self, crds: &Crds, bloom_size: usize) -> Vec<CrdsFilter> {
let num = cmp::max(
CRDS_GOSSIP_DEFAULT_BLOOM_ITEMS,