Adjustments to cost_tracker updates

- don't store pending tx signatures and costs in CostTracker
- apply tx costs to global state immediately again
- go from commit_or_cancel to update_or_remove, where the cost tracker
  is either updated with the true costs for successful tx, or the costs
  of a retryable tx is removed
- move the function into qos_service and hold the cost tracker lock for
  the whole loop
This commit is contained in:
Christian Kamm 2022-04-08 14:22:31 +02:00 committed by Tao Zhu
parent 9e07272af8
commit 924b8ea1eb
3 changed files with 86 additions and 117 deletions

View File

@ -1351,31 +1351,34 @@ impl BankingStage {
gossip_vote_sender: &ReplayVoteSender, gossip_vote_sender: &ReplayVoteSender,
qos_service: &QosService, qos_service: &QosService,
) -> ProcessTransactionBatchOutput { ) -> ProcessTransactionBatchOutput {
let ((transactions_qos_results, cost_model_throttled_transactions_count), cost_model_time) = let (
Measure::this( (transactions_qos_results, cost_model_throttled_transactions_count, transaction_costs),
|_| { cost_model_time,
let tx_costs = qos_service.compute_transaction_costs(txs.iter()); ) = Measure::this(
|_| {
let tx_costs = qos_service.compute_transaction_costs(txs.iter());
let (transactions_qos_results, num_included) = let (transactions_qos_results, num_included) =
qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank); qos_service.select_transactions_per_cost(txs.iter(), tx_costs.iter(), bank);
let cost_model_throttled_transactions_count = let cost_model_throttled_transactions_count =
txs.len().saturating_sub(num_included); txs.len().saturating_sub(num_included);
qos_service.accumulate_estimated_transaction_costs( qos_service.accumulate_estimated_transaction_costs(
&Self::accumulate_batched_transaction_costs( &Self::accumulate_batched_transaction_costs(
tx_costs.iter(), tx_costs.iter(),
transactions_qos_results.iter(), transactions_qos_results.iter(),
), ),
); );
( (
transactions_qos_results, transactions_qos_results,
cost_model_throttled_transactions_count, cost_model_throttled_transactions_count,
) tx_costs,
}, )
(), },
"cost_model", (),
); "cost_model",
);
// Only lock accounts for those transactions are selected for the block; // Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the // Once accounts are locked, other threads cannot encode transactions that will modify the
@ -1407,11 +1410,10 @@ impl BankingStage {
.. ..
} = execute_and_commit_transactions_output; } = execute_and_commit_transactions_output;
Self::commit_or_cancel_transaction_cost( QosService::update_or_remove_transaction_costs(
txs.iter(), transaction_costs.iter(),
transactions_qos_results.iter(), transactions_qos_results.iter(),
retryable_transaction_indexes, retryable_transaction_indexes,
qos_service,
bank, bank,
); );
@ -1442,30 +1444,6 @@ impl BankingStage {
} }
} }
/// To commit transaction cost to cost_tracker if it was executed successfully;
/// Otherwise cancel it from being committed, therefore prevents cost_tracker
/// being inflated with unsuccessfully executed transactions.
fn commit_or_cancel_transaction_cost<'a>(
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
retryable_transaction_indexes: &[usize],
qos_service: &QosService,
bank: &Arc<Bank>,
) {
transactions
.zip(transaction_results)
.enumerate()
.for_each(|(index, (tx, result))| {
if result.is_ok() && retryable_transaction_indexes.contains(&index) {
qos_service.cancel_transaction_cost(bank, tx);
} else {
// TODO the 3rd param is for transaction's actual units. Will have
// to plumb it in next; For now, it simply commit estimated units.
qos_service.commit_transaction_cost(bank, tx, None);
}
});
}
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and // rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
// execution_cost from the batch of transactions selected for block. // execution_cost from the batch of transactions selected for block.
fn accumulate_batched_transaction_costs<'a>( fn accumulate_batched_transaction_costs<'a>(

View File

@ -170,21 +170,31 @@ impl QosService {
(select_results, num_included) (select_results, num_included)
} }
pub fn commit_transaction_cost( /// Update the transaction cost in the cost_tracker with the real cost for
&self, /// transactions that were executed successfully;
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
/// being inflated with unsuccessfully executed transactions.
pub fn update_or_remove_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
retryable_transaction_indexes: &[usize],
bank: &Arc<Bank>, bank: &Arc<Bank>,
transaction: &SanitizedTransaction,
actual_units: Option<u64>,
) { ) {
bank.write_cost_tracker() let mut cost_tracker = bank.write_cost_tracker().unwrap();
.unwrap() transaction_costs
.commit_transaction(transaction, actual_units); .zip(transaction_qos_results)
} .enumerate()
.for_each(|(index, (tx_cost, qos_result))| {
pub fn cancel_transaction_cost(&self, bank: &Arc<Bank>, transaction: &SanitizedTransaction) { if qos_result.is_ok() && retryable_transaction_indexes.contains(&index) {
bank.write_cost_tracker() cost_tracker.remove(tx_cost);
.unwrap() } else {
.cancel_transaction(transaction); // TODO: Update the cost tracker with the actual execution compute units.
// Will have to plumb it in next; For now, keep estimated costs.
//
// let actual_execution_cost = 0;
// cost_tracker.update_execution_cost(tx_cost, actual_execution_cost);
}
});
} }
// metrics are reported by bank slot // metrics are reported by bank slot

