refactor: move more BankingStage cost_model stuff into qos_service (#28840)
This commit is contained in:
parent
53a579bed1
commit
88e6ea37d9
|
@ -48,7 +48,7 @@ use {
|
|||
},
|
||||
bank_forks::BankForks,
|
||||
bank_utils,
|
||||
cost_model::{CostModel, TransactionCost},
|
||||
cost_model::CostModel,
|
||||
transaction_batch::TransactionBatch,
|
||||
transaction_error_metrics::TransactionErrorMetrics,
|
||||
vote_sender_types::ReplayVoteSender,
|
||||
|
@ -1499,22 +1499,10 @@ impl BankingStage {
|
|||
qos_service: &QosService,
|
||||
log_messages_bytes_limit: Option<usize>,
|
||||
) -> ProcessTransactionBatchOutput {
|
||||
let mut cost_model_time = Measure::start("cost_model");
|
||||
|
||||
let transaction_costs = qos_service.compute_transaction_costs(txs.iter());
|
||||
|
||||
let (transactions_qos_results, num_included) =
|
||||
qos_service.select_transactions_per_cost(txs.iter(), transaction_costs.iter(), bank);
|
||||
|
||||
let cost_model_throttled_transactions_count = txs.len().saturating_sub(num_included);
|
||||
|
||||
qos_service.accumulate_estimated_transaction_costs(
|
||||
&Self::accumulate_batched_transaction_costs(
|
||||
transaction_costs.iter(),
|
||||
transactions_qos_results.iter(),
|
||||
),
|
||||
);
|
||||
cost_model_time.stop();
|
||||
let (
|
||||
(transaction_costs, transactions_qos_results, cost_model_throttled_transactions_count),
|
||||
cost_model_time,
|
||||
) = measure!(qos_service.select_and_accumulate_transaction_costs(bank, txs));
|
||||
|
||||
// Only lock accounts for those transactions are selected for the block;
|
||||
// Once accounts are locked, other threads cannot encode transactions that will modify the
|
||||
|
@ -1582,87 +1570,6 @@ impl BankingStage {
|
|||
}
|
||||
}
|
||||
|
||||
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
|
||||
// execution_cost from the batch of transactions selected for block.
|
||||
fn accumulate_batched_transaction_costs<'a>(
|
||||
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
|
||||
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
|
||||
) -> BatchedTransactionDetails {
|
||||
let mut batched_transaction_details = BatchedTransactionDetails::default();
|
||||
transactions_costs
|
||||
.zip(transaction_results)
|
||||
.for_each(|(cost, result)| match result {
|
||||
Ok(_) => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_signature_cost,
|
||||
cost.signature_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_write_lock_cost,
|
||||
cost.write_lock_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_data_bytes_cost,
|
||||
cost.data_bytes_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.costs
|
||||
.batched_builtins_execute_cost,
|
||||
cost.builtins_execution_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_bpf_execute_cost,
|
||||
cost.bpf_execution_cost
|
||||
);
|
||||
}
|
||||
Err(transaction_error) => match transaction_error {
|
||||
TransactionError::WouldExceedMaxBlockCostLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_block_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedMaxVoteCostLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_vote_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedMaxAccountCostLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_account_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedAccountDataBlockLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_account_data_block_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedAccountDataTotalLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_dropped_txs_per_account_data_total_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
});
|
||||
batched_transaction_details
|
||||
}
|
||||
|
||||
fn accumulate_execute_units_and_time(execute_timings: &ExecuteTimings) -> (u64, u64) {
|
||||
let (units, times): (Vec<_>, Vec<_>) = execute_timings
|
||||
.details
|
||||
|
@ -4119,66 +4026,6 @@ mod tests {
|
|||
Blockstore::destroy(ledger_path.path()).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accumulate_batched_transaction_costs() {
|
||||
let signature_cost = 1;
|
||||
let write_lock_cost = 2;
|
||||
let data_bytes_cost = 3;
|
||||
let builtins_execution_cost = 4;
|
||||
let bpf_execution_cost = 10;
|
||||
let num_txs = 4;
|
||||
|
||||
let tx_costs: Vec<_> = (0..num_txs)
|
||||
.map(|_| TransactionCost {
|
||||
signature_cost,
|
||||
write_lock_cost,
|
||||
data_bytes_cost,
|
||||
builtins_execution_cost,
|
||||
bpf_execution_cost,
|
||||
..TransactionCost::default()
|
||||
})
|
||||
.collect();
|
||||
let tx_results: Vec<_> = (0..num_txs)
|
||||
.map(|n| {
|
||||
if n % 2 == 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(TransactionError::WouldExceedMaxBlockCostLimit)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
// should only accumulate half of the costs that are OK
|
||||
let expected_signatures = signature_cost * (num_txs / 2);
|
||||
let expected_write_locks = write_lock_cost * (num_txs / 2);
|
||||
let expected_data_bytes = data_bytes_cost * (num_txs / 2);
|
||||
let expected_builtins_execution_costs = builtins_execution_cost * (num_txs / 2);
|
||||
let expected_bpf_execution_costs = bpf_execution_cost * (num_txs / 2);
|
||||
let batched_transaction_details =
|
||||
BankingStage::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter());
|
||||
assert_eq!(
|
||||
expected_signatures,
|
||||
batched_transaction_details.costs.batched_signature_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_write_locks,
|
||||
batched_transaction_details.costs.batched_write_lock_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_data_bytes,
|
||||
batched_transaction_details.costs.batched_data_bytes_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_builtins_execution_costs,
|
||||
batched_transaction_details
|
||||
.costs
|
||||
.batched_builtins_execute_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_bpf_execution_costs,
|
||||
batched_transaction_details.costs.batched_bpf_execute_cost
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accumulate_execute_units_and_time() {
|
||||
let mut execute_timings = ExecuteTimings::default();
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
//! Provides logic and functions to allow a Leader to prioritize
|
||||
//! how transactions are included in blocks, and optimize those blocks.
|
||||
//!
|
||||
|
||||
use {
|
||||
crate::banking_stage::{BatchedTransactionDetails, CommitTransactionDetails},
|
||||
crossbeam_channel::{unbounded, Receiver, Sender},
|
||||
|
@ -13,6 +14,7 @@ use {
|
|||
},
|
||||
solana_sdk::{
|
||||
clock::Slot,
|
||||
saturating_add_assign,
|
||||
transaction::{self, SanitizedTransaction, TransactionError},
|
||||
},
|
||||
std::{
|
||||
|
@ -88,8 +90,34 @@ impl QosService {
|
|||
}
|
||||
}
|
||||
|
||||
/// Calculate cost of transactions, determine which ones to include in the slot, and
|
||||
/// accumulate costs in the cost tracker.
|
||||
/// Returns a vector of transaction costs, a vector of results indicating which transactions
|
||||
/// were selected, and the number of transactions that were *NOT* selected.
|
||||
pub fn select_and_accumulate_transaction_costs(
|
||||
&self,
|
||||
bank: &Bank,
|
||||
transactions: &[SanitizedTransaction],
|
||||
) -> (Vec<TransactionCost>, Vec<transaction::Result<()>>, usize) {
|
||||
let transaction_costs = self.compute_transaction_costs(transactions.iter());
|
||||
let (transactions_qos_results, num_included) =
|
||||
self.select_transactions_per_cost(transactions.iter(), transaction_costs.iter(), bank);
|
||||
self.accumulate_estimated_transaction_costs(&Self::accumulate_batched_transaction_costs(
|
||||
transaction_costs.iter(),
|
||||
transactions_qos_results.iter(),
|
||||
));
|
||||
let cost_model_throttled_transactions_count =
|
||||
transactions.len().saturating_sub(num_included);
|
||||
|
||||
(
|
||||
transaction_costs,
|
||||
transactions_qos_results,
|
||||
cost_model_throttled_transactions_count,
|
||||
)
|
||||
}
|
||||
|
||||
// invoke cost_model to calculate cost for the given list of transactions
|
||||
pub fn compute_transaction_costs<'a>(
|
||||
fn compute_transaction_costs<'a>(
|
||||
&self,
|
||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
||||
) -> Vec<TransactionCost> {
|
||||
|
@ -122,11 +150,11 @@ impl QosService {
|
|||
/// Given a list of transactions and their costs, this function returns a corresponding
|
||||
/// list of Results that indicate if a transaction is selected to be included in the current block,
|
||||
/// and a count of the number of transactions that would fit in the block
|
||||
pub fn select_transactions_per_cost<'a>(
|
||||
fn select_transactions_per_cost<'a>(
|
||||
&self,
|
||||
transactions: impl Iterator<Item = &'a SanitizedTransaction>,
|
||||
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
|
||||
bank: &Arc<Bank>,
|
||||
bank: &Bank,
|
||||
) -> (Vec<transaction::Result<()>>, usize) {
|
||||
let mut cost_tracking_time = Measure::start("cost_tracking_time");
|
||||
let mut cost_tracker = bank.write_cost_tracker().unwrap();
|
||||
|
@ -243,7 +271,7 @@ impl QosService {
|
|||
.unwrap_or_else(|err| warn!("qos service report metrics failed: {:?}", err));
|
||||
}
|
||||
|
||||
pub fn accumulate_estimated_transaction_costs(
|
||||
fn accumulate_estimated_transaction_costs(
|
||||
&self,
|
||||
batched_transaction_details: &BatchedTransactionDetails,
|
||||
) {
|
||||
|
@ -331,6 +359,87 @@ impl QosService {
|
|||
.fetch_add(micro_sec, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// rollup transaction cost details, eg signature_cost, write_lock_cost, data_bytes_cost and
|
||||
// execution_cost from the batch of transactions selected for block.
|
||||
fn accumulate_batched_transaction_costs<'a>(
|
||||
transactions_costs: impl Iterator<Item = &'a TransactionCost>,
|
||||
transaction_results: impl Iterator<Item = &'a transaction::Result<()>>,
|
||||
) -> BatchedTransactionDetails {
|
||||
let mut batched_transaction_details = BatchedTransactionDetails::default();
|
||||
transactions_costs
|
||||
.zip(transaction_results)
|
||||
.for_each(|(cost, result)| match result {
|
||||
Ok(_) => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_signature_cost,
|
||||
cost.signature_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_write_lock_cost,
|
||||
cost.write_lock_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_data_bytes_cost,
|
||||
cost.data_bytes_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.costs
|
||||
.batched_builtins_execute_cost,
|
||||
cost.builtins_execution_cost
|
||||
);
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details.costs.batched_bpf_execute_cost,
|
||||
cost.bpf_execution_cost
|
||||
);
|
||||
}
|
||||
Err(transaction_error) => match transaction_error {
|
||||
TransactionError::WouldExceedMaxBlockCostLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_block_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedMaxVoteCostLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_vote_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedMaxAccountCostLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_account_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedAccountDataBlockLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_retried_txs_per_account_data_block_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
TransactionError::WouldExceedAccountDataTotalLimit => {
|
||||
saturating_add_assign!(
|
||||
batched_transaction_details
|
||||
.errors
|
||||
.batched_dropped_txs_per_account_data_total_limit_count,
|
||||
1
|
||||
);
|
||||
}
|
||||
_ => {}
|
||||
},
|
||||
});
|
||||
batched_transaction_details
|
||||
}
|
||||
|
||||
fn reporting_loop(
|
||||
running_flag: Arc<AtomicBool>,
|
||||
metrics: Arc<QosServiceMetrics>,
|
||||
|
@ -814,4 +923,64 @@ mod tests {
|
|||
);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_accumulate_batched_transaction_costs() {
|
||||
let signature_cost = 1;
|
||||
let write_lock_cost = 2;
|
||||
let data_bytes_cost = 3;
|
||||
let builtins_execution_cost = 4;
|
||||
let bpf_execution_cost = 10;
|
||||
let num_txs = 4;
|
||||
|
||||
let tx_costs: Vec<_> = (0..num_txs)
|
||||
.map(|_| TransactionCost {
|
||||
signature_cost,
|
||||
write_lock_cost,
|
||||
data_bytes_cost,
|
||||
builtins_execution_cost,
|
||||
bpf_execution_cost,
|
||||
..TransactionCost::default()
|
||||
})
|
||||
.collect();
|
||||
let tx_results: Vec<_> = (0..num_txs)
|
||||
.map(|n| {
|
||||
if n % 2 == 0 {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(TransactionError::WouldExceedMaxBlockCostLimit)
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
// should only accumulate half of the costs that are OK
|
||||
let expected_signatures = signature_cost * (num_txs / 2);
|
||||
let expected_write_locks = write_lock_cost * (num_txs / 2);
|
||||
let expected_data_bytes = data_bytes_cost * (num_txs / 2);
|
||||
let expected_builtins_execution_costs = builtins_execution_cost * (num_txs / 2);
|
||||
let expected_bpf_execution_costs = bpf_execution_cost * (num_txs / 2);
|
||||
let batched_transaction_details =
|
||||
QosService::accumulate_batched_transaction_costs(tx_costs.iter(), tx_results.iter());
|
||||
assert_eq!(
|
||||
expected_signatures,
|
||||
batched_transaction_details.costs.batched_signature_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_write_locks,
|
||||
batched_transaction_details.costs.batched_write_lock_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_data_bytes,
|
||||
batched_transaction_details.costs.batched_data_bytes_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_builtins_execution_costs,
|
||||
batched_transaction_details
|
||||
.costs
|
||||
.batched_builtins_execute_cost
|
||||
);
|
||||
assert_eq!(
|
||||
expected_bpf_execution_costs,
|
||||
batched_transaction_details.costs.batched_bpf_execute_cost
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue