Ensure that uncommitted transactions are always removed from QoS (#32285)

Co-authored-by: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com>
This commit is contained in:
buffalu 2023-06-28 15:44:58 -05:00 committed by GitHub
parent 9c9db4a461
commit 5dee2e4d0c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 157 additions and 55 deletions

View File

@ -21,8 +21,9 @@ use {
},
solana_runtime::bank::Bank,
solana_sdk::{
account::Account, signature::Keypair, signer::Signer, stake_history::Epoch, system_program,
system_transaction, transaction::SanitizedTransaction,
account::Account, feature_set::apply_cost_tracker_during_replay, signature::Keypair,
signer::Signer, stake_history::Epoch, system_program, system_transaction,
transaction::SanitizedTransaction,
},
std::sync::{
atomic::{AtomicBool, Ordering},
@ -31,6 +32,7 @@ use {
tempfile::TempDir,
test::Bencher,
};
extern crate test;
fn create_accounts(num: usize) -> Vec<Keypair> {
@ -90,7 +92,7 @@ struct BenchFrame {
signal_receiver: Receiver<(Arc<Bank>, (Entry, u64))>,
}
fn setup() -> BenchFrame {
fn setup(apply_cost_tracker_during_replay: bool) -> BenchFrame {
let mint_total = u64::MAX;
let GenesisConfigInfo {
mut genesis_config, ..
@ -101,6 +103,11 @@ fn setup() -> BenchFrame {
genesis_config.ticks_per_slot = 10_000;
let mut bank = Bank::new_for_benches(&genesis_config);
if !apply_cost_tracker_during_replay {
bank.deactivate_feature(&apply_cost_tracker_during_replay::id());
}
// Allow arbitrary transaction processing time for the purposes of this bench
bank.ns_per_slot = u128::MAX;
@ -127,7 +134,11 @@ fn setup() -> BenchFrame {
}
}
fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usize) {
fn bench_process_and_record_transactions(
bencher: &mut Bencher,
batch_size: usize,
apply_cost_tracker_during_replay: bool,
) {
let BenchFrame {
bank,
ledger_path: _ledger_path,
@ -135,7 +146,7 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
poh_recorder,
poh_service,
signal_receiver: _signal_receiver,
} = setup();
} = setup(apply_cost_tracker_during_replay);
let consumer = create_consumer(&poh_recorder);
let transactions = create_transactions(&bank, 2_usize.pow(20));
let mut transaction_iter = transactions.chunks(batch_size);
@ -155,15 +166,30 @@ fn bench_process_and_record_transactions(bencher: &mut Bencher, batch_size: usiz
#[bench]
fn bench_process_and_record_transactions_unbatched(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1);
bench_process_and_record_transactions(bencher, 1, true);
}
#[bench]
fn bench_process_and_record_transactions_half_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32);
bench_process_and_record_transactions(bencher, 32, true);
}
#[bench]
fn bench_process_and_record_transactions_full_batch(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64);
bench_process_and_record_transactions(bencher, 64, true);
}
#[bench]
fn bench_process_and_record_transactions_unbatched_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 1, false);
}
#[bench]
fn bench_process_and_record_transactions_half_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 32, false);
}
#[bench]
fn bench_process_and_record_transactions_full_batch_disable_tx_cost_update(bencher: &mut Bencher) {
bench_process_and_record_transactions(bencher, 64, false);
}

View File

