BankingStage Consumer: test_buffered_packets* reworking (#30615)

* refactor test_consume_buffered_packets_interrupted without test_fn

* Fix comment

* Also check retries

* Add retryable test case
This commit is contained in:
Andrew Fitzgerald 2023-03-09 09:13:04 -08:00 committed by GitHub
parent 76eef7ed0d
commit b0112a5f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 99 additions and 59 deletions

View File

@ -719,7 +719,7 @@ mod tests {
atomic::{AtomicBool, AtomicU64},
RwLock,
},
thread::{Builder, JoinHandle},
thread::JoinHandle,
},
};
@ -1729,13 +1729,35 @@ mod tests {
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&BankingStageStats::default(),
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);
// Check that all packets were processed without retrying
assert!(buffered_packet_batches.is_empty());
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
0
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
num_conflicting_transactions as u64
);
poh_recorder
.read()
.unwrap()
@ -1801,73 +1823,91 @@ mod tests {
}
#[test]
fn test_consume_buffered_packets_interrupted() {
fn test_consume_buffered_packets_retryable() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (continue_sender, continue_receiver) = unbounded();
let (finished_packet_sender, finished_packet_receiver) = unbounded();
let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let test_fn: Option<Box<dyn Fn() + Send>> = Some(Box::new(move || {
finished_packet_sender.send(()).unwrap();
continue_receiver.recv().unwrap();
}));
// When the poh recorder has a bank, it should process all buffered packets.
let recorder = poh_recorder.read().unwrap().recorder();
let num_conflicting_transactions = transactions.len();
poh_recorder.write().unwrap().set_bank(&bank, false);
let poh_recorder_ = poh_recorder.clone();
let recorder = poh_recorder_.read().unwrap().recorder();
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let retryable_packet = deserialized_packets[0].clone();
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets.into_iter(),
num_conflicting_transactions,
),
ThreadType::Transactions,
);
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, test_fn);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None);
// Start up thread to process the banks
let t_consume = Builder::new()
.name("consume-buffered-packets".to_string())
.spawn(move || {
let num_conflicting_transactions = transactions.len();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(
&transactions,
)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets.into_iter(),
num_conflicting_transactions,
),
ThreadType::Transactions,
);
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&BankingStageStats::default(),
&mut LeaderSlotMetricsTracker::new(0),
);
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
assert!(!poh_recorder.read().unwrap().has_bank());
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed
// except except for retryable errors. Manually take the lock of a transaction to
// simulate another thread processing a transaction with that lock.
poh_recorder.write().unwrap().set_bank(&bank, false);
let bank_start = poh_recorder.read().unwrap().bank_start().unwrap();
// Check everything is correct. All valid packets should be processed.
assert!(buffered_packet_batches.is_empty());
})
.unwrap();
let lock_account = transactions[0].message.account_keys[1];
let manual_lock_tx =
SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer(
&Keypair::new(),
&lock_account,
1,
bank.last_blockhash(),
));
let _ = bank_start.working_bank.accounts().lock_accounts(
std::iter::once(&manual_lock_tx),
bank_start.working_bank.get_transaction_account_lock_limit(),
);
// Should be calling `test_fn` for each non-conflicting batch.
// In this case each batch is of size 1.
for i in 0..num_conflicting_transactions {
finished_packet_receiver.recv().unwrap();
if i + 1 == num_conflicting_transactions {
poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
}
continue_sender.send(()).unwrap();
}
t_consume.join().unwrap();
let banking_stage_stats = BankingStageStats::default();
consumer.consume_buffered_packets(
&bank_start,
&mut buffered_packet_batches,
&banking_stage_stats,
&mut LeaderSlotMetricsTracker::new(0),
);
// Check that all but 1 transaction was processed. And that it was rebuffered.
assert_eq!(buffered_packet_batches.len(), 1);
assert_eq!(
buffered_packet_batches.iter().next().unwrap(),
&retryable_packet
);
assert_eq!(
banking_stage_stats
.consumed_buffered_packets_count
.load(Ordering::Relaxed),
num_conflicting_transactions - 1,
);
assert_eq!(
banking_stage_stats
.rebuffered_packets_count
.load(Ordering::Relaxed),
1
);
// Use bank to check the number of entries (batches)
assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1);
assert_eq!(
bank_start.working_bank.transaction_entries_count(),
num_conflicting_transactions as u64 - 1
);
poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();