refactor to consolidate info into single return field

This commit is contained in:
Tao Zhu 2022-04-12 20:16:57 -05:00 committed by Tao Zhu
parent 9dadfb2e2c
commit 6bc6384f8e
2 changed files with 54 additions and 23 deletions

View File

@ -115,10 +115,8 @@ pub struct ExecuteAndCommitTransactionsOutput {
// A result that indicates whether transactions were successfully // A result that indicates whether transactions were successfully
// committed into the Poh stream. If so, the result tells us // committed into the Poh stream. If so, the result tells us
// how many such transactions were committed // how many such transactions were committed
commit_transactions_result: Result<(), PohRecorderError>, commit_transactions_result: Result<Vec<bool>, PohRecorderError>,
execute_and_commit_timings: LeaderExecuteAndCommitTimings, execute_and_commit_timings: LeaderExecuteAndCommitTimings,
// True if transaction was-executed()
transactions_execute_and_record_status: Vec<bool>,
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]
@ -1259,7 +1257,6 @@ impl BankingStage {
retryable_transaction_indexes, retryable_transaction_indexes,
commit_transactions_result: Err(e), commit_transactions_result: Err(e),
execute_and_commit_timings, execute_and_commit_timings,
transactions_execute_and_record_status,
}; };
} }
@ -1344,9 +1341,8 @@ impl BankingStage {
executed_transactions_count, executed_transactions_count,
executed_with_successful_result_count, executed_with_successful_result_count,
retryable_transaction_indexes, retryable_transaction_indexes,
commit_transactions_result: Ok(()), commit_transactions_result: Ok(transactions_execute_and_record_status),
execute_and_commit_timings, execute_and_commit_timings,
transactions_execute_and_record_status,
} }
} }
@ -1403,14 +1399,14 @@ impl BankingStage {
let ExecuteAndCommitTransactionsOutput { let ExecuteAndCommitTransactionsOutput {
ref mut retryable_transaction_indexes, ref mut retryable_transaction_indexes,
ref execute_and_commit_timings, ref execute_and_commit_timings,
ref transactions_execute_and_record_status, ref commit_transactions_result,
.. ..
} = execute_and_commit_transactions_output; } = execute_and_commit_transactions_output;
QosService::update_or_remove_transaction_costs( QosService::update_or_remove_transaction_costs(
transaction_costs.iter(), transaction_costs.iter(),
transactions_qos_results.iter(), transactions_qos_results.iter(),
transactions_execute_and_record_status.iter(), commit_transactions_result.as_ref().ok(),
bank, bank,
); );

View File

@ -177,18 +177,37 @@ impl QosService {
pub fn update_or_remove_transaction_costs<'a>( pub fn update_or_remove_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>, transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>, transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_executed_status: impl Iterator<Item = &'a bool>, transaction_commited_status: Option<&Vec<bool>>,
bank: &Arc<Bank>,
) {
match transaction_commited_status {
Some(transaction_commited_status) => Self::update_transaction_costs(
transaction_costs,
transaction_qos_results,
transaction_commited_status,
bank,
),
None => {
Self::remove_transaction_costs(transaction_costs, transaction_qos_results, bank)
}
}
}
fn update_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_commited_status: &Vec<bool>,
bank: &Arc<Bank>, bank: &Arc<Bank>,
) { ) {
let mut cost_tracker = bank.write_cost_tracker().unwrap(); let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_costs transaction_costs
.zip(transaction_qos_results) .zip(transaction_qos_results)
.zip(transaction_executed_status) .zip(transaction_commited_status)
.for_each(|((tx_cost, qos_inclusion_result), executed_status)| { .for_each(|((tx_cost, qos_inclusion_result), commited_status)| {
// Only transactions that the qos service included have to be // Only transactions that the qos service included have to be
// checked for remove or update/commit // checked for update
if qos_inclusion_result.is_ok() { if qos_inclusion_result.is_ok() {
if *executed_status { if *commited_status {
cost_tracker.update_execution_cost(tx_cost, None); cost_tracker.update_execution_cost(tx_cost, None);
} else { } else {
cost_tracker.remove(tx_cost); cost_tracker.remove(tx_cost);
@ -197,6 +216,23 @@ impl QosService {
}); });
} }
fn remove_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_costs.zip(transaction_qos_results).for_each(
|(tx_cost, qos_inclusion_result)| {
// Only transactions that the qos service included have to be
// removed
if qos_inclusion_result.is_ok() {
cost_tracker.remove(tx_cost);
}
},
);
}
// metrics are reported by bank slot // metrics are reported by bank slot
pub fn report_metrics(&self, bank: Arc<Bank>) { pub fn report_metrics(&self, bank: Arc<Bank>) {
self.report_sender self.report_sender
@ -596,7 +632,7 @@ mod tests {
} }
#[test] #[test]
fn test_update_or_remove_transaction_costs_executed() { fn test_update_or_remove_transaction_costs_commited() {
solana_logger::setup(); solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let bank = Arc::new(Bank::new_for_tests(&genesis_config));
@ -612,7 +648,7 @@ mod tests {
.map(|_| transfer_tx.clone()) .map(|_| transfer_tx.clone())
.collect(); .collect();
// assert all tx_costs should be applied to cost_tracker if all execution_results are all Executed // assert all tx_costs should be applied to cost_tracker if all execution_results are all commited
{ {
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1); let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
let txs_costs = qos_service.compute_transaction_costs(txs.iter()); let txs_costs = qos_service.compute_transaction_costs(txs.iter());
@ -623,11 +659,11 @@ mod tests {
total_txs_costs, total_txs_costs,
bank.read_cost_tracker().unwrap().block_cost() bank.read_cost_tracker().unwrap().block_cost()
); );
let executed_status: Vec<bool> = (0..transaction_count).map(|_| true).collect(); let commited_status: Vec<bool> = (0..transaction_count).map(|_| true).collect();
QosService::update_or_remove_transaction_costs( QosService::update_or_remove_transaction_costs(
txs_costs.iter(), txs_costs.iter(),
qos_results.iter(), qos_results.iter(),
executed_status.iter(), Some(&commited_status),
&bank, &bank,
); );
assert_eq!( assert_eq!(
@ -642,7 +678,7 @@ mod tests {
} }
#[test] #[test]
fn test_update_or_remove_transaction_costs_not_executed() { fn test_update_or_remove_transaction_costs_not_commited() {
solana_logger::setup(); solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10); let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10);
let bank = Arc::new(Bank::new_for_tests(&genesis_config)); let bank = Arc::new(Bank::new_for_tests(&genesis_config));
@ -664,11 +700,10 @@ mod tests {
let txs_costs = qos_service.compute_transaction_costs(txs.iter()); let txs_costs = qos_service.compute_transaction_costs(txs.iter());
let (qos_results, _num_included) = let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let executed_status: Vec<bool> = (0..transaction_count).map(|_| false).collect();
QosService::update_or_remove_transaction_costs( QosService::update_or_remove_transaction_costs(
txs_costs.iter(), txs_costs.iter(),
qos_results.iter(), qos_results.iter(),
executed_status.iter(), None,
&bank, &bank,
); );
assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost()); assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost());
@ -693,17 +728,17 @@ mod tests {
.map(|_| transfer_tx.clone()) .map(|_| transfer_tx.clone())
.collect(); .collect();
// assert only executed tx_costs are applied cost_tracker // assert only commited tx_costs are applied cost_tracker
{ {
let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1); let qos_service = QosService::new(Arc::new(RwLock::new(CostModel::default())), 1);
let txs_costs = qos_service.compute_transaction_costs(txs.iter()); let txs_costs = qos_service.compute_transaction_costs(txs.iter());
let (qos_results, _num_included) = let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank); qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let executed_status: Vec<bool> = (0..transaction_count).map(|n| n != 0).collect(); let commited_status: Vec<bool> = (0..transaction_count).map(|n| n != 0).collect();
QosService::update_or_remove_transaction_costs( QosService::update_or_remove_transaction_costs(
txs_costs.iter(), txs_costs.iter(),
qos_results.iter(), qos_results.iter(),
executed_status.iter(), Some(&commited_status),
&bank, &bank,
); );
let expected_committed_units: u64 = txs_costs let expected_committed_units: u64 = txs_costs