TransactionScheduler: pre_lock_filter (#34488)
This commit is contained in:
parent
e877eef6c6
commit
d00c9a45b2
|
@ -46,10 +46,17 @@ impl PrioGraphScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
/// Schedule transactions from the given `TransactionStateContainer` to be consumed by the
|
||||
/// worker threads. Returns summary of scheduling, or an error.
|
||||
/// `filter` is used to filter out transactions that should be skipped and dropped, and
|
||||
/// should set `false` for transactions that should be dropped, and `true` otherwise.
|
||||
/// Schedule transactions from the given `TransactionStateContainer` to be
|
||||
/// consumed by the worker threads. Returns summary of scheduling, or an
|
||||
/// error.
|
||||
/// `pre_graph_filter` is used to filter out transactions that should be
|
||||
/// skipped and dropped before insertion to the prio-graph. This fn should
|
||||
/// set `false` for transactions that should be dropped, and `true`
|
||||
/// otherwise.
|
||||
/// `pre_lock_filter` is used to filter out transactions after they have
|
||||
/// made it to the top of the prio-graph, and immediately before locks are
|
||||
/// checked and taken. This fn should return `true` for transactions that
|
||||
/// should be scheduled, and `false` otherwise.
|
||||
///
|
||||
/// Uses a `PrioGraph` to perform look-ahead during the scheduling of transactions.
|
||||
/// This, combined with internal tracking of threads' in-flight transactions, allows
|
||||
|
@ -58,7 +65,8 @@ impl PrioGraphScheduler {
|
|||
pub(crate) fn schedule(
|
||||
&mut self,
|
||||
container: &mut TransactionStateContainer,
|
||||
filter: impl Fn(&[&SanitizedTransaction], &mut [bool]),
|
||||
pre_graph_filter: impl Fn(&[&SanitizedTransaction], &mut [bool]),
|
||||
pre_lock_filter: impl Fn(&SanitizedTransaction) -> bool,
|
||||
) -> Result<SchedulingSummary, SchedulerError> {
|
||||
let num_threads = self.consume_work_senders.len();
|
||||
let mut batches = Batches::new(num_threads);
|
||||
|
@ -100,7 +108,8 @@ impl PrioGraphScheduler {
|
|||
txs.push(&transaction.transaction);
|
||||
});
|
||||
|
||||
let (_, filter_us) = measure_us!(filter(&txs, &mut filter_array[..chunk_size]));
|
||||
let (_, filter_us) =
|
||||
measure_us!(pre_graph_filter(&txs, &mut filter_array[..chunk_size]));
|
||||
saturating_add_assign!(total_filter_time_us, filter_us);
|
||||
|
||||
for (id, filter_result) in ids.iter().zip(&filter_array[..chunk_size]) {
|
||||
|
@ -148,6 +157,10 @@ impl PrioGraphScheduler {
|
|||
};
|
||||
|
||||
let transaction = &transaction_state.transaction_ttl().transaction;
|
||||
if !pre_lock_filter(transaction) {
|
||||
container.remove_by_id(&id.id);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check if this transaction conflicts with any blocked transactions
|
||||
if !blocking_locks.check_locks(transaction.message()) {
|
||||
|
@ -601,10 +614,14 @@ mod tests {
|
|||
.unzip()
|
||||
}
|
||||
|
||||
fn test_filter(_txs: &[&SanitizedTransaction], results: &mut [bool]) {
|
||||
fn test_pre_graph_filter(_txs: &[&SanitizedTransaction], results: &mut [bool]) {
|
||||
results.fill(true);
|
||||
}
|
||||
|
||||
fn test_pre_lock_filter(_tx: &SanitizedTransaction) -> bool {
|
||||
true
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_schedule_disconnected_channel() {
|
||||
let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1);
|
||||
|
@ -612,7 +629,7 @@ mod tests {
|
|||
|
||||
drop(work_receivers); // explicitly drop receivers
|
||||
assert_matches!(
|
||||
scheduler.schedule(&mut container, test_filter),
|
||||
scheduler.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter),
|
||||
Err(SchedulerError::DisconnectedSendChannel(_))
|
||||
);
|
||||
}
|
||||
|
@ -625,7 +642,9 @@ mod tests {
|
|||
(&Keypair::new(), &[Pubkey::new_unique()], 2, 2),
|
||||
]);
|
||||
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 2);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 0);
|
||||
assert_eq!(collect_work(&work_receivers[0]).1, vec![txids!([1, 0])]);
|
||||
|
@ -640,7 +659,9 @@ mod tests {
|
|||
(&Keypair::new(), &[pubkey], 1, 2),
|
||||
]);
|
||||
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 2);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 0);
|
||||
assert_eq!(
|
||||
|
@ -658,7 +679,9 @@ mod tests {
|
|||
);
|
||||
|
||||
// expect 4 full batches to be scheduled
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
scheduling_summary.num_scheduled,
|
||||
4 * TARGET_NUM_TRANSACTIONS_PER_BATCH
|
||||
|
@ -678,7 +701,9 @@ mod tests {
|
|||
let mut container =
|
||||
create_container((0..4).map(|i| (Keypair::new(), [Pubkey::new_unique()], 1, i)));
|
||||
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 4);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 0);
|
||||
assert_eq!(collect_work(&work_receivers[0]).1, [txids!([3, 1])]);
|
||||
|
@ -710,7 +735,9 @@ mod tests {
|
|||
// fact they eventually join means that the scheduler will schedule them
|
||||
// onto the same thread to avoid causing [4], which conflicts with both
|
||||
// chains, to be un-schedulable.
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 5);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 0);
|
||||
assert_eq!(
|
||||
|
@ -751,7 +778,9 @@ mod tests {
|
|||
// Because the look-ahead window is shortened to a size of 4, the scheduler does
|
||||
// not have knowledge of the joining at transaction [4] until after [0] and [1]
|
||||
// have been scheduled.
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 4);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 2);
|
||||
let (thread_0_work, thread_0_ids) = collect_work(&work_receivers[0]);
|
||||
|
@ -762,7 +791,9 @@ mod tests {
|
|||
);
|
||||
|
||||
// Cannot schedule even on next pass because of lock conflicts
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 0);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 2);
|
||||
|
||||
|
@ -774,7 +805,9 @@ mod tests {
|
|||
})
|
||||
.unwrap();
|
||||
scheduler.receive_completed(&mut container).unwrap();
|
||||
let scheduling_summary = scheduler.schedule(&mut container, test_filter).unwrap();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, test_pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 2);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 0);
|
||||
|
||||
|
@ -783,4 +816,29 @@ mod tests {
|
|||
[txids!([4]), txids!([5])]
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_schedule_pre_lock_filter() {
|
||||
let (mut scheduler, work_receivers, _finished_work_sender) = create_test_frame(1);
|
||||
let pubkey = Pubkey::new_unique();
|
||||
let keypair = Keypair::new();
|
||||
let mut container = create_container([
|
||||
(&Keypair::new(), &[pubkey], 1, 1),
|
||||
(&keypair, &[pubkey], 1, 2),
|
||||
(&Keypair::new(), &[pubkey], 1, 3),
|
||||
]);
|
||||
|
||||
// 2nd transaction should be filtered out and dropped before locking.
|
||||
let pre_lock_filter =
|
||||
|tx: &SanitizedTransaction| tx.message().fee_payer() != &keypair.pubkey();
|
||||
let scheduling_summary = scheduler
|
||||
.schedule(&mut container, test_pre_graph_filter, pre_lock_filter)
|
||||
.unwrap();
|
||||
assert_eq!(scheduling_summary.num_scheduled, 2);
|
||||
assert_eq!(scheduling_summary.num_unschedulable, 0);
|
||||
assert_eq!(
|
||||
collect_work(&work_receivers[0]).1,
|
||||
vec![txids!([2]), txids!([0])]
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,12 +114,13 @@ impl SchedulerController {
|
|||
) -> Result<(), SchedulerError> {
|
||||
match decision {
|
||||
BufferedPacketsDecision::Consume(bank_start) => {
|
||||
let (scheduling_summary, schedule_time_us) =
|
||||
measure_us!(self
|
||||
.scheduler
|
||||
.schedule(&mut self.container, |txs, results| {
|
||||
Self::pre_scheduling_filter(txs, results, &bank_start.working_bank)
|
||||
})?);
|
||||
let (scheduling_summary, schedule_time_us) = measure_us!(self.scheduler.schedule(
|
||||
&mut self.container,
|
||||
|txs, results| {
|
||||
Self::pre_graph_filter(txs, results, &bank_start.working_bank)
|
||||
},
|
||||
|_| true // no pre-lock filter for now
|
||||
)?);
|
||||
saturating_add_assign!(
|
||||
self.count_metrics.num_scheduled,
|
||||
scheduling_summary.num_scheduled
|
||||
|
@ -152,11 +153,7 @@ impl SchedulerController {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn pre_scheduling_filter(
|
||||
transactions: &[&SanitizedTransaction],
|
||||
results: &mut [bool],
|
||||
bank: &Bank,
|
||||
) {
|
||||
fn pre_graph_filter(transactions: &[&SanitizedTransaction], results: &mut [bool], bank: &Bank) {
|
||||
let lock_results = vec![Ok(()); transactions.len()];
|
||||
let mut error_counters = TransactionErrorMetrics::default();
|
||||
let check_results = bank.check_transactions(
|
||||
|
|
Loading…
Reference in New Issue