Apply transaction actual execution units to cost_tracker (#24311)

* Pass the sum of consumed compute units to cost_tracker

* cost model tracks builtins and bpf programs separately, enabling adjust block cost by actual bpf programs execution costs

* Copied nightly-only experimental `checked_add_(un)signed` implementation to sdk

* Add function to update cost tracker with execution cost adjustment

* Review suggestion - using enum instead of struct for CommitTransactionDetails
Co-authored-by: Justin Starry <justin.m.starry@gmail.com>

* review - rename variable to distinguish accumulated_consumed_units from individual compute_units_consumed

* not to use signed integer operations

* Review - using saturating_add_assign!(), and checked_*().unwrap_or()

* Review - using Ordering enum to cmp

* replace checked_ with saturating_

* review - remove unnecessary Option<>

* Review - add function to report number of non-zero units account to metrics
This commit is contained in:
Tao Zhu 2022-04-21 02:38:07 -05:00 committed by GitHub
parent fed13b1f62
commit a21fc3f303
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 341 additions and 59 deletions

View File

@ -100,6 +100,12 @@ struct RecordTransactionsSummary {
retryable_indexes: Vec<usize>,
}
#[derive(Debug)]
pub enum CommitTransactionDetails {
Committed { compute_units: u64 },
NotCommitted,
}
pub struct ExecuteAndCommitTransactionsOutput {
// Total number of transactions that were passed as candidates for execution
transactions_attempted_execution_count: usize,
@ -113,9 +119,8 @@ pub struct ExecuteAndCommitTransactionsOutput {
// to the block ending.
retryable_transaction_indexes: Vec<usize>,
// A result that indicates whether transactions were successfully
// committed into the Poh stream. If so, the result tells us
// how many such transactions were committed
commit_transactions_result: Result<Vec<bool>, PohRecorderError>,
// committed into the Poh stream.
commit_transactions_result: Result<Vec<CommitTransactionDetails>, PohRecorderError>,
execute_and_commit_timings: LeaderExecuteAndCommitTimings,
}
@ -1205,9 +1210,18 @@ impl BankingStage {
..
} = load_and_execute_transactions_output;
let mut transactions_execute_and_record_status: Vec<bool> = execution_results
let mut transactions_execute_and_record_status: Vec<_> = execution_results
.iter()
.map(|execution_result| execution_result.was_executed())
.map(|execution_result| match execution_result {
TransactionExecutionResult::Executed(details) => {
CommitTransactionDetails::Committed {
compute_units: details.executed_units,
}
}
TransactionExecutionResult::NotExecuted { .. } => {
CommitTransactionDetails::NotCommitted
}
})
.collect();
let (freeze_lock, freeze_lock_time) =
@ -1237,7 +1251,7 @@ impl BankingStage {
// mark transactions that were executed but not recorded
retryable_record_transaction_indexes.iter().for_each(|i| {
transactions_execute_and_record_status[*i] = false;
transactions_execute_and_record_status[*i] = CommitTransactionDetails::NotCommitted;
});
inc_new_counter_info!(
@ -2168,6 +2182,7 @@ mod tests {
inner_instructions: None,
durable_nonce_fee: None,
return_data: None,
executed_units: 0u64,
})
}

View File

@ -3,7 +3,7 @@
//! how transactions are included in blocks, and optimize those blocks.
//!
use {
crate::banking_stage::BatchedTransactionDetails,
crate::banking_stage::{BatchedTransactionDetails, CommitTransactionDetails},
crossbeam_channel::{unbounded, Receiver, Sender},
solana_measure::measure::Measure,
solana_runtime::{
@ -177,7 +177,7 @@ impl QosService {
pub fn update_or_remove_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_committed_status: Option<&Vec<bool>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
match transaction_committed_status {
@ -196,20 +196,27 @@ impl QosService {
fn update_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_committed_status: &Vec<bool>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_costs
.zip(transaction_qos_results)
.zip(transaction_committed_status)
.for_each(|((tx_cost, qos_inclusion_result), was_committed)| {
// Only transactions that the qos service included have to be
// checked for update
if qos_inclusion_result.is_ok() && !*was_committed {
cost_tracker.remove(tx_cost);
}
});
.for_each(
|((tx_cost, qos_inclusion_result), transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if qos_inclusion_result.is_ok() {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
}
}
},
);
}
fn remove_transaction_costs<'a>(
@ -661,19 +668,27 @@ mod tests {
let txs: Vec<SanitizedTransaction> = (0..transaction_count)
.map(|_| transfer_tx.clone())
.collect();
let execute_units_adjustment = 10u64;
// assert all tx_costs should be applied to cost_tracker if all execution_results are all commited
// assert all tx_costs should be applied to cost_tracker if all execution_results are all committed
{
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
let txs_costs = qos_service.compute_transaction_costs(txs.iter());
let total_txs_costs: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
assert_eq!(
total_txs_costs,
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
let commited_status: Vec<bool> = (0..transaction_count).map(|_| true).collect();
// all transactions are committed with actual units more than estimated
let commited_status: Vec<CommitTransactionDetails> = txs_costs
.iter()
.map(|tx_cost| CommitTransactionDetails::Committed {
compute_units: tx_cost.bpf_execution_cost + execute_units_adjustment,
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
QosService::update_or_remove_transaction_costs(
txs_costs.iter(),
qos_results.iter(),
@ -681,7 +696,7 @@ mod tests {
&bank,
);
assert_eq!(
total_txs_costs,
final_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
@ -708,12 +723,17 @@ mod tests {
.map(|_| transfer_tx.clone())
.collect();
// assert all tx_costs should be removed from cost_tracker if all execution_results are all NotExecuted
// assert all tx_costs should be removed from cost_tracker if all execution_results are all Not Committed
{
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
let txs_costs = qos_service.compute_transaction_costs(txs.iter());
let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
QosService::update_or_remove_transaction_costs(
txs_costs.iter(),
qos_results.iter(),
@ -741,31 +761,55 @@ mod tests {
let txs: Vec<SanitizedTransaction> = (0..transaction_count)
.map(|_| transfer_tx.clone())
.collect();
let execute_units_adjustment = 10u64;
// assert only commited tx_costs are applied cost_tracker
{
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
let txs_costs = qos_service.compute_transaction_costs(txs.iter());
let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let commited_status: Vec<bool> = (0..transaction_count).map(|n| n != 0).collect();
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
// Half of transactions are not committed, the rest with cost adjustment
let commited_status: Vec<CommitTransactionDetails> = txs_costs
.iter()
.enumerate()
.map(|(n, tx_cost)| {
if n % 2 == 0 {
CommitTransactionDetails::NotCommitted
} else {
CommitTransactionDetails::Committed {
compute_units: tx_cost.bpf_execution_cost + execute_units_adjustment,
}
}
})
.collect();
QosService::update_or_remove_transaction_costs(
txs_costs.iter(),
qos_results.iter(),
Some(&commited_status),
&bank,
);
let expected_committed_units: u64 = txs_costs
.iter()
.enumerate()
.map(|(n, cost)| if n < 1 { 0 } else { cost.sum() })
.sum();
// assert the final block cost
let mut expected_final_txs_count = 0u64;
let mut expected_final_block_cost = 0u64;
txs_costs.iter().enumerate().for_each(|(n, cost)| {
if n % 2 != 0 {
expected_final_txs_count += 1;
expected_final_block_cost += cost.sum() + execute_units_adjustment;
}
});
assert_eq!(
expected_committed_units,
expected_final_block_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count - 1,
expected_final_txs_count,
bank.read_cost_tracker().unwrap().transaction_count()
);
}

View File

@ -102,6 +102,7 @@ impl TransactionStatusService {
inner_instructions,
durable_nonce_fee,
return_data,
..
} = details;
let lamports_per_signature = match durable_nonce_fee {
Some(DurableNonceFee::Valid(lamports_per_signature)) => {
@ -352,6 +353,7 @@ pub(crate) mod tests {
.unwrap(),
)),
return_data: None,
executed_units: 0u64,
});
let balances = TransactionBalancesSet {

View File

@ -1398,6 +1398,7 @@ mod tests {
inner_instructions: None,
durable_nonce_fee: nonce.map(DurableNonceFee::from),
return_data: None,
executed_units: 0u64,
})
}

View File

@ -597,6 +597,7 @@ pub struct TransactionExecutionDetails {
pub inner_instructions: Option<InnerInstructionsList>,
pub durable_nonce_fee: Option<DurableNonceFee>,
pub return_data: Option<TransactionReturnData>,
pub executed_units: u64,
}
/// Type safe representation of a transaction execution attempt which
@ -3964,6 +3965,8 @@ impl Bank {
let (blockhash, lamports_per_signature) = self.last_blockhash_and_lamports_per_signature();
let mut executed_units = 0u64;
let mut process_message_time = Measure::start("process_message_time");
let process_result = MessageProcessor::process_message(
&self.builtin_programs.vec,
@ -3980,8 +3983,10 @@ impl Bank {
blockhash,
lamports_per_signature,
self.load_accounts_data_len(),
&mut executed_units,
);
process_message_time.stop();
saturating_add_assign!(
timings.execute_accessories.process_message_us,
process_message_time.as_us()
@ -4062,6 +4067,7 @@ impl Bank {
inner_instructions,
durable_nonce_fee,
return_data,
executed_units,
})
}
@ -6991,6 +6997,7 @@ pub(crate) mod tests {
inner_instructions: None,
durable_nonce_fee: nonce.map(DurableNonceFee::from),
return_data: None,
executed_units: 0u64,
})
}

View File

@ -5,8 +5,8 @@
//!
use {
crate::{block_cost_limits::*, cost_model::TransactionCost},
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::collections::HashMap,
solana_sdk::{clock::Slot, pubkey::Pubkey, saturating_add_assign},
std::{cmp::Ordering, collections::HashMap},
};
const WRITABLE_ACCOUNTS_PER_BLOCK: usize = 512;
@ -95,6 +95,29 @@ impl CostTracker {
Ok(self.block_cost)
}
pub fn update_execution_cost(
&mut self,
estimated_tx_cost: &TransactionCost,
actual_execution_units: u64,
) {
let estimated_execution_units = estimated_tx_cost.bpf_execution_cost;
match actual_execution_units.cmp(&estimated_execution_units) {
Ordering::Equal => (),
Ordering::Greater => {
self.add_transaction_execution_cost(
estimated_tx_cost,
actual_execution_units - estimated_execution_units,
);
}
Ordering::Less => {
self.sub_transaction_execution_cost(
estimated_tx_cost,
estimated_execution_units - actual_execution_units,
);
}
}
}
pub fn remove(&mut self, tx_cost: &TransactionCost) {
self.remove_transaction_cost(tx_cost);
}
@ -121,11 +144,7 @@ impl CostTracker {
("block_cost", self.block_cost as i64, i64),
("vote_cost", self.vote_cost as i64, i64),
("transaction_count", self.transaction_count as i64, i64),
(
"number_of_accounts",
self.cost_by_writable_accounts.len() as i64,
i64
),
("number_of_accounts", self.number_of_accounts() as i64, i64),
("costliest_account", costliest_account.to_string(), String),
("costliest_account_cost", costliest_account_cost as i64, i64),
("account_data_size", self.account_data_size, i64),
@ -199,40 +218,56 @@ impl CostTracker {
fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) {
let cost = tx_cost.sum();
for account_key in tx_cost.writable_accounts.iter() {
let account_cost = self
.cost_by_writable_accounts
.entry(*account_key)
.or_insert(0);
*account_cost = account_cost.saturating_add(cost);
}
self.block_cost = self.block_cost.saturating_add(cost);
if tx_cost.is_simple_vote {
self.vote_cost = self.vote_cost.saturating_add(cost);
}
self.account_data_size = self
.account_data_size
.saturating_add(tx_cost.account_data_size);
self.transaction_count = self.transaction_count.saturating_add(1);
self.add_transaction_execution_cost(tx_cost, cost);
saturating_add_assign!(self.account_data_size, tx_cost.account_data_size);
saturating_add_assign!(self.transaction_count, 1);
}
fn remove_transaction_cost(&mut self, tx_cost: &TransactionCost) {
let cost = tx_cost.sum();
self.sub_transaction_execution_cost(tx_cost, cost);
self.account_data_size = self
.account_data_size
.saturating_sub(tx_cost.account_data_size);
self.transaction_count = self.transaction_count.saturating_sub(1);
}
/// Apply additional actual execution units to cost_tracker
fn add_transaction_execution_cost(&mut self, tx_cost: &TransactionCost, adjustment: u64) {
for account_key in tx_cost.writable_accounts.iter() {
let account_cost = self
.cost_by_writable_accounts
.entry(*account_key)
.or_insert(0);
*account_cost = account_cost.saturating_sub(cost);
*account_cost = account_cost.saturating_add(adjustment);
}
self.block_cost = self.block_cost.saturating_sub(cost);
self.block_cost = self.block_cost.saturating_add(adjustment);
if tx_cost.is_simple_vote {
self.vote_cost = self.vote_cost.saturating_sub(cost);
self.vote_cost = self.vote_cost.saturating_add(adjustment);
}
self.account_data_size = self
.account_data_size
.saturating_sub(tx_cost.account_data_size);
self.transaction_count = self.transaction_count.saturating_sub(1);
}
/// Substract extra execution units from cost_tracker
fn sub_transaction_execution_cost(&mut self, tx_cost: &TransactionCost, adjustment: u64) {
for account_key in tx_cost.writable_accounts.iter() {
let account_cost = self
.cost_by_writable_accounts
.entry(*account_key)
.or_insert(0);
*account_cost = account_cost.saturating_sub(adjustment);
}
self.block_cost = self.block_cost.saturating_sub(adjustment);
if tx_cost.is_simple_vote {
self.vote_cost = self.vote_cost.saturating_sub(adjustment);
}
}
/// count number of none-zero CU accounts
fn number_of_accounts(&self) -> usize {
self.cost_by_writable_accounts
.iter()
.map(|(_key, units)| if *units > 0 { 1 } else { 0 })
.sum()
}
}
@ -676,4 +711,171 @@ mod tests {
assert_eq!(acct2, costliest_account);
}
}
#[test]
fn test_adjust_transaction_execution_cost() {
let acct1 = Pubkey::new_unique();
let acct2 = Pubkey::new_unique();
let acct3 = Pubkey::new_unique();
let cost = 100;
let account_max = cost * 2;
let block_max = account_max * 3; // for three accts
let mut testee = CostTracker::new(account_max, block_max, block_max, None);
let tx_cost = TransactionCost {
writable_accounts: vec![acct1, acct2, acct3],
bpf_execution_cost: cost,
..TransactionCost::default()
};
let mut expected_block_cost = tx_cost.sum();
let expected_tx_count = 1;
assert!(testee.try_add(&tx_cost).is_ok());
assert_eq!(expected_block_cost, testee.block_cost());
assert_eq!(expected_tx_count, testee.transaction_count());
testee
.cost_by_writable_accounts
.iter()
.for_each(|(_key, units)| {
assert_eq!(expected_block_cost, *units);
});
// adjust up
{
let adjustment = 50u64;
testee.add_transaction_execution_cost(&tx_cost, adjustment);
expected_block_cost += 50;
assert_eq!(expected_block_cost, testee.block_cost());
assert_eq!(expected_tx_count, testee.transaction_count());
testee
.cost_by_writable_accounts
.iter()
.for_each(|(_key, units)| {
assert_eq!(expected_block_cost, *units);
});
}
// adjust down
{
let adjustment = 50u64;
testee.sub_transaction_execution_cost(&tx_cost, adjustment);
expected_block_cost -= 50;
assert_eq!(expected_block_cost, testee.block_cost());
assert_eq!(expected_tx_count, testee.transaction_count());
testee
.cost_by_writable_accounts
.iter()
.for_each(|(_key, units)| {
assert_eq!(expected_block_cost, *units);
});
}
// adjust overflow
{
testee.add_transaction_execution_cost(&tx_cost, u64::MAX);
// expect block cost set to limit
assert_eq!(u64::MAX, testee.block_cost());
assert_eq!(expected_tx_count, testee.transaction_count());
testee
.cost_by_writable_accounts
.iter()
.for_each(|(_key, units)| {
assert_eq!(u64::MAX, *units);
});
}
// adjust underflow
{
testee.sub_transaction_execution_cost(&tx_cost, u64::MAX);
// expect block cost set to limit
assert_eq!(u64::MIN, testee.block_cost());
assert_eq!(expected_tx_count, testee.transaction_count());
testee
.cost_by_writable_accounts
.iter()
.for_each(|(_key, units)| {
assert_eq!(u64::MIN, *units);
});
// assert the number of non-empty accounts is zero, but map
// still contains 3 account
assert_eq!(0, testee.number_of_accounts());
assert_eq!(3, testee.cost_by_writable_accounts.len());
}
}
#[test]
fn test_update_execution_cost() {
let acct1 = Pubkey::new_unique();
let acct2 = Pubkey::new_unique();
let acct3 = Pubkey::new_unique();
let cost = 100;
let tx_cost = TransactionCost {
writable_accounts: vec![acct1, acct2, acct3],
bpf_execution_cost: cost,
..TransactionCost::default()
};
let mut cost_tracker = CostTracker::default();
// Assert OK to add tx_cost
assert!(cost_tracker.try_add(&tx_cost).is_ok());
let (_costliest_account, costliest_account_cost) = cost_tracker.find_costliest_account();
assert_eq!(cost, cost_tracker.block_cost);
assert_eq!(cost, costliest_account_cost);
assert_eq!(1, cost_tracker.transaction_count);
// assert no-change if actual units is same as estimated units
let mut expected_cost = cost;
cost_tracker.update_execution_cost(&tx_cost, cost);
let (_costliest_account, costliest_account_cost) = cost_tracker.find_costliest_account();
assert_eq!(expected_cost, cost_tracker.block_cost);
assert_eq!(expected_cost, costliest_account_cost);
assert_eq!(1, cost_tracker.transaction_count);
// assert cost are adjusted down
let reduced_units = 3;
expected_cost -= reduced_units;
cost_tracker.update_execution_cost(&tx_cost, cost - reduced_units);
let (_costliest_account, costliest_account_cost) = cost_tracker.find_costliest_account();
assert_eq!(expected_cost, cost_tracker.block_cost);
assert_eq!(expected_cost, costliest_account_cost);
assert_eq!(1, cost_tracker.transaction_count);
// assert cost are adjusted up
let increased_units = 1;
expected_cost += increased_units;
cost_tracker.update_execution_cost(&tx_cost, cost + increased_units);
let (_costliest_account, costliest_account_cost) = cost_tracker.find_costliest_account();
assert_eq!(expected_cost, cost_tracker.block_cost);
assert_eq!(expected_cost, costliest_account_cost);
assert_eq!(1, cost_tracker.transaction_count);
}
#[test]
fn test_remove_transaction_cost() {
let mut cost_tracker = CostTracker::default();
let cost = 100u64;
let tx_cost = TransactionCost {
writable_accounts: vec![Pubkey::new_unique()],
bpf_execution_cost: cost,
..TransactionCost::default()
};
cost_tracker.add_transaction_cost(&tx_cost);
// assert cost_tracker is reverted to default
assert_eq!(1, cost_tracker.transaction_count);
assert_eq!(1, cost_tracker.number_of_accounts());
assert_eq!(cost, cost_tracker.block_cost);
assert_eq!(0, cost_tracker.vote_cost);
assert_eq!(0, cost_tracker.account_data_size);
cost_tracker.remove_transaction_cost(&tx_cost);
// assert cost_tracker is reverted to default
assert_eq!(0, cost_tracker.transaction_count);
assert_eq!(0, cost_tracker.number_of_accounts());
assert_eq!(0, cost_tracker.block_cost);
assert_eq!(0, cost_tracker.vote_cost);
assert_eq!(0, cost_tracker.account_data_size);
}
}

View File

@ -64,6 +64,7 @@ impl MessageProcessor {
blockhash: Hash,
lamports_per_signature: u64,
current_accounts_data_len: u64,
accumulated_consumed_units: &mut u64,
) -> Result<ProcessedMessageInfo, TransactionError> {
let mut invoke_context = InvokeContext::new(
transaction_context,
@ -134,6 +135,8 @@ impl MessageProcessor {
timings,
);
time.stop();
*accumulated_consumed_units =
accumulated_consumed_units.saturating_add(compute_units_consumed);
timings.details.accumulate_program(
program_id,
time.as_us(),
@ -282,6 +285,7 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert!(result.is_ok());
assert_eq!(
@ -330,6 +334,7 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert_eq!(
result,
@ -368,6 +373,7 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert_eq!(
result,
@ -503,6 +509,7 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert_eq!(
result,
@ -536,6 +543,7 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert!(result.is_ok());
@ -566,6 +574,7 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert!(result.is_ok());
assert_eq!(
@ -644,7 +653,9 @@ mod tests {
Hash::default(),
0,
0,
&mut 0,
);
assert_eq!(
result,
Err(TransactionError::InstructionError(