Optimize batching of transactions during replay for parallel processing
This commit is contained in:
parent
37afdd1a65
commit
4de14e530b
|
@ -28,6 +28,7 @@ use {
|
|||
bank_utils,
|
||||
block_cost_limits::*,
|
||||
commitment::VOTE_THRESHOLD_SIZE,
|
||||
cost_model::CostModel,
|
||||
snapshot_config::SnapshotConfig,
|
||||
snapshot_package::{AccountsPackageSender, SnapshotType},
|
||||
snapshot_utils::{self, BankFromArchiveTimings},
|
||||
|
@ -53,6 +54,7 @@ use {
|
|||
collect_token_balances, TransactionTokenBalancesSet,
|
||||
},
|
||||
std::{
|
||||
borrow::Cow,
|
||||
cell::RefCell,
|
||||
collections::{HashMap, HashSet},
|
||||
path::PathBuf,
|
||||
|
@ -247,7 +249,7 @@ fn execute_batch(
|
|||
first_err.map(|(result, _)| result).unwrap_or(Ok(()))
|
||||
}
|
||||
|
||||
fn execute_batches(
|
||||
fn execute_batches_internal(
|
||||
bank: &Arc<Bank>,
|
||||
batches: &[TransactionBatch],
|
||||
entry_callback: Option<&ProcessCallback>,
|
||||
|
@ -290,6 +292,79 @@ fn execute_batches(
|
|||
first_err(&results)
|
||||
}
|
||||
|
||||
fn execute_batches(
|
||||
bank: &Arc<Bank>,
|
||||
batches: &[TransactionBatch],
|
||||
entry_callback: Option<&ProcessCallback>,
|
||||
transaction_status_sender: Option<&TransactionStatusSender>,
|
||||
replay_vote_sender: Option<&ReplayVoteSender>,
|
||||
timings: &mut ExecuteTimings,
|
||||
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
|
||||
) -> Result<()> {
|
||||
let lock_results = batches
|
||||
.iter()
|
||||
.flat_map(|batch| batch.lock_results().clone())
|
||||
.collect::<Vec<_>>();
|
||||
let sanitized_txs = batches
|
||||
.iter()
|
||||
.flat_map(|batch| batch.sanitized_transactions().to_vec())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
let cost_model = CostModel::new();
|
||||
let mut minimal_tx_cost = u64::MAX;
|
||||
let total_cost: u64 = sanitized_txs
|
||||
.iter()
|
||||
.map(|tx| {
|
||||
let cost = cost_model.calculate_cost(tx).sum();
|
||||
minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost);
|
||||
cost
|
||||
})
|
||||
.sum();
|
||||
|
||||
let target_batch_count = get_thread_count() as u64;
|
||||
|
||||
if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
|
||||
let target_batch_cost = total_cost / target_batch_count;
|
||||
let mut batch_cost = 0;
|
||||
let mut index = 0;
|
||||
let mut tx_batches: Vec<TransactionBatch> = vec![];
|
||||
let mut slice_range = 0..0;
|
||||
while index < sanitized_txs.len() {
|
||||
batch_cost += cost_model.calculate_cost(&sanitized_txs[index]).sum();
|
||||
index += 1;
|
||||
if batch_cost >= target_batch_cost || sanitized_txs.len() == index {
|
||||
slice_range.end = index;
|
||||
let txs = &sanitized_txs[slice_range.clone()];
|
||||
let results = &lock_results[slice_range.clone()];
|
||||
let tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs));
|
||||
slice_range.start = index;
|
||||
tx_batches.push(tx_batch);
|
||||
batch_cost = 0;
|
||||
}
|
||||
}
|
||||
|
||||
execute_batches_internal(
|
||||
bank,
|
||||
&tx_batches,
|
||||
entry_callback,
|
||||
transaction_status_sender,
|
||||
replay_vote_sender,
|
||||
timings,
|
||||
cost_capacity_meter,
|
||||
)
|
||||
} else {
|
||||
execute_batches_internal(
|
||||
bank,
|
||||
&batches,
|
||||
entry_callback,
|
||||
transaction_status_sender,
|
||||
replay_vote_sender,
|
||||
timings,
|
||||
cost_capacity_meter,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/// Process an ordered list of entries in parallel
|
||||
/// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry
|
||||
/// 2. Process the locked group in parallel
|
||||
|
|
Loading…
Reference in New Issue