Bugfix/mi_remove_never_entries (#28978)

This commit is contained in:
apfitzge 2022-11-29 16:00:21 -06:00 committed by GitHub
parent 19d86bd2b1
commit 4d338ed882
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 117 additions and 18 deletions

View File

@ -722,7 +722,7 @@ impl BankingStage {
bank_creation_time, bank_creation_time,
}) = bank_start }) = bank_start
{ {
let returned_payload = unprocessed_transaction_storage.process_packets( reached_end_of_slot = unprocessed_transaction_storage.process_packets(
working_bank.clone(), working_bank.clone(),
banking_stage_stats, banking_stage_stats,
slot_metrics_tracker, slot_metrics_tracker,
@ -745,8 +745,6 @@ impl BankingStage {
) )
}, },
); );
reached_end_of_slot = returned_payload.reached_end_of_slot;
} else { } else {
reached_end_of_slot = true; reached_end_of_slot = true;
} }
@ -3726,6 +3724,78 @@ mod tests {
Blockstore::destroy(ledger_path.path()).unwrap(); Blockstore::destroy(ledger_path.path()).unwrap();
} }
#[test]
fn test_consume_buffered_packets_sanitization_error() {
let ledger_path = get_tmp_ledger_path_auto_delete!();
{
let (mut transactions, bank, poh_recorder, _entry_receiver, poh_simulator) =
setup_conflicting_transactions(ledger_path.path());
let duplicate_account_key = transactions[0].message.account_keys[0];
transactions[0]
.message
.account_keys
.push(duplicate_account_key); // corrupt transaction
let recorder = poh_recorder.read().unwrap().recorder();
let num_conflicting_transactions = transactions.len();
let deserialized_packets =
unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions)
.unwrap();
assert_eq!(deserialized_packets.len(), num_conflicting_transactions);
let mut buffered_packet_batches =
UnprocessedTransactionStorage::new_transaction_storage(
UnprocessedPacketBatches::from_iter(
deserialized_packets.into_iter(),
num_conflicting_transactions,
),
ThreadType::Transactions,
);
let (gossip_vote_sender, _gossip_vote_receiver) = unbounded();
// When the working bank in poh_recorder is None, no packets should be processed
assert!(!poh_recorder.read().unwrap().has_bank());
let max_tx_processing_ns = std::u128::MAX;
BankingStage::consume_buffered_packets(
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert_eq!(buffered_packet_batches.len(), num_conflicting_transactions);
// When the working bank in poh_recorder is Some, all packets should be processed.
// Multi-Iterator will process them 1-by-1 if all txs are conflicting.
poh_recorder.write().unwrap().set_bank(&bank, false);
BankingStage::consume_buffered_packets(
max_tx_processing_ns,
&poh_recorder,
&mut buffered_packet_batches,
&None,
&gossip_vote_sender,
None::<Box<dyn Fn()>>,
&BankingStageStats::default(),
&recorder,
&QosService::new(1),
&mut LeaderSlotMetricsTracker::new(0),
None,
);
assert!(buffered_packet_batches.is_empty());
poh_recorder
.read()
.unwrap()
.is_exited
.store(true, Ordering::Relaxed);
let _ = poh_simulator.join();
}
Blockstore::destroy(ledger_path.path()).unwrap();
}
#[test] #[test]
fn test_consume_buffered_packets_interrupted() { fn test_consume_buffered_packets_interrupted() {
let ledger_path = get_tmp_ledger_path_auto_delete!(); let ledger_path = get_tmp_ledger_path_auto_delete!();

View File

@ -19,10 +19,13 @@ use {
solana_measure::measure, solana_measure::measure,
solana_runtime::bank::Bank, solana_runtime::bank::Bank,
solana_sdk::{ solana_sdk::{
clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign, clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, hash::Hash, saturating_add_assign,
transaction::SanitizedTransaction, transaction::SanitizedTransaction,
}, },
std::sync::{atomic::Ordering, Arc}, std::{
collections::HashMap,
sync::{atomic::Ordering, Arc},
},
}; };
// Step-size set to be 64, equal to the maximum batch/entry size. With the // Step-size set to be 64, equal to the maximum batch/entry size. With the
@ -132,6 +135,7 @@ pub struct ConsumeScannerPayload<'a> {
pub account_locks: ReadWriteAccountSet, pub account_locks: ReadWriteAccountSet,
pub sanitized_transactions: Vec<SanitizedTransaction>, pub sanitized_transactions: Vec<SanitizedTransaction>,
pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, pub slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
pub message_hash_to_transaction: &'a mut HashMap<Hash, DeserializedPacket>,
} }
fn consume_scan_should_process_packet( fn consume_scan_should_process_packet(
@ -174,6 +178,9 @@ fn consume_scan_should_process_packet(
) )
.is_err() .is_err()
{ {
payload
.message_hash_to_transaction
.remove(packet.message_hash());
ProcessingDecision::Never ProcessingDecision::Never
} else if payload.account_locks.try_locking(message) { } else if payload.account_locks.try_locking(message) {
payload.sanitized_transactions.push(sanitized_transaction); payload.sanitized_transactions.push(sanitized_transaction);
@ -182,6 +189,9 @@ fn consume_scan_should_process_packet(
ProcessingDecision::Later ProcessingDecision::Later
} }
} else { } else {
payload
.message_hash_to_transaction
.remove(packet.message_hash());
ProcessingDecision::Never ProcessingDecision::Never
} }
} }
@ -189,6 +199,7 @@ fn consume_scan_should_process_packet(
fn create_consume_multi_iterator<'a, 'b, F>( fn create_consume_multi_iterator<'a, 'b, F>(
packets: &'a [Arc<ImmutableDeserializedPacket>], packets: &'a [Arc<ImmutableDeserializedPacket>],
slot_metrics_tracker: &'b mut LeaderSlotMetricsTracker, slot_metrics_tracker: &'b mut LeaderSlotMetricsTracker,
message_hash_to_transaction: &'b mut HashMap<Hash, DeserializedPacket>,
should_process_packet: F, should_process_packet: F,
) -> MultiIteratorScanner<'a, Arc<ImmutableDeserializedPacket>, ConsumeScannerPayload<'b>, F> ) -> MultiIteratorScanner<'a, Arc<ImmutableDeserializedPacket>, ConsumeScannerPayload<'b>, F>
where where
@ -203,6 +214,7 @@ where
account_locks: ReadWriteAccountSet::default(), account_locks: ReadWriteAccountSet::default(),
sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE), sanitized_transactions: Vec::with_capacity(UNPROCESSED_BUFFER_STEP_SIZE),
slot_metrics_tracker, slot_metrics_tracker,
message_hash_to_transaction,
}; };
MultiIteratorScanner::new( MultiIteratorScanner::new(
packets, packets,
@ -333,7 +345,7 @@ impl UnprocessedTransactionStorage {
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
processing_function: F, processing_function: F,
) -> ConsumeScannerPayload<'a> ) -> bool
where where
F: FnMut( F: FnMut(
&Vec<Arc<ImmutableDeserializedPacket>>, &Vec<Arc<ImmutableDeserializedPacket>>,
@ -417,13 +429,14 @@ impl VoteStorage {
FilterForwardingResults::default() FilterForwardingResults::default()
} }
// returns `true` if the end of slot is reached
fn process_packets<'a, F>( fn process_packets<'a, F>(
&mut self, &mut self,
bank: Arc<Bank>, bank: Arc<Bank>,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
mut processing_function: F, mut processing_function: F,
) -> ConsumeScannerPayload<'a> ) -> bool
where where
F: FnMut( F: FnMut(
&Vec<Arc<ImmutableDeserializedPacket>>, &Vec<Arc<ImmutableDeserializedPacket>>,
@ -445,9 +458,13 @@ impl VoteStorage {
let all_vote_packets = self let all_vote_packets = self
.latest_unprocessed_votes .latest_unprocessed_votes
.drain_unprocessed(bank.clone()); .drain_unprocessed(bank.clone());
// vote storage does not have a message hash map, so pass in an empty one
let mut dummy_message_hash_to_transaction = HashMap::new();
let mut scanner = create_consume_multi_iterator( let mut scanner = create_consume_multi_iterator(
&all_vote_packets, &all_vote_packets,
slot_metrics_tracker, slot_metrics_tracker,
&mut dummy_message_hash_to_transaction,
should_process_packet, should_process_packet,
); );
@ -472,7 +489,7 @@ impl VoteStorage {
} }
} }
scanner.finalize() scanner.finalize().reached_end_of_slot
} }
} }
@ -629,7 +646,8 @@ impl ThreadLocalUnprocessedPackets {
&accepted_packet_indexes, &accepted_packet_indexes,
); );
self.collect_retained_packets( Self::collect_retained_packets(
&mut self.unprocessed_packet_batches.message_hash_to_transaction,
&packets_to_forward, &packets_to_forward,
&Self::prepare_filtered_packet_indexes( &Self::prepare_filtered_packet_indexes(
&transaction_to_packet_indexes, &transaction_to_packet_indexes,
@ -807,11 +825,15 @@ impl ThreadLocalUnprocessedPackets {
} }
fn collect_retained_packets( fn collect_retained_packets(
&mut self, message_hash_to_transaction: &mut HashMap<Hash, DeserializedPacket>,
packets_to_process: &[Arc<ImmutableDeserializedPacket>], packets_to_process: &[Arc<ImmutableDeserializedPacket>],
retained_packet_indexes: &[usize], retained_packet_indexes: &[usize],
) -> Vec<Arc<ImmutableDeserializedPacket>> { ) -> Vec<Arc<ImmutableDeserializedPacket>> {
self.remove_non_retained_packets(packets_to_process, retained_packet_indexes); Self::remove_non_retained_packets(
message_hash_to_transaction,
packets_to_process,
retained_packet_indexes,
);
retained_packet_indexes retained_packet_indexes
.iter() .iter()
.map(|i| packets_to_process[*i].clone()) .map(|i| packets_to_process[*i].clone())
@ -821,7 +843,7 @@ impl ThreadLocalUnprocessedPackets {
/// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have
/// been removed from UnprocessedPacketBatches.packet_priority_queue /// been removed from UnprocessedPacketBatches.packet_priority_queue
fn remove_non_retained_packets( fn remove_non_retained_packets(
&mut self, message_hash_to_transaction: &mut HashMap<Hash, DeserializedPacket>,
packets_to_process: &[Arc<ImmutableDeserializedPacket>], packets_to_process: &[Arc<ImmutableDeserializedPacket>],
retained_packet_indexes: &[usize], retained_packet_indexes: &[usize],
) { ) {
@ -831,21 +853,20 @@ impl ThreadLocalUnprocessedPackets {
.chain(std::iter::once(&packets_to_process.len())), .chain(std::iter::once(&packets_to_process.len())),
|start, end| { |start, end| {
for processed_packet in &packets_to_process[start..end] { for processed_packet in &packets_to_process[start..end] {
self.unprocessed_packet_batches message_hash_to_transaction.remove(processed_packet.message_hash());
.message_hash_to_transaction
.remove(processed_packet.message_hash());
} }
}, },
) )
} }
// returns `true` if reached end of slot
fn process_packets<'a, F>( fn process_packets<'a, F>(
&mut self, &mut self,
bank: &Bank, bank: &Bank,
banking_stage_stats: &BankingStageStats, banking_stage_stats: &BankingStageStats,
slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker, slot_metrics_tracker: &'a mut LeaderSlotMetricsTracker,
mut processing_function: F, mut processing_function: F,
) -> ConsumeScannerPayload<'a> ) -> bool
where where
F: FnMut( F: FnMut(
&Vec<Arc<ImmutableDeserializedPacket>>, &Vec<Arc<ImmutableDeserializedPacket>>,
@ -864,6 +885,7 @@ impl ThreadLocalUnprocessedPackets {
let mut scanner = create_consume_multi_iterator( let mut scanner = create_consume_multi_iterator(
&all_packets_to_process, &all_packets_to_process,
slot_metrics_tracker, slot_metrics_tracker,
&mut self.unprocessed_packet_batches.message_hash_to_transaction,
should_process_packet, should_process_packet,
); );
@ -875,7 +897,11 @@ impl ThreadLocalUnprocessedPackets {
let retryable_packets = if let Some(retryable_transaction_indexes) = let retryable_packets = if let Some(retryable_transaction_indexes) =
processing_function(&packets_to_process, payload) processing_function(&packets_to_process, payload)
{ {
self.collect_retained_packets(&packets_to_process, &retryable_transaction_indexes) Self::collect_retained_packets(
payload.message_hash_to_transaction,
&packets_to_process,
&retryable_transaction_indexes,
)
} else { } else {
packets_to_process packets_to_process
}; };
@ -883,9 +909,12 @@ impl ThreadLocalUnprocessedPackets {
new_retryable_packets.extend(retryable_packets); new_retryable_packets.extend(retryable_packets);
} }
let reached_end_of_slot = scanner.finalize().reached_end_of_slot;
self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets; self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
self.verify_priority_queue(original_capacity); self.verify_priority_queue(original_capacity);
scanner.finalize()
reached_end_of_slot
} }
/// Prepare a chunk of packets for forwarding, filter out already forwarded packets while /// Prepare a chunk of packets for forwarding, filter out already forwarded packets while