From 0f4fd8367debea1526b8ca2f8db642d439569aaf Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Mon, 24 Sep 2018 17:13:49 -0700 Subject: [PATCH] Add counters for channel pressure and time spent in TPU pipeline (#1324) * Add counters for channel pressure and time spent in TPU pipeline * Fixed failing tests * Fix rust format issue --- src/banking_stage.rs | 6 ++++++ src/bin/bench-streamer.rs | 7 ++++++- src/broadcast_stage.rs | 6 ++++++ src/fetch_stage.rs | 2 +- src/request_stage.rs | 6 +++++- src/rpu.rs | 7 ++++++- src/sigverify_stage.rs | 14 +++++++++++++- src/streamer.rs | 22 +++++++++++++++++----- src/window.rs | 7 ++++++- src/write_stage.rs | 8 +++++++- 10 files changed, 73 insertions(+), 12 deletions(-) diff --git a/src/banking_stage.rs b/src/banking_stage.rs index bfa9478b4d..e1594a3a3d 100644 --- a/src/banking_stage.rs +++ b/src/banking_stage.rs @@ -172,6 +172,7 @@ impl BankingStage { let recv_start = Instant::now(); let mms = verified_receiver.recv_timeout(timer)?; debug!("verified_recevier {:?}", verified_receiver); + let now = Instant::now(); let mut reqs_len = 0; let mms_len = mms.len(); info!( @@ -180,6 +181,7 @@ impl BankingStage { timing::duration_as_ms(&recv_start.elapsed()), mms.len(), ); + inc_new_counter_info!("banking_stage-entries_received", mms_len); let bank_starting_tx_count = bank.transaction_count(); let count = mms.iter().map(|x| x.1.len()).sum(); let proc_start = Instant::now(); @@ -208,6 +210,10 @@ impl BankingStage { )?; } + inc_new_counter_info!( + "banking_stage-time_ms", + timing::duration_as_ms(&now.elapsed()) as usize + ); let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); info!( diff --git a/src/bin/bench-streamer.rs b/src/bin/bench-streamer.rs index 7e58d23e0e..5189c652b3 100644 --- a/src/bin/bench-streamer.rs +++ b/src/bin/bench-streamer.rs @@ -85,7 +85,12 @@ fn main() -> Result<()> { let (s_reader, r_reader) = channel(); read_channels.push(r_reader); - read_threads.push(receiver(Arc::new(read), exit.clone(), s_reader)); + read_threads.push(receiver( + Arc::new(read), + exit.clone(), + s_reader, + "bench-streamer", + )); } let t_producer1 = producer(&addr, &pack_recycler, exit.clone()); diff --git a/src/broadcast_stage.rs b/src/broadcast_stage.rs index a0b0e3db92..7b998af24c 100644 --- a/src/broadcast_stage.rs +++ b/src/broadcast_stage.rs @@ -39,6 +39,7 @@ fn broadcast( let id = node_info.id; let timer = Duration::new(1, 0); let entries = receiver.recv_timeout(timer)?; + let now = Instant::now(); let mut num_entries = entries.len(); let mut ventries = Vec::new(); ventries.push(entries); @@ -46,6 +47,7 @@ fn broadcast( num_entries += entries.len(); ventries.push(entries); } + inc_new_counter_info!("broadcast_stage-entries_received", num_entries); let to_blobs_start = Instant::now(); let dq: SharedBlobs = ventries @@ -128,6 +130,10 @@ fn broadcast( } let broadcast_elapsed = duration_as_ms(&broadcast_start.elapsed()); + inc_new_counter_info!( + "broadcast_stage-time_ms", + duration_as_ms(&now.elapsed()) as usize + ); info!( "broadcast: {} entries, blob time {} chunking time {} broadcast time {}", num_entries, to_blobs_elapsed, chunking_elapsed, broadcast_elapsed diff --git a/src/fetch_stage.rs b/src/fetch_stage.rs index f1f5615acb..34d6dc3f07 100644 --- a/src/fetch_stage.rs +++ b/src/fetch_stage.rs @@ -25,7 +25,7 @@ impl FetchStage { let (sender, receiver) = channel(); let thread_hdls: Vec<_> = sockets .into_iter() - .map(|socket| streamer::receiver(socket, exit.clone(), sender.clone())) + .map(|socket| streamer::receiver(socket, exit.clone(), sender.clone(), "fetch-stage")) .collect(); (FetchStage { exit, thread_hdls }, receiver) diff --git a/src/request_stage.rs b/src/request_stage.rs index 3afb53689b..77c905d49b 100644 --- a/src/request_stage.rs +++ b/src/request_stage.rs @@ -1,6 +1,8 @@ //! The `request_stage` processes thin client Request messages. use bincode::deserialize; +use counter::Counter; +use log::Level; use packet::{to_blobs, BlobRecycler, Packets, SharedPackets}; use rayon::prelude::*; use request::Request; @@ -8,6 +10,7 @@ use request_processor::RequestProcessor; use result::{Error, Result}; use service::Service; use std::net::SocketAddr; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError}; use std::sync::Arc; use std::thread::{self, Builder, JoinHandle}; @@ -37,7 +40,7 @@ impl RequestStage { blob_sender: &BlobSender, blob_recycler: &BlobRecycler, ) -> Result<()> { - let (batch, batch_len) = streamer::recv_batch(packet_receiver)?; + let (batch, batch_len, _recv_time) = streamer::recv_batch(packet_receiver)?; debug!( "@{:?} request_stage: processing: {}", @@ -65,6 +68,7 @@ impl RequestStage { } let total_time_s = timing::duration_as_s(&proc_start.elapsed()); let total_time_ms = timing::duration_as_ms(&proc_start.elapsed()); + inc_new_counter_info!("request_stage-time_ms", total_time_ms as usize); debug!( "@{:?} done process batches: {} time: {:?}ms reqs: {} reqs/s: {}", timing::timestamp(), diff --git a/src/rpu.rs b/src/rpu.rs index 766468cad9..4ed6893ffc 100644 --- a/src/rpu.rs +++ b/src/rpu.rs @@ -44,7 +44,12 @@ impl Rpu { pub fn new(bank: &Arc, requests_socket: UdpSocket, respond_socket: UdpSocket) -> Self { let exit = Arc::new(AtomicBool::new(false)); let (packet_sender, packet_receiver) = channel(); - let t_receiver = streamer::receiver(Arc::new(requests_socket), exit.clone(), packet_sender); + let t_receiver = streamer::receiver( + Arc::new(requests_socket), + exit.clone(), + packet_sender, + "rpu", + ); let request_processor = RequestProcessor::new(bank.clone()); let (request_stage, blob_receiver) = RequestStage::new(request_processor, packet_receiver); diff --git a/src/sigverify_stage.rs b/src/sigverify_stage.rs index 5e5972ec60..70ae88f2f3 100644 --- a/src/sigverify_stage.rs +++ b/src/sigverify_stage.rs @@ -5,13 +5,16 @@ //! transaction. All processing is done on the CPU by default and on a GPU //! if the `cuda` feature is enabled with `--features=cuda`. +use counter::Counter; use influx_db_client as influxdb; +use log::Level; use metrics; use packet::SharedPackets; use rand::{thread_rng, Rng}; use result::{Error, Result}; use service::Service; use sigverify; +use std::sync::atomic::AtomicUsize; use std::sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender}; use std::sync::{Arc, Mutex}; use std::thread::{self, spawn, JoinHandle}; @@ -51,8 +54,9 @@ impl SigVerifyStage { sendr: &Arc>>, sigverify_disabled: bool, ) -> Result<()> { - let (batch, len) = + let (batch, len, recv_time) = streamer::recv_batch(&recvr.lock().expect("'recvr' lock in fn verifier"))?; + inc_new_counter_info!("sigverify_stage-entries_received", len); let now = Instant::now(); let batch_len = batch.len(); @@ -65,6 +69,10 @@ impl SigVerifyStage { ); let verified_batch = Self::verify_batch(batch, sigverify_disabled); + inc_new_counter_info!( + "sigverify_stage-verified_entries_send", + verified_batch.len() + ); if sendr .lock() @@ -77,6 +85,10 @@ impl SigVerifyStage { let total_time_ms = timing::duration_as_ms(&now.elapsed()); let total_time_s = timing::duration_as_s(&now.elapsed()); + inc_new_counter_info!( + "sigverify_stage-time_ms", + (total_time_ms + recv_time) as usize + ); info!( "@{:?} verifier: done. batches: {} total verify time: {:?} id: {} verified: {} v/s {}", timing::timestamp(), diff --git a/src/streamer.rs b/src/streamer.rs index 825cd543d1..8783973c4e 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,5 +1,7 @@ //! The `streamer` module defines a set of services for efficiently pulling data from UDP sockets. //! +use influx_db_client as influxdb; +use metrics; use packet::{Blob, BlobRecycler, PacketRecycler, SharedBlobs, SharedPackets}; use result::{Error, Result}; use std::net::UdpSocket; @@ -7,7 +9,8 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::{Receiver, RecvTimeoutError, Sender}; use std::sync::Arc; use std::thread::{Builder, JoinHandle}; -use std::time::Duration; +use std::time::{Duration, Instant}; +use timing::duration_as_ms; pub type PacketReceiver = Receiver; pub type PacketSender = Sender; @@ -19,6 +22,7 @@ fn recv_loop( exit: &Arc, re: &PacketRecycler, channel: &PacketSender, + channel_tag: &'static str, ) -> Result<()> { loop { let msgs = re.allocate(); @@ -26,6 +30,12 @@ fn recv_loop( let result = msgs.write().recv_from(sock); match result { Ok(()) => { + let len = msgs.read().packets.len(); + metrics::submit( + influxdb::Point::new(channel_tag) + .add_field("count", influxdb::Value::Integer(len as i64)) + .to_owned(), + ); channel.send(msgs)?; break; } @@ -43,6 +53,7 @@ pub fn receiver( sock: Arc, exit: Arc, packet_sender: PacketSender, + sender_tag: &'static str, ) -> JoinHandle<()> { let res = sock.set_read_timeout(Some(Duration::new(1, 0))); let recycler = PacketRecycler::default(); @@ -52,7 +63,7 @@ pub fn receiver( Builder::new() .name("solana-receiver".to_string()) .spawn(move || { - let _ = recv_loop(&sock, &exit, &recycler, &packet_sender); + let _ = recv_loop(&sock, &exit, &recycler, &packet_sender, sender_tag); () }).unwrap() } @@ -64,9 +75,10 @@ fn recv_send(sock: &UdpSocket, r: &BlobReceiver) -> Result<()> { Ok(()) } -pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> { +pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize, u64)> { let timer = Duration::new(1, 0); let msgs = recvr.recv_timeout(timer)?; + let recv_start = Instant::now(); trace!("got msgs"); let mut len = msgs.read().packets.len(); let mut batch = vec![msgs]; @@ -80,7 +92,7 @@ pub fn recv_batch(recvr: &PacketReceiver) -> Result<(Vec, usize)> } } trace!("batch len {}", batch.len()); - Ok((batch, len)) + Ok((batch, len, duration_as_ms(&recv_start.elapsed()))) } pub fn responder(name: &'static str, sock: Arc, r: BlobReceiver) -> JoinHandle<()> { @@ -166,7 +178,7 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader); + let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader, "streamer-test"); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/src/window.rs b/src/window.rs index 6e283a7220..43cfd982b8 100644 --- a/src/window.rs +++ b/src/window.rs @@ -450,7 +450,12 @@ mod test { let exit = Arc::new(AtomicBool::new(false)); let resp_recycler = BlobRecycler::default(); let (s_reader, r_reader) = channel(); - let t_receiver = receiver(Arc::new(read), exit.clone(), s_reader); + let t_receiver = receiver( + Arc::new(read), + exit.clone(), + s_reader, + "window-streamer-test", + ); let t_responder = { let (s_responder, r_responder) = channel(); let t_responder = responder("streamer_send_test", Arc::new(send), r_responder); diff --git a/src/write_stage.rs b/src/write_stage.rs index e423c94142..b8d85ccdac 100644 --- a/src/write_stage.rs +++ b/src/write_stage.rs @@ -96,6 +96,7 @@ impl WriteStage { ) -> Result<()> { let mut ventries = Vec::new(); let mut received_entries = entry_receiver.recv_timeout(Duration::new(1, 0))?; + let now = Instant::now(); let mut num_new_entries = 0; let mut num_txs = 0; @@ -122,6 +123,7 @@ impl WriteStage { break; } } + inc_new_counter_info!("write_stage-entries_received", num_new_entries); info!("write_stage entries: {}", num_new_entries); @@ -159,13 +161,17 @@ impl WriteStage { let blob_send_start = Instant::now(); if !entries.is_empty() { inc_new_counter_info!("write_stage-recv_vote", votes.len()); - inc_new_counter_info!("write_stage-broadcast_entries", entries.len()); + inc_new_counter_info!("write_stage-entries_sent", entries.len()); trace!("broadcasting {}", entries.len()); entry_sender.send(entries)?; } blob_send_total += duration_as_ms(&blob_send_start.elapsed()); } + inc_new_counter_info!( + "write_stage-time_ms", + duration_as_ms(&now.elapsed()) as usize + ); info!("done write_stage txs: {} time {} ms txs/s: {} to_blobs_total: {} register_entry_total: {} blob_send_total: {} crdt_votes_total: {}", num_txs, duration_as_ms(&start.elapsed()), num_txs as f32 / duration_as_s(&start.elapsed()),