From 0e039b4094c263211047ad15ae27f4bc1073a28b Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Tue, 6 Jul 2021 10:41:25 -0500 Subject: [PATCH] Aggregate cost_model into cost_tracker (#18374) * * aggregate cost_model into cost_tracker, decouple it from banking_stage to prevent accidental deadlock. * Simplified code, removed unused functions * review fixes --- banking-bench/src/main.rs | 6 +- core/benches/banking_stage.rs | 11 +- core/src/banking_stage.rs | 142 ++++++------------------ core/src/cost_model.rs | 199 ++++++---------------------------- core/src/cost_tracker.rs | 103 ++++++++++++++---- core/src/tpu.rs | 4 +- ledger-tool/src/main.rs | 13 ++- 7 files changed, 168 insertions(+), 310 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 7dff4bc411..4603509b4b 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -4,7 +4,7 @@ use crossbeam_channel::unbounded; use log::*; use rand::{thread_rng, Rng}; use rayon::prelude::*; -use solana_core::{banking_stage::BankingStage, cost_model::CostModel}; +use solana_core::{banking_stage::BankingStage, cost_model::CostModel, cost_tracker::CostTracker}; use solana_gossip::{cluster_info::ClusterInfo, cluster_info::Node}; use solana_ledger::{ blockstore::Blockstore, @@ -224,7 +224,9 @@ fn main() { vote_receiver, None, replay_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 8a8e18f870..0f19747869 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -93,8 +93,9 @@ fn bench_consume_buffered(bencher: &mut Bencher) { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(RwLock::new(CostTracker::new(std::u64::MAX, std::u64::MAX))), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::new(std::u64::MAX, std::u64::MAX), + ))))), ); }); @@ -208,14 +209,16 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let cluster_info = ClusterInfo::new_with_invalid_keypair(Node::new_localhost().info); let cluster_info = Arc::new(cluster_info); let (s, _r) = unbounded(); - let _banking_stage = BankingStage::new_with_cost_limit( + let _banking_stage = BankingStage::new( &cluster_info, &poh_recorder, verified_receiver, vote_receiver, None, s, - &Arc::new(RwLock::new(CostModel::new(std::u64::MAX, std::u64::MAX))), + Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::new(std::u64::MAX, std::u64::MAX), + ))))), ); poh_recorder.lock().unwrap().set_bank(&bank); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index a29a2f7094..47df9fd730 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1,10 +1,7 @@ //! The `banking_stage` processes Transaction messages. It is intended to be used //! to contruct a software pipeline. The stage uses all available CPU cores and //! can do its processing in parallel with signature verification on the GPU. -use crate::{ - cost_model::CostModel, cost_model::TransactionCost, cost_tracker::CostTracker, - packet_hasher::PacketHasher, -}; +use crate::{cost_tracker::CostTracker, packet_hasher::PacketHasher}; use crossbeam_channel::{Receiver as CrossbeamReceiver, RecvTimeoutError}; use itertools::Itertools; use lru::LruCache; @@ -79,8 +76,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 128; const DEFAULT_LRU_SIZE: usize = 200_000; -const MAX_WRITABLE_ACCOUNTS: usize = 256; - #[derive(Debug, Default)] pub struct BankingStageStats { last_report: AtomicU64, @@ -271,38 +266,8 @@ impl BankingStage { verified_vote_receiver: CrossbeamReceiver>, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - cost_model: &Arc>, + cost_tracker: Arc>, ) -> Self { - Self::new_with_cost_limit( - cluster_info, - poh_recorder, - verified_receiver, - verified_vote_receiver, - transaction_status_sender, - gossip_vote_sender, - cost_model, - ) - } - - pub fn new_with_cost_limit( - cluster_info: &Arc, - poh_recorder: &Arc>, - verified_receiver: CrossbeamReceiver>, - verified_vote_receiver: CrossbeamReceiver>, - transaction_status_sender: Option, - gossip_vote_sender: ReplayVoteSender, - cost_model: &Arc>, - ) -> Self { - // 'cost_tracker' tracks bank's cost against configured limits. - let cost_tracker = { - let cost_model = cost_model.read().unwrap(); - CostTracker::new( - cost_model.get_account_cost_limit(), - cost_model.get_block_cost_limit(), - ) - }; - let cost_tracker = Arc::new(RwLock::new(cost_tracker)); - Self::new_num_threads( cluster_info, poh_recorder, @@ -311,8 +276,7 @@ impl BankingStage { Self::num_threads(), transaction_status_sender, gossip_vote_sender, - cost_model, - &cost_tracker, + cost_tracker, ) } @@ -324,8 +288,7 @@ impl BankingStage { num_threads: u32, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, - cost_model: &Arc>, - cost_tracker: &Arc>, + cost_tracker: Arc>, ) -> Self { let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - 1) as usize * PACKETS_PER_BATCH); // Single thread to generate entries from many banks. @@ -351,7 +314,6 @@ impl BankingStage { let transaction_status_sender = transaction_status_sender.clone(); let gossip_vote_sender = gossip_vote_sender.clone(); let duplicates = duplicates.clone(); - let cost_model = cost_model.clone(); let cost_tracker = cost_tracker.clone(); Builder::new() .name("solana-banking-stage-tx".to_string()) @@ -367,7 +329,6 @@ impl BankingStage { transaction_status_sender, gossip_vote_sender, &duplicates, - &cost_model, &cost_tracker, ); }) @@ -438,7 +399,6 @@ impl BankingStage { test_fn: Option, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_model: &Arc>, cost_tracker: &Arc>, ) { let mut rebuffered_packets_len = 0; @@ -457,7 +417,6 @@ impl BankingStage { original_unprocessed_indexes, my_pubkey, *next_leader, - cost_model, cost_tracker, banking_stage_stats, ); @@ -483,7 +442,6 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, - cost_model, cost_tracker, ); if processed < verified_txs_len @@ -587,7 +545,6 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, - cost_model: &Arc>, cost_tracker: &Arc>, ) -> BufferedPacketsDecision { let bank_start; @@ -636,7 +593,6 @@ impl BankingStage { None::>, banking_stage_stats, recorder, - cost_model, cost_tracker, ); } @@ -707,7 +663,6 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, duplicates: &Arc, PacketHasher)>>, - cost_model: &Arc>, cost_tracker: &Arc>, ) { let recorder = poh_recorder.lock().unwrap().recorder(); @@ -728,7 +683,6 @@ impl BankingStage { &gossip_vote_sender, &banking_stage_stats, &recorder, - cost_model, cost_tracker, ); if matches!(decision, BufferedPacketsDecision::Hold) @@ -764,7 +718,6 @@ impl BankingStage { &banking_stage_stats, duplicates, &recorder, - cost_model, cost_tracker, ) { Ok(()) | Err(RecvTimeoutError::Timeout) => (), @@ -1106,7 +1059,6 @@ impl BankingStage { msgs: &Packets, transaction_indexes: &[usize], secp256k1_program_enabled: bool, - cost_model: &Arc>, cost_tracker: &Arc>, banking_stage_stats: &BankingStageStats, ) -> (Vec>, Vec, Vec) { @@ -1129,18 +1081,12 @@ impl BankingStage { ); let mut cost_tracker_check_time = Measure::start("cost_tracker_check_time"); - let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS); let filtered_transactions_with_packet_indexes: Vec<_> = { - let cost_model_readonly = cost_model.read().unwrap(); let cost_tracker_readonly = cost_tracker.read().unwrap(); verified_transactions_with_packet_indexes .into_iter() .filter_map(|(tx, tx_index)| { - cost_model_readonly.calculate_cost_no_alloc(&tx, &mut tx_cost); - let result = cost_tracker_readonly.would_fit( - &tx_cost.writable_accounts, - &(tx_cost.account_access_cost + tx_cost.execution_cost), - ); + let result = cost_tracker_readonly.would_transaction_fit(&tx); if result.is_err() { debug!("transaction {:?} would exceed limit: {:?}", tx, result); retryable_transaction_packet_indexes.push(tx_index); @@ -1226,7 +1172,6 @@ impl BankingStage { transaction_status_sender: Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, - cost_model: &Arc>, cost_tracker: &Arc>, ) -> (usize, usize, Vec) { let mut packet_conversion_time = Measure::start("packet_conversion"); @@ -1235,7 +1180,6 @@ impl BankingStage { msgs, &packet_indexes, bank.secp256k1_program_enabled(), - cost_model, cost_tracker, banking_stage_stats, ); @@ -1272,23 +1216,14 @@ impl BankingStage { // applying cost of processed transactions to shared cost_tracker let mut cost_tracking_time = Measure::start("cost_tracking_time"); - let mut tx_cost = TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS); - { - //let cost_model_readonly = cost_model.read().unwrap(); - //let mut cost_tracker_mutable = cost_tracker.write().unwrap(); - transactions.iter().enumerate().for_each(|(index, tx)| { - if !unprocessed_tx_indexes.iter().any(|&i| i == index) { - cost_model - .read() - .unwrap() - .calculate_cost_no_alloc(tx.transaction(), &mut tx_cost); - cost_tracker.write().unwrap().add_transaction( - &tx_cost.writable_accounts, - &(tx_cost.account_access_cost + tx_cost.execution_cost), - ); - } - }); - } + transactions.iter().enumerate().for_each(|(index, tx)| { + if unprocessed_tx_indexes.iter().all(|&i| i != index) { + cost_tracker + .write() + .unwrap() + .add_transaction_cost(tx.transaction()); + } + }); cost_tracking_time.stop(); let mut filter_pending_packets_time = Measure::start("filter_pending_packets_time"); @@ -1331,7 +1266,6 @@ impl BankingStage { transaction_indexes: &[usize], my_pubkey: &Pubkey, next_leader: Option, - cost_model: &Arc>, cost_tracker: &Arc>, banking_stage_stats: &BankingStageStats, ) -> Vec { @@ -1351,7 +1285,6 @@ impl BankingStage { msgs, transaction_indexes, bank.secp256k1_program_enabled(), - cost_model, cost_tracker, banking_stage_stats, ); @@ -1414,7 +1347,6 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, duplicates: &Arc, PacketHasher)>>, recorder: &TransactionRecorder, - cost_model: &Arc>, cost_tracker: &Arc>, ) -> Result<(), RecvTimeoutError> { let mut recv_time = Measure::start("process_packets_recv"); @@ -1466,7 +1398,6 @@ impl BankingStage { transaction_status_sender.clone(), gossip_vote_sender, banking_stage_stats, - cost_model, cost_tracker, ); @@ -1498,7 +1429,6 @@ impl BankingStage { &packet_indexes, my_pubkey, next_leader, - cost_model, cost_tracker, banking_stage_stats, ); @@ -1637,7 +1567,7 @@ fn next_leader_tpu_forwards( #[cfg(test)] mod tests { use super::*; - use crate::cost_model::{ACCOUNT_MAX_COST, BLOCK_MAX_COST}; + use crate::cost_model::CostModel; use crossbeam_channel::unbounded; use itertools::Itertools; use solana_gossip::cluster_info::Node; @@ -1698,7 +1628,9 @@ mod tests { vote_receiver, None, gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); drop(verified_sender); drop(vote_sender); @@ -1744,7 +1676,9 @@ mod tests { vote_receiver, None, gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); trace!("sending bank"); drop(verified_sender); @@ -1814,7 +1748,9 @@ mod tests { vote_receiver, None, gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), + Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); // fund another account so we can send 2 good transactions in a single batch. @@ -1963,11 +1899,9 @@ mod tests { 2, None, gossip_vote_sender, - &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(RwLock::new(CostTracker::new( - ACCOUNT_MAX_COST, - BLOCK_MAX_COST, - ))), + Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); // wait for banking_stage to eat the packets @@ -2788,11 +2722,9 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(RwLock::new(CostTracker::new( - ACCOUNT_MAX_COST, - BLOCK_MAX_COST, - ))), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); assert_eq!(buffered_packets[0].1.len(), num_conflicting_transactions); // When the poh recorder has a bank, should process all non conflicting buffered packets. @@ -2809,11 +2741,9 @@ mod tests { None::>, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(RwLock::new(CostTracker::new( - ACCOUNT_MAX_COST, - BLOCK_MAX_COST, - ))), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); if num_expected_unprocessed == 0 { assert!(buffered_packets.is_empty()) @@ -2879,11 +2809,9 @@ mod tests { test_fn, &BankingStageStats::default(), &recorder, - &Arc::new(RwLock::new(CostModel::default())), - &Arc::new(RwLock::new(CostTracker::new( - ACCOUNT_MAX_COST, - BLOCK_MAX_COST, - ))), + &Arc::new(RwLock::new(CostTracker::new(Arc::new(RwLock::new( + CostModel::default(), + ))))), ); // Check everything is correct. All indexes after `interrupted_iteration` diff --git a/core/src/cost_model.rs b/core/src/cost_model.rs index 149045f9ae..0f16981ea4 100644 --- a/core/src/cost_model.rs +++ b/core/src/cost_model.rs @@ -5,11 +5,11 @@ //! Instructions take time to execute, both historical and runtime data are //! used to determine each instruction's execution time, the sum of that //! is transaction's "execution cost" -//! The main function is `calculate_cost` which returns a TransactionCost struct. +//! The main function is `calculate_cost` which returns &TransactionCost. //! use crate::execute_cost_table::ExecuteCostTable; use log::*; -use solana_sdk::{message::Message, pubkey::Pubkey, transaction::Transaction}; +use solana_sdk::{pubkey::Pubkey, transaction::Transaction}; use std::collections::HashMap; // Guestimated from mainnet-beta data, sigver averages 1us, average read 7us and average write 25us @@ -26,6 +26,7 @@ const SIGNED_WRITABLE_ACCOUNT_ACCESS_COST: u64 = pub const ACCOUNT_MAX_COST: u64 = 100_000_000; pub const BLOCK_MAX_COST: u64 = 2_500_000_000; +const MAX_WRITABLE_ACCOUNTS: usize = 256; const DEMOTE_SYSVAR_WRITE_LOCKS: bool = true; // cost of transaction is made of account_access_cost and instruction execution_cost @@ -61,6 +62,9 @@ pub struct CostModel { account_cost_limit: u64, block_cost_limit: u64, instruction_execution_cost_table: ExecuteCostTable, + + // reusable variables + transaction_cost: TransactionCost, } impl Default for CostModel { @@ -75,6 +79,7 @@ impl CostModel { account_cost_limit: chain_max, block_cost_limit: block_max, instruction_execution_cost_table: ExecuteCostTable::default(), + transaction_cost: TransactionCost::new_with_capacity(MAX_WRITABLE_ACCOUNTS), } } @@ -86,36 +91,8 @@ impl CostModel { self.block_cost_limit } - pub fn calculate_cost(&self, transaction: &Transaction) -> TransactionCost { - let ( - signed_writable_accounts, - signed_readonly_accounts, - non_signed_writable_accounts, - non_signed_readonly_accounts, - ) = CostModel::sort_accounts_by_type(transaction.message()); - - let mut cost = TransactionCost { - writable_accounts: vec![], - account_access_cost: CostModel::find_account_access_cost( - &signed_writable_accounts, - &signed_readonly_accounts, - &non_signed_writable_accounts, - &non_signed_readonly_accounts, - ), - execution_cost: self.find_transaction_cost(transaction), - }; - cost.writable_accounts.extend(&signed_writable_accounts); - cost.writable_accounts.extend(&non_signed_writable_accounts); - debug!("transaction {:?} has cost {:?}", transaction, cost); - cost - } - - // calculate `transaction` cost, the result is passed back to caller via mutable - // parameter `cost`. Existing content in `cost` will be erased before adding new content - // This is to allow this function to reuse pre-allocated memory, as this function - // is often on hot-path. - pub fn calculate_cost_no_alloc(&self, transaction: &Transaction, cost: &mut TransactionCost) { - cost.reset(); + pub fn calculate_cost(&mut self, transaction: &Transaction) -> &TransactionCost { + self.transaction_cost.reset(); let message = transaction.message(); message.account_keys.iter().enumerate().for_each(|(i, k)| { @@ -123,19 +100,25 @@ impl CostModel { let is_writable = message.is_writable(i, DEMOTE_SYSVAR_WRITE_LOCKS); if is_signer && is_writable { - cost.writable_accounts.push(*k); - cost.account_access_cost += SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; + self.transaction_cost.writable_accounts.push(*k); + self.transaction_cost.account_access_cost += SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; } else if is_signer && !is_writable { - cost.account_access_cost += SIGNED_READONLY_ACCOUNT_ACCESS_COST; + self.transaction_cost.account_access_cost += SIGNED_READONLY_ACCOUNT_ACCESS_COST; } else if !is_signer && is_writable { - cost.writable_accounts.push(*k); - cost.account_access_cost += NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; + self.transaction_cost.writable_accounts.push(*k); + self.transaction_cost.account_access_cost += + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; } else { - cost.account_access_cost += NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; + self.transaction_cost.account_access_cost += + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; } }); - cost.execution_cost = self.find_transaction_cost(transaction); - debug!("transaction {:?} has cost {:?}", transaction, cost); + self.transaction_cost.execution_cost = self.find_transaction_cost(transaction); + debug!( + "transaction {:?} has cost {:?}", + transaction, self.transaction_cost + ); + &self.transaction_cost } // To update or insert instruction cost to table. @@ -186,50 +169,6 @@ impl CostModel { } cost } - - fn find_account_access_cost( - signed_writable_accounts: &[Pubkey], - signed_readonly_accounts: &[Pubkey], - non_signed_writable_accounts: &[Pubkey], - non_signed_readonly_accounts: &[Pubkey], - ) -> u64 { - let mut cost = 0; - cost += signed_writable_accounts.len() as u64 * SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; - cost += signed_readonly_accounts.len() as u64 * SIGNED_READONLY_ACCOUNT_ACCESS_COST; - cost += non_signed_writable_accounts.len() as u64 * NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST; - cost += non_signed_readonly_accounts.len() as u64 * NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; - cost - } - - fn sort_accounts_by_type( - message: &Message, - ) -> (Vec, Vec, Vec, Vec) { - let demote_sysvar_write_locks = true; - let mut signer_writable: Vec = vec![]; - let mut signer_readonly: Vec = vec![]; - let mut non_signer_writable: Vec = vec![]; - let mut non_signer_readonly: Vec = vec![]; - message.account_keys.iter().enumerate().for_each(|(i, k)| { - let is_signer = message.is_signer(i); - let is_writable = message.is_writable(i, demote_sysvar_write_locks); - - if is_signer && is_writable { - signer_writable.push(*k); - } else if is_signer && !is_writable { - signer_readonly.push(*k); - } else if !is_signer && is_writable { - non_signer_writable.push(*k); - } else { - non_signer_readonly.push(*k); - } - }); - ( - signer_writable, - signer_readonly, - non_signer_writable, - non_signer_readonly, - ) - } } #[cfg(test)] @@ -387,25 +326,14 @@ mod tests { vec![prog1, prog2], instructions, ); - debug!("many random transaction {:?}", tx); - let ( - signed_writable_accounts, - signed_readonly_accounts, - non_signed_writable_accounts, - non_signed_readonly_accounts, - ) = CostModel::sort_accounts_by_type(tx.message()); - - assert_eq!(2, signed_writable_accounts.len()); - assert_eq!(signer1.pubkey(), signed_writable_accounts[0]); - assert_eq!(signer2.pubkey(), signed_writable_accounts[1]); - assert_eq!(0, signed_readonly_accounts.len()); - assert_eq!(2, non_signed_writable_accounts.len()); - assert_eq!(key1, non_signed_writable_accounts[0]); - assert_eq!(key2, non_signed_writable_accounts[1]); - assert_eq!(2, non_signed_readonly_accounts.len()); - assert_eq!(prog1, non_signed_readonly_accounts[0]); - assert_eq!(prog2, non_signed_readonly_accounts[1]); + let mut cost_model = CostModel::default(); + let tx_cost = cost_model.calculate_cost(&tx); + assert_eq!(2 + 2, tx_cost.writable_accounts.len()); + assert_eq!(signer1.pubkey(), tx_cost.writable_accounts[0]); + assert_eq!(signer2.pubkey(), tx_cost.writable_accounts[1]); + assert_eq!(key1, tx_cost.writable_accounts[2]); + assert_eq!(key2, tx_cost.writable_accounts[3]); } #[test] @@ -448,33 +376,6 @@ mod tests { assert_eq!(2, tx_cost.writable_accounts.len()); } - #[test] - fn test_cost_model_calculate_cost_no_alloc() { - let (mint_keypair, start_hash) = test_setup(); - let tx = - system_transaction::transfer(&mint_keypair, &Keypair::new().pubkey(), 2, start_hash); - - let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST - + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST - + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; - let expected_execution_cost = 8; - - let mut cost_model = CostModel::default(); - cost_model - .upsert_instruction_cost(&system_program::id(), &expected_execution_cost) - .unwrap(); - - // allocate cost, set some random number - let mut tx_cost = TransactionCost::new_with_capacity(8); - tx_cost.execution_cost = 101; - tx_cost.writable_accounts.push(Pubkey::new_unique()); - - cost_model.calculate_cost_no_alloc(&tx, &mut tx_cost); - assert_eq!(expected_account_cost, tx_cost.account_access_cost); - assert_eq!(expected_execution_cost, tx_cost.execution_cost); - assert_eq!(2, tx_cost.writable_accounts.len()); - } - #[test] fn test_cost_model_update_instruction_cost() { let key1 = Pubkey::new_unique(); @@ -493,43 +394,6 @@ mod tests { assert_eq!(updated_cost, cost_model.find_instruction_cost(&key1)); } - #[test] - fn test_cost_model_can_be_shared_concurrently_as_immutable() { - let (mint_keypair, start_hash) = test_setup(); - let number_threads = 10; - let expected_account_cost = SIGNED_WRITABLE_ACCOUNT_ACCESS_COST - + NON_SIGNED_WRITABLE_ACCOUNT_ACCESS_COST - + NON_SIGNED_READONLY_ACCOUNT_ACCESS_COST; - - let cost_model = Arc::new(CostModel::default()); - - let thread_handlers: Vec> = (0..number_threads) - .map(|_| { - // each thread creates its own simple transaction - let simple_transaction = system_transaction::transfer( - &mint_keypair, - &Keypair::new().pubkey(), - 2, - start_hash, - ); - let cost_model = cost_model.clone(); - thread::spawn(move || { - let tx_cost = cost_model.calculate_cost(&simple_transaction); - assert_eq!(2, tx_cost.writable_accounts.len()); - assert_eq!(expected_account_cost, tx_cost.account_access_cost); - assert_eq!( - cost_model.instruction_execution_cost_table.get_mode(), - tx_cost.execution_cost - ); - }) - }) - .collect(); - - for th in thread_handlers { - th.join().unwrap(); - } - } - #[test] fn test_cost_model_can_be_shared_concurrently_with_rwlock() { let (mint_keypair, start_hash) = test_setup(); @@ -573,7 +437,8 @@ mod tests { }) } else { thread::spawn(move || { - let tx_cost = cost_model.read().unwrap().calculate_cost(&tx); + let mut cost_model = cost_model.write().unwrap(); + let tx_cost = cost_model.calculate_cost(&tx); assert_eq!(3, tx_cost.writable_accounts.len()); assert_eq!(expected_account_cost, tx_cost.account_access_cost); }) diff --git a/core/src/cost_tracker.rs b/core/src/cost_tracker.rs index fd434e4e89..782c920465 100644 --- a/core/src/cost_tracker.rs +++ b/core/src/cost_tracker.rs @@ -1,14 +1,21 @@ //! `cost_tracker` keeps tracking tranasction cost per chained accounts as well as for entire block -//! The main entry function is 'try_add', if success, it returns new block cost. +//! It aggregates `cost_model`, which provides service of calculating transaction cost. +//! The main functions are: +//! - would_transaction_fit(&tx), immutable function to test if `tx` would fit into current block +//! - add_transaction_cost(&tx), mutable function to accumulate `tx` cost to tracker. //! -use crate::cost_model::TransactionCost; -use solana_sdk::{clock::Slot, pubkey::Pubkey}; -use std::collections::HashMap; +use crate::cost_model::{CostModel, TransactionCost}; +use solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::Transaction}; +use std::{ + collections::HashMap, + sync::{Arc, RwLock}, +}; const WRITABLE_ACCOUNTS_PER_BLOCK: usize = 512; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct CostTracker { + cost_model: Arc>, account_cost_limit: u64, block_cost_limit: u64, current_bank_slot: Slot, @@ -17,17 +24,47 @@ pub struct CostTracker { } impl CostTracker { - pub fn new(chain_max: u64, package_max: u64) -> Self { - assert!(chain_max <= package_max); + pub fn new(cost_model: Arc>) -> Self { + let (account_cost_limit, block_cost_limit) = { + let cost_model = cost_model.read().unwrap(); + ( + cost_model.get_account_cost_limit(), + cost_model.get_block_cost_limit(), + ) + }; + assert!(account_cost_limit <= block_cost_limit); Self { - account_cost_limit: chain_max, - block_cost_limit: package_max, + cost_model, + account_cost_limit, + block_cost_limit, current_bank_slot: 0, cost_by_writable_accounts: HashMap::with_capacity(WRITABLE_ACCOUNTS_PER_BLOCK), block_cost: 0, } } + pub fn would_transaction_fit(&self, transaction: &Transaction) -> Result<(), &'static str> { + let mut cost_model = self.cost_model.write().unwrap(); + let tx_cost = cost_model.calculate_cost(transaction); + self.would_fit( + &tx_cost.writable_accounts, + &(tx_cost.account_access_cost + tx_cost.execution_cost), + ) + } + + pub fn add_transaction_cost(&mut self, transaction: &Transaction) { + let mut cost_model = self.cost_model.write().unwrap(); + let tx_cost = cost_model.calculate_cost(transaction); + let cost = tx_cost.account_access_cost + tx_cost.execution_cost; + for account_key in tx_cost.writable_accounts.iter() { + *self + .cost_by_writable_accounts + .entry(*account_key) + .or_insert(0) += cost; + } + self.block_cost += cost; + } + pub fn reset_if_new_bank(&mut self, slot: Slot) { if slot != self.current_bank_slot { self.current_bank_slot = slot; @@ -36,7 +73,7 @@ impl CostTracker { } } - pub fn try_add(&mut self, transaction_cost: TransactionCost) -> Result { + pub fn try_add(&mut self, transaction_cost: &TransactionCost) -> Result { let cost = transaction_cost.account_access_cost + transaction_cost.execution_cost; self.would_fit(&transaction_cost.writable_accounts, &cost)?; @@ -44,7 +81,7 @@ impl CostTracker { Ok(self.block_cost) } - pub fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> { + fn would_fit(&self, keys: &[Pubkey], cost: &u64) -> Result<(), &'static str> { // check against the total package cost if self.block_cost + cost > self.block_cost_limit { return Err("would exceed block cost limit"); @@ -72,7 +109,7 @@ impl CostTracker { Ok(()) } - pub fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) { + fn add_transaction(&mut self, keys: &[Pubkey], cost: &u64) { for account_key in keys.iter() { *self .cost_by_writable_accounts @@ -86,6 +123,7 @@ impl CostTracker { // CostStats can be collected by util, such as ledger_tool #[derive(Default, Debug)] pub struct CostStats { + pub bank_slot: Slot, pub total_cost: u64, pub number_of_accounts: usize, pub costliest_account: Pubkey, @@ -95,6 +133,7 @@ pub struct CostStats { impl CostTracker { pub fn get_stats(&self) -> CostStats { let mut stats = CostStats { + bank_slot: self.current_bank_slot, total_cost: self.block_cost, number_of_accounts: self.cost_by_writable_accounts.len(), costliest_account: Pubkey::default(), @@ -152,7 +191,7 @@ mod tests { #[test] fn test_cost_tracker_initialization() { - let testee = CostTracker::new(10, 11); + let testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(10, 11)))); assert_eq!(10, testee.account_cost_limit); assert_eq!(11, testee.block_cost_limit); assert_eq!(0, testee.cost_by_writable_accounts.len()); @@ -165,7 +204,7 @@ mod tests { let (_tx, keys, cost) = build_simple_transaction(&mint_keypair, &start_hash); // build testee to have capacity for one simple transaction - let mut testee = CostTracker::new(cost, cost); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new(cost, cost)))); assert!(testee.would_fit(&keys, &cost).is_ok()); testee.add_transaction(&keys, &cost); assert_eq!(cost, testee.block_cost); @@ -179,7 +218,10 @@ mod tests { let (_tx2, keys2, cost2) = build_simple_transaction(&mint_keypair, &start_hash); // build testee to have capacity for two simple transactions, with same accounts - let mut testee = CostTracker::new(cost1 + cost2, cost1 + cost2); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new( + cost1 + cost2, + cost1 + cost2, + )))); { assert!(testee.would_fit(&keys1, &cost1).is_ok()); testee.add_transaction(&keys1, &cost1); @@ -201,7 +243,10 @@ mod tests { let (_tx2, keys2, cost2) = build_simple_transaction(&second_account, &start_hash); // build testee to have capacity for two simple transactions, with same accounts - let mut testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new( + cmp::max(cost1, cost2), + cost1 + cost2, + )))); { assert!(testee.would_fit(&keys1, &cost1).is_ok()); testee.add_transaction(&keys1, &cost1); @@ -222,7 +267,10 @@ mod tests { let (_tx2, keys2, cost2) = build_simple_transaction(&mint_keypair, &start_hash); // build testee to have capacity for two simple transactions, but not for same accounts - let mut testee = CostTracker::new(cmp::min(cost1, cost2), cost1 + cost2); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new( + cmp::min(cost1, cost2), + cost1 + cost2, + )))); // should have room for first transaction { assert!(testee.would_fit(&keys1, &cost1).is_ok()); @@ -243,7 +291,10 @@ mod tests { let (_tx2, keys2, cost2) = build_simple_transaction(&second_account, &start_hash); // build testee to have capacity for each chain, but not enough room for both transactions - let mut testee = CostTracker::new(cmp::max(cost1, cost2), cost1 + cost2 - 1); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new( + cmp::max(cost1, cost2), + cost1 + cost2 - 1, + )))); // should have room for first transaction { assert!(testee.would_fit(&keys1, &cost1).is_ok()); @@ -263,7 +314,10 @@ mod tests { let (_tx2, keys2, cost2) = build_simple_transaction(&mint_keypair, &start_hash); // build testee to have capacity for two simple transactions, but not for same accounts - let mut testee = CostTracker::new(cmp::min(cost1, cost2), cost1 + cost2); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new( + cmp::min(cost1, cost2), + cost1 + cost2, + )))); // should have room for first transaction { assert!(testee.would_fit(&keys1, &cost1).is_ok()); @@ -296,7 +350,10 @@ mod tests { let account_max = cost * 2; let block_max = account_max * 3; // for three accts - let mut testee = CostTracker::new(account_max, block_max); + let mut testee = CostTracker::new(Arc::new(RwLock::new(CostModel::new( + account_max, + block_max, + )))); // case 1: a tx writes to 3 accounts, should success, we will have: // | acct1 | $cost | @@ -309,7 +366,7 @@ mod tests { account_access_cost: 0, execution_cost: cost, }; - assert!(testee.try_add(tx_cost).is_ok()); + assert!(testee.try_add(&tx_cost).is_ok()); let stat = testee.get_stats(); assert_eq!(cost, stat.total_cost); assert_eq!(3, stat.number_of_accounts); @@ -327,7 +384,7 @@ mod tests { account_access_cost: 0, execution_cost: cost, }; - assert!(testee.try_add(tx_cost).is_ok()); + assert!(testee.try_add(&tx_cost).is_ok()); let stat = testee.get_stats(); assert_eq!(cost * 2, stat.total_cost); assert_eq!(3, stat.number_of_accounts); @@ -347,7 +404,7 @@ mod tests { account_access_cost: 0, execution_cost: cost, }; - assert!(testee.try_add(tx_cost).is_err()); + assert!(testee.try_add(&tx_cost).is_err()); let stat = testee.get_stats(); assert_eq!(cost * 2, stat.total_cost); assert_eq!(3, stat.number_of_accounts); diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 73fb624cbf..0e741edb9b 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -9,6 +9,7 @@ use crate::{ VerifiedVoteSender, VoteTracker, }, cost_model::CostModel, + cost_tracker::CostTracker, fetch_stage::FetchStage, sigverify::TransactionSigVerifier, sigverify_stage::SigVerifyStage, @@ -105,6 +106,7 @@ impl Tpu { cluster_confirmed_slot_sender, ); + let cost_tracker = Arc::new(RwLock::new(CostTracker::new(cost_model.clone()))); let banking_stage = BankingStage::new( cluster_info, poh_recorder, @@ -112,7 +114,7 @@ impl Tpu { verified_vote_packets_receiver, transaction_status_sender, replay_vote_sender, - cost_model, + cost_tracker, ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 65fc779d8e..1ff3b16f92 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -60,7 +60,7 @@ use std::{ path::{Path, PathBuf}, process::{exit, Command, Stdio}, str::FromStr, - sync::Arc, + sync::{Arc, RwLock}, }; mod bigtable; @@ -737,14 +737,15 @@ fn compute_slot_cost(blockstore: &Blockstore, slot: Slot) -> Result<(), String> let mut transactions = 0; let mut programs = 0; let mut program_ids = HashMap::new(); - let cost_model = CostModel::new(ACCOUNT_MAX_COST, BLOCK_MAX_COST); - let mut cost_tracker = CostTracker::new( - cost_model.get_account_cost_limit(), - cost_model.get_block_cost_limit(), - ); + let cost_model = Arc::new(RwLock::new(CostModel::new( + ACCOUNT_MAX_COST, + BLOCK_MAX_COST, + ))); + let mut cost_tracker = CostTracker::new(cost_model.clone()); for entry in &entries { transactions += entry.transactions.len(); + let mut cost_model = cost_model.write().unwrap(); for transaction in &entry.transactions { programs += transaction.message().instructions.len(); let tx_cost = cost_model.calculate_cost(transaction);