Refactor transaction processing in banking stage (#24336)

* Refactor transaction processing in banking stage

* feedback

* more feedback
This commit is contained in:
Justin Starry 2022-04-21 21:06:26 +08:00 committed by GitHub
parent d5127abf46
commit 02bfb85c16
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 167 additions and 154 deletions

View File

@ -32,7 +32,6 @@ use {
accounts_db::ErrorCounters,
bank::{
Bank, LoadAndExecuteTransactionsOutput, TransactionBalancesSet, TransactionCheckResult,
TransactionExecutionResult,
},
bank_utils,
cost_model::{CostModel, TransactionCost},
@ -48,7 +47,9 @@ use {
pubkey::Pubkey,
saturating_add_assign,
timing::{duration_as_ms, timestamp, AtomicInterval},
transaction::{self, AddressLoader, SanitizedTransaction, TransactionError},
transaction::{
self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction,
},
transport::TransportError,
},
solana_transaction_status::token_balances::{
@ -95,12 +96,10 @@ struct RecordTransactionsSummary {
// Metrics describing how time was spent recording transactions
record_transactions_timings: RecordTransactionsTimings,
// Result of trying to record the transactions into the PoH stream
result: Result<usize, PohRecorderError>,
// Transactions that failed record, and are retryable
retryable_indexes: Vec<usize>,
result: Result<(), PohRecorderError>,
}
#[derive(Debug)]
#[derive(Clone, Debug, PartialEq)]
pub enum CommitTransactionDetails {
Committed { compute_units: u64 },
NotCommitted,
@ -1071,52 +1070,21 @@ impl BankingStage {
#[allow(clippy::match_wild_err_arm)]
fn record_transactions(
bank_slot: Slot,
txs: &[SanitizedTransaction],
execution_results: &[TransactionExecutionResult],
transactions: Vec<VersionedTransaction>,
recorder: &TransactionRecorder,
) -> RecordTransactionsSummary {
let mut record_transactions_timings = RecordTransactionsTimings::default();
let (
(processed_transactions, processed_transactions_indexes),
execution_results_to_transactions_time,
): ((Vec<_>, Vec<_>), Measure) = Measure::this(
|_| {
execution_results
.iter()
.zip(txs)
.enumerate()
.filter_map(|(i, (execution_result, tx))| {
if execution_result.was_executed() {
Some((tx.to_versioned_transaction(), i))
} else {
None
}
})
.unzip()
},
(),
" execution_results_to_transactions",
);
record_transactions_timings.execution_results_to_transactions_us =
execution_results_to_transactions_time.as_us();
let num_to_commit = processed_transactions.len();
debug!("num_to_commit: {} ", num_to_commit);
// unlock all the accounts with errors which are filtered by the above `filter_map`
if !processed_transactions.is_empty() {
if !transactions.is_empty() {
let num_to_record = transactions.len();
inc_new_counter_info!("banking_stage-record_count", 1);
inc_new_counter_info!("banking_stage-record_transactions", num_to_commit);
inc_new_counter_info!("banking_stage-record_transactions", num_to_record);
let (hash, hash_time) = Measure::this(
|_| hash_transactions(&processed_transactions[..]),
(),
"hash",
);
let (hash, hash_time) = Measure::this(|_| hash_transactions(&transactions), (), "hash");
record_transactions_timings.hash_us = hash_time.as_us();
// record and unlock will unlock all the successful transactions
let (res, poh_record_time) = Measure::this(
|_| recorder.record(bank_slot, hash, processed_transactions),
|_| recorder.record(bank_slot, hash, transactions),
(),
"hash",
);
@ -1128,14 +1096,11 @@ impl BankingStage {
inc_new_counter_info!("banking_stage-max_height_reached", 1);
inc_new_counter_info!(
"banking_stage-max_height_reached_num_to_commit",
num_to_commit
num_to_record
);
// If record errors, add all the committable transactions (the ones
// we just attempted to record) as retryable
return RecordTransactionsSummary {
record_transactions_timings,
result: Err(PohRecorderError::MaxHeightReached),
retryable_indexes: processed_transactions_indexes,
};
}
Err(e) => panic!("Poh recorder returned unexpected error: {:?}", e),
@ -1144,8 +1109,7 @@ impl BankingStage {
RecordTransactionsSummary {
record_transactions_timings,
result: (Ok(num_to_commit)),
retryable_indexes: vec![],
result: Ok(()),
}
}
@ -1210,77 +1174,73 @@ impl BankingStage {
..
} = load_and_execute_transactions_output;
let mut transactions_execute_and_record_status: Vec<_> = execution_results
let transactions_attempted_execution_count = execution_results.len();
let (executed_transactions, execution_results_to_transactions_time): (Vec<_>, Measure) =
Measure::this(
|_| {
execution_results
.iter()
.map(|execution_result| match execution_result {
TransactionExecutionResult::Executed(details) => {
CommitTransactionDetails::Committed {
compute_units: details.executed_units,
}
}
TransactionExecutionResult::NotExecuted { .. } => {
CommitTransactionDetails::NotCommitted
.zip(batch.sanitized_transactions())
.filter_map(|(execution_result, tx)| {
if execution_result.was_executed() {
Some(tx.to_versioned_transaction())
} else {
None
}
})
.collect();
.collect()
},
(),
"execution_results_to_transactions",
);
let (freeze_lock, freeze_lock_time) =
Measure::this(|_| bank.freeze_lock(), (), "freeze_lock");
execute_and_commit_timings.freeze_lock_us = freeze_lock_time.as_us();
let (record_transactions_summary, record_time) = Measure::this(
|_| {
Self::record_transactions(
bank.slot(),
batch.sanitized_transactions(),
&execution_results,
poh,
)
},
|_| Self::record_transactions(bank.slot(), executed_transactions, poh),
(),
"record_transactions",
);
execute_and_commit_timings.record_us = record_time.as_us();
let RecordTransactionsSummary {
result: commit_transactions_result,
retryable_indexes: retryable_record_transaction_indexes,
result: record_transactions_result,
record_transactions_timings,
} = record_transactions_summary;
execute_and_commit_timings.record_transactions_timings = record_transactions_timings;
execute_and_commit_timings.record_transactions_timings = RecordTransactionsTimings {
execution_results_to_transactions_us: execution_results_to_transactions_time.as_us(),
..record_transactions_timings
};
// mark transactions that were executed but not recorded
retryable_record_transaction_indexes.iter().for_each(|i| {
transactions_execute_and_record_status[*i] = CommitTransactionDetails::NotCommitted;
});
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
*commit_transactions_result.as_ref().unwrap_or(&0)
);
if let Err(recorder_err) = record_transactions_result {
inc_new_counter_info!(
"banking_stage-record_transactions_retryable_record_txs",
retryable_record_transaction_indexes.len()
executed_transactions_count
);
retryable_transaction_indexes.extend(retryable_record_transaction_indexes);
let transactions_attempted_execution_count = execution_results.len();
if let Err(e) = commit_transactions_result {
retryable_transaction_indexes.extend(execution_results.iter().enumerate().filter_map(
|(index, execution_result)| execution_result.was_executed().then(|| index),
));
return ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result: Err(e),
commit_transactions_result: Err(recorder_err),
execute_and_commit_timings,
};
}
let sanitized_txs = batch.sanitized_transactions();
let committed_transaction_count = commit_transactions_result.unwrap();
// Note: `committed_transaction_count` should equal `executed_transactions_count`, since
// every executed transaction should have been recorded into the Poh stream if the record
// was successful (there's no partial records).
let commit_time_us = if committed_transaction_count != 0 {
let (commit_time_us, commit_transaction_statuses) = if executed_transactions_count != 0 {
inc_new_counter_info!(
"banking_stage-record_transactions_num_to_commit",
executed_transactions_count
);
let (tx_results, commit_time) = Measure::this(
|_| {
bank.commit_transactions(
@ -1301,6 +1261,17 @@ impl BankingStage {
let commit_time_us = commit_time.as_us();
execute_and_commit_timings.commit_us = commit_time_us;
let commit_transaction_statuses = tx_results
.execution_results
.iter()
.map(|execution_result| match execution_result.details() {
Some(details) => CommitTransactionDetails::Committed {
compute_units: details.executed_units,
},
None => CommitTransactionDetails::NotCommitted,
})
.collect();
let (_, find_and_send_votes_time) = Measure::this(
|_| {
bank_utils::find_and_send_votes(
@ -1330,9 +1301,12 @@ impl BankingStage {
"find_and_send_votes",
);
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_time.as_us();
commit_time_us
(commit_time_us, commit_transaction_statuses)
} else {
0
(
0,
vec![CommitTransactionDetails::NotCommitted; execution_results.len()],
)
};
drop(freeze_lock);
@ -1351,12 +1325,17 @@ impl BankingStage {
execute_and_commit_timings.execute_timings,
);
debug_assert_eq!(
commit_transaction_statuses.len(),
transactions_attempted_execution_count
);
ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
retryable_transaction_indexes,
commit_transactions_result: Ok(transactions_execute_and_record_status),
commit_transactions_result: Ok(commit_transaction_statuses),
execute_and_commit_timings,
}
}
@ -2137,7 +2116,6 @@ mod tests {
},
solana_program_runtime::timings::ProgramTiming,
solana_rpc::transaction_status_service::TransactionStatusService,
solana_runtime::bank::TransactionExecutionDetails,
solana_sdk::{
account::AccountSharedData,
hash::Hash,
@ -2148,7 +2126,6 @@ mod tests {
},
poh_config::PohConfig,
signature::{Keypair, Signer},
system_instruction::SystemError,
system_transaction,
transaction::{
MessageHash, SimpleAddressLoader, Transaction, TransactionError,
@ -2175,17 +2152,6 @@ mod tests {
)
}
fn new_execution_result(status: Result<(), TransactionError>) -> TransactionExecutionResult {
TransactionExecutionResult::Executed(TransactionExecutionDetails {
status,
log_messages: None,
inner_instructions: None,
durable_nonce_fee: None,
return_data: None,
executed_units: 0u64,
})
}
#[test]
fn test_banking_stage_shutdown1() {
let genesis_config = create_genesis_config(2).genesis_config;
@ -2568,56 +2534,22 @@ mod tests {
let keypair2 = Keypair::new();
let pubkey2 = solana_sdk::pubkey::new_rand();
let txs = sanitize_transactions(vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash()),
system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()),
]);
let txs = vec![
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash())
.into(),
system_transaction::transfer(&keypair2, &pubkey2, 1, genesis_config.hash()).into(),
];
let mut results = vec![new_execution_result(Ok(())); 2];
let _ = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
let _ = BankingStage::record_transactions(bank.slot(), txs.clone(), &recorder);
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), txs.len());
// InstructionErrors should still be recorded
results[0] = new_execution_result(Err(TransactionError::InstructionError(
1,
SystemError::ResultWithNegativeLamports.into(),
)));
let RecordTransactionsSummary {
result,
retryable_indexes,
..
} = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
result.unwrap();
assert!(retryable_indexes.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), txs.len());
// Other TransactionErrors should not be recorded
results[0] = TransactionExecutionResult::NotExecuted(TransactionError::AccountNotFound);
let RecordTransactionsSummary {
result,
retryable_indexes,
..
} = BankingStage::record_transactions(bank.slot(), &txs, &results, &recorder);
result.unwrap();
assert!(retryable_indexes.is_empty());
let (_bank, (entry, _tick_height)) = entry_receiver.recv().unwrap();
assert_eq!(entry.transactions.len(), txs.len() - 1);
assert_eq!(entry.transactions, txs);
// Once bank is set to a new bank (setting bank.slot() + 1 in record_transactions),
// record_transactions should throw MaxHeightReached and return the set of retryable
// txs
// record_transactions should throw MaxHeightReached
let next_slot = bank.slot() + 1;
let RecordTransactionsSummary {
result,
retryable_indexes,
..
} = BankingStage::record_transactions(next_slot, &txs, &results, &recorder);
let RecordTransactionsSummary { result, .. } =
BankingStage::record_transactions(next_slot, txs, &recorder);
assert_matches!(result, Err(PohRecorderError::MaxHeightReached));
// The first result was an error so it's filtered out. The second result was Ok(),
// so it should be marked as retryable
assert_eq!(retryable_indexes, vec![1]);
// Should receive nothing from PohRecorder b/c record failed
assert!(entry_receiver.try_recv().is_err());
@ -2914,6 +2846,87 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[test]
fn test_bank_process_and_record_transactions_all_unexecuted() {
solana_logger::setup();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_slow_genesis_config(10_000);
let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config));
let pubkey = solana_sdk::pubkey::new_rand();
let transactions = {
let mut tx =
system_transaction::transfer(&mint_keypair, &pubkey, 1, genesis_config.hash());
// Add duplicate account key
tx.message.account_keys.push(pubkey);
sanitize_transactions(vec![tx])
};
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let blockstore = Blockstore::open(ledger_path.path())
.expect("Expected to be able to open database ledger");
let (poh_recorder, _entry_receiver, record_receiver) = PohRecorder::new(
bank.tick_height(),
bank.last_blockhash(),
bank.clone(),
Some((4, 4)),
bank.ticks_per_slot(),
&pubkey,
&Arc::new(blockstore),
&Arc::new(LeaderScheduleCache::new_from_bank(&bank)),
&Arc::new(PohConfig::default()),
Arc::new(AtomicBool::default()),
);
let recorder = poh_recorder.recorder();
let poh_recorder = Arc::new(Mutex::new(poh_recorder));
let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
poh_recorder.lock().unwrap().set_bank(&bank);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
let process_transactions_batch_output = BankingStage::process_and_record_transactions(
&bank,
&transactions,
&recorder,
0,
None,
&gossip_vote_sender,
&QosService::new(Arc::new(RwLock::new(CostModel::default())), 1),
);
let ExecuteAndCommitTransactionsOutput {
transactions_attempted_execution_count,
executed_transactions_count,
executed_with_successful_result_count,
commit_transactions_result,
retryable_transaction_indexes,
..
} = process_transactions_batch_output.execute_and_commit_transactions_output;
assert_eq!(transactions_attempted_execution_count, 1);
assert_eq!(executed_transactions_count, 0);
assert_eq!(executed_with_successful_result_count, 0);
assert!(retryable_transaction_indexes.is_empty());
assert_eq!(
commit_transactions_result.ok(),
Some(vec![CommitTransactionDetails::NotCommitted; 1])
);
poh_recorder
.lock()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[test]
fn test_bank_process_and_record_transactions_cost_tracker() {
solana_logger::setup();