Add counters for channel pressure and time spent in TPU pipeline (#1324)

* Add counters for channel pressure and time spent in TPU pipeline

* Fixed failing tests

* Fix rust format issue
This commit is contained in:
Pankaj Garg 2018-09-24 17:13:49 -07:00 committed by GitHub
parent 747ba6a8d3
commit 0f4fd8367d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 73 additions and 12 deletions

View File

@ -172,6 +172,7 @@ impl BankingStage {
let recv_start = Instant::now();
let mms = verified_receiver.recv_timeout(timer)?;
debug!("verified_recevier {:?}", verified_receiver);
let now = Instant::now();
let mut reqs_len = 0;
let mms_len = mms.len();
info!(
@ -180,6 +181,7 @@ impl BankingStage {
timing::duration_as_ms(&recv_start.elapsed()),
mms.len(),
);
inc_new_counter_info!("banking_stage-entries_received", mms_len);
let bank_starting_tx_count = bank.transaction_count();
let count = mms.iter().map(|x| x.1.len()).sum();
let proc_start = Instant::now();
@ -208,6 +210,10 @@ impl BankingStage {
)?;
}
inc_new_counter_info!(
"banking_stage-time_ms",
timing::duration_as_ms(&now.elapsed()) as usize
);
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
info!(

View File

@ -85,7 +85,12 @@ fn main() -> Result<()> {
let (s_reader, r_reader) = channel();
read_channels.push(r_reader);
read_threads.push(receiver(Arc::new(read), exit.clone(), s_reader));
read_threads.push(receiver(
Arc::new(read),
exit.clone(),
s_reader,
"bench-streamer",
));
}
let t_producer1 = producer(&addr, &pack_recycler, exit.clone());

View File

@ -39,6 +39,7 @@ fn broadcast(
let id = node_info.id;
let timer = Duration::new(1, 0);
let entries = receiver.recv_timeout(timer)?;
let now = Instant::now();
let mut num_entries = entries.len();
let mut ventries = Vec::new();
ventries.push(entries);
@ -46,6 +47,7 @@ fn broadcast(
num_entries += entries.len();
ventries.push(entries);
}
inc_new_counter_info!("broadcast_stage-entries_received", num_entries);
let to_blobs_start = Instant::now();
let dq: SharedBlobs = ventries
@ -128,6 +130,10 @@ fn broadcast(
}
let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed());
inc_new_counter_info!(
"broadcast_stage-time_ms",
duration_as_ms(&now.elapsed()) as usize
);
info!(
"broadcast: {} entries, blob time {} chunking time {} broadcast time {}",
num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed

View File

@ -25,7 +25,7 @@ impl FetchStage {
let (sender, receiver) = channel();
let thread_hdls: Vec<_> = sockets
.into_iter()
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone()))
.map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage"))
.collect();
(FetchStage { exit, thread_hdls }, receiver)

View File

@ -1,6 +1,8 @@
//! The `request_stage` processes thin client Request messages.
use bincode::deserialize;
use counter::Counter;
use log::Level;
use packet::{to_blobs, BlobRecycler, Packets, SharedPackets};
use rayon::prelude::*;
use request::Request;
@ -8,6 +10,7 @@ use request_processor::RequestProcessor;
use result::{Error, Result};
use service::Service;
use std::net::SocketAddr;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError};
use std::sync::Arc;
use std::thread::{self, Builder, JoinHandle};
@ -37,7 +40,7 @@ impl RequestStage {
blob_sender: &BlobSender,
blob_recycler: &BlobRecycler,
) -> Result<()> {
let (batch, batch_len) = streamer::recv_batch(packet_receiver)?;
let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?;
debug!(
"@{:?} request_stage: processing: {}",
@ -65,6 +68,7 @@ impl RequestStage {
}
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
inc_new_counter_info!("request_stage-time_ms", total_time_ms as usize);
debug!(
"@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}",
timing::timestamp(),

View File

@ -44,7 +44,12 @@ impl Rpu {
pub fn new(bank: &Arc<Bank>, requests_socket: UdpSocket, respond_socket: UdpSocket) -> Self {
let exit = Arc::new(AtomicBool::new(false));
let (packet_sender, packet_receiver) = channel();
let t_receiver = streamer::receiver(Arc::new(requests_socket), exit.clone(), packet_sender);
let t_receiver = streamer::receiver(
Arc::new(requests_socket),
exit.clone(),
packet_sender,
"rpu",
);
let request_processor = RequestProcessor::new(bank.clone());
let (request_stage, blob_receiver) = RequestStage::new(request_processor, packet_receiver);

View File

@ -5,13 +5,16 @@
//! transaction. All processing is done on the CPU by default and on a GPU
//! if the `cuda` feature is enabled with `--features=cuda`.
use counter::Counter;
use influx_db_client as influxdb;
use log::Level;
use metrics;
use packet::SharedPackets;
use rand::{thread_rng, Rng};
use result::{Error, Result};
use service::Service;
use sigverify;
use std::sync::atomic::AtomicUsize;
use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, spawn, JoinHandle};
@ -51,8 +54,9 @@ impl SigVerifyStage {
sendr: &Arc<Mutex<Sender<VerifiedPackets>>>,
sigverify_disabled: bool,
) -> Result<()> {
let (batch, len) =
let (batch, len, recv_time) =
streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?;
inc_new_counter_info!("sigverify_stage-entries_received", len);
let now = Instant::now();
let batch_len = batch.len();
@ -65,6 +69,10 @@ impl SigVerifyStage {
);
let verified_batch = Self::verify_batch(batch, sigverify_disabled);
inc_new_counter_info!(
"sigverify_stage-verified_entries_send",
verified_batch.len()
);
if sendr
.lock()
@ -77,6 +85,10 @@ impl SigVerifyStage {
let total_time_ms = timing::duration_as_ms(&now.elapsed());
let total_time_s = timing::duration_as_s(&now.elapsed());
inc_new_counter_info!(
"sigverify_stage-time_ms",
(total_time_ms + recv_time) as usize
);
info!(
"@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}",
timing::timestamp(),

View File

@ -1,5 +1,7 @@
//! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets.
//!
use influx_db_client as influxdb;
use metrics;
use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets};
use result::{Error, Result};
use std::net::UdpSocket;
@ -7,7 +9,8 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender};
use std::sync::Arc;
use std::thread::{Builder, JoinHandle};
use std::time::Duration;
use std::time::{Duration, Instant};
use timing::duration_as_ms;
pub type PacketReceiver = Receiver<SharedPackets>;
pub type PacketSender = Sender<SharedPackets>;
@ -19,6 +22,7 @@ fn recv_loop(
exit: &Arc<AtomicBool>,
re: &PacketRecycler,
channel: &PacketSender,
channel_tag: &'static str,
) -> Result<()> {
loop {
let msgs = re.allocate();
@ -26,6 +30,12 @@ fn recv_loop(
let result = msgs.write().recv_from(sock);
match result {
Ok(()) => {
let len = msgs.read().packets.len();
metrics::submit(
influxdb::Point::new(channel_tag)
.add_field("count", influxdb::Value::Integer(len as i64))
.to_owned(),
);
channel.send(msgs)?;
break;
}
@ -43,6 +53,7 @@ pub fn receiver(
sock: Arc<UdpSocket>,
exit: Arc<AtomicBool>,
packet_sender: PacketSender,
sender_tag: &'static str,
) -> JoinHandle<()> {
let res = sock.set_read_timeout(Some(Duration::new(1, 0)));
let recycler = PacketRecycler::default();
@ -52,7 +63,7 @@ pub fn receiver(
Builder::new()
.name("solana-receiver".to_string())
.spawn(move || {
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender);
let _ = recv_loop(&sock, &exit, &recycler, &packet_sender, sender_tag);
()
}).unwrap()
}
@ -64,9 +75,10 @@ fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> {
Ok(())
}
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)> {
pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize, u64)> {
let timer = Duration::new(1, 0);
let msgs = recvr.recv_timeout(timer)?;
let recv_start = Instant::now();
trace!("got msgs");
let mut len = msgs.read().packets.len();
let mut batch = vec![msgs];
@ -80,7 +92,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec<SharedPackets>, usize)>
}
}
trace!("batch len {}", batch.len());
Ok((batch, len))
Ok((batch, len, duration_as_ms(&recv_start.elapsed())))
}
pub fn responder(name: &'static str, sock: Arc<UdpSocket>, r: BlobReceiver) -> JoinHandle<()> {
@ -166,7 +178,7 @@ mod test {
let exit = Arc::new(AtomicBool::new(false));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader);
let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader, "streamer-test");
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -450,7 +450,12 @@ mod test {
let exit = Arc::new(AtomicBool::new(false));
let resp_recycler = BlobRecycler::default();
let (s_reader, r_reader) = channel();
let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader);
let t_receiver = receiver(
Arc::new(read),
exit.clone(),
s_reader,
"window-streamer-test",
);
let t_responder = {
let (s_responder, r_responder) = channel();
let t_responder = responder("streamer_send_test", Arc::new(send), r_responder);

View File

@ -96,6 +96,7 @@ impl WriteStage {
) -> Result<()> {
let mut ventries = Vec::new();
let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?;
let now = Instant::now();
let mut num_new_entries = 0;
let mut num_txs = 0;
@ -122,6 +123,7 @@ impl WriteStage {
break;
}
}
inc_new_counter_info!("write_stage-entries_received", num_new_entries);
info!("write_stage entries: {}", num_new_entries);
@ -159,13 +161,17 @@ impl WriteStage {
let blob_send_start = Instant::now();
if !entries.is_empty() {
inc_new_counter_info!("write_stage-recv_vote", votes.len());
inc_new_counter_info!("write_stage-broadcast_entries", entries.len());
inc_new_counter_info!("write_stage-entries_sent", entries.len());
trace!("broadcasting {}", entries.len());
entry_sender.send(entries)?;
}
blob_send_total += duration_as_ms(&blob_send_start.elapsed());
}
inc_new_counter_info!(
"write_stage-time_ms",
duration_as_ms(&now.elapsed()) as usize
);
info!("done write_stage txs: {} time {} ms txs/s: {} to_blobs_total: {} register_entry_total: {} blob_send_total: {} crdt_votes_total: {}",
num_txs, duration_as_ms(&start.elapsed()),
num_txs as f32 / duration_as_s(&start.elapsed()),