From 6ff508c6439fbc5f931c88daef9bd61ca6f9b493 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Tue, 5 Oct 2021 08:57:39 -0500 Subject: [PATCH] add transaction cost histogram metrics (#20350) --- core/benches/banking_stage.rs | 3 +- core/src/banking_stage.rs | 50 +++++++++-- core/src/cost_tracker.rs | 165 +++++++++++++++++++++++++++++----- ledger-tool/src/main.rs | 8 +- 4 files changed, 195 insertions(+), 31 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index ba4bdfa23..68c09b74b 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -9,7 +9,7 @@ use rand::{thread_rng, Rng}; use rayon::prelude::*; use solana_core::banking_stage::{BankingStage, BankingStageStats}; use solana_core::cost_model::CostModel; -use solana_core::cost_tracker::CostTracker; +use solana_core::cost_tracker::{CostTracker, CostTrackerStats}; use solana_entry::entry::{next_hash, Entry}; use solana_gossip::cluster_info::ClusterInfo; use solana_gossip::cluster_info::Node; @@ -97,6 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( CostModel::new(std::u64::MAX, std::u64::MAX), ))))), + &mut CostTrackerStats::default(), ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index d59c2fc4b..1963f1861 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,7 +1,9 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher}; +use crate::{ + cost_tracker::CostTracker, cost_tracker::CostTrackerStats, packet_hasher::PacketHasher, +}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -408,8 +410,12 @@ impl BankingStage { cost_tracker: &Arc>, bank_slot: Slot, banking_stage_stats: &BankingStageStats, + cost_tracker_stats: &mut CostTrackerStats, ) { - cost_tracker.write().unwrap().reset_if_new_bank(bank_slot); + cost_tracker + .write() + .unwrap() + .reset_if_new_bank(bank_slot, cost_tracker_stats); banking_stage_stats .reset_cost_tracker_count .fetch_add(1, Ordering::Relaxed); @@ -427,6 +433,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, cost_tracker: &Arc>, + cost_tracker_stats: &mut CostTrackerStats, ) { let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; @@ -446,6 +453,7 @@ impl BankingStage { *next_leader, cost_tracker, banking_stage_stats, + cost_tracker_stats, ); Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, @@ -462,6 +470,7 @@ impl BankingStage { cost_tracker, working_bank.slot(), banking_stage_stats, + cost_tracker_stats, ); let (processed, verified_txs_len, new_unprocessed_indexes) = Self::process_packets_transactions( @@ -474,6 +483,7 @@ impl BankingStage { gossip_vote_sender, banking_stage_stats, cost_tracker, + cost_tracker_stats, ); if processed < verified_txs_len || !Bank::should_bank_still_be_processing_txs( @@ -580,6 +590,7 @@ impl BankingStage { recorder: &TransactionRecorder, cost_tracker: &Arc>, data_budget: &DataBudget, + cost_tracker_stats: &mut CostTrackerStats, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -595,6 +606,7 @@ impl BankingStage { cost_tracker, bank_start.working_bank.slot(), banking_stage_stats, + cost_tracker_stats, ); }; @@ -629,6 +641,7 @@ impl BankingStage { banking_stage_stats, recorder, cost_tracker, + cost_tracker_stats, ); } BufferedPacketsDecision::Forward => { @@ -708,6 +721,7 @@ impl BankingStage { let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); let mut buffered_packets = VecDeque::with_capacity(batch_limit); let banking_stage_stats = BankingStageStats::new(id); + let mut cost_tracker_stats = CostTrackerStats::default(); loop { let my_pubkey = cluster_info.id(); while !buffered_packets.is_empty() { @@ -724,6 +738,7 @@ impl BankingStage { &recorder, cost_tracker, data_budget, + &mut cost_tracker_stats, ); if matches!(decision, BufferedPacketsDecision::Hold) || matches!(decision, BufferedPacketsDecision::ForwardAndHold) @@ -759,6 +774,7 @@ impl BankingStage { duplicates, &recorder, cost_tracker, + &mut cost_tracker_stats, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), Err(RecvTimeoutError::Disconnected) => break, @@ -1099,6 +1115,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, demote_program_write_locks: bool, votes_only: bool, + cost_tracker_stats: &mut CostTrackerStats, ) -> (Vec, Vec, Vec) { let mut retryable_transaction_packet_indexes: Vec = vec![]; @@ -1131,8 +1148,11 @@ impl BankingStage { verified_transactions_with_packet_indexes .into_iter() .filter_map(|(tx, tx_index)| { - let result = cost_tracker_readonly - .would_transaction_fit(&tx, demote_program_write_locks); + let result = cost_tracker_readonly.would_transaction_fit( + &tx, + demote_program_write_locks, + cost_tracker_stats, + ); if result.is_err() { debug!("transaction {:?} would exceed limit: {:?}", tx, result); retryable_transaction_packet_indexes.push(tx_index); @@ -1205,6 +1225,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, cost_tracker: &Arc>, + cost_tracker_stats: &mut CostTrackerStats, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = @@ -1216,6 +1237,7 @@ impl BankingStage { banking_stage_stats, bank.demote_program_write_locks(), bank.vote_only_bank(), + cost_tracker_stats, ); packet_conversion_time.stop(); inc_new_counter_info!("banking_stage-packet_conversion", 1); @@ -1252,10 +1274,11 @@ impl BankingStage { let mut cost_tracking_time = Measure::start("cost_tracking_time"); transactions.iter().enumerate().for_each(|(index, tx)| { if unprocessed_tx_indexes.iter().all(|&i| i != index) { - cost_tracker - .write() - .unwrap() - .add_transaction_cost(tx, bank.demote_program_write_locks()); + cost_tracker.write().unwrap().add_transaction_cost( + tx, + bank.demote_program_write_locks(), + cost_tracker_stats, + ); } }); cost_tracking_time.stop(); @@ -1302,6 +1325,7 @@ impl BankingStage { next_leader: Option, cost_tracker: &Arc>, banking_stage_stats: &BankingStageStats, + cost_tracker_stats: &mut CostTrackerStats, ) -> Vec { // Check if we are the next leader. If so, let's not filter the packets // as we'll filter it again while processing the packets. @@ -1323,6 +1347,7 @@ impl BankingStage { banking_stage_stats, bank.demote_program_write_locks(), bank.vote_only_bank(), + cost_tracker_stats, ); unprocessed_packet_conversion_time.stop(); @@ -1384,6 +1409,7 @@ impl BankingStage { duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, cost_tracker: &Arc>, + cost_tracker_stats: &mut CostTrackerStats, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -1434,6 +1460,7 @@ impl BankingStage { cost_tracker, working_bank.slot(), banking_stage_stats, + cost_tracker_stats, ); let (processed, verified_txs_len, unprocessed_indexes) = @@ -1447,6 +1474,7 @@ impl BankingStage { gossip_vote_sender, banking_stage_stats, cost_tracker, + cost_tracker_stats, ); new_tx_count += processed; @@ -1480,6 +1508,7 @@ impl BankingStage { next_leader, cost_tracker, banking_stage_stats, + cost_tracker_stats, ); Self::push_unprocessed( buffered_packets, @@ -2768,6 +2797,7 @@ mod tests { &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( CostModel::default(), ))))), + &mut CostTrackerStats::default(), ); assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. @@ -2787,6 +2817,7 @@ mod tests { &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( CostModel::default(), ))))), + &mut CostTrackerStats::default(), ); if num_expected_unprocessed == 0 { assert!(buffered_packets.is_empty()) @@ -2855,6 +2886,7 @@ mod tests { &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( CostModel::default(), ))))), + &mut CostTrackerStats::default(), ); // Check everything is correct. All indexes after `interrupted_iteration` @@ -3099,6 +3131,7 @@ mod tests { &banking_stage_stats, false, true, + &mut CostTrackerStats::default(), ); assert_eq!(transactions.len(), 1); assert!(!transactions[0].signatures().is_empty()); @@ -3112,6 +3145,7 @@ mod tests { &banking_stage_stats, false, false, + &mut CostTrackerStats::default(), ); assert_eq!(transactions.len(), 2); } diff --git a/core/src/cost_tracker.rs b/core/src/cost_tracker.rs index 40a86133a..f87939e45 100644 --- a/core/src/cost_tracker.rs +++ b/core/src/cost_tracker.rs @@ -13,6 +13,76 @@ use std::{ const WRITABLE_ACCOUNTS_PER_BLOCK: usize = 512; +// cist tracker stats reset for each bank +#[derive(Debug, Default)] +pub struct CostTrackerStats { + transaction_cost_histogram: histogram::Histogram, + writable_accounts_cost_histogram: histogram::Histogram, + block_cost: u64, + bank_slot: u64, +} + +impl CostTrackerStats { + pub fn new(bank_slot: Slot) -> Self { + CostTrackerStats { + bank_slot, + ..CostTrackerStats::default() + } + } + + fn report(&self) { + datapoint_info!( + "cost_tracker_stats", + ( + "transaction_cost_unit_min", + self.transaction_cost_histogram.minimum().unwrap_or(0), + i64 + ), + ( + "transaction_cost_unit_max", + self.transaction_cost_histogram.maximum().unwrap_or(0), + i64 + ), + ( + "transaction_cost_unit_mean", + self.transaction_cost_histogram.mean().unwrap_or(0), + i64 + ), + ( + "transaction_cost_unit_2nd_std", + self.transaction_cost_histogram + .percentile(95.0) + .unwrap_or(0), + i64 + ), + ( + "writable_accounts_cost_min", + self.writable_accounts_cost_histogram.minimum().unwrap_or(0), + i64 + ), + ( + "writable_accounts_cost_max", + self.writable_accounts_cost_histogram.maximum().unwrap_or(0), + i64 + ), + ( + "writable_accounts_cost_mean", + self.writable_accounts_cost_histogram.mean().unwrap_or(0), + i64 + ), + ( + "writable_accounts_cost_2nd_std", + self.writable_accounts_cost_histogram + .percentile(95.0) + .unwrap_or(0), + i64 + ), + ("block_cost", self.block_cost, i64), + ("bank_slot", self.bank_slot, i64), + ); + } +} + #[derive(Debug)] pub struct CostTracker { cost_model: Arc>, @@ -47,12 +117,14 @@ impl CostTracker { &self, transaction: &SanitizedTransaction, demote_program_write_locks: bool, + stats: &mut CostTrackerStats, ) -> Result<(), CostModelError> { let mut cost_model = self.cost_model.write().unwrap(); let tx_cost = cost_model.calculate_cost(transaction, demote_program_write_locks); self.would_fit( &tx_cost.writable_accounts, &(tx_cost.account_access_cost + tx_cost.execution_cost), + stats, ) } @@ -60,6 +132,7 @@ impl CostTracker { &mut self, transaction: &SanitizedTransaction, demote_program_write_locks: bool, + stats: &mut CostTrackerStats, ) { let mut cost_model = self.cost_model.write().unwrap(); let tx_cost = cost_model.calculate_cost(transaction, demote_program_write_locks); @@ -71,25 +144,42 @@ impl CostTracker { .or_insert(0) += cost; } self.block_cost += cost; + + stats.block_cost += cost; } - pub fn reset_if_new_bank(&mut self, slot: Slot) { + pub fn reset_if_new_bank(&mut self, slot: Slot, stats: &mut CostTrackerStats) { if slot != self.current_bank_slot { + stats.bank_slot = self.current_bank_slot; + stats.report(); + *stats = CostTrackerStats::default(); + self.current_bank_slot = slot; self.cost_by_writable_accounts.clear(); self.block_cost = 0; } } - pub fn try_add(&mut self, transaction_cost: &TransactionCost) -> Result { + pub fn try_add( + &mut self, + transaction_cost: &TransactionCost, + stats: &mut CostTrackerStats, + ) -> Result { let cost = transaction_cost.account_access_cost + transaction_cost.execution_cost; - self.would_fit(&transaction_cost.writable_accounts, &cost)?; + self.would_fit(&transaction_cost.writable_accounts, &cost, stats)?; self.add_transaction(&transaction_cost.writable_accounts, &cost); Ok(self.block_cost) } - fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), CostModelError> { + fn would_fit( + &self, + keys: &[Pubkey], + cost: &u64, + stats: &mut CostTrackerStats, + ) -> Result<(), CostModelError> { + stats.transaction_cost_histogram.increment(*cost).unwrap(); + // check against the total package cost if self.block_cost + cost > self.block_cost_limit { return Err(CostModelError::WouldExceedBlockMaxLimit); @@ -104,6 +194,11 @@ impl CostTracker { for account_key in keys.iter() { match self.cost_by_writable_accounts.get(account_key) { Some(chained_cost) => { + stats + .writable_accounts_cost_histogram + .increment(*chained_cost) + .unwrap(); + if chained_cost + cost > self.account_cost_limit { return Err(CostModelError::WouldExceedAccountMaxLimit); } else { @@ -213,7 +308,9 @@ mod tests { // build testee to have capacity for one simple transaction let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(cost, cost)))); - assert!(testee.would_fit(&keys, &cost).is_ok()); + assert!(testee + .would_fit(&keys, &cost, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys, &cost); assert_eq!(cost, testee.block_cost); } @@ -231,11 +328,15 @@ mod tests { cost1 + cost2, )))); { - assert!(testee.would_fit(&keys1, &cost1).is_ok()); + assert!(testee + .would_fit(&keys1, &cost1, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys1, &cost1); } { - assert!(testee.would_fit(&keys2, &cost2).is_ok()); + assert!(testee + .would_fit(&keys2, &cost2, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys2, &cost2); } assert_eq!(cost1 + cost2, testee.block_cost); @@ -256,11 +357,15 @@ mod tests { cost1 + cost2, )))); { - assert!(testee.would_fit(&keys1, &cost1).is_ok()); + assert!(testee + .would_fit(&keys1, &cost1, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys1, &cost1); } { - assert!(testee.would_fit(&keys2, &cost2).is_ok()); + assert!(testee + .would_fit(&keys2, &cost2, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys2, &cost2); } assert_eq!(cost1 + cost2, testee.block_cost); @@ -281,12 +386,16 @@ mod tests { )))); // should have room for first transaction { - assert!(testee.would_fit(&keys1, &cost1).is_ok()); + assert!(testee + .would_fit(&keys1, &cost1, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys1, &cost1); } // but no more sapce on the same chain (same signer account) { - assert!(testee.would_fit(&keys2, &cost2).is_err()); + assert!(testee + .would_fit(&keys2, &cost2, &mut CostTrackerStats::default()) + .is_err()); } } @@ -305,12 +414,16 @@ mod tests { )))); // should have room for first transaction { - assert!(testee.would_fit(&keys1, &cost1).is_ok()); + assert!(testee + .would_fit(&keys1, &cost1, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys1, &cost1); } // but no more room for package as whole { - assert!(testee.would_fit(&keys2, &cost2).is_err()); + assert!(testee + .would_fit(&keys2, &cost2, &mut CostTrackerStats::default()) + .is_err()); } } @@ -328,24 +441,30 @@ mod tests { )))); // should have room for first transaction { - assert!(testee.would_fit(&keys1, &cost1).is_ok()); + assert!(testee + .would_fit(&keys1, &cost1, &mut CostTrackerStats::default()) + .is_ok()); testee.add_transaction(&keys1, &cost1); assert_eq!(1, testee.cost_by_writable_accounts.len()); assert_eq!(cost1, testee.block_cost); } // but no more sapce on the same chain (same signer account) { - assert!(testee.would_fit(&keys2, &cost2).is_err()); + assert!(testee + .would_fit(&keys2, &cost2, &mut CostTrackerStats::default()) + .is_err()); } // reset the tracker { - testee.reset_if_new_bank(100); + testee.reset_if_new_bank(100, &mut CostTrackerStats::default()); assert_eq!(0, testee.cost_by_writable_accounts.len()); assert_eq!(0, testee.block_cost); } //now the second transaction can be added { - assert!(testee.would_fit(&keys2, &cost2).is_ok()); + assert!(testee + .would_fit(&keys2, &cost2, &mut CostTrackerStats::default()) + .is_ok()); } } @@ -374,7 +493,9 @@ mod tests { account_access_cost: 0, execution_cost: cost, }; - assert!(testee.try_add(&tx_cost).is_ok()); + assert!(testee + .try_add(&tx_cost, &mut CostTrackerStats::default()) + .is_ok()); let stat = testee.get_stats(); assert_eq!(cost, stat.total_cost); assert_eq!(3, stat.number_of_accounts); @@ -392,7 +513,9 @@ mod tests { account_access_cost: 0, execution_cost: cost, }; - assert!(testee.try_add(&tx_cost).is_ok()); + assert!(testee + .try_add(&tx_cost, &mut CostTrackerStats::default()) + .is_ok()); let stat = testee.get_stats(); assert_eq!(cost * 2, stat.total_cost); assert_eq!(3, stat.number_of_accounts); @@ -412,7 +535,9 @@ mod tests { account_access_cost: 0, execution_cost: cost, }; - assert!(testee.try_add(&tx_cost).is_err()); + assert!(testee + .try_add(&tx_cost, &mut CostTrackerStats::default()) + .is_err()); let stat = testee.get_stats(); assert_eq!(cost * 2, stat.total_cost); assert_eq!(3, stat.number_of_accounts); diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 2e334dfe0..d12b1916d 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -15,7 +15,7 @@ use solana_clap_utils::{ }, }; use solana_core::cost_model::CostModel; -use solana_core::cost_tracker::CostTracker; +use solana_core::cost_tracker::{CostTracker, CostTrackerStats}; use solana_entry::entry::Entry; use solana_ledger::{ ancestor_iterator::AncestorIterator, @@ -774,6 +774,7 @@ fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> cost_model.initialize_cost_table(&blockstore.read_program_costs().unwrap()); let cost_model = Arc::new(RwLock::new(cost_model)); let mut cost_tracker = CostTracker::new(cost_model.clone()); + let mut cost_tracker_stats = CostTrackerStats::default(); for entry in entries { num_transactions += entry.transactions.len(); @@ -797,7 +798,10 @@ fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> &transaction, true, // demote_program_write_locks ); - if cost_tracker.try_add(tx_cost).is_err() { + if cost_tracker + .try_add(tx_cost, &mut cost_tracker_stats) + .is_err() + { println!( "Slot: {}, CostModel rejected transaction {:?}, stats {:?}!", slot,