updates to address review feedback

This commit is contained in:
Pankaj Garg 2022-02-05 11:46:09 -08:00
parent dfef68f985
commit c5d8560cdb
1 changed files with 29 additions and 36 deletions

View File

@ -312,57 +312,50 @@ fn execute_batches(
let cost_model = CostModel::new();
let mut minimal_tx_cost = u64::MAX;
let total_cost: u64 = sanitized_txs
let mut total_cost: u64 = 0;
let tx_costs = sanitized_txs
.iter()
.map(|tx| {
let cost = cost_model.calculate_cost(tx).sum();
minimal_tx_cost = std::cmp::min(minimal_tx_cost, cost);
total_cost = total_cost.saturating_add(cost);
cost
})
.sum();
.collect::<Vec<_>>();
let target_batch_count = get_thread_count() as u64;
if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let mut tx_batches: Vec<TransactionBatch> = vec![];
let rebatched_txs = if total_cost > target_batch_count.saturating_mul(minimal_tx_cost) {
let target_batch_cost = total_cost / target_batch_count;
let mut batch_cost = 0;
let mut index = 0;
let mut tx_batches: Vec<TransactionBatch> = vec![];
let mut slice_range = 0..0;
while index < sanitized_txs.len() {
batch_cost += cost_model.calculate_cost(&sanitized_txs[index]).sum();
index += 1;
if batch_cost >= target_batch_cost || sanitized_txs.len() == index {
slice_range.end = index;
let txs = &sanitized_txs[slice_range.clone()];
let results = &lock_results[slice_range.clone()];
let mut batch_cost: u64 = 0;
let mut slice_start = 0;
tx_costs.into_iter().enumerate().for_each(|(index, cost)| {
let next_index = index + 1;
batch_cost = batch_cost.saturating_add(cost);
if batch_cost >= target_batch_cost || next_index == sanitized_txs.len() {
let txs = &sanitized_txs[slice_start..=index];
let results = &lock_results[slice_start..=index];
let tx_batch = TransactionBatch::new(results.to_vec(), bank, Cow::from(txs));
slice_range.start = index;
slice_start = next_index;
tx_batches.push(tx_batch);
batch_cost = 0;
}
}
execute_batches_internal(
bank,
&tx_batches,
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)
});
&tx_batches[..]
} else {
execute_batches_internal(
bank,
batches,
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)
}
batches
};
execute_batches_internal(
bank,
rebatched_txs,
entry_callback,
transaction_status_sender,
replay_vote_sender,
timings,
cost_capacity_meter,
)
}
/// Process an ordered list of entries in parallel