Optimize broadcast cluster_info critical section (#9327)

This commit is contained in:
sakridge 2020-04-06 17:36:22 -07:00 committed by GitHub
parent 96c23110ae
commit 4677cdb4c2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 151 additions and 124 deletions

View File

@ -3,10 +3,13 @@
extern crate test;
use rand::{thread_rng, Rng};
use solana_core::broadcast_stage::{broadcast_shreds, get_broadcast_peers};
use solana_core::cluster_info::{ClusterInfo, Node};
use solana_core::contact_info::ContactInfo;
use solana_ledger::shred::Shred;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::timing::timestamp;
use std::sync::RwLock;
use std::{collections::HashMap, net::UdpSocket, sync::Arc, time::Instant};
use test::Bencher;
@ -18,10 +21,8 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
let mut cluster_info = ClusterInfo::new_with_invalid_keypair(leader_info.info.clone());
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
const SHRED_SIZE: usize = 1024;
const NUM_SHREDS: usize = 32;
let shreds = vec![vec![0; SHRED_SIZE]; NUM_SHREDS];
let seeds = vec![[0u8; 32]; NUM_SHREDS];
let shreds = vec![Shred::new_empty_data_shred(); NUM_SHREDS];
let mut stakes = HashMap::new();
const NUM_PEERS: usize = 200;
for _ in 0..NUM_PEERS {
@ -31,15 +32,18 @@ fn broadcast_shreds_bench(bencher: &mut Bencher) {
stakes.insert(id, thread_rng().gen_range(1, NUM_PEERS) as u64);
}
let stakes = Arc::new(stakes);
let cluster_info = Arc::new(RwLock::new(cluster_info));
let (peers, peers_and_stakes) = get_broadcast_peers(&cluster_info, Some(stakes.clone()));
let shreds = Arc::new(shreds);
bencher.iter(move || {
let shreds = shreds.clone();
cluster_info
.broadcast_shreds(
broadcast_shreds(
&socket,
shreds,
&seeds,
Some(stakes.clone()),
&shreds,
&peers_and_stakes,
&peers,
&mut Instant::now(),
&mut 0,
)
.unwrap();
});

View File

@ -4,6 +4,9 @@ use self::{
fail_entry_verification_broadcast_run::FailEntryVerificationBroadcastRun,
standard_broadcast_run::StandardBroadcastRun,
};
use crate::contact_info::ContactInfo;
use crate::crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS;
use crate::weighted_shuffle::weighted_best;
use crate::{
cluster_info::{ClusterInfo, ClusterInfoError},
poh_recorder::WorkingBankEntry,
@ -14,9 +17,13 @@ use crossbeam_channel::{
Sender as CrossbeamSender,
};
use solana_ledger::{blockstore::Blockstore, shred::Shred, staking_utils};
use solana_measure::measure::Measure;
use solana_metrics::{inc_new_counter_error, inc_new_counter_info};
use solana_runtime::bank::Bank;
use solana_sdk::timing::duration_as_s;
use solana_sdk::timing::timestamp;
use solana_sdk::{clock::Slot, pubkey::Pubkey};
use solana_streamer::sendmmsg::send_mmsg;
use std::{
collections::HashMap,
net::UdpSocket,
@ -328,6 +335,84 @@ impl BroadcastStage {
}
}
fn update_peer_stats(num_live_peers: i64, broadcast_len: i64, last_datapoint_submit: &mut Instant) {
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", num_live_peers, i64),
("broadcast_count", broadcast_len, i64)
);
*last_datapoint_submit = Instant::now();
}
}
pub fn get_broadcast_peers<S: std::hash::BuildHasher>(
cluster_info: &Arc<RwLock<ClusterInfo>>,
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
use crate::cluster_info;
let mut peers = cluster_info.read().unwrap().tvu_peers();
let peers_and_stakes = cluster_info::stake_weight_peers(&mut peers, stakes);
(peers, peers_and_stakes)
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast_shreds(
s: &UdpSocket,
shreds: &Arc<Vec<Shred>>,
peers_and_stakes: &[(u64, usize)],
peers: &[ContactInfo],
last_datapoint_submit: &mut Instant,
send_mmsg_total: &mut u64,
) -> Result<()> {
let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 {
update_peer_stats(1, 1, last_datapoint_submit);
return Ok(());
}
let packets: Vec<_> = shreds
.iter()
.map(|shred| {
let broadcast_index = weighted_best(&peers_and_stakes, shred.seed());
(&shred.payload, &peers[broadcast_index].tvu)
})
.collect();
let mut sent = 0;
let mut send_mmsg_time = Measure::start("send_mmsg");
while sent < packets.len() {
match send_mmsg(s, &packets[sent..]) {
Ok(n) => sent += n,
Err(e) => {
return Err(Error::IO(e));
}
}
}
send_mmsg_time.stop();
*send_mmsg_total += send_mmsg_time.as_us();
let num_live_peers = num_live_peers(&peers);
update_peer_stats(
num_live_peers,
broadcast_len as i64 + 1,
last_datapoint_submit,
);
Ok(())
}
fn num_live_peers(peers: &[ContactInfo]) -> i64 {
let mut num_live_peers = 1i64;
peers.iter().for_each(|p| {
// A peer is considered live if they generated their contact info recently
if timestamp() - p.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
num_live_peers += 1;
}
});
num_live_peers
}
#[cfg(test)]
pub mod test {
use super::*;

View File

@ -78,16 +78,19 @@ impl BroadcastRun for FailEntryVerificationBroadcastRun {
sock: &UdpSocket,
) -> Result<()> {
let (stakes, shreds) = receiver.lock().unwrap().recv()?;
let all_seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
// Broadcast data
let all_shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
cluster_info.read().unwrap().broadcast_shreds(
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let mut send_mmsg_total = 0;
broadcast_shreds(
sock,
all_shred_bufs,
&all_seeds,
stakes,
&shreds,
&peers_and_stakes,
&peers,
&mut Instant::now(),
&mut send_mmsg_total,
)?;
Ok(())
}
fn record(

View File

@ -17,20 +17,17 @@ struct BroadcastStats {
broadcast_elapsed: u64,
receive_elapsed: u64,
seed_elapsed: u64,
send_mmsg_elapsed: u64,
}
impl BroadcastStats {
fn reset(&mut self) {
self.insert_shreds_elapsed = 0;
self.shredding_elapsed = 0;
self.broadcast_elapsed = 0;
self.receive_elapsed = 0;
self.seed_elapsed = 0;
*self = Self::default();
}
}
#[derive(Clone)]
pub(super) struct StandardBroadcastRun {
pub struct StandardBroadcastRun {
stats: Arc<RwLock<BroadcastStats>>,
unfinished_slot: Option<UnfinishedSlotInfo>,
current_slot_and_parent: Option<(u64, u64)>,
@ -258,20 +255,22 @@ impl StandardBroadcastRun {
shreds: Arc<Vec<Shred>>,
) -> Result<()> {
let seed_start = Instant::now();
let seeds: Vec<[u8; 32]> = shreds.iter().map(|s| s.seed()).collect();
let seed_elapsed = seed_start.elapsed();
// Broadcast the shreds
let broadcast_start = Instant::now();
let shred_bufs: Vec<Vec<u8>> = shreds.to_vec().into_iter().map(|s| s.payload).collect();
trace!("Broadcasting {:?} shreds", shred_bufs.len());
trace!("Broadcasting {:?} shreds", shreds.len());
cluster_info.read().unwrap().broadcast_shreds(
let (peers, peers_and_stakes) = get_broadcast_peers(cluster_info, stakes);
let mut send_mmsg_total = 0;
broadcast_shreds(
sock,
shred_bufs,
&seeds,
stakes,
&shreds,
&peers_and_stakes,
&peers,
&mut self.last_datapoint_submit,
&mut send_mmsg_total,
)?;
let broadcast_elapsed = broadcast_start.elapsed();
@ -279,6 +278,7 @@ impl StandardBroadcastRun {
self.update_broadcast_stats(BroadcastStats {
broadcast_elapsed: duration_as_us(&broadcast_elapsed),
seed_elapsed: duration_as_us(&seed_elapsed),
send_mmsg_elapsed: send_mmsg_total,
..BroadcastStats::default()
});
Ok(())
@ -291,6 +291,7 @@ impl StandardBroadcastRun {
wstats.insert_shreds_elapsed += stats.insert_shreds_elapsed;
wstats.broadcast_elapsed += stats.broadcast_elapsed;
wstats.seed_elapsed += stats.seed_elapsed;
wstats.send_mmsg_elapsed += stats.send_mmsg_elapsed;
}
fn report_and_reset_stats(&mut self) {
@ -303,6 +304,7 @@ impl StandardBroadcastRun {
("insertion_time", stats.insert_shreds_elapsed as i64, i64),
("broadcast_time", stats.broadcast_elapsed as i64, i64),
("receive_time", stats.receive_elapsed as i64, i64),
("send_mmsg", stats.send_mmsg_elapsed as i64, i64),
("seed", stats.seed_elapsed as i64, i64),
(
"num_shreds",

View File

@ -22,7 +22,7 @@ use crate::{
},
epoch_slots::EpochSlots,
result::{Error, Result},
weighted_shuffle::{weighted_best, weighted_shuffle},
weighted_shuffle::weighted_shuffle,
};
use bincode::{serialize, serialized_size};
use core::cmp;
@ -43,7 +43,6 @@ use solana_perf::packet::{
};
use solana_rayon_threadlimit::get_thread_count;
use solana_sdk::hash::Hash;
use solana_sdk::timing::duration_as_s;
use solana_sdk::{
clock::{Slot, DEFAULT_MS_PER_SLOT, DEFAULT_SLOTS_PER_EPOCH},
pubkey::Pubkey,
@ -51,7 +50,7 @@ use solana_sdk::{
timing::{duration_as_ms, timestamp},
transaction::Transaction,
};
use solana_streamer::sendmmsg::{multicast, send_mmsg};
use solana_streamer::sendmmsg::multicast;
use solana_streamer::streamer::{PacketReceiver, PacketSender};
use std::{
borrow::Cow,
@ -934,77 +933,6 @@ impl ClusterInfo {
.collect()
}
fn sorted_tvu_peers_and_stakes(
&self,
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
) -> (Vec<ContactInfo>, Vec<(u64, usize)>) {
let mut peers = self.tvu_peers();
peers.dedup();
let stakes_and_index = ClusterInfo::sorted_stakes_with_index(&peers, stakes);
(peers, stakes_and_index)
}
/// broadcast messages from the leader to layer 1 nodes
/// # Remarks
pub fn broadcast_shreds(
&self,
s: &UdpSocket,
shreds: Vec<Vec<u8>>,
seeds: &[[u8; 32]],
stakes: Option<Arc<HashMap<Pubkey, u64>>>,
last_datapoint_submit: &mut Instant,
) -> Result<()> {
let (peers, peers_and_stakes) = self.sorted_tvu_peers_and_stakes(stakes);
let broadcast_len = peers_and_stakes.len();
if broadcast_len == 0 {
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", 1, i64),
("broadcast_count", 1, i64)
);
*last_datapoint_submit = Instant::now();
}
return Ok(());
}
let mut packets: Vec<_> = shreds
.into_iter()
.zip(seeds)
.map(|(shred, seed)| {
let broadcast_index = weighted_best(&peers_and_stakes, *seed);
(shred, &peers[broadcast_index].tvu)
})
.collect();
let mut sent = 0;
while sent < packets.len() {
match send_mmsg(s, &mut packets[sent..]) {
Ok(n) => sent += n,
Err(e) => {
return Err(Error::IO(e));
}
}
}
let mut num_live_peers = 1i64;
peers.iter().for_each(|p| {
// A peer is considered live if they generated their contact info recently
if timestamp() - p.wallclock <= CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS {
num_live_peers += 1;
}
});
if duration_as_s(&Instant::now().duration_since(*last_datapoint_submit)) >= 1.0 {
datapoint_info!(
"cluster_info-num_nodes",
("live_count", num_live_peers, i64),
("broadcast_count", broadcast_len + 1, i64)
);
*last_datapoint_submit = Instant::now();
}
Ok(())
}
/// retransmit messages to a list of nodes
/// # Remarks
/// We need to avoid having obj locked while doing a io, such as the `send_to`
@ -1942,6 +1870,14 @@ fn report_time_spent(label: &str, time: &Duration, extra: &str) {
}
}
pub fn stake_weight_peers<S: std::hash::BuildHasher>(
peers: &mut Vec<ContactInfo>,
stakes: Option<Arc<HashMap<Pubkey, u64, S>>>,
) -> Vec<(u64, usize)> {
peers.dedup();
ClusterInfo::sorted_stakes_with_index(peers, stakes)
}
#[cfg(test)]
mod tests {
use super::*;
@ -2480,7 +2416,8 @@ mod tests {
stakes.insert(id4, 10);
let stakes = Arc::new(stakes);
let (peers, peers_and_stakes) = cluster_info.sorted_tvu_peers_and_stakes(Some(stakes));
let mut peers = cluster_info.tvu_peers();
let peers_and_stakes = stake_weight_peers(&mut peers, Some(stakes));
assert_eq!(peers.len(), 2);
assert_eq!(peers[0].id, id);
assert_eq!(peers[1].id, id2);

View File

@ -4,7 +4,7 @@ use std::io;
use std::net::{SocketAddr, UdpSocket};
#[cfg(not(target_os = "linux"))]
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
let count = packets.len();
for (p, a) in packets {
sock.send_to(p, *a)?;
@ -18,7 +18,7 @@ use libc::{iovec, mmsghdr, sockaddr_in, sockaddr_in6};
#[cfg(target_os = "linux")]
fn mmsghdr_for_packet(
packet: &mut [u8],
packet: &[u8],
dest: &SocketAddr,
index: usize,
addr_in_len: u32,
@ -32,7 +32,7 @@ fn mmsghdr_for_packet(
use std::mem;
iovs.push(iovec {
iov_base: packet.as_mut_ptr() as *mut c_void,
iov_base: packet.as_ptr() as *mut c_void,
iov_len: packet.len(),
});
@ -57,7 +57,7 @@ fn mmsghdr_for_packet(
}
#[cfg(target_os = "linux")]
pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
pub fn send_mmsg(sock: &UdpSocket, packets: &[(&Vec<u8>, &SocketAddr)]) -> io::Result<usize> {
use libc::{sendmmsg, socklen_t};
use std::mem;
use std::os::unix::io::AsRawFd;
@ -73,7 +73,7 @@ pub fn send_mmsg(sock: &UdpSocket, packets: &mut [(Vec<u8>, &SocketAddr)]) -> io
let sock_fd = sock.as_raw_fd();
let mut hdrs: Vec<mmsghdr> = packets
.iter_mut()
.iter()
.enumerate()
.map(|(i, (packet, dest))| {
mmsghdr_for_packet(
@ -160,11 +160,10 @@ mod tests {
let addr = reader.local_addr().unwrap();
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packets: Vec<_> = (0..32)
.map(|_| (vec![0u8; PACKET_DATA_SIZE], &addr))
.collect();
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
let packet_refs: Vec<_> = packets.iter().map(|p| (p, &addr)).collect();
let sent = send_mmsg(&sender, &mut packets).ok();
let sent = send_mmsg(&sender, &packet_refs).ok();
assert_eq!(sent, Some(32));
let mut packets = vec![Packet::default(); 32];
@ -182,17 +181,14 @@ mod tests {
let sender = UdpSocket::bind("127.0.0.1:0").expect("bind");
let mut packets: Vec<_> = (0..32)
.map(|i| {
if i < 16 {
(vec![0u8; PACKET_DATA_SIZE], &addr)
} else {
(vec![0u8; PACKET_DATA_SIZE], &addr2)
}
})
let packets: Vec<_> = (0..32).map(|_| vec![0u8; PACKET_DATA_SIZE]).collect();
let packet_refs: Vec<_> = packets
.iter()
.enumerate()
.map(|(i, p)| if i < 16 { (p, &addr) } else { (p, &addr2) })
.collect();
let sent = send_mmsg(&sender, &mut packets).ok();
let sent = send_mmsg(&sender, &packet_refs).ok();
assert_eq!(sent, Some(32));
let mut packets = vec![Packet::default(); 32];