Correctly mark packets as forwarded (#28161)

Only mark packets accepted for forwarding as `forwarded`
This commit is contained in:
Tao Zhu 2022-10-07 11:50:57 -05:00 committed by GitHub
parent 16853acf35
commit 50985f79a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 20 deletions

View File

@ -1014,11 +1014,7 @@ impl BankingStage {
} }
}); });
if hold { if !hold {
for deserialized_packet in buffered_packet_batches.iter_mut() {
deserialized_packet.forwarded = true;
}
} else {
slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count( slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count(
filter_forwarding_result.total_forwardable_packets as u64, filter_forwarding_result.total_forwardable_packets as u64,
); );
@ -1110,7 +1106,7 @@ impl BankingStage {
} }
} }
accepting_packets = Self::add_filtered_packets_to_forward_buffer( let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer(
forward_buffer, forward_buffer,
&packets_to_process, &packets_to_process,
&sanitized_transactions, &sanitized_transactions,
@ -1118,6 +1114,14 @@ impl BankingStage {
&forwardable_transaction_indexes, &forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count, &mut dropped_tx_before_forwarding_count,
); );
accepting_packets =
accepted_packet_indexes.len() == forwardable_transaction_indexes.len();
UnprocessedPacketBatches::mark_accepted_packets_as_forwarded(
buffered_packet_batches,
&packets_to_process,
&accepted_packet_indexes,
);
Self::collect_retained_packets( Self::collect_retained_packets(
buffered_packet_batches, buffered_packet_batches,
@ -1275,37 +1279,38 @@ impl BankingStage {
} }
/// try to add filtered forwardable and valid packets to forward buffer; /// try to add filtered forwardable and valid packets to forward buffer;
/// returns if forward_buffer is still accepting packets, and how many packets added. /// returns vector of packet indexes that were accepted for forwarding.
fn add_filtered_packets_to_forward_buffer( fn add_filtered_packets_to_forward_buffer(
forward_buffer: &mut ForwardPacketBatchesByAccounts, forward_buffer: &mut ForwardPacketBatchesByAccounts,
packets_to_process: &[Arc<ImmutableDeserializedPacket>], packets_to_process: &[Arc<ImmutableDeserializedPacket>],
transactions: &[SanitizedTransaction], transactions: &[SanitizedTransaction],
transaction_to_packet_indexes: &[usize], transaction_to_packet_indexes: &[usize],
retained_transaction_indexes: &[usize], forwardable_transaction_indexes: &[usize],
dropped_tx_before_forwarding_count: &mut usize, dropped_tx_before_forwarding_count: &mut usize,
) -> bool { ) -> Vec<usize> {
let mut added_packets_count: usize = 0; let mut added_packets_count: usize = 0;
let mut accepting_packets = true; let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len());
for retained_transaction_index in retained_transaction_indexes { for forwardable_transaction_index in forwardable_transaction_indexes {
let sanitized_transaction = &transactions[*retained_transaction_index]; let sanitized_transaction = &transactions[*forwardable_transaction_index];
let immutable_deserialized_packet = packets_to_process let forwardable_packet_index =
[transaction_to_packet_indexes[*retained_transaction_index]] transaction_to_packet_indexes[*forwardable_transaction_index];
.clone(); let immutable_deserialized_packet =
accepting_packets = packets_to_process[forwardable_packet_index].clone();
forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet); if !forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet)
if !accepting_packets { {
break; break;
} }
accepted_packet_indexes.push(forwardable_packet_index);
saturating_add_assign!(added_packets_count, 1); saturating_add_assign!(added_packets_count, 1);
} }
// count the packets not being forwarded in this batch // count the packets not being forwarded in this batch
saturating_add_assign!( saturating_add_assign!(
*dropped_tx_before_forwarding_count, *dropped_tx_before_forwarding_count,
retained_transaction_indexes.len() - added_packets_count forwardable_transaction_indexes.len() - added_packets_count
); );
accepting_packets accepted_packet_indexes
} }
#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_arguments)]

View File

@ -281,6 +281,24 @@ impl UnprocessedPacketBatches {
.get(immutable_packet.message_hash()) .get(immutable_packet.message_hash())
.map_or(true, |p| p.forwarded) .map_or(true, |p| p.forwarded)
} }
pub fn mark_accepted_packets_as_forwarded(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Arc<ImmutableDeserializedPacket>],
accepted_packet_indexes: &[usize],
) {
accepted_packet_indexes
.iter()
.for_each(|accepted_packet_index| {
let accepted_packet = packets_to_process[*accepted_packet_index].clone();
if let Some(deserialized_packet) = buffered_packet_batches
.message_hash_to_transaction
.get_mut(accepted_packet.message_hash())
{
deserialized_packet.forwarded = true;
}
});
}
} }
pub fn deserialize_packets<'a>( pub fn deserialize_packets<'a>(