check is_forwarded packet earlier (#28159)
* check and filter is_forwarded packet earlier * review fix: renaming; and rebase
This commit is contained in:
parent
f69e847137
commit
e5ae0b3371
|
@ -440,33 +440,24 @@ impl ThreadLocalUnprocessedPackets {
|
|||
.chunks(batch_size)
|
||||
.into_iter()
|
||||
.flat_map(|packets_to_process| {
|
||||
let packets_to_process = packets_to_process.into_iter().collect_vec();
|
||||
|
||||
// Vec<bool> of same size of `packets_to_process`, each indicates
|
||||
// corresponding packet is tracer packet.
|
||||
let tracer_packet_indexes = packets_to_process
|
||||
.iter()
|
||||
.map(|deserialized_packet| {
|
||||
deserialized_packet
|
||||
.original_packet()
|
||||
.meta
|
||||
.is_tracer_packet()
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
saturating_add_assign!(
|
||||
total_tracer_packets_in_buffer,
|
||||
tracer_packet_indexes
|
||||
.iter()
|
||||
.filter(|is_tracer| **is_tracer)
|
||||
.count()
|
||||
// Only prcoess packets not yet forwarded
|
||||
let (forwarded_packets, packets_to_forward, is_tracer_packet) = self
|
||||
.prepare_packets_to_forward(
|
||||
packets_to_process,
|
||||
&mut total_tracer_packets_in_buffer,
|
||||
);
|
||||
|
||||
[
|
||||
forwarded_packets,
|
||||
if accepting_packets {
|
||||
let (
|
||||
(sanitized_transactions, transaction_to_packet_indexes),
|
||||
packet_conversion_time,
|
||||
): ((Vec<SanitizedTransaction>, Vec<usize>), _) = measure!(
|
||||
self.sanitize_unforwarded_packets(&packets_to_process, &bank,),
|
||||
): (
|
||||
(Vec<SanitizedTransaction>, Vec<usize>),
|
||||
_,
|
||||
) = measure!(
|
||||
self.sanitize_unforwarded_packets(&packets_to_forward, &bank),
|
||||
"sanitize_packet",
|
||||
);
|
||||
saturating_add_assign!(
|
||||
|
@ -475,7 +466,7 @@ impl ThreadLocalUnprocessedPackets {
|
|||
);
|
||||
|
||||
let (forwardable_transaction_indexes, filter_packets_time) = measure!(
|
||||
Self::filter_invalid_transactions(&sanitized_transactions, &bank,),
|
||||
Self::filter_invalid_transactions(&sanitized_transactions, &bank),
|
||||
"filter_packets",
|
||||
);
|
||||
saturating_add_assign!(
|
||||
|
@ -487,30 +478,31 @@ impl ThreadLocalUnprocessedPackets {
|
|||
saturating_add_assign!(total_forwardable_packets, 1);
|
||||
let forwardable_packet_index =
|
||||
transaction_to_packet_indexes[*forwardable_transaction_index];
|
||||
if tracer_packet_indexes[forwardable_packet_index] {
|
||||
if is_tracer_packet[forwardable_packet_index] {
|
||||
saturating_add_assign!(total_forwardable_tracer_packets, 1);
|
||||
}
|
||||
}
|
||||
|
||||
let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer(
|
||||
let accepted_packet_indexes =
|
||||
Self::add_filtered_packets_to_forward_buffer(
|
||||
forward_buffer,
|
||||
&packets_to_process,
|
||||
&packets_to_forward,
|
||||
&sanitized_transactions,
|
||||
&transaction_to_packet_indexes,
|
||||
&forwardable_transaction_indexes,
|
||||
&mut dropped_tx_before_forwarding_count,
|
||||
);
|
||||
accepting_packets =
|
||||
accepted_packet_indexes.len() == forwardable_transaction_indexes.len();
|
||||
accepting_packets = accepted_packet_indexes.len()
|
||||
== forwardable_transaction_indexes.len();
|
||||
|
||||
self.unprocessed_packet_batches
|
||||
.mark_accepted_packets_as_forwarded(
|
||||
&packets_to_process,
|
||||
&packets_to_forward,
|
||||
&accepted_packet_indexes,
|
||||
);
|
||||
|
||||
self.collect_retained_packets(
|
||||
&packets_to_process,
|
||||
&packets_to_forward,
|
||||
&Self::prepare_filtered_packet_indexes(
|
||||
&transaction_to_packet_indexes,
|
||||
&forwardable_transaction_indexes,
|
||||
|
@ -520,10 +512,12 @@ impl ThreadLocalUnprocessedPackets {
|
|||
// skip sanitizing and filtering if not longer able to add more packets for forwarding
|
||||
saturating_add_assign!(
|
||||
dropped_tx_before_forwarding_count,
|
||||
packets_to_process.len()
|
||||
packets_to_forward.len()
|
||||
);
|
||||
packets_to_process
|
||||
}
|
||||
packets_to_forward
|
||||
},
|
||||
]
|
||||
.concat()
|
||||
}),
|
||||
);
|
||||
|
||||
|
@ -531,6 +525,14 @@ impl ThreadLocalUnprocessedPackets {
|
|||
self.unprocessed_packet_batches.packet_priority_queue = new_priority_queue;
|
||||
self.verify_priority_queue(original_capacity);
|
||||
|
||||
// Assert unprocessed queue is still consistent
|
||||
assert_eq!(
|
||||
self.unprocessed_packet_batches.packet_priority_queue.len(),
|
||||
self.unprocessed_packet_batches
|
||||
.message_hash_to_transaction
|
||||
.len()
|
||||
);
|
||||
|
||||
inc_new_counter_info!(
|
||||
"banking_stage-dropped_tx_before_forwarding",
|
||||
dropped_tx_before_forwarding_count
|
||||
|
@ -582,10 +584,6 @@ impl ThreadLocalUnprocessedPackets {
|
|||
deserialized_packets
|
||||
.enumerate()
|
||||
.filter_map(|(packet_index, deserialized_packet)| {
|
||||
if !self
|
||||
.unprocessed_packet_batches
|
||||
.is_forwarded(deserialized_packet)
|
||||
{
|
||||
deserialized_packet
|
||||
.build_sanitized_transaction(
|
||||
&bank.feature_set,
|
||||
|
@ -593,9 +591,6 @@ impl ThreadLocalUnprocessedPackets {
|
|||
bank.as_ref(),
|
||||
)
|
||||
.map(|transaction| (transaction, packet_index))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.unzip();
|
||||
|
||||
|
@ -746,6 +741,45 @@ impl ThreadLocalUnprocessedPackets {
|
|||
self.unprocessed_packet_batches.packet_priority_queue = new_retryable_packets;
|
||||
self.verify_priority_queue(original_capacity);
|
||||
}
|
||||
|
||||
/// Prepare a chunk of packets for forwarding, filter out already forwarded packets while
|
||||
/// counting tracers.
|
||||
/// Returns Vec of unforwarded packets, and Vec<bool> of same size each indicates corresponding
|
||||
/// packet is tracer packet.
|
||||
fn prepare_packets_to_forward(
|
||||
&self,
|
||||
packets_to_forward: impl Iterator<Item = Arc<ImmutableDeserializedPacket>>,
|
||||
total_tracer_packets_in_buffer: &mut usize,
|
||||
) -> (
|
||||
Vec<Arc<ImmutableDeserializedPacket>>,
|
||||
Vec<Arc<ImmutableDeserializedPacket>>,
|
||||
Vec<bool>,
|
||||
) {
|
||||
let mut forwarded_packets: Vec<Arc<ImmutableDeserializedPacket>> = vec![];
|
||||
let (forwardable_packets, is_tracer_packet) = packets_to_forward
|
||||
.into_iter()
|
||||
.filter_map(|immutable_deserialized_packet| {
|
||||
let is_tracer_packet = immutable_deserialized_packet
|
||||
.original_packet()
|
||||
.meta
|
||||
.is_tracer_packet();
|
||||
if is_tracer_packet {
|
||||
saturating_add_assign!(*total_tracer_packets_in_buffer, 1);
|
||||
}
|
||||
if !self
|
||||
.unprocessed_packet_batches
|
||||
.is_forwarded(&immutable_deserialized_packet)
|
||||
{
|
||||
Some((immutable_deserialized_packet, is_tracer_packet))
|
||||
} else {
|
||||
forwarded_packets.push(immutable_deserialized_packet);
|
||||
None
|
||||
}
|
||||
})
|
||||
.unzip();
|
||||
|
||||
(forwarded_packets, forwardable_packets, is_tracer_packet)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
@ -1026,4 +1060,123 @@ mod tests {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_prepare_packets_to_forward() {
|
||||
solana_logger::setup();
|
||||
let GenesisConfigInfo {
|
||||
genesis_config,
|
||||
mint_keypair,
|
||||
..
|
||||
} = create_genesis_config(10);
|
||||
|
||||
let simple_transactions: Vec<Transaction> = (0..256)
|
||||
.map(|_id| {
|
||||
// packets are deserialized upon receiving, failed packets will not be
|
||||
// forwarded; Therefore we need to create real packets here.
|
||||
let key1 = Keypair::new();
|
||||
system_transaction::transfer(
|
||||
&mint_keypair,
|
||||
&key1.pubkey(),
|
||||
genesis_config.rent.minimum_balance(0),
|
||||
genesis_config.hash(),
|
||||
)
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
let mut packets: Vec<DeserializedPacket> = simple_transactions
|
||||
.iter()
|
||||
.enumerate()
|
||||
.map(|(packets_id, transaction)| {
|
||||
let mut p = Packet::from_data(None, transaction).unwrap();
|
||||
p.meta.port = packets_id as u16;
|
||||
p.meta.set_tracer(true);
|
||||
DeserializedPacket::new(p).unwrap()
|
||||
})
|
||||
.collect_vec();
|
||||
|
||||
// test preparing buffered packets for forwarding
|
||||
let test_prepareing_buffered_packets_for_forwarding =
|
||||
|buffered_packet_batches: UnprocessedPacketBatches| -> (usize, usize, usize) {
|
||||
let mut total_tracer_packets_in_buffer: usize = 0;
|
||||
let mut total_packets_to_forward: usize = 0;
|
||||
let mut total_tracer_packets_to_forward: usize = 0;
|
||||
|
||||
let mut unprocessed_transactions = ThreadLocalUnprocessedPackets {
|
||||
unprocessed_packet_batches: buffered_packet_batches,
|
||||
thread_type: ThreadType::Transactions,
|
||||
};
|
||||
|
||||
let mut original_priority_queue = unprocessed_transactions.take_priority_queue();
|
||||
let _ = original_priority_queue
|
||||
.drain_desc()
|
||||
.chunks(128usize)
|
||||
.into_iter()
|
||||
.flat_map(|packets_to_process| {
|
||||
let (_, packets_to_forward, is_tracer_packet) = unprocessed_transactions
|
||||
.prepare_packets_to_forward(
|
||||
packets_to_process,
|
||||
&mut total_tracer_packets_in_buffer,
|
||||
);
|
||||
total_packets_to_forward += packets_to_forward.len();
|
||||
total_tracer_packets_to_forward += is_tracer_packet.len();
|
||||
packets_to_forward
|
||||
})
|
||||
.collect::<MinMaxHeap<Arc<ImmutableDeserializedPacket>>>();
|
||||
(
|
||||
total_tracer_packets_in_buffer,
|
||||
total_packets_to_forward,
|
||||
total_tracer_packets_to_forward,
|
||||
)
|
||||
};
|
||||
|
||||
// all tracer packets are forwardable
|
||||
{
|
||||
let buffered_packet_batches: UnprocessedPacketBatches =
|
||||
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
|
||||
let (
|
||||
total_tracer_packets_in_buffer,
|
||||
total_packets_to_forward,
|
||||
total_tracer_packets_to_forward,
|
||||
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
|
||||
assert_eq!(total_tracer_packets_in_buffer, 256);
|
||||
assert_eq!(total_packets_to_forward, 256);
|
||||
assert_eq!(total_tracer_packets_to_forward, 256);
|
||||
}
|
||||
|
||||
// some packets are forwarded
|
||||
{
|
||||
let num_already_forwarded = 16;
|
||||
for packet in &mut packets[0..num_already_forwarded] {
|
||||
packet.forwarded = true;
|
||||
}
|
||||
let buffered_packet_batches: UnprocessedPacketBatches =
|
||||
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
|
||||
let (
|
||||
total_tracer_packets_in_buffer,
|
||||
total_packets_to_forward,
|
||||
total_tracer_packets_to_forward,
|
||||
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
|
||||
assert_eq!(total_tracer_packets_in_buffer, 256);
|
||||
assert_eq!(total_packets_to_forward, 256 - num_already_forwarded);
|
||||
assert_eq!(total_tracer_packets_to_forward, 256 - num_already_forwarded);
|
||||
}
|
||||
|
||||
// all packets are forwarded
|
||||
{
|
||||
for packet in &mut packets {
|
||||
packet.forwarded = true;
|
||||
}
|
||||
let buffered_packet_batches: UnprocessedPacketBatches =
|
||||
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
|
||||
let (
|
||||
total_tracer_packets_in_buffer,
|
||||
total_packets_to_forward,
|
||||
total_tracer_packets_to_forward,
|
||||
) = test_prepareing_buffered_packets_for_forwarding(buffered_packet_batches);
|
||||
assert_eq!(total_tracer_packets_in_buffer, 256);
|
||||
assert_eq!(total_packets_to_forward, 0);
|
||||
assert_eq!(total_tracer_packets_to_forward, 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue