From 4de14e530b09cceba10cf47d91478af800011d38 Mon Sep 17 00:00:00 2001 From: Pankaj Garg Date: Thu, 3 Feb 2022 10:00:27 -0800 Subject: [PATCH] Optimize batching of transactions during replay for parallel processing --- ledger/src/blockstore_processor.rs | 77 +++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 12ee159c7..0ec91c3ef 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -28,6 +28,7 @@ use { bank_utils, block_cost_limits::*, commitment::VOTE_THRESHOLD_SIZE, + cost_model::CostModel, snapshot_config::SnapshotConfig, snapshot_package::{AccountsPackageSender, SnapshotType}, snapshot_utils::{self, BankFromArchiveTimings}, @@ -53,6 +54,7 @@ use { collect_token_balances, TransactionTokenBalancesSet, }, std::{ + borrow::Cow, cell::RefCell, collections::{HashMap, HashSet}, path::PathBuf, @@ -247,7 +249,7 @@ fn execute_batch( first_err.map(|(result, _)| result).unwrap_or(Ok(())) } -fn execute_batches( +fn execute_batches_internal( bank: &Arc, batches: &[TransactionBatch], entry_callback: Option<&ProcessCallback>, @@ -290,6 +292,79 @@ fn execute_batches( first_err(&results) } +fn execute_batches( + bank: &Arc, + batches: &[TransactionBatch], + entry_callback: Option<&ProcessCallback>, + transaction_status_sender: Option<&TransactionStatusSender>, + replay_vote_sender: Option<&ReplayVoteSender>, + timings: &mut ExecuteTimings, + cost_capacity_meter: Arc>, +) -> Result<()> { + let lock_results = batches + .iter() + .flat_map(|batch| batch.lock_results().clone()) + .collect::>(); + let sanitized_txs = batches + .iter() + .flat_map(|batch| batch.sanitized_transactions().to_vec()) + .collect::>(); + + let cost_model = CostModel::new(); + let mut minimal_tx_cost = u64::MAX; + let total_cost: u64 = sanitized_txs + .iter() + .map(|tx| { + let cost = cost_model.calculate_cost(tx).sum(); + minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost); + cost + }) + .sum(); + + let target_batch_count = get_thread_count() as u64; + + if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) { + let target_batch_cost = total_cost / target_batch_count; + let mut batch_cost = 0; + let mut index = 0; + let mut tx_batches: Vec = vec![]; + let mut slice_range = 0..0; + while index < sanitized_txs.len() { + batch_cost += cost_model.calculate_cost(&sanitized_txs[index]).sum(); + index += 1; + if batch_cost >= target_batch_cost || sanitized_txs.len() == index { + slice_range.end = index; + let txs = &sanitized_txs[slice_range.clone()]; + let results = &lock_results[slice_range.clone()]; + let tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs)); + slice_range.start = index; + tx_batches.push(tx_batch); + batch_cost = 0; + } + } + + execute_batches_internal( + bank, + &tx_batches, + entry_callback, + transaction_status_sender, + replay_vote_sender, + timings, + cost_capacity_meter, + ) + } else { + execute_batches_internal( + bank, + &batches, + entry_callback, + transaction_status_sender, + replay_vote_sender, + timings, + cost_capacity_meter, + ) + } +} + /// Process an ordered list of entries in parallel /// 1. In order lock accounts for each entry while the lock succeeds, up to a Tick entry /// 2. Process the locked group in parallel