Refactor: SchedulerController tests call receive then schedule (#34836)

This commit is contained in:
Andrew Fitzgerald 2024-01-18 22:49:11 -08:00 committed by GitHub
parent 7470c3d68b
commit 6976b750a9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 38 additions and 50 deletions

View File

@ -596,8 +596,6 @@ mod tests {
tempfile::TempDir,
};
const TEST_TIMEOUT: Duration = Duration::from_millis(1000);
fn create_channels<T>(num: usize) -> (Vec<Sender<T>>, Vec<Receiver<T>>) {
(0..num).map(|_| unbounded()).unzip()
}
@ -703,10 +701,27 @@ mod tests {
Arc::new((packet_batch, None))
}
// Helper function to let test receive and then schedule packets.
// The order of operations here is convenient for testing, but does not
// match the order of operations in the actual scheduler.
// The actual scheduler will process immediately after the decision,
// in order to keep the decision as recent as possible for processing.
// In the tests, the decision will not become stale, so it is more convenient
// to receive first and then schedule.
fn test_receive_then_schedule(scheduler_controller: &mut SchedulerController) {
let decision = scheduler_controller
.decision_maker
.make_consume_or_forward_decision();
assert!(matches!(decision, BufferedPacketsDecision::Consume(_)));
assert!(scheduler_controller.receive_completed().is_ok());
assert!(scheduler_controller.receive_and_buffer_packets(&decision));
assert!(scheduler_controller.process_transactions(&decision).is_ok());
}
#[test]
#[should_panic(expected = "batch id 0 is not being tracked")]
fn test_unexpected_batch_id() {
let (test_frame, central_scheduler_banking_stage) = create_test_frame(1);
let (test_frame, scheduler_controller) = create_test_frame(1);
let TestFrame {
finished_consume_work_sender,
..
@ -724,12 +739,12 @@ mod tests {
})
.unwrap();
central_scheduler_banking_stage.run().unwrap();
scheduler_controller.run().unwrap();
}
#[test]
fn test_schedule_consume_single_threaded_no_conflicts() {
let (test_frame, central_scheduler_banking_stage) = create_test_frame(1);
let (test_frame, mut scheduler_controller) = create_test_frame(1);
let TestFrame {
bank,
mint_keypair,
@ -743,7 +758,6 @@ mod tests {
.write()
.unwrap()
.set_bank_for_test(bank.clone());
let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run());
// Send packet batch to the scheduler - should do nothing until we become the leader.
let tx1 = create_and_fund_prioritized_transfer(
@ -772,9 +786,8 @@ mod tests {
.send(to_banking_packet_batch(&txs))
.unwrap();
let consume_work = consume_work_receivers[0]
.recv_timeout(TEST_TIMEOUT)
.unwrap();
test_receive_then_schedule(&mut scheduler_controller);
let consume_work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(consume_work.ids.len(), 2);
assert_eq!(consume_work.transactions.len(), 2);
let message_hashes = consume_work
@ -783,14 +796,11 @@ mod tests {
.map(|tx| tx.message_hash())
.collect_vec();
assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]);
drop(test_frame);
let _ = scheduler_thread.join();
}
#[test]
fn test_schedule_consume_single_threaded_conflict() {
let (test_frame, central_scheduler_banking_stage) = create_test_frame(1);
let (test_frame, mut scheduler_controller) = create_test_frame(1);
let TestFrame {
bank,
mint_keypair,
@ -804,7 +814,6 @@ mod tests {
.write()
.unwrap()
.set_bank_for_test(bank.clone());
let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run());
let pk = Pubkey::new_unique();
let tx1 = create_and_fund_prioritized_transfer(
@ -834,12 +843,9 @@ mod tests {
.unwrap();
// We expect 2 batches to be scheduled
test_receive_then_schedule(&mut scheduler_controller);
let consume_works = (0..2)
.map(|_| {
consume_work_receivers[0]
.recv_timeout(TEST_TIMEOUT)
.unwrap()
})
.map(|_| consume_work_receivers[0].try_recv().unwrap())
.collect_vec();
let num_txs_per_batch = consume_works.iter().map(|cw| cw.ids.len()).collect_vec();
@ -849,14 +855,11 @@ mod tests {
.collect_vec();
assert_eq!(num_txs_per_batch, vec![1; 2]);
assert_eq!(message_hashes, vec![&tx2_hash, &tx1_hash]);
drop(test_frame);
let _ = scheduler_thread.join();
}
#[test]
fn test_schedule_consume_single_threaded_multi_batch() {
let (test_frame, central_scheduler_banking_stage) = create_test_frame(1);
let (test_frame, mut scheduler_controller) = create_test_frame(1);
let TestFrame {
bank,
mint_keypair,
@ -866,7 +869,6 @@ mod tests {
..
} = &test_frame;
let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run());
poh_recorder
.write()
.unwrap()
@ -908,26 +910,20 @@ mod tests {
.unwrap();
// We expect 4 batches to be scheduled
test_receive_then_schedule(&mut scheduler_controller);
let consume_works = (0..4)
.map(|_| {
consume_work_receivers[0]
.recv_timeout(TEST_TIMEOUT)
.unwrap()
})
.map(|_| consume_work_receivers[0].try_recv().unwrap())
.collect_vec();
assert_eq!(
consume_works.iter().map(|cw| cw.ids.len()).collect_vec(),
vec![TARGET_NUM_TRANSACTIONS_PER_BATCH; 4]
);
drop(test_frame);
let _ = scheduler_thread.join();
}
#[test]
fn test_schedule_consume_simple_thread_selection() {
let (test_frame, central_scheduler_banking_stage) = create_test_frame(2);
let (test_frame, mut scheduler_controller) = create_test_frame(2);
let TestFrame {
bank,
mint_keypair,
@ -941,7 +937,6 @@ mod tests {
.write()
.unwrap()
.set_bank_for_test(bank.clone());
let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run());
// Send 4 transactions w/o conflicts. 2 should be scheduled on each thread
let txs = (0..4)
@ -972,15 +967,17 @@ mod tests {
.into_iter()
.map(|i| txs[i].message().hash())
.collect_vec();
test_receive_then_schedule(&mut scheduler_controller);
let t0_actual = consume_work_receivers[0]
.recv_timeout(TEST_TIMEOUT)
.try_recv()
.unwrap()
.transactions
.iter()
.map(|tx| *tx.message_hash())
.collect_vec();
let t1_actual = consume_work_receivers[1]
.recv_timeout(TEST_TIMEOUT)
.try_recv()
.unwrap()
.transactions
.iter()
@ -989,14 +986,11 @@ mod tests {
assert_eq!(t0_actual, t0_expected);
assert_eq!(t1_actual, t1_expected);
drop(test_frame);
let _ = scheduler_thread.join();
}
#[test]
fn test_schedule_consume_retryable() {
let (test_frame, central_scheduler_banking_stage) = create_test_frame(1);
let (test_frame, mut scheduler_controller) = create_test_frame(1);
let TestFrame {
bank,
mint_keypair,
@ -1011,7 +1005,6 @@ mod tests {
.write()
.unwrap()
.set_bank_for_test(bank.clone());
let scheduler_thread = std::thread::spawn(move || central_scheduler_banking_stage.run());
// Send packet batch to the scheduler - should do nothing until we become the leader.
let tx1 = create_and_fund_prioritized_transfer(
@ -1040,9 +1033,8 @@ mod tests {
.send(to_banking_packet_batch(&txs))
.unwrap();
let consume_work = consume_work_receivers[0]
.recv_timeout(TEST_TIMEOUT)
.unwrap();
test_receive_then_schedule(&mut scheduler_controller);
let consume_work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(consume_work.ids.len(), 2);
assert_eq!(consume_work.transactions.len(), 2);
let message_hashes = consume_work
@ -1061,9 +1053,8 @@ mod tests {
.unwrap();
// Transaction should be rescheduled
let consume_work = consume_work_receivers[0]
.recv_timeout(TEST_TIMEOUT)
.unwrap();
test_receive_then_schedule(&mut scheduler_controller);
let consume_work = consume_work_receivers[0].try_recv().unwrap();
assert_eq!(consume_work.ids.len(), 1);
assert_eq!(consume_work.transactions.len(), 1);
let message_hashes = consume_work
@ -1072,8 +1063,5 @@ mod tests {
.map(|tx| tx.message_hash())
.collect_vec();
assert_eq!(message_hashes, vec![&tx1_hash]);
drop(test_frame);
let _ = scheduler_thread.join();
}
}