From b0112a5f43fd142ce1d7eebc39e9034aa49e9c2f Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Thu, 9 Mar 2023 09:13:04 -0800 Subject: [PATCH] BankingStage Consumer: test_buffered_packets* reworking (#30615) * refactor test_consume_buffered_packets_interrupted without test_fn * Fix comment * Also check retries * Add retryable test case --- core/src/banking_stage/consumer.rs | 158 ++++++++++++++++++----------- 1 file changed, 99 insertions(+), 59 deletions(-) diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 8dee3c0658..0a6d3ca41f 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -719,7 +719,7 @@ mod tests { atomic::{AtomicBool, AtomicU64}, RwLock, }, - thread::{Builder, JoinHandle}, + thread::JoinHandle, }, }; @@ -1729,13 +1729,35 @@ mod tests { // Multi-Iterator will process them 1-by-1 if all txs are conflicting. poh_recorder.write().unwrap().set_bank(&bank, false); let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); + let banking_stage_stats = BankingStageStats::default(); consumer.consume_buffered_packets( &bank_start, &mut buffered_packet_batches, - &BankingStageStats::default(), + &banking_stage_stats, &mut LeaderSlotMetricsTracker::new(0), ); + + // Check that all packets were processed without retrying assert!(buffered_packet_batches.is_empty()); + assert_eq!( + banking_stage_stats + .consumed_buffered_packets_count + .load(Ordering::Relaxed), + num_conflicting_transactions + ); + assert_eq!( + banking_stage_stats + .rebuffered_packets_count + .load(Ordering::Relaxed), + 0 + ); + // Use bank to check the number of entries (batches) + assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); + assert_eq!( + bank_start.working_bank.transaction_entries_count(), + num_conflicting_transactions as u64 + ); + poh_recorder .read() .unwrap() @@ -1801,73 +1823,91 @@ mod tests { } #[test] - fn test_consume_buffered_packets_interrupted() { + fn test_consume_buffered_packets_retryable() { let ledger_path = get_tmp_ledger_path_auto_delete!(); { - let (continue_sender, continue_receiver) = unbounded(); - let (finished_packet_sender, finished_packet_receiver) = unbounded(); let (transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = setup_conflicting_transactions(ledger_path.path()); - - let test_fn: Option> = Some(Box::new(move || { - finished_packet_sender.send(()).unwrap(); - continue_receiver.recv().unwrap(); - })); - // When the poh recorder has a bank, it should process all buffered packets. + let recorder = poh_recorder.read().unwrap().recorder(); let num_conflicting_transactions = transactions.len(); - poh_recorder.write().unwrap().set_bank(&bank, false); - let poh_recorder_ = poh_recorder.clone(); - let recorder = poh_recorder_.read().unwrap().recorder(); - let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); + let deserialized_packets = + unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) + .unwrap(); + assert_eq!(deserialized_packets.len(), num_conflicting_transactions); + let retryable_packet = deserialized_packets[0].clone(); + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets.into_iter(), + num_conflicting_transactions, + ), + ThreadType::Transactions, + ); + let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let committer = Committer::new(None, replay_vote_sender); - let consumer = Consumer::new(committer, recorder, QosService::new(1), None, test_fn); + let consumer = Consumer::new(committer, recorder, QosService::new(1), None, None); - // Start up thread to process the banks - let t_consume = Builder::new() - .name("consume-buffered-packets".to_string()) - .spawn(move || { - let num_conflicting_transactions = transactions.len(); - let deserialized_packets = - unprocessed_packet_batches::transactions_to_deserialized_packets( - &transactions, - ) - .unwrap(); - assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches = - UnprocessedTransactionStorage::new_transaction_storage( - UnprocessedPacketBatches::from_iter( - deserialized_packets.into_iter(), - num_conflicting_transactions, - ), - ThreadType::Transactions, - ); - consumer.consume_buffered_packets( - &bank_start, - &mut buffered_packet_batches, - &BankingStageStats::default(), - &mut LeaderSlotMetricsTracker::new(0), - ); + // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) + assert!(!poh_recorder.read().unwrap().has_bank()); + assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); + // When the working bank in poh_recorder is Some, all packets should be processed + // except except for retryable errors. Manually take the lock of a transaction to + // simulate another thread processing a transaction with that lock. + poh_recorder.write().unwrap().set_bank(&bank, false); + let bank_start = poh_recorder.read().unwrap().bank_start().unwrap(); - // Check everything is correct. All valid packets should be processed. - assert!(buffered_packet_batches.is_empty()); - }) - .unwrap(); + let lock_account = transactions[0].message.account_keys[1]; + let manual_lock_tx = + SanitizedTransaction::from_transaction_for_tests(system_transaction::transfer( + &Keypair::new(), + &lock_account, + 1, + bank.last_blockhash(), + )); + let _ = bank_start.working_bank.accounts().lock_accounts( + std::iter::once(&manual_lock_tx), + bank_start.working_bank.get_transaction_account_lock_limit(), + ); - // Should be calling `test_fn` for each non-conflicting batch. - // In this case each batch is of size 1. - for i in 0..num_conflicting_transactions { - finished_packet_receiver.recv().unwrap(); - if i + 1 == num_conflicting_transactions { - poh_recorder - .read() - .unwrap() - .is_exited - .store(true, Ordering::Relaxed); - } - continue_sender.send(()).unwrap(); - } - t_consume.join().unwrap(); + let banking_stage_stats = BankingStageStats::default(); + consumer.consume_buffered_packets( + &bank_start, + &mut buffered_packet_batches, + &banking_stage_stats, + &mut LeaderSlotMetricsTracker::new(0), + ); + + // Check that all but 1 transaction was processed. And that it was rebuffered. + assert_eq!(buffered_packet_batches.len(), 1); + assert_eq!( + buffered_packet_batches.iter().next().unwrap(), + &retryable_packet + ); + assert_eq!( + banking_stage_stats + .consumed_buffered_packets_count + .load(Ordering::Relaxed), + num_conflicting_transactions - 1, + ); + assert_eq!( + banking_stage_stats + .rebuffered_packets_count + .load(Ordering::Relaxed), + 1 + ); + // Use bank to check the number of entries (batches) + assert_eq!(bank_start.working_bank.transactions_per_entry_max(), 1); + assert_eq!( + bank_start.working_bank.transaction_entries_count(), + num_conflicting_transactions as u64 - 1 + ); + + poh_recorder + .read() + .unwrap() + .is_exited + .store(true, Ordering::Relaxed); let _ = poh_simulator.join(); } Blockstore::destroy(ledger_path.path()).unwrap();