From 4d338ed88298ff9262186df7657a43a6ee860ed0 Mon Sep 17 00:00:00 2001 From: apfitzge Date: Tue, 29 Nov 2022 16:00:21 -0600 Subject: [PATCH] Bugfix/mi_remove_never_entries (#28978) --- core/src/banking_stage.rs | 76 ++++++++++++++++++++- core/src/unprocessed_transaction_storage.rs | 59 ++++++++++++---- 2 files changed, 117 insertions(+), 18 deletions(-) diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 03a4ccd8c3..73b54c6858 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -722,7 +722,7 @@ impl BankingStage { bank_creation_time, }) = bank_start { - let returned_payload = unprocessed_transaction_storage.process_packets( + reached_end_of_slot = unprocessed_transaction_storage.process_packets( working_bank.clone(), banking_stage_stats, slot_metrics_tracker, @@ -745,8 +745,6 @@ impl BankingStage { ) }, ); - - reached_end_of_slot = returned_payload.reached_end_of_slot; } else { reached_end_of_slot = true; } @@ -3726,6 +3724,78 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } + #[test] + fn test_consume_buffered_packets_sanitization_error() { + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) = + setup_conflicting_transactions(ledger_path.path()); + let duplicate_account_key = transactions[0].message.account_keys[0]; + transactions[0] + .message + .account_keys + .push(duplicate_account_key); // corrupt transaction + let recorder = poh_recorder.read().unwrap().recorder(); + let num_conflicting_transactions = transactions.len(); + let deserialized_packets = + unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) + .unwrap(); + assert_eq!(deserialized_packets.len(), num_conflicting_transactions); + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets.into_iter(), + num_conflicting_transactions, + ), + ThreadType::Transactions, + ); + + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); + + // When the working bank in poh_recorder is None, no packets should be processed + assert!(!poh_recorder.read().unwrap().has_bank()); + let max_tx_processing_ns = std::u128::MAX; + BankingStage::consume_buffered_packets( + max_tx_processing_ns, + &poh_recorder, + &mut buffered_packet_batches, + &None, + &gossip_vote_sender, + None::>, + &BankingStageStats::default(), + &recorder, + &QosService::new(1), + &mut LeaderSlotMetricsTracker::new(0), + None, + ); + assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions); + // When the working bank in poh_recorder is Some, all packets should be processed. + // Multi-Iterator will process them 1-by-1 if all txs are conflicting. + poh_recorder.write().unwrap().set_bank(&bank, false); + BankingStage::consume_buffered_packets( + max_tx_processing_ns, + &poh_recorder, + &mut buffered_packet_batches, + &None, + &gossip_vote_sender, + None::>, + &BankingStageStats::default(), + &recorder, + &QosService::new(1), + &mut LeaderSlotMetricsTracker::new(0), + None, + ); + assert!(buffered_packet_batches.is_empty()); + poh_recorder + .read() + .unwrap() + .is_exited + .store(true, Ordering::Relaxed); + let _ = poh_simulator.join(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); + } + #[test] fn test_consume_buffered_packets_interrupted() { let ledger_path = get_tmp_ledger_path_auto_delete!(); diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 87258faf2f..e4d15af0ab 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -19,10 +19,13 @@ use { solana_measure::measure, solana_runtime::bank::Bank, solana_sdk::{ - clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign, + clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, hash::Hash, saturating_add_assign, transaction::SanitizedTransaction, }, - std::sync::{atomic::Ordering, Arc}, + std::{ + collections::HashMap, + sync::{atomic::Ordering, Arc}, + }, }; // Step-size set to be 64, equal to the maximum batch/entry size. With the @@ -132,6 +135,7 @@ pub struct ConsumeScannerPayload<'a> { pub account_locks: ReadWriteAccountSet, pub sanitized_transactions: Vec, pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, + pub message_hash_to_transaction: &'a mut HashMap, } fn consume_scan_should_process_packet( @@ -174,6 +178,9 @@ fn consume_scan_should_process_packet( ) .is_err() { + payload + .message_hash_to_transaction + .remove(packet.message_hash()); ProcessingDecision::Never } else if payload.account_locks.try_locking(message) { payload.sanitized_transactions.push(sanitized_transaction); @@ -182,6 +189,9 @@ fn consume_scan_should_process_packet( ProcessingDecision::Later } } else { + payload + .message_hash_to_transaction + .remove(packet.message_hash()); ProcessingDecision::Never } } @@ -189,6 +199,7 @@ fn consume_scan_should_process_packet( fn create_consume_multi_iterator<'a, 'b, F>( packets: &'a [Arc], slot_metrics_tracker: &'b mut LeaderSlotMetricsTracker, + message_hash_to_transaction: &'b mut HashMap, should_process_packet: F, ) -> MultiIteratorScanner<'a, Arc, ConsumeScannerPayload<'b>, F> where @@ -203,6 +214,7 @@ where account_locks: ReadWriteAccountSet::default(), sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE), slot_metrics_tracker, + message_hash_to_transaction, }; MultiIteratorScanner::new( packets, @@ -333,7 +345,7 @@ impl UnprocessedTransactionStorage { banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, processing_function: F, - ) -> ConsumeScannerPayload<'a> + ) -> bool where F: FnMut( &Vec>, @@ -417,13 +429,14 @@ impl VoteStorage { FilterForwardingResults::default() } + // returns `true` if the end of slot is reached fn process_packets<'a, F>( &mut self, bank: Arc, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, mut processing_function: F, - ) -> ConsumeScannerPayload<'a> + ) -> bool where F: FnMut( &Vec>, @@ -445,9 +458,13 @@ impl VoteStorage { let all_vote_packets = self .latest_unprocessed_votes .drain_unprocessed(bank.clone()); + + // vote storage does not have a message hash map, so pass in an empty one + let mut dummy_message_hash_to_transaction = HashMap::new(); let mut scanner = create_consume_multi_iterator( &all_vote_packets, slot_metrics_tracker, + &mut dummy_message_hash_to_transaction, should_process_packet, ); @@ -472,7 +489,7 @@ impl VoteStorage { } } - scanner.finalize() + scanner.finalize().reached_end_of_slot } } @@ -629,7 +646,8 @@ impl ThreadLocalUnprocessedPackets { &accepted_packet_indexes, ); - self.collect_retained_packets( + Self::collect_retained_packets( + &mut self.unprocessed_packet_batches.message_hash_to_transaction, &packets_to_forward, &Self::prepare_filtered_packet_indexes( &transaction_to_packet_indexes, @@ -807,11 +825,15 @@ impl ThreadLocalUnprocessedPackets { } fn collect_retained_packets( - &mut self, + message_hash_to_transaction: &mut HashMap, packets_to_process: &[Arc], retained_packet_indexes: &[usize], ) -> Vec> { - self.remove_non_retained_packets(packets_to_process, retained_packet_indexes); + Self::remove_non_retained_packets( + message_hash_to_transaction, + packets_to_process, + retained_packet_indexes, + ); retained_packet_indexes .iter() .map(|i| packets_to_process[*i].clone()) @@ -821,7 +843,7 @@ impl ThreadLocalUnprocessedPackets { /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have /// been removed from UnprocessedPacketBatches.packet_priority_queue fn remove_non_retained_packets( - &mut self, + message_hash_to_transaction: &mut HashMap, packets_to_process: &[Arc], retained_packet_indexes: &[usize], ) { @@ -831,21 +853,20 @@ impl ThreadLocalUnprocessedPackets { .chain(std::iter::once(&packets_to_process.len())), |start, end| { for processed_packet in &packets_to_process[start..end] { - self.unprocessed_packet_batches - .message_hash_to_transaction - .remove(processed_packet.message_hash()); + message_hash_to_transaction.remove(processed_packet.message_hash()); } }, ) } + // returns `true` if reached end of slot fn process_packets<'a, F>( &mut self, bank: &Bank, banking_stage_stats: &BankingStageStats, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, mut processing_function: F, - ) -> ConsumeScannerPayload<'a> + ) -> bool where F: FnMut( &Vec>, @@ -864,6 +885,7 @@ impl ThreadLocalUnprocessedPackets { let mut scanner = create_consume_multi_iterator( &all_packets_to_process, slot_metrics_tracker, + &mut self.unprocessed_packet_batches.message_hash_to_transaction, should_process_packet, ); @@ -875,7 +897,11 @@ impl ThreadLocalUnprocessedPackets { let retryable_packets = if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process, payload) { - self.collect_retained_packets(&packets_to_process, &retryable_transaction_indexes) + Self::collect_retained_packets( + payload.message_hash_to_transaction, + &packets_to_process, + &retryable_transaction_indexes, + ) } else { packets_to_process }; @@ -883,9 +909,12 @@ impl ThreadLocalUnprocessedPackets { new_retryable_packets.extend(retryable_packets); } + let reached_end_of_slot = scanner.finalize().reached_end_of_slot; + self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; self.verify_priority_queue(original_capacity); - scanner.finalize() + + reached_end_of_slot } /// Prepare a chunk of packets for forwarding, filter out already forwarded packets while