From 924b8ea1eb4a9a59de64414d24e85fe0345685fe Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Fri, 8 Apr 2022 14:22:31 +0200 Subject: [PATCH] Adjustments to cost_tracker updates - don't store pending tx signatures and costs in CostTracker - apply tx costs to global state immediately again - go from commit_or_cancel to update_or_remove, where the cost tracker is either updated with the true costs for successful tx, or the costs of a retryable tx is removed - move the function into qos_service and hold the cost tracker lock for the whole loop --- core/src/banking_stage.rs | 76 +++++++++++-------------------- core/src/qos_service.rs | 36 +++++++++------ runtime/src/cost_tracker.rs | 91 +++++++++++++++---------------------- 3 files changed, 86 insertions(+), 117 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index b41115d7d4..5577f41e41 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -1351,31 +1351,34 @@ impl BankingStage { gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, ) -> ProcessTransactionBatchOutput { - let ((transactions_qos_results, cost_model_throttled_transactions_count), cost_model_time) = - Measure::this( - |_| { - let tx_costs = qos_service.compute_transaction_costs(txs.iter()); + let ( + (transactions_qos_results, cost_model_throttled_transactions_count, transaction_costs), + cost_model_time, + ) = Measure::this( + |_| { + let tx_costs = qos_service.compute_transaction_costs(txs.iter()); - let (transactions_qos_results, num_included) = - qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); + let (transactions_qos_results, num_included) = + qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); - let cost_model_throttled_transactions_count = - txs.len().saturating_sub(num_included); + let cost_model_throttled_transactions_count = + txs.len().saturating_sub(num_included); - qos_service.accumulate_estimated_transaction_costs( - &Self::accumulate_batched_transaction_costs( - tx_costs.iter(), - transactions_qos_results.iter(), - ), - ); - ( - transactions_qos_results, - cost_model_throttled_transactions_count, - ) - }, - (), - "cost_model", - ); + qos_service.accumulate_estimated_transaction_costs( + &Self::accumulate_batched_transaction_costs( + tx_costs.iter(), + transactions_qos_results.iter(), + ), + ); + ( + transactions_qos_results, + cost_model_throttled_transactions_count, + tx_costs, + ) + }, + (), + "cost_model", + ); // Only lock accounts for those transactions are selected for the block; // Once accounts are locked, other threads cannot encode transactions that will modify the @@ -1407,11 +1410,10 @@ impl BankingStage { .. } = execute_and_commit_transactions_output; - Self::commit_or_cancel_transaction_cost( - txs.iter(), + QosService::update_or_remove_transaction_costs( + transaction_costs.iter(), transactions_qos_results.iter(), retryable_transaction_indexes, - qos_service, bank, ); @@ -1442,30 +1444,6 @@ impl BankingStage { } } - /// To commit transaction cost to cost_tracker if it was executed successfully; - /// Otherwise cancel it from being committed, therefore prevents cost_tracker - /// being inflated with unsuccessfully executed transactions. - fn commit_or_cancel_transaction_cost<'a>( - transactions: impl Iterator, - transaction_results: impl Iterator>, - retryable_transaction_indexes: &[usize], - qos_service: &QosService, - bank: &Arc, - ) { - transactions - .zip(transaction_results) - .enumerate() - .for_each(|(index, (tx, result))| { - if result.is_ok() && retryable_transaction_indexes.contains(&index) { - qos_service.cancel_transaction_cost(bank, tx); - } else { - // TODO the 3rd param is for transaction's actual units. Will have - // to plumb it in next; For now, it simply commit estimated units. - qos_service.commit_transaction_cost(bank, tx, None); - } - }); - } - // rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and // execution_cost from the batch of transactions selected for block. fn accumulate_batched_transaction_costs<'a>( diff --git a/core/src/qos_service.rs b/core/src/qos_service.rs index 2908000e1e..f83862284e 100644 --- a/core/src/qos_service.rs +++ b/core/src/qos_service.rs @@ -170,21 +170,31 @@ impl QosService { (select_results, num_included) } - pub fn commit_transaction_cost( - &self, + /// Update the transaction cost in the cost_tracker with the real cost for + /// transactions that were executed successfully; + /// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker + /// being inflated with unsuccessfully executed transactions. + pub fn update_or_remove_transaction_costs<'a>( + transaction_costs: impl Iterator, + transaction_qos_results: impl Iterator>, + retryable_transaction_indexes: &[usize], bank: &Arc, - transaction: &SanitizedTransaction, - actual_units: Option, ) { - bank.write_cost_tracker() - .unwrap() - .commit_transaction(transaction, actual_units); - } - - pub fn cancel_transaction_cost(&self, bank: &Arc, transaction: &SanitizedTransaction) { - bank.write_cost_tracker() - .unwrap() - .cancel_transaction(transaction); + let mut cost_tracker = bank.write_cost_tracker().unwrap(); + transaction_costs + .zip(transaction_qos_results) + .enumerate() + .for_each(|(index, (tx_cost, qos_result))| { + if qos_result.is_ok() && retryable_transaction_indexes.contains(&index) { + cost_tracker.remove(tx_cost); + } else { + // TODO: Update the cost tracker with the actual execution compute units. + // Will have to plumb it in next; For now, keep estimated costs. + // + // let actual_execution_cost = 0; + // cost_tracker.update_execution_cost(tx_cost, actual_execution_cost); + } + }); } // metrics are reported by bank slot diff --git a/runtime/src/cost_tracker.rs b/runtime/src/cost_tracker.rs index 08c7bdf77d..64df73e513 100644 --- a/runtime/src/cost_tracker.rs +++ b/runtime/src/cost_tracker.rs @@ -5,9 +5,7 @@ //! use { crate::{block_cost_limits::*, cost_model::TransactionCost}, - solana_sdk::{ - clock::Slot, pubkey::Pubkey, signature::Signature, transaction::SanitizedTransaction, - }, + solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction}, std::collections::HashMap, }; @@ -45,12 +43,6 @@ pub struct CostTracker { /// The amount of total account data size remaining. If `Some`, then do not add transactions /// that would cause `account_data_size` to exceed this limit. account_data_size_limit: Option, - - // Transactions have passed would_fit check, is being executed. - // If the execution is successful, it's actual Units can be committed - // to cost_tracker; otherwise, it should be removed without impacting - // cost_tracker. - pending_transactions: HashMap, } impl Default for CostTracker { @@ -71,7 +63,6 @@ impl Default for CostTracker { transaction_count: 0, account_data_size: 0, account_data_size_limit: None, - pending_transactions: HashMap::new(), } } } @@ -100,31 +91,24 @@ impl CostTracker { pub fn try_add( &mut self, - transaction: &SanitizedTransaction, + _transaction: &SanitizedTransaction, tx_cost: &TransactionCost, ) -> Result { self.would_fit(tx_cost)?; - self.pending_transactions - .insert(*transaction.signature(), tx_cost.clone()); + self.add_transaction_cost(tx_cost); Ok(self.block_cost) } - pub fn commit_transaction( + pub fn update_execution_cost( &mut self, - transaction: &SanitizedTransaction, - actual_units: Option, + _estimated_tx_cost: &TransactionCost, + _actual_execution_cost: u64, ) { - if let Some(mut tx_cost) = self.pending_transactions.remove(transaction.signature()) { - if let Some(actual_units) = actual_units { - // using actual units to update cost tracker if available - tx_cost.execution_cost = actual_units; - } - self.add_transaction(&tx_cost); - } + // adjust block_cost / vote_cost / account_cost by (actual_execution_cost - execution_cost) } - pub fn cancel_transaction(&mut self, transaction: &SanitizedTransaction) { - self.pending_transactions.remove(transaction.signature()); + pub fn remove(&mut self, tx_cost: &TransactionCost) { + self.remove_transaction_cost(tx_cost); } pub fn report_stats(&self, bank_slot: Slot) { @@ -166,34 +150,10 @@ impl CostTracker { } fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> { - let mut writable_account = vec![]; - writable_account.extend(&tx_cost.writable_accounts); - let mut cost = tx_cost.sum(); - let mut account_data_size = tx_cost.account_data_size; - let mut vote_cost = if tx_cost.is_simple_vote { cost } else { 0 }; + let writable_accounts = &tx_cost.writable_accounts; + let cost = tx_cost.sum(); + let vote_cost = if tx_cost.is_simple_vote { cost } else { 0 }; - for tx_cost in self.pending_transactions.values() { - writable_account.extend(&tx_cost.writable_accounts); - cost = cost.saturating_add(tx_cost.sum()); - account_data_size = account_data_size.saturating_add(tx_cost.account_data_size); - vote_cost = vote_cost.saturating_add(if tx_cost.is_simple_vote { cost } else { 0 }); - } - - self.would_aggregated_transactions_fit( - &writable_account, - cost, - account_data_size, - vote_cost, - ) - } - - fn would_aggregated_transactions_fit( - &self, - keys: &[Pubkey], - cost: u64, - account_data_len: u64, - vote_cost: u64, - ) -> Result<(), CostTrackerError> { // check against the total package cost if self.block_cost.saturating_add(cost) > self.block_cost_limit { return Err(CostTrackerError::WouldExceedBlockMaxLimit); @@ -211,7 +171,9 @@ impl CostTracker { // NOTE: Check if the total accounts data size is exceeded *before* the block accounts data // size. This way, transactions are not unnecessarily retried. - let account_data_size = self.account_data_size.saturating_add(account_data_len); + let account_data_size = self + .account_data_size + .saturating_add(tx_cost.account_data_size); if let Some(account_data_size_limit) = self.account_data_size_limit { if account_data_size > account_data_size_limit { return Err(CostTrackerError::WouldExceedAccountDataTotalLimit); @@ -223,7 +185,7 @@ impl CostTracker { } // check each account against account_cost_limit, - for account_key in keys.iter() { + for account_key in writable_accounts.iter() { match self.cost_by_writable_accounts.get(account_key) { Some(chained_cost) => { if chained_cost.saturating_add(cost) > self.account_cost_limit { @@ -239,7 +201,7 @@ impl CostTracker { Ok(()) } - fn add_transaction(&mut self, tx_cost: &TransactionCost) { + fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) { let cost = tx_cost.sum(); for account_key in tx_cost.writable_accounts.iter() { let account_cost = self @@ -257,6 +219,25 @@ impl CostTracker { .saturating_add(tx_cost.account_data_size); self.transaction_count = self.transaction_count.saturating_add(1); } + + fn remove_transaction_cost(&mut self, tx_cost: &TransactionCost) { + let cost = tx_cost.sum(); + for account_key in tx_cost.writable_accounts.iter() { + let account_cost = self + .cost_by_writable_accounts + .entry(*account_key) + .or_insert(0); + *account_cost = account_cost.saturating_sub(cost); + } + self.block_cost = self.block_cost.saturating_sub(cost); + if tx_cost.is_simple_vote { + self.vote_cost = self.vote_cost.saturating_sub(cost); + } + self.account_data_size = self + .account_data_size + .saturating_sub(tx_cost.account_data_size); + self.transaction_count = self.transaction_count.saturating_sub(1); + } } #[cfg(test)]