From 9d6f1ebef489210e408eb9a4cbb21cd9059e43cb Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Mon, 28 Jun 2021 21:34:04 -0500 Subject: [PATCH] investigate system performance test degradation (#17919) * Add stats and counter around cost model ops, mainly: - calculate transaction cost - check transaction can fit in a block - update block cost tracker after transactions are added to block - replay_stage to update/insert execution cost to table * Change mutex on cost_tracker to RwLock * removed cloning cost_tracker for local use, as the metrics show clone is very expensive. * acquire and hold locks for block of TXs, instead of acquire and release per transaction; * remove redundant would_fit check from cost_tracker update execution path * refactor cost checking with less frequent lock acquiring * avoid many Transaction_cost heap allocation when calculate cost, which is in the hot path - executed per transaction. * create hashmap with new_capacity to reduce runtime heap realloc. * code review changes: categorize stats, replace explicit drop calls, concisely initiate to default * address potential deadlock by acquiring locks one at time --- core/benches/banking_stage.rs | 4 +- core/src/banking_stage.rs | 253 +++++++++++++++++++++++++-------- core/src/cost_model.rs | 72 ++++++++++ core/src/cost_tracker.rs | 8 +- core/src/execute_cost_table.rs | 4 +- core/src/replay_stage.rs | 18 ++- 6 files changed, 286 insertions(+), 73 deletions(-) diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 4de561a9ef..8a8e18f870 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -34,7 +34,7 @@ use solana_sdk::transaction::Transaction; use std::collections::VecDeque; use std::sync::atomic::Ordering; use std::sync::mpsc::Receiver; -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use test::Bencher; @@ -94,7 +94,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(Mutex::new(CostTracker::new(std::u64::MAX, std::u64::MAX))), + &Arc::new(RwLock::new(CostTracker::new(std::u64::MAX, std::u64::MAX))), ); }); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 63c7c5f355..cd7bc1ea0f 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,7 +1,10 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::{cost_model::CostModel, cost_tracker::CostTracker, packet_hasher::PacketHasher}; +use crate::{ + cost_model::CostModel, cost_model::TransactionCost, cost_tracker::CostTracker, + packet_hasher::PacketHasher, +}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -76,6 +79,8 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; +const MAX_WRITABLE_ACCOUNTS: usize = 256; + #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicU64, @@ -87,6 +92,9 @@ pub struct BankingStageStats { current_buffered_packets_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, + reset_cost_tracker_count: AtomicUsize, + cost_tracker_check_count: AtomicUsize, + cost_forced_retry_transactions_count: AtomicUsize, // Timing consume_buffered_packets_elapsed: AtomicU64, @@ -95,7 +103,11 @@ pub struct BankingStageStats { filter_pending_packets_elapsed: AtomicU64, packet_duplicate_check_elapsed: AtomicU64, packet_conversion_elapsed: AtomicU64, + unprocessed_packet_conversion_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64, + cost_tracker_update_elapsed: AtomicU64, + cost_tracker_clone_elapsed: AtomicU64, + cost_tracker_check_elapsed: AtomicU64, } impl BankingStageStats { @@ -154,6 +166,22 @@ impl BankingStageStats { self.rebuffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "reset_cost_tracker_count", + self.reset_cost_tracker_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "cost_tracker_check_count", + self.cost_tracker_check_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "cost_forced_retry_transactions_count", + self.cost_forced_retry_transactions_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "consume_buffered_packets_elapsed", self.consume_buffered_packets_elapsed @@ -188,12 +216,33 @@ impl BankingStageStats { self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "unprocessed_packet_conversion_elapsed", + self.unprocessed_packet_conversion_elapsed + .swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "transaction_processing_elapsed", self.transaction_processing_elapsed .swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "cost_tracker_update_elapsed", + self.cost_tracker_update_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "cost_tracker_clone_elapsed", + self.cost_tracker_clone_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "cost_tracker_check_elapsed", + self.cost_tracker_check_elapsed.swap(0, Ordering::Relaxed) as i64, + i64 + ), ); } } @@ -244,11 +293,16 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, cost_model: &Arc>, ) -> Self { - // shared mutex guarded 'cost_tracker' tracks bank's cost against configured limits. - let cost_tracker = Arc::new(Mutex::new(CostTracker::new( - cost_model.read().unwrap().get_account_cost_limit(), - cost_model.read().unwrap().get_block_cost_limit(), - ))); + // 'cost_tracker' tracks bank's cost against configured limits. + let cost_tracker = { + let cost_model = cost_model.read().unwrap(); + CostTracker::new( + cost_model.get_account_cost_limit(), + cost_model.get_block_cost_limit(), + ) + }; + let cost_tracker = Arc::new(RwLock::new(cost_tracker)); + Self::new_num_threads( cluster_info, poh_recorder, @@ -271,7 +325,7 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. @@ -364,8 +418,15 @@ impl BankingStage { has_more_unprocessed_transactions } - fn reset_cost_tracker_if_new_bank(cost_tracker: &Arc>, bank_slot: Slot) { - cost_tracker.lock().unwrap().reset_if_new_bank(bank_slot); + fn reset_cost_tracker_if_new_bank( + cost_tracker: &Arc>, + bank_slot: Slot, + banking_stage_stats: &BankingStageStats, + ) { + cost_tracker.write().unwrap().reset_if_new_bank(bank_slot); + banking_stage_stats + .reset_cost_tracker_count + .fetch_add(1, Ordering::Relaxed); } #[allow(clippy::too_many_arguments)] @@ -380,7 +441,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, ) { let mut rebuffered_packets_len = 0; let mut new_tx_count = 0; @@ -400,6 +461,7 @@ impl BankingStage { *next_leader, cost_model, cost_tracker, + banking_stage_stats, ); Self::update_buffered_packets_with_new_unprocessed( original_unprocessed_indexes, @@ -408,7 +470,11 @@ impl BankingStage { } else { let bank_start = poh_recorder.lock().unwrap().bank_start(); if let Some((bank, bank_creation_time)) = bank_start { - Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot()); + Self::reset_cost_tracker_if_new_bank( + cost_tracker, + bank.slot(), + banking_stage_stats, + ); let (processed, verified_txs_len, new_unprocessed_indexes) = Self::process_packets_transactions( &bank, @@ -524,7 +590,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, ) -> BufferedPacketsDecision { let bank_start; let ( @@ -536,7 +602,11 @@ impl BankingStage { let poh = poh_recorder.lock().unwrap(); bank_start = poh.bank_start(); if let Some((ref bank, _)) = bank_start { - Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot()); + Self::reset_cost_tracker_if_new_bank( + cost_tracker, + bank.slot(), + banking_stage_stats, + ); }; ( poh.leader_after_n_slots(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET), @@ -641,7 +711,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, ) { let recorder = poh_recorder.lock().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1033,20 +1103,18 @@ impl BankingStage { // and verifies secp256k1 instructions. A list of valid transactions are returned with their message hashes // and packet indexes. // Also returned is packet indexes for transaction should be retried due to cost limits. + #[allow(clippy::needless_collect)] fn transactions_from_packets( msgs: &Packets, transaction_indexes: &[usize], secp256k1_program_enabled: bool, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, + banking_stage_stats: &BankingStageStats, ) -> (Vec>, Vec, Vec) { - // Making a snapshot of shared cost_tracker by clone(), drop lock immediately. - // Local copy `cost_tracker` is used to filter transactions by cost. - // Shared cost_tracker is updated later by processed transactions confirmed by bank. - let mut cost_tracker = cost_tracker.lock().unwrap().clone(); - let mut retryable_transaction_packet_indexes: Vec = vec![]; - let (filtered_transactions, filter_transaction_packet_indexes) = transaction_indexes + + let verified_transactions_with_packet_indexes: Vec<_> = transaction_indexes .iter() .filter_map(|tx_index| { let p = &msgs.packets[*tx_index]; @@ -1054,27 +1122,55 @@ impl BankingStage { if secp256k1_program_enabled { tx.verify_precompiles().ok()?; } - - // Get transaction cost via cost_model; try to add cost to - // local copy of cost_tracker, if suceeded, local copy is updated - // and transaction added to valid list; otherwise, transaction is - // added to retry list. No locking here. - let tx_cost = cost_model.read().unwrap().calculate_cost(&tx); - let result = cost_tracker.try_add(tx_cost); - if result.is_err() { - debug!("transaction {:?} would exceed limit: {:?}", tx, result); - retryable_transaction_packet_indexes.push(*tx_index); - return None; - } - - let message_bytes = Self::packet_message(p)?; - let message_hash = Message::hash_raw_message(message_bytes); - Some(( - HashedTransaction::new(Cow::Owned(tx), message_hash), - tx_index, - )) + Some((tx, *tx_index)) }) - .unzip(); + .collect(); + banking_stage_stats.cost_tracker_check_count.fetch_add( + verified_transactions_with_packet_indexes.len(), + Ordering::Relaxed, + ); + + let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time"); + let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS); + let filtered_transactions_with_packet_indexes: Vec<_> = { + let cost_model_readonly = cost_model.read().unwrap(); + let cost_tracker_readonly = cost_tracker.read().unwrap(); + verified_transactions_with_packet_indexes + .into_iter() + .filter_map(|(tx, tx_index)| { + cost_model_readonly.calculate_cost_no_alloc(&tx, &mut tx_cost); + let result = cost_tracker_readonly.would_fit( + &tx_cost.writable_accounts, + &(tx_cost.account_access_cost + tx_cost.execution_cost), + ); + if result.is_err() { + debug!("transaction {:?} would exceed limit: {:?}", tx, result); + retryable_transaction_packet_indexes.push(tx_index); + return None; + } + Some((tx, tx_index)) + }) + .collect() + }; + cost_tracker_check_time.stop(); + + let (filtered_transactions, filter_transaction_packet_indexes) = + filtered_transactions_with_packet_indexes + .into_iter() + .filter_map(|(tx, tx_index)| { + let p = &msgs.packets[tx_index]; + let message_bytes = Self::packet_message(p)?; + let message_hash = Message::hash_raw_message(message_bytes); + Some(( + HashedTransaction::new(Cow::Owned(tx), message_hash), + tx_index, + )) + }) + .unzip(); + + banking_stage_stats + .cost_tracker_check_elapsed + .fetch_add(cost_tracker_check_time.as_us(), Ordering::Relaxed); ( filtered_transactions, @@ -1133,7 +1229,7 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); let (transactions, transaction_to_packet_indexes, retryable_packet_indexes) = @@ -1143,9 +1239,14 @@ impl BankingStage { bank.secp256k1_program_enabled(), cost_model, cost_tracker, + banking_stage_stats, ); packet_conversion_time.stop(); + inc_new_counter_info!("banking_stage-packet_conversion", 1); + banking_stage_stats + .cost_forced_retry_transactions_count + .fetch_add(retryable_packet_indexes.len(), Ordering::Relaxed); debug!( "bank: {} filtered transactions {} cost limited transactions {}", bank.slot(), @@ -1166,16 +1267,31 @@ impl BankingStage { ); process_tx_time.stop(); let unprocessed_tx_count = unprocessed_tx_indexes.len(); + inc_new_counter_info!( + "banking_stage-unprocessed_transactions", + unprocessed_tx_count + ); // applying cost of processed transactions to shared cost_tracker - transactions.iter().enumerate().for_each(|(index, tx)| { - if !unprocessed_tx_indexes.iter().any(|&i| i == index) { - let tx_cost = cost_model.read().unwrap().calculate_cost(tx.transaction()); - let mut guard = cost_tracker.lock().unwrap(); - let _result = guard.try_add(tx_cost); - drop(guard); - } - }); + let mut cost_tracking_time = Measure::start("cost_tracking_time"); + let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS); + { + //let cost_model_readonly = cost_model.read().unwrap(); + //let mut cost_tracker_mutable = cost_tracker.write().unwrap(); + transactions.iter().enumerate().for_each(|(index, tx)| { + if !unprocessed_tx_indexes.iter().any(|&i| i == index) { + cost_model + .read() + .unwrap() + .calculate_cost_no_alloc(tx.transaction(), &mut tx_cost); + cost_tracker.write().unwrap().add_transaction( + &tx_cost.writable_accounts, + &(tx_cost.account_access_cost + tx_cost.execution_cost), + ); + } + }); + } + cost_tracking_time.stop(); let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); let mut filtered_unprocessed_packet_indexes = Self::filter_pending_packets_from_pending_txs( @@ -1186,21 +1302,24 @@ impl BankingStage { ); filter_pending_packets_time.stop(); - // combine cost-related unprocessed transactions with bank determined unprocessed for - // buffering - filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes); - inc_new_counter_info!( "banking_stage-dropped_tx_before_forwarding", unprocessed_tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) ); + // combine cost-related unprocessed transactions with bank determined unprocessed for + // buffering + filtered_unprocessed_packet_indexes.extend(retryable_packet_indexes); + banking_stage_stats .packet_conversion_elapsed .fetch_add(packet_conversion_time.as_us(), Ordering::Relaxed); banking_stage_stats .transaction_processing_elapsed .fetch_add(process_tx_time.as_us(), Ordering::Relaxed); + banking_stage_stats + .cost_tracker_update_elapsed + .fetch_add(cost_tracking_time.as_us(), Ordering::Relaxed); banking_stage_stats .filter_pending_packets_elapsed .fetch_add(filter_pending_packets_time.as_us(), Ordering::Relaxed); @@ -1215,7 +1334,8 @@ impl BankingStage { my_pubkey: &Pubkey, next_leader: Option, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, + banking_stage_stats: &BankingStageStats, ) -> Vec { // Check if we are the next leader. If so, let's not filter the packets // as we'll filter it again while processing the packets. @@ -1226,6 +1346,8 @@ impl BankingStage { } } + let mut unprocessed_packet_conversion_time = + Measure::start("unprocessed_packet_conversion"); let (transactions, transaction_to_packet_indexes, retry_packet_indexes) = Self::transactions_from_packets( msgs, @@ -1233,7 +1355,9 @@ impl BankingStage { bank.secp256k1_program_enabled(), cost_model, cost_tracker, + banking_stage_stats, ); + unprocessed_packet_conversion_time.stop(); let tx_count = transaction_to_packet_indexes.len(); @@ -1251,6 +1375,12 @@ impl BankingStage { "banking_stage-dropped_tx_before_forwarding", tx_count.saturating_sub(filtered_unprocessed_packet_indexes.len()) ); + banking_stage_stats + .unprocessed_packet_conversion_elapsed + .fetch_add( + unprocessed_packet_conversion_time.as_us(), + Ordering::Relaxed, + ); filtered_unprocessed_packet_indexes } @@ -1287,7 +1417,7 @@ impl BankingStage { duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: &Arc>, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); let mms = verified_receiver.recv_timeout(recv_timeout)?; @@ -1326,7 +1456,7 @@ impl BankingStage { continue; } let (bank, bank_creation_time) = bank_start.unwrap(); - Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot()); + Self::reset_cost_tracker_if_new_bank(cost_tracker, bank.slot(), banking_stage_stats); let (processed, verified_txs_len, unprocessed_indexes) = Self::process_packets_transactions( @@ -1372,6 +1502,7 @@ impl BankingStage { next_leader, cost_model, cost_tracker, + banking_stage_stats, ); Self::push_unprocessed( buffered_packets, @@ -1835,7 +1966,7 @@ mod tests { None, gossip_vote_sender, &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(Mutex::new(CostTracker::new( + &Arc::new(RwLock::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, ))), @@ -2660,7 +2791,7 @@ mod tests { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(Mutex::new(CostTracker::new( + &Arc::new(RwLock::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, ))), @@ -2681,7 +2812,7 @@ mod tests { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(Mutex::new(CostTracker::new( + &Arc::new(RwLock::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, ))), @@ -2751,7 +2882,7 @@ mod tests { &BankingStageStats::default(), &recorder, &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(Mutex::new(CostTracker::new( + &Arc::new(RwLock::new(CostTracker::new( ACCOUNT_MAX_COST, BLOCK_MAX_COST, ))), diff --git a/core/src/cost_model.rs b/core/src/cost_model.rs index 6a12005cb0..b7ecd6ae07 100644 --- a/core/src/cost_model.rs +++ b/core/src/cost_model.rs @@ -23,6 +23,8 @@ const NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST: u64 = 7; pub const ACCOUNT_MAX_COST: u64 = 100_000_000; pub const BLOCK_MAX_COST: u64 = 2_500_000_000; +const DEMOTE_SYSVAR_WRITE_LOCKS: bool = true; + // cost of transaction is made of account_access_cost and instruction execution_cost // where // account_access_cost is the sum of read/write/sign all accounts included in the transaction @@ -36,6 +38,21 @@ pub struct TransactionCost { pub execution_cost: u64, } +impl TransactionCost { + pub fn new_with_capacity(capacity: usize) -> Self { + Self { + writable_accounts: Vec::with_capacity(capacity), + ..Self::default() + } + } + + pub fn reset(&mut self) { + self.writable_accounts.clear(); + self.account_access_cost = 0; + self.execution_cost = 0; + } +} + #[derive(Debug)] pub struct CostModel { account_cost_limit: u64, @@ -90,6 +107,34 @@ impl CostModel { cost } + // calculate `transaction` cost, the result is passed back to caller via mutable + // parameter `cost`. Existing content in `cost` will be erased before adding new content + // This is to allow this function to reuse pre-allocated memory, as this function + // is often on hot-path. + pub fn calculate_cost_no_alloc(&self, transaction: &Transaction, cost: &mut TransactionCost) { + cost.reset(); + + let message = transaction.message(); + message.account_keys.iter().enumerate().for_each(|(i, k)| { + let is_signer = message.is_signer(i); + let is_writable = message.is_writable(i, DEMOTE_SYSVAR_WRITE_LOCKS); + + if is_signer && is_writable { + cost.writable_accounts.push(*k); + cost.account_access_cost += SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; + } else if is_signer && !is_writable { + cost.account_access_cost += SIGNED_READONLY_ACCOUNT_ACCESS_COST; + } else if !is_signer && is_writable { + cost.writable_accounts.push(*k); + cost.account_access_cost += NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; + } else { + cost.account_access_cost += NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; + } + }); + cost.execution_cost = self.find_transaction_cost(transaction); + debug!("transaction {:?} has cost {:?}", transaction, cost); + } + // To update or insert instruction cost to table. pub fn upsert_instruction_cost( &mut self, @@ -400,6 +445,33 @@ mod tests { assert_eq!(2, tx_cost.writable_accounts.len()); } + #[test] + fn test_cost_model_calculate_cost_no_alloc() { + let (mint_keypair, start_hash) = test_setup(); + let tx = + system_transaction::transfer(&mint_keypair, &Keypair::new().pubkey(), 2, start_hash); + + let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST + + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST + + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; + let expected_execution_cost = 8; + + let mut cost_model = CostModel::default(); + cost_model + .upsert_instruction_cost(&system_program::id(), &expected_execution_cost) + .unwrap(); + + // allocate cost, set some random number + let mut tx_cost = TransactionCost::new_with_capacity(8); + tx_cost.execution_cost = 101; + tx_cost.writable_accounts.push(Pubkey::new_unique()); + + cost_model.calculate_cost_no_alloc(&tx, &mut tx_cost); + assert_eq!(expected_account_cost, tx_cost.account_access_cost); + assert_eq!(expected_execution_cost, tx_cost.execution_cost); + assert_eq!(2, tx_cost.writable_accounts.len()); + } + #[test] fn test_cost_model_update_instruction_cost() { let key1 = Pubkey::new_unique(); diff --git a/core/src/cost_tracker.rs b/core/src/cost_tracker.rs index df544ba702..fd434e4e89 100644 --- a/core/src/cost_tracker.rs +++ b/core/src/cost_tracker.rs @@ -5,6 +5,8 @@ use crate::cost_model::TransactionCost; use solana_sdk::{clock::Slot, pubkey::Pubkey}; use std::collections::HashMap; +const WRITABLE_ACCOUNTS_PER_BLOCK: usize = 512; + #[derive(Debug, Clone)] pub struct CostTracker { account_cost_limit: u64, @@ -21,7 +23,7 @@ impl CostTracker { account_cost_limit: chain_max, block_cost_limit: package_max, current_bank_slot: 0, - cost_by_writable_accounts: HashMap::new(), + cost_by_writable_accounts: HashMap::with_capacity(WRITABLE_ACCOUNTS_PER_BLOCK), block_cost: 0, } } @@ -42,7 +44,7 @@ impl CostTracker { Ok(self.block_cost) } - fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> { + pub fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> { // check against the total package cost if self.block_cost + cost > self.block_cost_limit { return Err("would exceed block cost limit"); @@ -70,7 +72,7 @@ impl CostTracker { Ok(()) } - fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) { + pub fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) { for account_key in keys.iter() { *self .cost_by_writable_accounts diff --git a/core/src/execute_cost_table.rs b/core/src/execute_cost_table.rs index 47cb1c81dc..1a3ee443a9 100644 --- a/core/src/execute_cost_table.rs +++ b/core/src/execute_cost_table.rs @@ -33,8 +33,8 @@ impl ExecuteCostTable { pub fn new(cap: usize) -> Self { Self { capacity: cap, - table: HashMap::new(), - occurrences: HashMap::new(), + table: HashMap::with_capacity(cap), + occurrences: HashMap::with_capacity(cap), } } diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 568d7a0eeb..0c501ffb07 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -1720,10 +1720,6 @@ impl ReplayStage { verify_recyclers, ); Self::update_cost_model(cost_model, &bank_progress.replay_stats.execute_timings); - debug!( - "after replayed into bank, updated cost model instruction cost table, current values: {:?}", - cost_model.read().unwrap().get_instruction_cost_table() - ); match replay_result { Ok(replay_tx_count) => tx_count += replay_tx_count, Err(err) => { @@ -1915,6 +1911,7 @@ impl ReplayStage { } fn update_cost_model(cost_model: &RwLock, execute_timings: &ExecuteTimings) { + let mut update_cost_model_time = Measure::start("update_cost_model_time"); let mut cost_model_mutable = cost_model.write().unwrap(); for (program_id, stats) in &execute_timings.details.per_program_timings { let cost = stats.0 / stats.1 as u64; @@ -1937,7 +1934,18 @@ impl ReplayStage { debug!( "after replayed into bank, updated cost model instruction cost table, current values: {:?}", cost_model.read().unwrap().get_instruction_cost_table() - ); + ); + update_cost_model_time.stop(); + + inc_new_counter_info!("replay_stage-update_cost_model", 1); + datapoint_info!( + "replay-loop-timing-stats", + ( + "update_cost_model_elapsed", + update_cost_model_time.as_us() as i64, + i64 + ) + ); } fn update_propagation_status(