Cache tvu peers for broadcast (#10373)

This commit is contained in:
sakridge 2020-06-03 08:24:05 -07:00 committed by GitHub
parent e63e7937cb
commit 2cf719ac2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 76 additions and 41 deletions

View File

@ -3,6 +3,7 @@
extern crate test; extern crate test;
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use solana_core::broadcast_stage::broadcast_metrics::TransmitShredsStats;
use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers}; use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers};
use solana_core::cluster_info::{ClusterInfo, Node}; use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::contact_info::ContactInfo; use solana_core::contact_info::ContactInfo;
@ -47,7 +48,7 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
&peers_and_stakes, &peers_and_stakes,
&peers, &peers,
&last_datapoint, &last_datapoint,
&mut 0, &mut TransmitShredsStats::default(),
) )
.unwrap(); .unwrap();
}); });

View File

@ -35,7 +35,7 @@ use std::{
}; };
mod broadcast_fake_shreds_run; mod broadcast_fake_shreds_run;
pub(crate) mod broadcast_metrics; pub mod broadcast_metrics;
pub(crate) mod broadcast_utils; pub(crate) mod broadcast_utils;
mod fail_entry_verification_broadcast_run; mod fail_entry_verification_broadcast_run;
mod standard_broadcast_run; mod standard_broadcast_run;
@ -374,13 +374,14 @@ pub fn broadcast_shreds(
peers_and_stakes: &[(u64, usize)], peers_and_stakes: &[(u64, usize)],
peers: &[ContactInfo], peers: &[ContactInfo],
last_datapoint_submit: &Arc<AtomicU64>, last_datapoint_submit: &Arc<AtomicU64>,
send_mmsg_total: &mut u64, transmit_stats: &mut TransmitShredsStats,
) -> Result<()> { ) -> Result<()> {
let broadcast_len = peers_and_stakes.len(); let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 { if broadcast_len == 0 {
update_peer_stats(1, 1, last_datapoint_submit); update_peer_stats(1, 1, last_datapoint_submit);
return Ok(()); return Ok(());
} }
let mut shred_select = Measure::start("shred_select");
let packets: Vec<_> = shreds let packets: Vec<_> = shreds
.iter() .iter()
.map(|shred| { .map(|shred| {
@ -389,6 +390,8 @@ pub fn broadcast_shreds(
(&shred.payload, &peers[broadcast_index].tvu) (&shred.payload, &peers[broadcast_index].tvu)
}) })
.collect(); .collect();
shred_select.stop();
transmit_stats.shred_select += shred_select.as_us();
let mut sent = 0; let mut sent = 0;
let mut send_mmsg_time = Measure::start("send_mmsg"); let mut send_mmsg_time = Measure::start("send_mmsg");
@ -401,7 +404,7 @@ pub fn broadcast_shreds(
} }
} }
send_mmsg_time.stop(); send_mmsg_time.stop();
*send_mmsg_total += send_mmsg_time.as_us(); transmit_stats.send_mmsg_elapsed += send_mmsg_time.as_us();
let num_live_peers = num_live_peers(&peers); let num_live_peers = num_live_peers(&peers);
update_peer_stats( update_peer_stats(

View File

@ -29,11 +29,12 @@ impl ProcessShredsStats {
} }
#[derive(Default, Clone)] #[derive(Default, Clone)]
pub(crate) struct TransmitShredsStats { pub struct TransmitShredsStats {
pub(crate) transmit_elapsed: u64, pub transmit_elapsed: u64,
pub(crate) send_mmsg_elapsed: u64, pub send_mmsg_elapsed: u64,
pub(crate) get_peers_elapsed: u64, pub get_peers_elapsed: u64,
pub(crate) num_shreds: usize, pub shred_select: u64,
pub num_shreds: usize,
} }
impl BroadcastStats for TransmitShredsStats { impl BroadcastStats for TransmitShredsStats {
@ -42,6 +43,7 @@ impl BroadcastStats for TransmitShredsStats {
self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed; self.send_mmsg_elapsed += new_stats.send_mmsg_elapsed;
self.get_peers_elapsed += new_stats.get_peers_elapsed; self.get_peers_elapsed += new_stats.get_peers_elapsed;
self.num_shreds += new_stats.num_shreds; self.num_shreds += new_stats.num_shreds;
self.shred_select += new_stats.shred_select;
} }
fn report_stats(&mut self, slot: Slot, slot_start: Instant) { fn report_stats(&mut self, slot: Slot, slot_start: Instant) {
datapoint_info!( datapoint_info!(
@ -58,6 +60,7 @@ impl BroadcastStats for TransmitShredsStats {
("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64), ("send_mmsg_elapsed", self.send_mmsg_elapsed as i64, i64),
("get_peers_elapsed", self.get_peers_elapsed as i64, i64), ("get_peers_elapsed", self.get_peers_elapsed as i64, i64),
("num_shreds", self.num_shreds as i64, i64), ("num_shreds", self.num_shreds as i64, i64),
("shred_select", self.shred_select as i64, i64),
); );
} }
} }
@ -176,15 +179,16 @@ mod test {
} }
#[test] #[test]
fn test_update() { fn test_update_broadcast() {
let start = Instant::now(); let start = Instant::now();
let mut slot_broadcast_stats = SlotBroadcastStats::default(); let mut slot_broadcast_stats = SlotBroadcastStats::default();
slot_broadcast_stats.update( slot_broadcast_stats.update(
&TransmitShredsStats { &TransmitShredsStats {
transmit_elapsed: 1, transmit_elapsed: 1,
get_peers_elapsed: 1, get_peers_elapsed: 2,
send_mmsg_elapsed: 1, send_mmsg_elapsed: 3,
num_shreds: 1, shred_select: 4,
num_shreds: 5,
}, },
&Some(BroadcastShredBatchInfo { &Some(BroadcastShredBatchInfo {
slot: 0, slot: 0,
@ -198,16 +202,18 @@ mod test {
assert_eq!(slot_0_stats.num_batches, 1); assert_eq!(slot_0_stats.num_batches, 1);
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
slot_broadcast_stats.update( slot_broadcast_stats.update(
&TransmitShredsStats { &TransmitShredsStats {
transmit_elapsed: 1, transmit_elapsed: 7,
get_peers_elapsed: 1, get_peers_elapsed: 8,
send_mmsg_elapsed: 1, send_mmsg_elapsed: 9,
num_shreds: 1, shred_select: 10,
num_shreds: 11,
}, },
&None, &None,
); );
@ -217,9 +223,10 @@ mod test {
assert_eq!(slot_0_stats.num_batches, 1); assert_eq!(slot_0_stats.num_batches, 1);
assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2); assert_eq!(slot_0_stats.num_expected_batches.unwrap(), 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.transmit_elapsed, 1);
assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.get_peers_elapsed, 2);
assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.send_mmsg_elapsed, 3);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 1); assert_eq!(slot_0_stats.broadcast_shred_stats.shred_select, 4);
assert_eq!(slot_0_stats.broadcast_shred_stats.num_shreds, 5);
// If another batch is given, then total number of batches == num_expected_batches == 2, // If another batch is given, then total number of batches == num_expected_batches == 2,
// so the batch should be purged from the HashMap // so the batch should be purged from the HashMap
@ -228,6 +235,7 @@ mod test {
transmit_elapsed: 1, transmit_elapsed: 1,
get_peers_elapsed: 1, get_peers_elapsed: 1,
send_mmsg_elapsed: 1, send_mmsg_elapsed: 1,
shred_select: 1,
num_shreds: 1, num_shreds: 1,
}, },
&Some(BroadcastShredBatchInfo { &Some(BroadcastShredBatchInfo {

View File

@ -137,14 +137,13 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
// Broadcast data // Broadcast data
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let mut send_mmsg_total = 0;
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,
&peers_and_stakes, &peers_and_stakes,
&peers, &peers,
&Arc::new(AtomicU64::new(0)), &Arc::new(AtomicU64::new(0)),
&mut send_mmsg_total, &mut TransmitShredsStats::default(),
)?; )?;
Ok(()) Ok(())

View File

@ -9,6 +9,7 @@ use solana_ledger::{
}; };
use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us}; use solana_sdk::{pubkey::Pubkey, signature::Keypair, timing::duration_as_us};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::RwLock;
use std::time::Duration; use std::time::Duration;
#[derive(Clone)] #[derive(Clone)]
@ -23,6 +24,14 @@ pub struct StandardBroadcastRun {
shred_version: u16, shred_version: u16,
last_datapoint_submit: Arc<AtomicU64>, last_datapoint_submit: Arc<AtomicU64>,
num_batches: usize, num_batches: usize,
broadcast_peer_cache: Arc<RwLock<BroadcastPeerCache>>,
last_peer_update: Arc<AtomicU64>,
}
#[derive(Default)]
struct BroadcastPeerCache {
peers: Vec<ContactInfo>,
peers_and_stakes: Vec<(u64, usize)>,
} }
impl StandardBroadcastRun { impl StandardBroadcastRun {
@ -38,6 +47,8 @@ impl StandardBroadcastRun {
shred_version, shred_version,
last_datapoint_submit: Arc::new(AtomicU64::new(0)), last_datapoint_submit: Arc::new(AtomicU64::new(0)),
num_batches: 0, num_batches: 0,
broadcast_peer_cache: Arc::new(RwLock::new(BroadcastPeerCache::default())),
last_peer_update: Arc::new(AtomicU64::new(0)),
} }
} }
@ -293,33 +304,46 @@ impl StandardBroadcastRun {
shreds: Arc<Vec<Shred>>, shreds: Arc<Vec<Shred>>,
broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>, broadcast_shred_batch_info: Option<BroadcastShredBatchInfo>,
) -> Result<()> { ) -> Result<()> {
const BROADCAST_PEER_UPDATE_INTERVAL_MS: u64 = 1000;
trace!("Broadcasting {:?} shreds", shreds.len()); trace!("Broadcasting {:?} shreds", shreds.len());
// Get the list of peers to broadcast to // Get the list of peers to broadcast to
let get_peers_start = Instant::now(); let mut get_peers_time = Measure::start("broadcast::get_peers");
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes); let now = timestamp();
let get_peers_elapsed = get_peers_start.elapsed(); let last = self.last_peer_update.load(Ordering::Relaxed);
if now - last > BROADCAST_PEER_UPDATE_INTERVAL_MS
&& self
.last_peer_update
.compare_and_swap(now, last, Ordering::Relaxed)
== last
{
let mut w_broadcast_peer_cache = self.broadcast_peer_cache.write().unwrap();
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
w_broadcast_peer_cache.peers = peers;
w_broadcast_peer_cache.peers_and_stakes = peers_and_stakes;
}
get_peers_time.stop();
let r_broadcast_peer_cache = self.broadcast_peer_cache.read().unwrap();
let mut transmit_stats = TransmitShredsStats::default();
// Broadcast the shreds // Broadcast the shreds
let transmit_start = Instant::now(); let mut transmit_time = Measure::start("broadcast_shreds");
let mut send_mmsg_total = 0;
broadcast_shreds( broadcast_shreds(
sock, sock,
&shreds, &shreds,
&peers_and_stakes, &r_broadcast_peer_cache.peers_and_stakes,
&peers, &r_broadcast_peer_cache.peers,
&self.last_datapoint_submit, &self.last_datapoint_submit,
&mut send_mmsg_total, &mut transmit_stats,
)?; )?;
let transmit_elapsed = transmit_start.elapsed(); drop(r_broadcast_peer_cache);
let new_transmit_shreds_stats = TransmitShredsStats { transmit_time.stop();
transmit_elapsed: duration_as_us(&transmit_elapsed),
get_peers_elapsed: duration_as_us(&get_peers_elapsed), transmit_stats.transmit_elapsed = transmit_time.as_us();
send_mmsg_elapsed: send_mmsg_total, transmit_stats.get_peers_elapsed = get_peers_time.as_us();
num_shreds: shreds.len(), transmit_stats.num_shreds = shreds.len();
};
// Process metrics // Process metrics
self.update_transmit_metrics(&new_transmit_shreds_stats, &broadcast_shred_batch_info); self.update_transmit_metrics(&transmit_stats, &broadcast_shred_batch_info);
Ok(()) Ok(())
} }