Improve banking_stage performance messages
Use transaction count instead of batch count, and set the recv_start from when we finished processing the previous batch to get a more accurate number.
This commit is contained in:
parent
7b2abf2087
commit
2c93062f54
|
@ -76,10 +76,16 @@ impl BankingStage {
|
||||||
let poh_recorder = poh_recorder.clone();
|
let poh_recorder = poh_recorder.clone();
|
||||||
let cluster_info = cluster_info.clone();
|
let cluster_info = cluster_info.clone();
|
||||||
let exit = exit.clone();
|
let exit = exit.clone();
|
||||||
|
let mut recv_start = Instant::now();
|
||||||
Builder::new()
|
Builder::new()
|
||||||
.name("solana-banking-stage-tx".to_string())
|
.name("solana-banking-stage-tx".to_string())
|
||||||
.spawn(move || {
|
.spawn(move || {
|
||||||
Self::process_loop(&verified_receiver, &poh_recorder, &cluster_info);
|
Self::process_loop(
|
||||||
|
&verified_receiver,
|
||||||
|
&poh_recorder,
|
||||||
|
&cluster_info,
|
||||||
|
&mut recv_start,
|
||||||
|
);
|
||||||
exit.store(true, Ordering::Relaxed);
|
exit.store(true, Ordering::Relaxed);
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
@ -168,6 +174,7 @@ impl BankingStage {
|
||||||
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
||||||
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
poh_recorder: &Arc<Mutex<PohRecorder>>,
|
||||||
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
cluster_info: &Arc<RwLock<ClusterInfo>>,
|
||||||
|
recv_start: &mut Instant,
|
||||||
) {
|
) {
|
||||||
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
let socket = UdpSocket::bind("0.0.0.0:0").unwrap();
|
||||||
let mut buffered_packets = vec![];
|
let mut buffered_packets = vec![];
|
||||||
|
@ -181,7 +188,7 @@ impl BankingStage {
|
||||||
buffered_packets.clear();
|
buffered_packets.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
match Self::process_packets(&verified_receiver, &poh_recorder) {
|
match Self::process_packets(&verified_receiver, &poh_recorder, recv_start) {
|
||||||
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
Err(Error::RecvTimeoutError(RecvTimeoutError::Timeout)) => (),
|
||||||
Ok(unprocessed_packets) => {
|
Ok(unprocessed_packets) => {
|
||||||
if Self::should_buffer_packets(poh_recorder, cluster_info) {
|
if Self::should_buffer_packets(poh_recorder, cluster_info) {
|
||||||
|
@ -360,23 +367,22 @@ impl BankingStage {
|
||||||
pub fn process_packets(
|
pub fn process_packets(
|
||||||
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
verified_receiver: &Arc<Mutex<Receiver<VerifiedPackets>>>,
|
||||||
poh: &Arc<Mutex<PohRecorder>>,
|
poh: &Arc<Mutex<PohRecorder>>,
|
||||||
|
recv_start: &mut Instant,
|
||||||
) -> Result<UnprocessedPackets> {
|
) -> Result<UnprocessedPackets> {
|
||||||
let recv_start = Instant::now();
|
|
||||||
let mms = verified_receiver
|
let mms = verified_receiver
|
||||||
.lock()
|
.lock()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.recv_timeout(Duration::from_millis(100))?;
|
.recv_timeout(Duration::from_millis(100))?;
|
||||||
|
|
||||||
let mut reqs_len = 0;
|
|
||||||
let mms_len = mms.len();
|
let mms_len = mms.len();
|
||||||
|
let count = mms.iter().map(|x| x.1.len()).sum();
|
||||||
debug!(
|
debug!(
|
||||||
"@{:?} process start stalled for: {:?}ms batches: {}",
|
"@{:?} process start stalled for: {:?}ms txs: {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
timing::duration_as_ms(&recv_start.elapsed()),
|
timing::duration_as_ms(&recv_start.elapsed()),
|
||||||
mms.len(),
|
count,
|
||||||
);
|
);
|
||||||
inc_new_counter_info!("banking_stage-entries_received", mms_len);
|
inc_new_counter_info!("banking_stage-entries_received", mms_len);
|
||||||
let count = mms.iter().map(|x| x.1.len()).sum();
|
|
||||||
let proc_start = Instant::now();
|
let proc_start = Instant::now();
|
||||||
let mut new_tx_count = 0;
|
let mut new_tx_count = 0;
|
||||||
|
|
||||||
|
@ -397,7 +403,6 @@ impl BankingStage {
|
||||||
debug!("banking-stage-tx bank {}", bank.slot());
|
debug!("banking-stage-tx bank {}", bank.slot());
|
||||||
|
|
||||||
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
let transactions = Self::deserialize_transactions(&msgs.read().unwrap());
|
||||||
reqs_len += transactions.len();
|
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"bank: {} transactions received {}",
|
"bank: {} transactions received {}",
|
||||||
|
@ -443,16 +448,18 @@ impl BankingStage {
|
||||||
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
let total_time_s = timing::duration_as_s(&proc_start.elapsed());
|
||||||
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
let total_time_ms = timing::duration_as_ms(&proc_start.elapsed());
|
||||||
debug!(
|
debug!(
|
||||||
"@{:?} done processing transaction batches: {} time: {:?}ms reqs: {} reqs/s: {}",
|
"@{:?} done processing transaction batches: {} time: {:?}ms tx count: {} tx/s: {}",
|
||||||
timing::timestamp(),
|
timing::timestamp(),
|
||||||
mms_len,
|
mms_len,
|
||||||
total_time_ms,
|
total_time_ms,
|
||||||
reqs_len,
|
new_tx_count,
|
||||||
(reqs_len as f32) / (total_time_s)
|
(new_tx_count as f32) / (total_time_s)
|
||||||
);
|
);
|
||||||
inc_new_counter_info!("banking_stage-process_packets", count);
|
inc_new_counter_info!("banking_stage-process_packets", count);
|
||||||
inc_new_counter_info!("banking_stage-process_transactions", new_tx_count);
|
inc_new_counter_info!("banking_stage-process_transactions", new_tx_count);
|
||||||
|
|
||||||
|
*recv_start = Instant::now();
|
||||||
|
|
||||||
Ok(unprocessed_packets)
|
Ok(unprocessed_packets)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue