QoS refactor: Allow pre-filtering (#31542)

This commit is contained in:
Andrew Fitzgerald 2023-05-12 08:53:22 -07:00 committed by GitHub
parent c5905f525c
commit 2c869ef778
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 225 additions and 203 deletions

View File

@ -394,18 +394,24 @@ impl Consumer {
chunk_offset: usize,
) -> ProcessTransactionBatchOutput {
let (
(transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count),
(transaction_qos_cost_results, cost_model_throttled_transactions_count),
cost_model_us,
) = measure_us!(self
.qos_service
.select_and_accumulate_transaction_costs(bank, txs));
) = measure_us!(self.qos_service.select_and_accumulate_transaction_costs(
bank,
txs,
std::iter::repeat(Ok(())) // no filtering before QoS
));
// Only lock accounts for those transactions are selected for the block;
// Once accounts are locked, other threads cannot encode transactions that will modify the
// same account state
let (batch, lock_us) = measure_us!(
bank.prepare_sanitized_batch_with_results(txs, transactions_qos_results.iter())
);
let (batch, lock_us) = measure_us!(bank.prepare_sanitized_batch_with_results(
txs,
transaction_qos_cost_results.iter().map(|r| match r {
Ok(_cost) => Ok(()),
Err(err) => Err(err.clone()),
})
));
// retryable_txs includes AccountInUse, WouldExceedMaxBlockCostLimit
// WouldExceedMaxAccountCostLimit, WouldExceedMaxVoteCostLimit
@ -424,8 +430,7 @@ impl Consumer {
} = execute_and_commit_transactions_output;
QosService::update_or_remove_transaction_costs(
transaction_costs.iter(),
transactions_qos_results.iter(),
transaction_qos_cost_results.iter(),
commit_transactions_result.as_ref().ok(),
bank,
);

View File

@ -84,50 +84,57 @@ impl QosService {
}
}
/// Calculate cost of transactions, determine which ones to include in the slot, and
/// accumulate costs in the cost tracker.
/// Returns a vector of transaction costs, a vector of results indicating which transactions
/// were selected, and the number of transactions that were *NOT* selected.
/// Calculate cost of transactions, if not already filtered out, determine which ones to
/// include in the slot, and accumulate costs in the cost tracker.
/// Returns a vector of results containing selected transaction costs, and the number of
/// transactions that were *NOT* selected.
pub fn select_and_accumulate_transaction_costs(
&self,
bank: &Bank,
transactions: &[SanitizedTransaction],
) -> (Vec<TransactionCost>, Vec<transaction::Result<()>>, usize) {
pre_results: impl Iterator<Item = transaction::Result<()>>,
) -> (Vec<transaction::Result<TransactionCost>>, usize) {
let transaction_costs =
self.compute_transaction_costs(&bank.feature_set, transactions.iter());
let (transactions_qos_results, num_included) =
self.select_transactions_per_cost(transactions.iter(), transaction_costs.iter(), bank);
self.compute_transaction_costs(&bank.feature_set, transactions.iter(), pre_results);
let (transactions_qos_cost_results, num_included) = self.select_transactions_per_cost(
transactions.iter(),
transaction_costs.into_iter(),
bank,
);
self.accumulate_estimated_transaction_costs(&Self::accumulate_batched_transaction_costs(
transaction_costs.iter(),
transactions_qos_results.iter(),
transactions_qos_cost_results.iter(),
));
let cost_model_throttled_transactions_count =
transactions.len().saturating_sub(num_included);
(
transaction_costs,
transactions_qos_results,
transactions_qos_cost_results,
cost_model_throttled_transactions_count,
)
}
// invoke cost_model to calculate cost for the given list of transactions
// invoke cost_model to calculate cost for the given list of transactions that have not
// been filtered out already.
fn compute_transaction_costs<'a>(
&self,
feature_set: &FeatureSet,
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
) -> Vec<TransactionCost> {
pre_results: impl Iterator<Item = transaction::Result<()>>,
) -> Vec<transaction::Result<TransactionCost>> {
let mut compute_cost_time = Measure::start("compute_cost_time");
let txs_costs: Vec<_> = transactions
.map(|tx| {
let cost = CostModel::calculate_cost(tx, feature_set);
debug!(
"transaction {:?}, cost {:?}, cost sum {}",
tx,
cost,
cost.sum()
);
cost
.zip(pre_results)
.map(|(tx, pre_result)| {
pre_result.map(|()| {
let cost = CostModel::calculate_cost(tx, feature_set);
debug!(
"transaction {:?}, cost {:?}, cost sum {}",
tx,
cost,
cost.sum()
);
cost
})
})
.collect();
compute_cost_time.stop();
@ -148,27 +155,34 @@ impl QosService {
fn select_transactions_per_cost<'a>(
&self,
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
transactions_costs: impl Iterator<Item = transaction::Result<TransactionCost>>,
bank: &Bank,
) -> (Vec<transaction::Result<()>>, usize) {
) -> (Vec<transaction::Result<TransactionCost>>, usize) {
let mut cost_tracking_time = Measure::start("cost_tracking_time");
let mut cost_tracker = bank.write_cost_tracker().unwrap();
let mut num_included = 0;
let select_results = transactions
.zip(transactions_costs)
.map(|(tx, cost)| match cost_tracker.try_add(cost) {
Ok(current_block_cost) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
self.metrics.stats.selected_txs_count.fetch_add(1, Ordering::Relaxed);
num_included += 1;
Ok(())
},
Err(e) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e);
Err(TransactionError::from(e))
let select_results = transactions.zip(transactions_costs)
.map(|(tx, cost)| {
match cost {
Ok(cost) => {
match cost_tracker.try_add(&cost) {
Ok(current_block_cost) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, fit into current block, current block cost {}", bank.slot(), tx, cost, current_block_cost);
self.metrics.stats.selected_txs_count.fetch_add(1, Ordering::Relaxed);
num_included += 1;
Ok(cost)
},
Err(e) => {
debug!("slot {:?}, transaction {:?}, cost {:?}, not fit into current block, '{:?}'", bank.slot(), tx, cost, e);
Err(TransactionError::from(e))
}
}
},
Err(e) => Err(e),
}
})
.collect();
cost_tracking_time.stop();
self.metrics
.stats
@ -182,65 +196,54 @@ impl QosService {
/// Otherwise remove the cost from the cost tracker, therefore preventing cost_tracker
/// being inflated with unsuccessfully executed transactions.
pub fn update_or_remove_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: Option<&Vec<CommitTransactionDetails>>,
bank: &Arc<Bank>,
) {
match transaction_committed_status {
Some(transaction_committed_status) => Self::update_transaction_costs(
transaction_costs,
transaction_qos_results,
transaction_cost_results,
transaction_committed_status,
bank,
),
None => {
Self::remove_transaction_costs(transaction_costs, transaction_qos_results, bank)
}
None => Self::remove_transaction_costs(transaction_cost_results, bank),
}
}
fn update_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
transaction_committed_status: &Vec<CommitTransactionDetails>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_costs
.zip(transaction_qos_results)
transaction_cost_results
.zip(transaction_committed_status)
.for_each(
|((tx_cost, qos_inclusion_result), transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if qos_inclusion_result.is_ok() {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
.for_each(|(tx_cost, transaction_committed_details)| {
// Only transactions that the qos service included have to be
// checked for update
if let Ok(tx_cost) = tx_cost {
match transaction_committed_details {
CommitTransactionDetails::Committed { compute_units } => {
cost_tracker.update_execution_cost(tx_cost, *compute_units)
}
CommitTransactionDetails::NotCommitted => cost_tracker.remove(tx_cost),
}
},
);
}
});
}
fn remove_transaction_costs<'a>(
transaction_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_qos_results: impl Iterator<Item = &'a transaction::Result<()>>,
transaction_cost_results: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
bank: &Arc<Bank>,
) {
let mut cost_tracker = bank.write_cost_tracker().unwrap();
transaction_costs.zip(transaction_qos_results).for_each(
|(tx_cost, qos_inclusion_result)| {
// Only transactions that the qos service included have to be
// removed
if qos_inclusion_result.is_ok() {
cost_tracker.remove(tx_cost);
}
},
);
transaction_cost_results.for_each(|tx_cost| {
// Only transactions that the qos service included have to be
// removed
if let Ok(tx_cost) = tx_cost {
cost_tracker.remove(tx_cost);
}
});
}
// metrics are reported by bank slot
@ -341,81 +344,78 @@ impl QosService {
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
// execution_cost from the batch of transactions selected for block.
fn accumulate_batched_transaction_costs<'a>(
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
transactions_costs: impl Iterator<Item = &'a transaction::Result<TransactionCost>>,
) -> BatchedTransactionDetails {
let mut batched_transaction_details = BatchedTransactionDetails::default();
transactions_costs
.zip(transaction_results)
.for_each(|(cost, result)| match result {
Ok(_) => {
saturating_add_assign!(
batched_transaction_details.costs.batched_signature_cost,
cost.signature_cost
);
saturating_add_assign!(
batched_transaction_details.costs.batched_write_lock_cost,
cost.write_lock_cost
);
saturating_add_assign!(
batched_transaction_details.costs.batched_data_bytes_cost,
cost.data_bytes_cost
);
transactions_costs.for_each(|cost| match cost {
Ok(cost) => {
saturating_add_assign!(
batched_transaction_details.costs.batched_signature_cost,
cost.signature_cost
);
saturating_add_assign!(
batched_transaction_details.costs.batched_write_lock_cost,
cost.write_lock_cost
);
saturating_add_assign!(
batched_transaction_details.costs.batched_data_bytes_cost,
cost.data_bytes_cost
);
saturating_add_assign!(
batched_transaction_details
.costs
.batched_builtins_execute_cost,
cost.builtins_execution_cost
);
saturating_add_assign!(
batched_transaction_details.costs.batched_bpf_execute_cost,
cost.bpf_execution_cost
);
}
Err(transaction_error) => match transaction_error {
TransactionError::WouldExceedMaxBlockCostLimit => {
saturating_add_assign!(
batched_transaction_details
.costs
.batched_builtins_execute_cost,
cost.builtins_execution_cost
);
saturating_add_assign!(
batched_transaction_details.costs.batched_bpf_execute_cost,
cost.bpf_execution_cost
.errors
.batched_retried_txs_per_block_limit_count,
1
);
}
Err(transaction_error) => match transaction_error {
TransactionError::WouldExceedMaxBlockCostLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_block_limit_count,
1
);
}
TransactionError::WouldExceedMaxVoteCostLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_vote_limit_count,
1
);
}
TransactionError::WouldExceedMaxAccountCostLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_account_limit_count,
1
);
}
TransactionError::WouldExceedAccountDataBlockLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_account_data_block_limit_count,
1
);
}
TransactionError::WouldExceedAccountDataTotalLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_dropped_txs_per_account_data_total_limit_count,
1
);
}
_ => {}
},
});
TransactionError::WouldExceedMaxVoteCostLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_vote_limit_count,
1
);
}
TransactionError::WouldExceedMaxAccountCostLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_account_limit_count,
1
);
}
TransactionError::WouldExceedAccountDataBlockLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_retried_txs_per_account_data_block_limit_count,
1
);
}
TransactionError::WouldExceedAccountDataTotalLimit => {
saturating_add_assign!(
batched_transaction_details
.errors
.batched_dropped_txs_per_account_data_total_limit_count,
1
);
}
_ => {}
},
});
batched_transaction_details
}
@ -671,8 +671,11 @@ mod tests {
let txs = vec![transfer_tx.clone(), vote_tx.clone(), vote_tx, transfer_tx];
let qos_service = QosService::new(1);
let txs_costs =
qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter());
let txs_costs = qos_service.compute_transaction_costs(
&FeatureSet::all_enabled(),
txs.iter(),
std::iter::repeat(Ok(())),
);
// verify the size of txs_costs and its contents
assert_eq!(txs_costs.len(), txs.len());
@ -681,7 +684,7 @@ mod tests {
.enumerate()
.map(|(index, cost)| {
assert_eq!(
cost.sum(),
cost.as_ref().unwrap().sum(),
CostModel::calculate_cost(&txs[index], &FeatureSet::all_enabled()).sum()
);
})
@ -717,8 +720,11 @@ mod tests {
let txs = vec![transfer_tx.clone(), vote_tx.clone(), transfer_tx, vote_tx];
let qos_service = QosService::new(1);
let txs_costs =
qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter());
let txs_costs = qos_service.compute_transaction_costs(
&FeatureSet::all_enabled(),
txs.iter(),
std::iter::repeat(Ok(())),
);
// set cost tracker limit to fit 1 transfer tx and 1 vote tx
let cost_limit = transfer_tx_cost + vote_tx_cost;
@ -726,7 +732,7 @@ mod tests {
.unwrap()
.set_limits(cost_limit, cost_limit, cost_limit);
let (results, num_selected) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank);
assert_eq!(num_selected, 2);
// verify that first transfer tx and first vote are allowed
@ -758,26 +764,32 @@ mod tests {
// assert all tx_costs should be applied to cost_tracker if all execution_results are all committed
{
let qos_service = QosService::new(1);
let txs_costs =
qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter());
let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let txs_costs = qos_service.compute_transaction_costs(
&FeatureSet::all_enabled(),
txs.iter(),
std::iter::repeat(Ok(())),
);
let total_txs_cost: u64 = txs_costs
.iter()
.map(|cost| cost.as_ref().unwrap().sum())
.sum();
let (qos_cost_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
// all transactions are committed with actual units more than estimated
let commited_status: Vec<CommitTransactionDetails> = txs_costs
let commited_status: Vec<CommitTransactionDetails> = qos_cost_results
.iter()
.map(|tx_cost| CommitTransactionDetails::Committed {
compute_units: tx_cost.bpf_execution_cost + execute_units_adjustment,
compute_units: tx_cost.as_ref().unwrap().bpf_execution_cost
+ execute_units_adjustment,
})
.collect();
let final_txs_cost = total_txs_cost + execute_units_adjustment * transaction_count;
QosService::update_or_remove_transaction_costs(
txs_costs.iter(),
qos_results.iter(),
qos_cost_results.iter(),
Some(&commited_status),
&bank,
);
@ -812,21 +824,22 @@ mod tests {
// assert all tx_costs should be removed from cost_tracker if all execution_results are all Not Committed
{
let qos_service = QosService::new(1);
let txs_costs =
qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter());
let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let txs_costs = qos_service.compute_transaction_costs(
&FeatureSet::all_enabled(),
txs.iter(),
std::iter::repeat(Ok(())),
);
let total_txs_cost: u64 = txs_costs
.iter()
.map(|cost| cost.as_ref().unwrap().sum())
.sum();
let (qos_cost_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
QosService::update_or_remove_transaction_costs(
txs_costs.iter(),
qos_results.iter(),
None,
&bank,
);
QosService::update_or_remove_transaction_costs(qos_cost_results.iter(), None, &bank);
assert_eq!(0, bank.read_cost_tracker().unwrap().block_cost());
assert_eq!(0, bank.read_cost_tracker().unwrap().transaction_count());
}
@ -853,17 +866,23 @@ mod tests {
// assert only commited tx_costs are applied cost_tracker
{
let qos_service = QosService::new(1);
let txs_costs =
qos_service.compute_transaction_costs(&FeatureSet::all_enabled(), txs.iter());
let total_txs_cost: u64 = txs_costs.iter().map(|cost| cost.sum()).sum();
let (qos_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.iter(), &bank);
let txs_costs = qos_service.compute_transaction_costs(
&FeatureSet::all_enabled(),
txs.iter(),
std::iter::repeat(Ok(())),
);
let total_txs_cost: u64 = txs_costs
.iter()
.map(|cost| cost.as_ref().unwrap().sum())
.sum();
let (qos_cost_results, _num_included) =
qos_service.select_transactions_per_cost(txs.iter(), txs_costs.into_iter(), &bank);
assert_eq!(
total_txs_cost,
bank.read_cost_tracker().unwrap().block_cost()
);
// Half of transactions are not committed, the rest with cost adjustment
let commited_status: Vec<CommitTransactionDetails> = txs_costs
let commited_status: Vec<CommitTransactionDetails> = qos_cost_results
.iter()
.enumerate()
.map(|(n, tx_cost)| {
@ -871,14 +890,14 @@ mod tests {
CommitTransactionDetails::NotCommitted
} else {
CommitTransactionDetails::Committed {
compute_units: tx_cost.bpf_execution_cost + execute_units_adjustment,
compute_units: tx_cost.as_ref().unwrap().bpf_execution_cost
+ execute_units_adjustment,
}
}
})
.collect();
QosService::update_or_remove_transaction_costs(
txs_costs.iter(),
qos_results.iter(),
qos_cost_results.iter(),
Some(&commited_status),
&bank,
);
@ -886,10 +905,11 @@ mod tests {
// assert the final block cost
let mut expected_final_txs_count = 0u64;
let mut expected_final_block_cost = 0u64;
txs_costs.iter().enumerate().for_each(|(n, cost)| {
qos_cost_results.iter().enumerate().for_each(|(n, cost)| {
if n % 2 != 0 {
expected_final_txs_count += 1;
expected_final_block_cost += cost.sum() + execute_units_adjustment;
expected_final_block_cost +=
cost.as_ref().unwrap().sum() + execute_units_adjustment;
}
});
assert_eq!(
@ -912,20 +932,17 @@ mod tests {
let bpf_execution_cost = 10;
let num_txs = 4;
let tx_costs: Vec<_> = (0..num_txs)
.map(|_| TransactionCost {
signature_cost,
write_lock_cost,
data_bytes_cost,
builtins_execution_cost,
bpf_execution_cost,
..TransactionCost::default()
})
.collect();
let tx_results: Vec<_> = (0..num_txs)
let tx_cost_results: Vec<_> = (0..num_txs)
.map(|n| {
if n % 2 == 0 {
Ok(())
Ok(TransactionCost {
signature_cost,
write_lock_cost,
data_bytes_cost,
builtins_execution_cost,
bpf_execution_cost,
..TransactionCost::default()
})
} else {
Err(TransactionError::WouldExceedMaxBlockCostLimit)
}
@ -938,7 +955,7 @@ mod tests {
let expected_builtins_execution_costs = builtins_execution_cost * (num_txs / 2);
let expected_bpf_execution_costs = bpf_execution_cost * (num_txs / 2);
let batched_transaction_details =
QosService::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter());
QosService::accumulate_batched_transaction_costs(tx_cost_results.iter());
assert_eq!(
expected_signatures,
batched_transaction_details.costs.batched_signature_cost

View File

@ -1233,14 +1233,14 @@ impl Accounts {
pub fn lock_accounts_with_results<'a>(
&self,
txs: impl Iterator<Item = &'a SanitizedTransaction>,
results: impl Iterator<Item = &'a Result<()>>,
results: impl Iterator<Item = Result<()>>,
tx_account_lock_limit: usize,
) -> Vec<Result<()>> {
let tx_account_locks_results: Vec<Result<_>> = txs
.zip(results)
.map(|(tx, result)| match result {
Ok(()) => tx.get_account_locks(tx_account_lock_limit),
Err(err) => Err(err.clone()),
Err(err) => Err(err),
})
.collect();
self.lock_accounts_inner(tx_account_locks_results)
@ -3162,7 +3162,7 @@ mod tests {
let results = accounts.lock_accounts_with_results(
txs.iter(),
qos_results.iter(),
qos_results.into_iter(),
MAX_TX_ACCOUNT_LOCKS,
);

View File

@ -3765,7 +3765,7 @@ impl Bank {
pub fn prepare_sanitized_batch_with_results<'a, 'b>(
&'a self,
transactions: &'b [SanitizedTransaction],
transaction_results: impl Iterator<Item = &'b Result<()>>,
transaction_results: impl Iterator<Item = Result<()>>,
) -> TransactionBatch<'a, 'b> {
// this lock_results could be: Ok, AccountInUse, WouldExceedBlockMaxLimit or WouldExceedAccountMaxLimit
let tx_account_lock_limit = self.get_transaction_account_lock_limit();