Collect stats in streamer receiver and report fetch stage metrics (#25010)

This commit is contained in:
Justin Starry 2022-05-06 02:56:18 +08:00 committed by GitHub
parent e0092902ea
commit 7100f1c94b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 196 additions and 103 deletions

View File

@ -1,10 +1,11 @@
#![allow(clippy::integer_arithmetic)]
use {
clap::{crate_description, crate_name, Arg, Command},
crossbeam_channel::unbounded,
solana_streamer::{
packet::{Packet, PacketBatch, PacketBatchRecycler, PACKET_DATA_SIZE},
streamer::{receiver, PacketBatchReceiver},
streamer::{receiver, PacketBatchReceiver, StreamerReceiveStats},
},
std::{
cmp::max,
@ -97,6 +98,7 @@ fn main() -> Result<()> {
num_sockets,
)
.unwrap();
let stats = Arc::new(StreamerReceiveStats::new("bench-streamer-test"));
for read in read_sockets {
read.set_read_timeout(Some(Duration::new(1, 0))).unwrap();
@ -105,10 +107,10 @@ fn main() -> Result<()> {
read_channels.push(r_reader);
read_threads.push(receiver(
Arc::new(read),
&exit,
exit.clone(),
s_reader,
recycler.clone(),
"bench-streamer-test",
stats.clone(),
1,
true,
));

View File

@ -23,7 +23,7 @@ use {
pubkey::Pubkey,
timing::timestamp,
},
solana_streamer::streamer::{self, PacketBatchReceiver},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
collections::HashSet,
net::UdpSocket,
@ -149,10 +149,12 @@ impl AncestorHashesService {
let (response_sender, response_receiver) = unbounded();
let t_receiver = streamer::receiver(
ancestor_hashes_request_socket.clone(),
&exit,
exit.clone(),
response_sender,
Recycler::default(),
"ancestor_hashes_response_receiver",
Arc::new(StreamerReceiveStats::new(
"ancestor_hashes_response_receiver",
)),
1,
false,
);
@ -910,10 +912,12 @@ mod test {
// Set up response threads
let t_request_receiver = streamer::receiver(
Arc::new(responder_node.sockets.serve_repair),
&exit,
exit.clone(),
requests_sender,
Recycler::default(),
"serve_repair_receiver",
Arc::new(StreamerReceiveStats::new(
"ancestor_hashes_response_receiver",
)),
1,
false,
);

View File

@ -13,11 +13,17 @@ use {
clock::DEFAULT_TICKS_PER_SLOT,
packet::{Packet, PacketFlags},
},
solana_streamer::streamer::{self, PacketBatchReceiver, PacketBatchSender},
solana_streamer::streamer::{
self, PacketBatchReceiver, PacketBatchSender, StreamerReceiveStats,
},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, Mutex},
thread::{self, Builder, JoinHandle},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread::{self, sleep, Builder, JoinHandle},
time::Duration,
},
};
@ -132,42 +138,54 @@ impl FetchStage {
) -> Self {
let recycler: PacketBatchRecycler = Recycler::warmed(1000, 1024);
let tpu_threads = tpu_sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
exit,
sender.clone(),
recycler.clone(),
"fetch_stage",
coalesce_ms,
true,
)
});
let tpu_stats = Arc::new(StreamerReceiveStats::new("tpu_receiver"));
let tpu_threads: Vec<_> = tpu_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
sender.clone(),
recycler.clone(),
tpu_stats.clone(),
coalesce_ms,
true,
)
})
.collect();
let tpu_forward_stats = Arc::new(StreamerReceiveStats::new("tpu_forwards_receiver"));
let (forward_sender, forward_receiver) = unbounded();
let tpu_forwards_threads = tpu_forwards_sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
exit,
forward_sender.clone(),
recycler.clone(),
"fetch_forward_stage",
coalesce_ms,
true,
)
});
let tpu_forwards_threads: Vec<_> = tpu_forwards_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
forward_sender.clone(),
recycler.clone(),
tpu_forward_stats.clone(),
coalesce_ms,
true,
)
})
.collect();
let tpu_vote_threads = tpu_vote_sockets.into_iter().map(|socket| {
streamer::receiver(
socket,
exit,
vote_sender.clone(),
recycler.clone(),
"fetch_vote_stage",
coalesce_ms,
true,
)
});
let tpu_vote_stats = Arc::new(StreamerReceiveStats::new("tpu_vote_receiver"));
let tpu_vote_threads: Vec<_> = tpu_vote_sockets
.into_iter()
.map(|socket| {
streamer::receiver(
socket,
exit.clone(),
vote_sender.clone(),
recycler.clone(),
tpu_vote_stats.clone(),
coalesce_ms,
true,
)
})
.collect();
let sender = sender.clone();
let poh_recorder = poh_recorder.clone();
@ -189,12 +207,33 @@ impl FetchStage {
})
.unwrap();
let mut thread_hdls: Vec<_> = tpu_threads
.chain(tpu_forwards_threads)
.chain(tpu_vote_threads)
.collect();
thread_hdls.push(fwd_thread_hdl);
Self { thread_hdls }
let exit = exit.clone();
let metrics_thread_hdl = Builder::new()
.name("solana-fetch-stage-metrics".to_string())
.spawn(move || loop {
sleep(Duration::from_secs(1));
tpu_stats.report();
tpu_vote_stats.report();
tpu_forward_stats.report();
if exit.load(Ordering::Relaxed) {
return;
}
})
.unwrap();
Self {
thread_hdls: [
tpu_threads,
tpu_forwards_threads,
tpu_vote_threads,
vec![fwd_thread_hdl, metrics_thread_hdl],
]
.into_iter()
.flatten()
.collect(),
}
}
pub fn join(self) -> thread::Result<()> {

View File

@ -3,7 +3,10 @@ use {
crossbeam_channel::{unbounded, Sender},
solana_ledger::blockstore::Blockstore,
solana_perf::recycler::Recycler,
solana_streamer::{socket::SocketAddrSpace, streamer},
solana_streamer::{
socket::SocketAddrSpace,
streamer::{self, StreamerReceiveStats},
},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
@ -33,10 +36,10 @@ impl ServeRepairService {
);
let t_receiver = streamer::receiver(
serve_repair_socket.clone(),
exit,
exit.clone(),
request_sender,
Recycler::default(),
"serve_repair_receiver",
Arc::new(StreamerReceiveStats::new("serve_repair_receiver")),
1,
false,
);

View File

@ -12,7 +12,7 @@ use {
},
solana_runtime::bank_forks::BankForks,
solana_sdk::clock::{Slot, DEFAULT_MS_PER_SLOT},
solana_streamer::streamer::{self, PacketBatchReceiver},
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
@ -136,10 +136,10 @@ impl ShredFetchStage {
.map(|s| {
streamer::receiver(
s,
exit,
exit.clone(),
packet_sender.clone(),
recycler.clone(),
"packet_modifier",
Arc::new(StreamerReceiveStats::new("packet_modifier")),
1,
true,
)

View File

@ -11,7 +11,10 @@ use {
pubkey::Pubkey,
signature::{Keypair, Signer},
},
solana_streamer::{socket::SocketAddrSpace, streamer},
solana_streamer::{
socket::SocketAddrSpace,
streamer::{self, StreamerReceiveStats},
},
std::{
collections::HashSet,
net::{SocketAddr, TcpListener, UdpSocket},
@ -48,10 +51,10 @@ impl GossipService {
let socket_addr_space = *cluster_info.socket_addr_space();
let t_receiver = streamer::receiver(
gossip_socket.clone(),
exit,
exit.clone(),
request_sender,
Recycler::default(),
"gossip_receiver",
Arc::new(StreamerReceiveStats::new("gossip_receiver")),
1,
false,
);

View File

@ -4,7 +4,6 @@
use {
crate::{
packet::{self, PacketBatch, PacketBatchRecycler, PACKETS_PER_BATCH},
recvmmsg::NUM_RCVMMSGS,
sendmmsg::{batch_send, SendPktsError},
socket::SocketAddrSpace,
},
@ -16,7 +15,7 @@ use {
collections::HashMap,
net::{IpAddr, UdpSocket},
sync::{
atomic::{AtomicBool, Ordering},
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
thread::{Builder, JoinHandle},
@ -43,24 +42,66 @@ pub enum StreamerError {
SendPktsError(#[from] SendPktsError),
}
pub struct StreamerReceiveStats {
pub name: &'static str,
pub packets_count: AtomicUsize,
pub packet_batches_count: AtomicUsize,
pub full_packet_batches_count: AtomicUsize,
pub max_channel_len: AtomicUsize,
}
impl StreamerReceiveStats {
pub fn new(name: &'static str) -> Self {
Self {
name,
packets_count: AtomicUsize::default(),
packet_batches_count: AtomicUsize::default(),
full_packet_batches_count: AtomicUsize::default(),
max_channel_len: AtomicUsize::default(),
}
}
pub fn report(&self) {
datapoint_info!(
self.name,
(
"packets_count",
self.packets_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"packet_batches_count",
self.packet_batches_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"full_packet_batches_count",
self.full_packet_batches_count.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"channel_len",
self.max_channel_len.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
pub type Result<T> = std::result::Result<T, StreamerError>;
fn recv_loop(
sock: &UdpSocket,
socket: &UdpSocket,
exit: Arc<AtomicBool>,
channel: &PacketBatchSender,
packet_batch_sender: &PacketBatchSender,
recycler: &PacketBatchRecycler,
name: &'static str,
stats: &StreamerReceiveStats,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> Result<()> {
let mut recv_count = 0;
let mut call_count = 0;
let mut now = Instant::now();
let mut num_max_received = 0; // Number of times maximum packets were received
loop {
let mut packet_batch = if use_pinned_memory {
PacketBatch::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, name)
PacketBatch::new_with_recycler(recycler.clone(), PACKETS_PER_BATCH, stats.name)
} else {
PacketBatch::with_capacity(PACKETS_PER_BATCH)
};
@ -70,55 +111,51 @@ fn recv_loop(
if exit.load(Ordering::Relaxed) {
return Ok(());
}
if let Ok(len) = packet::recv_from(&mut packet_batch, sock, coalesce_ms) {
if len == NUM_RCVMMSGS {
num_max_received += 1;
}
recv_count += len;
call_count += 1;
if let Ok(len) = packet::recv_from(&mut packet_batch, socket, coalesce_ms) {
if len > 0 {
channel.send(packet_batch)?;
let StreamerReceiveStats {
packets_count,
packet_batches_count,
full_packet_batches_count,
max_channel_len,
..
} = stats;
packets_count.fetch_add(len, Ordering::Relaxed);
packet_batches_count.fetch_add(1, Ordering::Relaxed);
max_channel_len.fetch_max(packet_batch_sender.len(), Ordering::Relaxed);
if len == PACKETS_PER_BATCH {
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
}
packet_batch_sender.send(packet_batch)?;
}
break;
}
}
if recv_count > 1024 {
datapoint_debug!(
name,
("received", recv_count as i64, i64),
("call_count", i64::from(call_count), i64),
("elapsed", now.elapsed().as_millis() as i64, i64),
("max_received", i64::from(num_max_received), i64),
);
recv_count = 0;
call_count = 0;
num_max_received = 0;
}
now = Instant::now();
}
}
pub fn receiver(
sock: Arc<UdpSocket>,
exit: &Arc<AtomicBool>,
packet_sender: PacketBatchSender,
socket: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
packet_batch_sender: PacketBatchSender,
recycler: PacketBatchRecycler,
name: &'static str,
stats: Arc<StreamerReceiveStats>,
coalesce_ms: u64,
use_pinned_memory: bool,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
let exit = exit.clone();
Builder::new()
.name("solana-receiver".to_string())
.spawn(move || {
let _ = recv_loop(
&sock,
&socket,
exit,
&packet_sender,
&recycler.clone(),
name,
&packet_batch_sender,
&recycler,
&stats,
coalesce_ms,
use_pinned_memory,
);
@ -405,15 +442,17 @@ mod test {
let send = UdpSocket::bind("127.0.0.1:0").expect("bind");
let exit = Arc::new(AtomicBool::new(false));
let (s_reader, r_reader) = unbounded();
let stats = Arc::new(StreamerReceiveStats::new("test"));
let t_receiver = receiver(
Arc::new(read),
&exit,
exit.clone(),
s_reader,
Recycler::default(),
"test",
stats.clone(),
1,
true,
);
const NUM_PACKETS: usize = 5;
let t_responder = {
let (s_responder, r_responder) = unbounded();
let t_responder = responder(
@ -424,7 +463,7 @@ mod test {
None,
);
let mut packet_batch = PacketBatch::default();
for i in 0..5 {
for i in 0..NUM_PACKETS {
let mut p = Packet::default();
{
p.data[0] = i as u8;
@ -437,10 +476,13 @@ mod test {
t_responder
};
let mut packets_remaining = 5;
let mut packets_remaining = NUM_PACKETS;
get_packet_batches(r_reader, &mut packets_remaining);
assert_eq!(packets_remaining, 0);
exit.store(true, Ordering::Relaxed);
assert_eq!(stats.packets_count.load(Ordering::Relaxed), NUM_PACKETS);
assert_eq!(stats.packet_batches_count.load(Ordering::Relaxed), 1);
assert_eq!(stats.full_packet_batches_count.load(Ordering::Relaxed), 0);
t_receiver.join().expect("join");
t_responder.join().expect("join");
}