View File

@ -5,9 +5,7 @@
//! //!
use { use {
crate::{block_cost_limits::*, cost_model::TransactionCost}, crate::{block_cost_limits::*, cost_model::TransactionCost},
solana_sdk::{ solana_sdk::{clock::Slot, pubkey::Pubkey, transaction::SanitizedTransaction},
clock::Slot, pubkey::Pubkey, signature::Signature, transaction::SanitizedTransaction,
},
std::collections::HashMap, std::collections::HashMap,
}; };
@ -45,12 +43,6 @@ pub struct CostTracker {
/// The amount of total account data size remaining. If `Some`, then do not add transactions /// The amount of total account data size remaining. If `Some`, then do not add transactions
/// that would cause `account_data_size` to exceed this limit. /// that would cause `account_data_size` to exceed this limit.
account_data_size_limit: Option<u64>, account_data_size_limit: Option<u64>,
// Transactions have passed would_fit check, is being executed.
// If the execution is successful, it's actual Units can be committed
// to cost_tracker; otherwise, it should be removed without impacting
// cost_tracker.
pending_transactions: HashMap<Signature, TransactionCost>,
} }
impl Default for CostTracker { impl Default for CostTracker {
@ -71,7 +63,6 @@ impl Default for CostTracker {
transaction_count: 0, transaction_count: 0,
account_data_size: 0, account_data_size: 0,
account_data_size_limit: None, account_data_size_limit: None,
pending_transactions: HashMap::new(),
} }
} }
} }
@ -100,31 +91,24 @@ impl CostTracker {
pub fn try_add( pub fn try_add(
&mut self, &mut self,
transaction: &SanitizedTransaction, _transaction: &SanitizedTransaction,
tx_cost: &TransactionCost, tx_cost: &TransactionCost,
) -> Result<u64, CostTrackerError> { ) -> Result<u64, CostTrackerError> {
self.would_fit(tx_cost)?; self.would_fit(tx_cost)?;
self.pending_transactions self.add_transaction_cost(tx_cost);
.insert(*transaction.signature(), tx_cost.clone());
Ok(self.block_cost) Ok(self.block_cost)
} }
pub fn commit_transaction( pub fn update_execution_cost(
&mut self, &mut self,
transaction: &SanitizedTransaction, _estimated_tx_cost: &TransactionCost,
actual_units: Option<u64>, _actual_execution_cost: u64,
) { ) {
if let Some(mut tx_cost) = self.pending_transactions.remove(transaction.signature()) { // adjust block_cost / vote_cost / account_cost by (actual_execution_cost - execution_cost)
if let Some(actual_units) = actual_units {
// using actual units to update cost tracker if available
tx_cost.execution_cost = actual_units;
}
self.add_transaction(&tx_cost);
}
} }
pub fn cancel_transaction(&mut self, transaction: &SanitizedTransaction) { pub fn remove(&mut self, tx_cost: &TransactionCost) {
self.pending_transactions.remove(transaction.signature()); self.remove_transaction_cost(tx_cost);
} }
pub fn report_stats(&self, bank_slot: Slot) { pub fn report_stats(&self, bank_slot: Slot) {
@ -166,34 +150,10 @@ impl CostTracker {
} }
fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> { fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> {
let mut writable_account = vec![]; let writable_accounts = &tx_cost.writable_accounts;
writable_account.extend(&tx_cost.writable_accounts); let cost = tx_cost.sum();
let mut cost = tx_cost.sum(); let vote_cost = if tx_cost.is_simple_vote { cost } else { 0 };
let mut account_data_size = tx_cost.account_data_size;
let mut vote_cost = if tx_cost.is_simple_vote { cost } else { 0 };
for tx_cost in self.pending_transactions.values() {
writable_account.extend(&tx_cost.writable_accounts);
cost = cost.saturating_add(tx_cost.sum());
account_data_size = account_data_size.saturating_add(tx_cost.account_data_size);
vote_cost = vote_cost.saturating_add(if tx_cost.is_simple_vote { cost } else { 0 });
}
self.would_aggregated_transactions_fit(
&writable_account,
cost,
account_data_size,
vote_cost,
)
}
fn would_aggregated_transactions_fit(
&self,
keys: &[Pubkey],
cost: u64,
account_data_len: u64,
vote_cost: u64,
) -> Result<(), CostTrackerError> {
// check against the total package cost // check against the total package cost
if self.block_cost.saturating_add(cost) > self.block_cost_limit { if self.block_cost.saturating_add(cost) > self.block_cost_limit {
return Err(CostTrackerError::WouldExceedBlockMaxLimit); return Err(CostTrackerError::WouldExceedBlockMaxLimit);
@ -211,7 +171,9 @@ impl CostTracker {
// NOTE: Check if the total accounts data size is exceeded *before* the block accounts data // NOTE: Check if the total accounts data size is exceeded *before* the block accounts data
// size. This way, transactions are not unnecessarily retried. // size. This way, transactions are not unnecessarily retried.
let account_data_size = self.account_data_size.saturating_add(account_data_len); let account_data_size = self
.account_data_size
.saturating_add(tx_cost.account_data_size);
if let Some(account_data_size_limit) = self.account_data_size_limit { if let Some(account_data_size_limit) = self.account_data_size_limit {
if account_data_size > account_data_size_limit { if account_data_size > account_data_size_limit {
return Err(CostTrackerError::WouldExceedAccountDataTotalLimit); return Err(CostTrackerError::WouldExceedAccountDataTotalLimit);
@ -223,7 +185,7 @@ impl CostTracker {
} }
// check each account against account_cost_limit, // check each account against account_cost_limit,
for account_key in keys.iter() { for account_key in writable_accounts.iter() {
match self.cost_by_writable_accounts.get(account_key) { match self.cost_by_writable_accounts.get(account_key) {
Some(chained_cost) => { Some(chained_cost) => {
if chained_cost.saturating_add(cost) > self.account_cost_limit { if chained_cost.saturating_add(cost) > self.account_cost_limit {
@ -239,7 +201,7 @@ impl CostTracker {
Ok(()) Ok(())
} }
fn add_transaction(&mut self, tx_cost: &TransactionCost) { fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) {
let cost = tx_cost.sum(); let cost = tx_cost.sum();
for account_key in tx_cost.writable_accounts.iter() { for account_key in tx_cost.writable_accounts.iter() {
let account_cost = self let account_cost = self
@ -257,6 +219,25 @@ impl CostTracker {
.saturating_add(tx_cost.account_data_size); .saturating_add(tx_cost.account_data_size);
self.transaction_count = self.transaction_count.saturating_add(1); self.transaction_count = self.transaction_count.saturating_add(1);
} }
fn remove_transaction_cost(&mut self, tx_cost: &TransactionCost) {
let cost = tx_cost.sum();
for account_key in tx_cost.writable_accounts.iter() {
let account_cost = self
.cost_by_writable_accounts
.entry(*account_key)
.or_insert(0);
*account_cost = account_cost.saturating_sub(cost);
}
self.block_cost = self.block_cost.saturating_sub(cost);
if tx_cost.is_simple_vote {
self.vote_cost = self.vote_cost.saturating_sub(cost);
}
self.account_data_size = self
.account_data_size
.saturating_sub(tx_cost.account_data_size);
self.transaction_count = self.transaction_count.saturating_sub(1);
}
} }
#[cfg(test)] #[cfg(test)]