@ -472,6 +472,15 @@ impl Consumer {
..
} = execute_and_commit_transactions_output;
// Costs of all transactions are added to the cost_tracker before processing.
// To ensure accurate tracking of compute units, transactions that ultimately
// were not included in the block should have their cost removed.
QosService::remove_costs(
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
);
// once feature `apply_cost_tracker_during_replay` is activated, leader shall no longer
// adjust block with executed cost (a behavior more inline with bankless leader), it
// should use requested, or default `compute_unit_limit` as transaction's execution cost.
@ -479,7 +488,7 @@ impl Consumer {
.feature_set
.is_active(&feature_set::apply_cost_tracker_during_replay::id())
{
QosService::update_or_remove_transaction_costs(
QosService::update_costs(
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
@ -1193,36 +1202,25 @@ mod tests {
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());
let single_transfer_cost = get_block_cost();
assert_ne!(single_transfer_cost, 0);
let block_cost = get_block_cost();
assert_ne!(block_cost, 0);
assert_eq!(get_tx_count(), 1);
//
// TEST: When a tx in a batch can't be executed (here because of account
// locks), then its cost does not affect the cost tracker only if qos
// adjusts it with actual execution cost (when apply_cost_tracker_during_replay
// is not enabled).
//
// TEST: it's expected that the allocation will execute but the transfer will not
// because of a shared write-lock between mint_keypair. Ensure only the first transaction
// takes compute units in the block AND the apply_cost_tracker_during_replay_enabled feature
// is applied correctly
let allocate_keypair = Keypair::new();
let transactions = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
// intentionally use a tx that has a different cost
system_transaction::allocate(
&mint_keypair,
&allocate_keypair,
genesis_config.hash(),
1,
100,
),
// this one won't execute in process_and_record_transactions from shared account lock overlap
system_transaction::transfer(&mint_keypair, &pubkey, 2, genesis_config.hash()),
]);
let mut expected_block_cost = 2 * single_transfer_cost;
let mut expected_tracked_tx_count = 2;
if apply_cost_tracker_during_replay_enabled {
expected_block_cost +=
CostModel::calculate_cost(&transactions[1], &bank.feature_set).sum();
expected_tracked_tx_count += 1;
}
let process_transactions_batch_output =
consumer.process_and_record_transactions(&bank, &transactions, 0);
@ -1235,10 +1233,39 @@ mod tests {
} = process_transactions_batch_output.execute_and_commit_transactions_output;
assert_eq!(executed_with_successful_result_count, 1);
assert!(commit_transactions_result.is_ok());
// first one should have been committed, second one not committed due to AccountInUse error during
// account locking
let commit_transactions_result = commit_transactions_result.unwrap();
assert_eq!(commit_transactions_result.len(), 2);
assert!(matches!(
commit_transactions_result.get(0).unwrap(),
CommitTransactionDetails::Committed { .. }
));
assert!(matches!(
commit_transactions_result.get(1).unwrap(),
CommitTransactionDetails::NotCommitted
));
assert_eq!(retryable_transaction_indexes, vec![1]);
let expected_block_cost = if !apply_cost_tracker_during_replay_enabled {
let actual_bpf_execution_cost = match commit_transactions_result.get(0).unwrap() {
CommitTransactionDetails::Committed { compute_units } => *compute_units,
CommitTransactionDetails::NotCommitted => {
unreachable!()
}
};
let mut cost = CostModel::calculate_cost(&transactions[0], &bank.feature_set);
cost.bpf_execution_cost = actual_bpf_execution_cost;
block_cost + cost.sum()
} else {
block_cost + CostModel::calculate_cost(&transactions[0], &bank.feature_set).sum()
};
assert_eq!(get_block_cost(), expected_block_cost);
assert_eq!(get_tx_count(), expected_tracked_tx_count);
assert_eq!(get_tx_count(), 2);
poh_recorder
.read()

View File

@ -177,17 +177,30 @@ impl QosService {
(select_results, num_included)
}
/// Update the transaction cost in the cost_tracker with the real cost for
/// transactions that were executed successfully;
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
/// being inflated with unsuccessfully executed transactions.
pub fn update_or_remove_transaction_costs<'a>(
/// Updates the transaction costs for committed transactions. Does not handle removing costs
/// for transactions that didn't get recorded or committed
pub fn update_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
if let Some(transaction_committed_status) = transaction_committed_status {
Self::update_committed_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
)
}
}
/// Removes transaction costs from the cost tracker if not committed or recorded
pub fn remove_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
match transaction_committed_status {
Some(transaction_committed_status) => Self::update_transaction_costs(
Some(transaction_committed_status) => Self::remove_uncommitted_transaction_costs(
transaction_cost_results,
transaction_committed_status,
bank,
@ -196,7 +209,7 @@ impl QosService {
}
}
fn update_transaction_costs<'a>(
fn remove_uncommitted_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
@ -208,11 +221,29 @@ impl QosService {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
if *transaction_committed_details == CommitTransactionDetails::NotCommitted {
cost_tracker.remove(tx_cost)
}
}
});
}
fn update_committed_transaction_costs<'a>(
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_cost_results
.zip(transaction_committed_status)
.for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
if let CommitTransactionDetails::Committed { compute_units } =
transaction_committed_details
{
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
}
});
@ -730,7 +761,7 @@ mod tests {
}
#[test]
fn test_update_or_remove_transaction_costs_commited() {
fn test_update_and_remove_transaction_costs_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
@ -774,11 +805,19 @@ mod tests {
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,
// All transactions are committed, no costs should be removed
QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);
QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
assert_eq!(
final_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
@ -791,7 +830,7 @@ mod tests {
}
#[test]
fn test_update_or_remove_transaction_costs_not_commited() {
fn test_update_and_remove_transaction_costs_not_committed() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
@ -825,14 +864,26 @@ mod tests {
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank);
// update costs doesn't impact non-committed
QosService::update_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
assert_eq!(
transaction_count,
bank.read_cost_tracker().unwrap().transaction_count()
);
QosService::remove_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost());
assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count());
}
}
#[test]
fn test_update_or_remove_transaction_costs_mixed_execution() {
fn test_update_and_remove_transaction_costs_mixed_execution() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config));
@ -882,11 +933,9 @@ mod tests {
}
})
.collect();
QosService::update_or_remove_transaction_costs(
qos_cost_results.iter(),
Some(&commited_status),
&bank,
);
QosService::remove_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
QosService::update_costs(qos_cost_results.iter(), Some(&commited_status), &bank);
// assert the final block cost
let mut expected_final_txs_count = 0u64;