Batch filtering invalid transactions before forwarding (#26798)

- Batch filtering invalid transactions (fail to sanitize, too old or already processed) before forwarding
- Combine packet filtering and forwarding to share sanitized transactions
- `iter_desc` is no longer needed, remove it;
- Add a method to share the logic of removing packets from buffer after they were removed from MinMaxHeap
- Add test coverage for forward_packet_batches_by_accounts
- rebase, resolve conflicts
This commit is contained in:
Tao Zhu 2022-09-29 16:33:40 -05:00 committed by GitHub
parent 80c0173452
commit 82e65593ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 620 additions and 267 deletions

View File

@ -21,7 +21,10 @@ use {
test::Bencher,
};
fn build_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec<usize>) {
fn build_packet_batch(
packet_per_batch_count: usize,
recent_blockhash: Option<Hash>,
) -> (PacketBatch, Vec<usize>) {
let packet_batch = PacketBatch::new(
(0..packet_per_batch_count)
.map(|sender_stake| {
@ -29,7 +32,7 @@ fn build_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec<usize>
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
recent_blockhash.unwrap_or_else(Hash::new_unique),
);
let mut packet = Packet::from_data(None, &tx).unwrap();
packet.meta.sender_stake = sender_stake as u64;
@ -42,7 +45,10 @@ fn build_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec<usize>
(packet_batch, packet_indexes)
}
fn build_randomized_packet_batch(packet_per_batch_count: usize) -> (PacketBatch, Vec<usize>) {
fn build_randomized_packet_batch(
packet_per_batch_count: usize,
recent_blockhash: Option<Hash>,
) -> (PacketBatch, Vec<usize>) {
let mut rng = rand::thread_rng();
let distribution = Uniform::from(0..200_000);
@ -53,7 +59,7 @@ fn build_randomized_packet_batch(packet_per_batch_count: usize) -> (PacketBatch,
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
recent_blockhash.unwrap_or_else(Hash::new_unique),
);
let mut packet = Packet::from_data(None, &tx).unwrap();
let sender_stake = distribution.sample(&mut rng);
@ -79,9 +85,9 @@ fn insert_packet_batches(
let mut timer = Measure::start("insert_batch");
(0..batch_count).for_each(|_| {
let (packet_batch, packet_indexes) = if randomize {
build_randomized_packet_batch(packet_per_batch_count)
build_randomized_packet_batch(packet_per_batch_count, None)
} else {
build_packet_batch(packet_per_batch_count)
build_packet_batch(packet_per_batch_count, None)
};
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes);
unprocessed_packet_batches.insert_batch(deserialized_packets);
@ -101,7 +107,7 @@ fn bench_packet_clone(bencher: &mut Bencher) {
let packet_per_batch_count = 128;
let packet_batches: Vec<PacketBatch> = (0..batch_count)
.map(|_| build_packet_batch(packet_per_batch_count).0)
.map(|_| build_packet_batch(packet_per_batch_count, None).0)
.collect();
bencher.iter(|| {
@ -184,13 +190,6 @@ fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Benche
});
}
fn build_bank_forks_for_test() -> Arc<RwLock<BankForks>> {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new(bank);
Arc::new(RwLock::new(bank_forks))
}
fn buffer_iter_desc_and_forward(
buffer_max_size: usize,
batch_count: usize,
@ -200,14 +199,19 @@ fn buffer_iter_desc_and_forward(
solana_logger::setup();
let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size);
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new(bank);
let bank_forks = Arc::new(RwLock::new(bank_forks));
let current_bank = bank_forks.read().unwrap().root_bank();
// fill buffer
{
let mut timer = Measure::start("fill_buffer");
(0..batch_count).for_each(|_| {
let (packet_batch, packet_indexes) = if randomize {
build_randomized_packet_batch(packet_per_batch_count)
build_randomized_packet_batch(packet_per_batch_count, Some(genesis_config.hash()))
} else {
build_packet_batch(packet_per_batch_count)
build_packet_batch(packet_per_batch_count, Some(genesis_config.hash()))
};
let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes);
unprocessed_packet_batches.insert_batch(deserialized_packets);
@ -222,27 +226,13 @@ fn buffer_iter_desc_and_forward(
// forward whole buffer
{
let mut timer = Measure::start("forward_time");
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(
build_bank_forks_for_test().read().unwrap().root_bank(),
);
// iter_desc buffer
let filter_forwarding_results = BankingStage::filter_valid_packets_for_forwarding(
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let _ = BankingStage::filter_and_forward_with_account_limits(
&current_bank,
&mut unprocessed_packet_batches,
&mut forward_packet_batches_by_accounts,
);
timer.stop();
let batched_filter_forwarding_results: usize = forward_packet_batches_by_accounts
.iter_batches()
.map(|forward_batch| forward_batch.len())
.sum();
log::info!(
"filter_forwarding_results {:?}, batched_forwardable packets {}, elapsed {}",
filter_forwarding_results,
batched_filter_forwarding_results,
timer.as_us()
128usize,
);
}
}

View File

@ -92,7 +92,7 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 64;
const NUM_VOTE_PROCESSING_THREADS: u32 = 2;
const MIN_THREADS_BANKING: u32 = 1;
const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING;
const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128;
const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10);
pub type BankingPacketBatch = (Vec<PacketBatch>, Option<SigverifyTracerPacketStats>);
@ -384,6 +384,8 @@ pub struct FilterForwardingResults {
pub(crate) total_forwardable_packets: usize,
pub(crate) total_tracer_packets_in_buffer: usize,
pub(crate) total_forwardable_tracer_packets: usize,
pub(crate) total_packet_conversion_us: u64,
pub(crate) total_filter_packets_us: u64,
}
impl BankingStage {
@ -493,66 +495,6 @@ impl BankingStage {
Self { bank_thread_hdls }
}
// filter forwardable Rc<immutable_deserialized_packet>s that:
// 1. are not forwarded, and
// 2. in priority order from max to min, and
// 3. not exceeding account bucket limit
// returns forwarded packets count
pub fn filter_valid_packets_for_forwarding(
buffered_packet_batches: &mut UnprocessedPacketBatches,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
let mut total_forwardable_tracer_packets: usize = 0;
let mut total_tracer_packets_in_buffer: usize = 0;
let mut total_forwardable_packets: usize = 0;
let mut dropped_tx_before_forwarding_count: usize = 0;
let filter_forwardable_packet = |deserialized_packet: &mut DeserializedPacket| -> bool {
let mut result = true;
let is_tracer_packet = deserialized_packet
.immutable_section()
.original_packet()
.meta
.is_tracer_packet();
if is_tracer_packet {
saturating_add_assign!(total_tracer_packets_in_buffer, 1);
}
if !deserialized_packet.forwarded {
saturating_add_assign!(total_forwardable_packets, 1);
if is_tracer_packet {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
result = forward_packet_batches_by_accounts
.add_packet(deserialized_packet.immutable_section().clone());
if !result {
saturating_add_assign!(dropped_tx_before_forwarding_count, 1);
}
}
result
};
// Iterates buffered packets from high priority to low, places each packet into
// forwarding account buckets by calling `forward_packet_batches_by_accounts.add_packet()`.
// Iteration stops as soon as `add_packet()` returns false when a packet fails to fit into
// buckets, ignoring remaining lower priority packets that could fit.
// The motivation of this is during bot spamming, buffer is likely to be filled with
// transactions have higher priority and write to same account(s), other lower priority
// transactions will not make into buffer, therefore it shall exit as soon as first
// transaction failed to fit in forwarding buckets.
buffered_packet_batches.iter_desc(filter_forwardable_packet);
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
dropped_tx_before_forwarding_count
);
FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
}
}
/// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns
/// the number of successfully forwarded packets in second part of tuple
fn forward_buffered_packets<'a>(
@ -762,18 +704,7 @@ impl BankingStage {
// Remove the non-retryable packets, packets that were either:
// 1) Successfully processed
// 2) Failed but not retryable
Self::filter_processed_packets(
retryable_transaction_indexes
.iter()
.chain(std::iter::once(&packets_to_process.len())),
|start, end| {
for processed_packet in &packets_to_process[start..end] {
buffered_packet_batches
.message_hash_to_transaction
.remove(processed_packet.message_hash());
}
},
);
Self::remove_non_retained_packets(buffered_packet_batches, &packets_to_process, &retryable_transaction_indexes);
result
} else if reached_end_of_slot {
@ -795,7 +726,7 @@ impl BankingStage {
.set_end_of_slot_unprocessed_buffer_len(buffered_packet_batches.len() as u64);
// We've hit the end of this slot, no need to perform more processing,
// Packet filtering will be done at `forward_packet_batches_by_accounts.add_packet()`
// Packet filtering will be done before forwarding.
}
proc_start.stop();
@ -1016,12 +947,32 @@ impl BankingStage {
// get current root bank from bank_forks, use it to sanitize transaction and
// load all accounts from address loader;
let current_bank = bank_forks.read().unwrap().root_bank();
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(current_bank);
let filter_forwarding_result = Self::filter_valid_packets_for_forwarding(
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
// sanitize and filter packets that are no longer valid (could be too old, a duplicate of something
// already processed), then add to forwarding buffer.
let filter_forwarding_result = Self::filter_and_forward_with_account_limits(
&current_bank,
buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
UNPROCESSED_BUFFER_STEP_SIZE,
);
slot_metrics_tracker.increment_transactions_from_packets_us(
filter_forwarding_result.total_packet_conversion_us,
);
banking_stage_stats.packet_conversion_elapsed.fetch_add(
filter_forwarding_result.total_packet_conversion_us,
Ordering::Relaxed,
);
banking_stage_stats
.filter_pending_packets_elapsed
.fetch_add(
filter_forwarding_result.total_filter_packets_us,
Ordering::Relaxed,
);
forward_packet_batches_by_accounts
.iter_batches()
.filter(|&batch| !batch.is_empty())
@ -1079,6 +1030,289 @@ impl BankingStage {
}
}
/// Filter out packets that fail to sanitize, or are no longer valid (could be
/// too old, a duplicate of something already processed). Doing this in batches to avoid
/// checking bank's blockhash and status cache per transaction which could be bad for performance.
/// Added valid and sanitized packets to forwarding queue.
pub fn filter_and_forward_with_account_limits(
bank: &Arc<Bank>,
buffered_packet_batches: &mut UnprocessedPacketBatches,
forward_buffer: &mut ForwardPacketBatchesByAccounts,
batch_size: usize,
) -> FilterForwardingResults {
let mut total_forwardable_tracer_packets: usize = 0;
let mut total_tracer_packets_in_buffer: usize = 0;
let mut total_forwardable_packets: usize = 0;
let mut total_packet_conversion_us: u64 = 0;
let mut total_filter_packets_us: u64 = 0;
let mut dropped_tx_before_forwarding_count: usize = 0;
let mut original_priority_queue = Self::swap_priority_queue(buffered_packet_batches);
// indicates if `forward_buffer` still accept more packets, see details at
// `ForwardPacketBatchesByAccounts.rs`.
let mut accepting_packets = true;
// batch iterate through buffered_packet_batches in desc priority order
let retained_priority_queue: MinMaxHeap<Rc<ImmutableDeserializedPacket>> =
original_priority_queue
.drain_desc()
.chunks(batch_size)
.into_iter()
.flat_map(|packets_to_process| {
let packets_to_process = packets_to_process.into_iter().collect_vec();
// Vec<bool> of same size of `packets_to_process`, each indicates
// corresponding packet is tracer packet.
let tracer_packet_indexes = packets_to_process
.iter()
.map(|deserialized_packet| {
deserialized_packet
.original_packet()
.meta
.is_tracer_packet()
})
.collect::<Vec<_>>();
saturating_add_assign!(
total_tracer_packets_in_buffer,
tracer_packet_indexes
.iter()
.filter(|is_tracer| **is_tracer)
.count()
);
if accepting_packets {
let (
(sanitized_transactions, transaction_to_packet_indexes),
packet_conversion_time,
): ((Vec<SanitizedTransaction>, Vec<usize>), _) = measure!(
Self::sanitize_unforwarded_packets(
buffered_packet_batches,
&packets_to_process,
bank,
),
"sanitize_packet",
);
saturating_add_assign!(
total_packet_conversion_us,
packet_conversion_time.as_us()
);
let (forwardable_transaction_indexes, filter_packets_time) = measure!(
Self::filter_invalid_transactions(&sanitized_transactions, bank,),
"filter_packets",
);
saturating_add_assign!(
total_filter_packets_us,
filter_packets_time.as_us()
);
for forwardable_transaction_index in &forwardable_transaction_indexes {
saturating_add_assign!(total_forwardable_packets, 1);
let forwardable_packet_index =
transaction_to_packet_indexes[*forwardable_transaction_index];
if tracer_packet_indexes[forwardable_packet_index] {
saturating_add_assign!(total_forwardable_tracer_packets, 1);
}
}
accepting_packets = Self::add_filtered_packets_to_forward_buffer(
forward_buffer,
&packets_to_process,
&sanitized_transactions,
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
&mut dropped_tx_before_forwarding_count,
);
Self::collect_retained_packets(
buffered_packet_batches,
&packets_to_process,
&Self::prepare_filtered_packet_indexes(
&transaction_to_packet_indexes,
&forwardable_transaction_indexes,
),
)
} else {
// skip sanitizing and filtering if not longer able to add more packets for forwarding
saturating_add_assign!(
dropped_tx_before_forwarding_count,
packets_to_process.len()
);
packets_to_process
}
})
.collect();
// replace packet priority queue
buffered_packet_batches.packet_priority_queue = retained_priority_queue;
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
dropped_tx_before_forwarding_count
);
FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
total_packet_conversion_us,
total_filter_packets_us,
}
}
/// Take buffered_packet_batches's priority_queue out, leave empty MinMaxHeap in its place.
fn swap_priority_queue(
buffered_packet_batches: &mut UnprocessedPacketBatches,
) -> MinMaxHeap<Rc<ImmutableDeserializedPacket>> {
let capacity = buffered_packet_batches.capacity();
std::mem::replace(
&mut buffered_packet_batches.packet_priority_queue,
MinMaxHeap::with_capacity(capacity),
)
}
/// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding.
fn sanitize_unforwarded_packets(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
bank: &Arc<Bank>,
) -> (Vec<SanitizedTransaction>, Vec<usize>) {
// Get ref of ImmutableDeserializedPacket
let deserialized_packets = packets_to_process.iter().map(|p| &**p);
let (transactions, transaction_to_packet_indexes): (Vec<SanitizedTransaction>, Vec<usize>) =
deserialized_packets
.enumerate()
.filter_map(|(packet_index, deserialized_packet)| {
if !buffered_packet_batches.is_forwarded(deserialized_packet) {
unprocessed_packet_batches::transaction_from_deserialized_packet(
deserialized_packet,
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
.map(|transaction| (transaction, packet_index))
} else {
None
}
})
.unzip();
// report metrics
inc_new_counter_info!("banking_stage-packet_conversion", 1);
let unsanitized_packets_filtered_count =
packets_to_process.len().saturating_sub(transactions.len());
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
unsanitized_packets_filtered_count
);
(transactions, transaction_to_packet_indexes)
}
/// Checks sanitized transactions against bank, returns valid transaction indexes
fn filter_invalid_transactions(
transactions: &[SanitizedTransaction],
bank: &Arc<Bank>,
) -> Vec<usize> {
let filter = vec![Ok(()); transactions.len()];
let results = Self::bank_check_transactions(bank, transactions, &filter);
// report metrics
let filtered_out_transactions_count = transactions.len().saturating_sub(results.len());
inc_new_counter_info!(
"banking_stage-dropped_tx_before_forwarding",
filtered_out_transactions_count
);
results
.iter()
.enumerate()
.filter_map(
|(tx_index, (result, _))| if result.is_ok() { Some(tx_index) } else { None },
)
.collect_vec()
}
fn prepare_filtered_packet_indexes(
transaction_to_packet_indexes: &[usize],
retained_transaction_indexes: &[usize],
) -> Vec<usize> {
retained_transaction_indexes
.iter()
.map(|tx_index| transaction_to_packet_indexes[*tx_index])
.collect_vec()
}
fn collect_retained_packets(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
retained_packet_indexes: &[usize],
) -> Vec<Rc<ImmutableDeserializedPacket>> {
Self::remove_non_retained_packets(
buffered_packet_batches,
packets_to_process,
retained_packet_indexes,
);
retained_packet_indexes
.iter()
.map(|i| packets_to_process[*i].clone())
.collect_vec()
}
/// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have
/// been removed from UnprocessedPacketBatches.packet_priority_queue
fn remove_non_retained_packets(
buffered_packet_batches: &mut UnprocessedPacketBatches,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
retained_packet_indexes: &[usize],
) {
Self::filter_processed_packets(
retained_packet_indexes
.iter()
.chain(std::iter::once(&packets_to_process.len())),
|start, end| {
for processed_packet in &packets_to_process[start..end] {
buffered_packet_batches
.message_hash_to_transaction
.remove(processed_packet.message_hash());
}
},
)
}
/// try to add filtered forwardable and valid packets to forward buffer;
/// returns if forward_buffer is still accepting packets, and how many packets added.
fn add_filtered_packets_to_forward_buffer(
forward_buffer: &mut ForwardPacketBatchesByAccounts,
packets_to_process: &[Rc<ImmutableDeserializedPacket>],
transactions: &[SanitizedTransaction],
transaction_to_packet_indexes: &[usize],
retained_transaction_indexes: &[usize],
dropped_tx_before_forwarding_count: &mut usize,
) -> bool {
let mut added_packets_count: usize = 0;
let mut accepting_packets = true;
for retained_transaction_index in retained_transaction_indexes {
let sanitized_transaction = &transactions[*retained_transaction_index];
let immutable_deserialized_packet = packets_to_process
[transaction_to_packet_indexes[*retained_transaction_index]]
.clone();
accepting_packets =
forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet);
if !accepting_packets {
break;
}
saturating_add_assign!(added_packets_count, 1);
}
// count the packets not being forwarded in this batch
saturating_add_assign!(
*dropped_tx_before_forwarding_count,
retained_transaction_indexes.len() - added_packets_count
);
accepting_packets
}
#[allow(clippy::too_many_arguments)]
fn process_loop(
packet_deserializer: &mut PacketDeserializer,
@ -1798,8 +2032,8 @@ impl BankingStage {
}
}
// This function creates a filter of transaction results with Ok() for every pending
// transaction. The non-pending transactions are marked with TransactionError
/// This function creates a filter of transaction results with Ok() for every pending
/// transaction. The non-pending transactions are marked with TransactionError
fn prepare_filter_for_pending_transactions(
transactions_len: usize,
pending_tx_indexes: &[usize],
@ -1809,8 +2043,8 @@ impl BankingStage {
mask
}
// This function returns a vector containing index of all valid transactions. A valid
// transaction has result Ok() as the value
/// This function returns a vector containing index of all valid transactions. A valid
/// transaction has result Ok() as the value
fn filter_valid_transaction_indexes(
valid_txs: &[TransactionCheckResult],
transaction_indexes: &[usize],
@ -1823,6 +2057,35 @@ impl BankingStage {
.collect_vec()
}
/// Checks a batch of sanitized transactions again bank for age and status
fn bank_check_transactions(
bank: &Arc<Bank>,
transactions: &[SanitizedTransaction],
filter: &[transaction::Result<()>],
) -> Vec<TransactionCheckResult> {
let mut error_counters = TransactionErrorMetrics::default();
// The following code also checks if the blockhash for a transaction is too old
// The check accounts for
// 1. Transaction forwarding delay
// 2. The slot at which the next leader will actually process the transaction
// Drop the transaction if it will expire by the time the next node receives and processes it
let api = perf_libs::api();
let max_tx_fwd_delay = if api.is_none() {
MAX_TRANSACTION_FORWARDING_DELAY
} else {
MAX_TRANSACTION_FORWARDING_DELAY_GPU
};
bank.check_transactions(
transactions,
filter,
(MAX_PROCESSING_AGE)
.saturating_sub(max_tx_fwd_delay)
.saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize),
&mut error_counters,
)
}
/// This function filters pending packets that are still valid
/// # Arguments
/// * `transactions` - a batch of transactions deserialized from packets
@ -1837,27 +2100,7 @@ impl BankingStage {
let filter =
Self::prepare_filter_for_pending_transactions(transactions.len(), pending_indexes);
let mut error_counters = TransactionErrorMetrics::default();
// The following code also checks if the blockhash for a transaction is too old
// The check accounts for
// 1. Transaction forwarding delay
// 2. The slot at which the next leader will actually process the transaction
// Drop the transaction if it will expire by the time the next node receives and processes it
let api = perf_libs::api();
let max_tx_fwd_delay = if api.is_none() {
MAX_TRANSACTION_FORWARDING_DELAY
} else {
MAX_TRANSACTION_FORWARDING_DELAY_GPU
};
let results = bank.check_transactions(
transactions,
&filter,
(MAX_PROCESSING_AGE)
.saturating_sub(max_tx_fwd_delay)
.saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize),
&mut error_counters,
);
let results = Self::bank_check_transactions(bank, transactions, &filter);
Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes)
}
@ -3224,23 +3467,35 @@ mod tests {
}
#[test]
fn test_filter_valid_packets() {
fn test_filter_and_forward_with_account_limits() {
solana_logger::setup();
let GenesisConfigInfo { genesis_config, .. } = create_slow_genesis_config(10);
let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config);
let bank_forks = Arc::new(RwLock::new(BankForks::new(bank)));
let current_bank = bank_forks.read().unwrap().root_bank();
let GenesisConfigInfo {
genesis_config,
mint_keypair,
..
} = create_genesis_config(10);
let current_bank = Arc::new(Bank::new_for_tests(&genesis_config));
let mut packets: Vec<DeserializedPacket> = (0..256)
.map(|packets_id| {
let simple_transactions: Vec<Transaction> = (0..256)
.map(|_id| {
// packets are deserialized upon receiving, failed packets will not be
// forwarded; Therefore we need to create real packets here.
let keypair = Keypair::new();
let pubkey = solana_sdk::pubkey::new_rand();
let blockhash = Hash::new_unique();
let transaction = system_transaction::transfer(&keypair, &pubkey, 1, blockhash);
let mut p = Packet::from_data(None, &transaction).unwrap();
p.meta.port = packets_id;
let key1 = Keypair::new();
system_transaction::transfer(
&mint_keypair,
&key1.pubkey(),
genesis_config.rent.minimum_balance(0),
genesis_config.hash(),
)
})
.collect_vec();
let mut packets: Vec<DeserializedPacket> = simple_transactions
.iter()
.enumerate()
.map(|(packets_id, transaction)| {
let mut p = Packet::from_data(None, transaction).unwrap();
p.meta.port = packets_id as u16;
p.meta.set_tracer(true);
DeserializedPacket::new(p).unwrap()
})
@ -3251,15 +3506,18 @@ mod tests {
let mut buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new(current_bank.clone(), 1, 2);
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(
..
} = BankingStage::filter_and_forward_with_account_limits(
&current_bank,
&mut buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
UNPROCESSED_BUFFER_STEP_SIZE,
);
assert_eq!(total_forwardable_packets, 256);
assert_eq!(total_tracer_packets_in_buffer, 256);
@ -3290,14 +3548,17 @@ mod tests {
let mut buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new(current_bank, 1, 2);
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
} = BankingStage::filter_valid_packets_for_forwarding(
..
} = BankingStage::filter_and_forward_with_account_limits(
&current_bank,
&mut buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
UNPROCESSED_BUFFER_STEP_SIZE,
);
assert_eq!(
total_forwardable_packets,
@ -3309,6 +3570,38 @@ mod tests {
packets.len() - num_already_forwarded
);
}
// some packets are invalid (already processed)
{
let num_already_processed = 16;
for tx in &simple_transactions[0..num_already_processed] {
assert_eq!(current_bank.process_transaction(tx), Ok(()));
}
let mut buffered_packet_batches: UnprocessedPacketBatches =
UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len());
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let FilterForwardingResults {
total_forwardable_packets,
total_tracer_packets_in_buffer,
total_forwardable_tracer_packets,
..
} = BankingStage::filter_and_forward_with_account_limits(
&current_bank,
&mut buffered_packet_batches,
&mut forward_packet_batches_by_accounts,
UNPROCESSED_BUFFER_STEP_SIZE,
);
assert_eq!(
total_forwardable_packets,
packets.len() - num_already_processed
);
assert_eq!(total_tracer_packets_in_buffer, packets.len());
assert_eq!(
total_forwardable_tracer_packets,
packets.len() - num_already_processed
);
}
}
#[test]

View File

@ -1,15 +1,12 @@
use {
crate::{
immutable_deserialized_packet::ImmutableDeserializedPacket, unprocessed_packet_batches,
},
crate::immutable_deserialized_packet::ImmutableDeserializedPacket,
solana_perf::packet::Packet,
solana_runtime::{
bank::Bank,
block_cost_limits,
cost_tracker::{CostTracker, CostTrackerError},
},
solana_sdk::pubkey::Pubkey,
std::{rc::Rc, sync::Arc},
solana_sdk::{pubkey::Pubkey, transaction::SanitizedTransaction},
std::rc::Rc,
};
/// `ForwardBatch` to have half of default cost_tracker limits, as smaller batch
@ -97,44 +94,40 @@ impl ForwardBatch {
/// transactions that saturate those highly demanded accounts.
#[derive(Debug)]
pub struct ForwardPacketBatchesByAccounts {
// Need a `bank` to load all accounts for VersionedTransaction. Currently
// using current rooted bank for it.
pub(crate) current_bank: Arc<Bank>,
// Forwardable packets are staged in number of batches, each batch is limited
// by cost_tracker on both account limit and block limits. Those limits are
// set as `limit_ratio` of regular block limits to facilitate quicker iteration.
forward_batches: Vec<ForwardBatch>,
// Valid packets are iterated from high priority to low, then try to add into
// forwarding account buckets by calling `try_add_packet()` only when this flag is true.
// The motivation of this is during bot spamming, buffer is likely to be filled with
// transactions have higher priority and write to same account(s), other lower priority
// transactions will not make into buffer, saves from checking similar transactions if
// it exit as soon as first transaction failed to fit in forwarding buckets.
accepting_packets: bool,
}
impl ForwardPacketBatchesByAccounts {
pub fn new_with_default_batch_limits(current_bank: Arc<Bank>) -> Self {
Self::new(
current_bank,
FORWARDED_BLOCK_COMPUTE_RATIO,
DEFAULT_NUMBER_OF_BATCHES,
)
pub fn new_with_default_batch_limits() -> Self {
Self::new(FORWARDED_BLOCK_COMPUTE_RATIO, DEFAULT_NUMBER_OF_BATCHES)
}
pub fn new(current_bank: Arc<Bank>, limit_ratio: u32, number_of_batches: u32) -> Self {
pub fn new(limit_ratio: u32, number_of_batches: u32) -> Self {
let forward_batches = (0..number_of_batches)
.map(|_| ForwardBatch::new(limit_ratio))
.collect();
Self {
current_bank,
forward_batches,
accepting_packets: true,
}
}
pub fn add_packet(&mut self, packet: Rc<ImmutableDeserializedPacket>) -> bool {
// do not forward packet that cannot be sanitized
if let Some(sanitized_transaction) =
unprocessed_packet_batches::transaction_from_deserialized_packet(
&packet,
&self.current_bank.feature_set,
self.current_bank.vote_only_bank(),
self.current_bank.as_ref(),
)
{
pub fn try_add_packet(
&mut self,
sanitized_transaction: &SanitizedTransaction,
packet: Rc<ImmutableDeserializedPacket>,
) -> bool {
if self.accepting_packets {
// get write_lock_accounts
let message = sanitized_transaction.message();
let write_lock_accounts: Vec<_> = message
@ -154,10 +147,10 @@ impl ForwardPacketBatchesByAccounts {
let requested_cu = packet.compute_unit_limit();
// try to fill into forward batches
self.add_packet_to_batches(&write_lock_accounts, requested_cu, packet)
} else {
false
self.accepting_packets =
self.add_packet_to_batches(&write_lock_accounts, requested_cu, packet);
}
self.accepting_packets
}
pub fn iter_batches(&self) -> impl Iterator<Item = &ForwardBatch> {
@ -189,34 +182,22 @@ impl ForwardPacketBatchesByAccounts {
mod tests {
use {
super::*,
crate::unprocessed_packet_batches::DeserializedPacket,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
transaction_priority_details::TransactionPriorityDetails,
crate::unprocessed_packet_batches::{self, DeserializedPacket},
solana_runtime::transaction_priority_details::TransactionPriorityDetails,
solana_sdk::{
feature_set::FeatureSet, hash::Hash, signature::Keypair, system_transaction,
transaction::SimpleAddressLoader,
},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
std::sync::RwLock,
std::sync::Arc,
};
fn build_bank_forks_for_test() -> Arc<RwLock<BankForks>> {
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank = Bank::new_for_tests(&genesis_config);
let bank_forks = BankForks::new(bank);
Arc::new(RwLock::new(bank_forks))
}
fn build_deserialized_packet_for_test(
priority: u64,
write_to_account: &Pubkey,
compute_unit_limit: u64,
) -> DeserializedPacket {
let tx = system_transaction::transfer(
&Keypair::new(),
&solana_sdk::pubkey::new_rand(),
1,
Hash::new_unique(),
);
let tx =
system_transaction::transfer(&Keypair::new(), write_to_account, 1, Hash::new_unique());
let packet = Packet::from_data(None, &tx).unwrap();
DeserializedPacket::new_with_priority_details(
packet,
@ -239,7 +220,7 @@ mod tests {
let mut forward_batch = ForwardBatch::new(limit_ratio);
let write_lock_accounts = vec![Pubkey::new_unique(), Pubkey::new_unique()];
let packet = build_deserialized_packet_for_test(10, requested_cu);
let packet = build_deserialized_packet_for_test(10, &write_lock_accounts[1], requested_cu);
// first packet will be successful
assert!(forward_batch
.try_add(
@ -261,7 +242,7 @@ mod tests {
}
#[test]
fn test_add_packet_to_batches() {
fn test_try_add_packet_to_batches() {
solana_logger::setup();
// set test batch limit to be 1 millionth of regular block limit
let limit_ratio = 1_000_000u32;
@ -270,11 +251,8 @@ mod tests {
let requested_cu =
block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64);
let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new(
build_bank_forks_for_test().read().unwrap().root_bank(),
limit_ratio,
number_of_batches,
);
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new(limit_ratio, number_of_batches);
// initially both batches are empty
{
@ -286,11 +264,13 @@ mod tests {
let hot_account = solana_sdk::pubkey::new_rand();
let other_account = solana_sdk::pubkey::new_rand();
let packet_high_priority = build_deserialized_packet_for_test(10, requested_cu);
let packet_low_priority = build_deserialized_packet_for_test(0, requested_cu);
let packet_high_priority =
build_deserialized_packet_for_test(10, &hot_account, requested_cu);
let packet_low_priority =
build_deserialized_packet_for_test(0, &other_account, requested_cu);
// with 4 packets, first 3 write to same hot_account with higher priority,
// the 4th write to other_account with lower priority;
// assert the 1st and 4th fit in fist batch, the 2nd in 2nd batch and 3rd will be dropped.
// assert the 1st and 4th fit in first batch, the 2nd in 2nd batch and 3rd will be dropped.
// 1st high-priority packet added to 1st batch
{
@ -343,4 +323,88 @@ mod tests {
assert!(batches.next().is_none());
}
}
#[test]
fn test_try_add_packet() {
solana_logger::setup();
// set test batch limit to be 1 millionth of regular block limit
let limit_ratio = 1_000_000u32;
let number_of_batches = 1;
// set requested_cu to be batch account limit
let requested_cu =
block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64);
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new(limit_ratio, number_of_batches);
let hot_account = solana_sdk::pubkey::new_rand();
let other_account = solana_sdk::pubkey::new_rand();
// assert initially batch is empty, and accepting new packets
{
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(0, batches.next().unwrap().len());
assert!(batches.next().is_none());
assert!(forward_packet_batches_by_accounts.accepting_packets);
}
// build a packet that would take up hot account limit, add it to batches
// assert it is added, and buffer still accepts more packets
{
let packet = build_deserialized_packet_for_test(10, &hot_account, requested_cu);
let tx = unprocessed_packet_batches::transaction_from_deserialized_packet(
packet.immutable_section(),
&Arc::new(FeatureSet::default()),
false, //votes_only,
SimpleAddressLoader::Disabled,
)
.unwrap();
assert!(forward_packet_batches_by_accounts
.try_add_packet(&tx, packet.immutable_section().clone()));
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(1, batches.next().unwrap().len());
assert!(forward_packet_batches_by_accounts.accepting_packets);
}
// build a small packet that writes to hot account, try add it to batches
// assert it is not added, and buffer is no longer accept packets
{
let packet =
build_deserialized_packet_for_test(100, &hot_account, 1 /*requested_cu*/);
let tx = unprocessed_packet_batches::transaction_from_deserialized_packet(
packet.immutable_section(),
&Arc::new(FeatureSet::default()),
false, //votes_only,
SimpleAddressLoader::Disabled,
)
.unwrap();
assert!(!forward_packet_batches_by_accounts
.try_add_packet(&tx, packet.immutable_section().clone()));
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(1, batches.next().unwrap().len());
assert!(!forward_packet_batches_by_accounts.accepting_packets);
}
// build a small packet that writes to other account, try add it to batches
// assert it is not added due to buffer is no longer accept any packet
{
let packet =
build_deserialized_packet_for_test(100, &other_account, 1 /*requested_cu*/);
let tx = unprocessed_packet_batches::transaction_from_deserialized_packet(
packet.immutable_section(),
&Arc::new(FeatureSet::default()),
false, //votes_only,
SimpleAddressLoader::Disabled,
)
.unwrap();
assert!(!forward_packet_batches_by_accounts
.try_add_packet(&tx, packet.immutable_section().clone()));
let mut batches = forward_packet_batches_by_accounts.iter_batches();
assert_eq!(1, batches.next().unwrap().len());
assert!(!forward_packet_batches_by_accounts.accepting_packets);
}
}
}

View File

@ -3,6 +3,7 @@ use {
crate::{
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
unprocessed_packet_batches,
},
itertools::Itertools,
rand::{thread_rng, Rng},
@ -243,11 +244,12 @@ impl LatestUnprocessedVotes {
/// Votes from validators with 0 stakes are ignored
pub fn get_and_insert_forwardable_packets(
&self,
bank: Arc<Bank>,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> usize {
let mut continue_forwarding = true;
let pubkeys_by_stake = weighted_random_order_by_stake(
&forward_packet_batches_by_accounts.current_bank,
&bank,
self.latest_votes_per_pubkey.read().unwrap().keys(),
)
.collect_vec();
@ -260,16 +262,29 @@ impl LatestUnprocessedVotes {
if let Some(lock) = self.get_entry(pubkey) {
let mut vote = lock.write().unwrap();
if !vote.is_vote_taken() && !vote.is_forwarded() {
if forward_packet_batches_by_accounts
.add_packet(vote.vote.as_ref().unwrap().clone())
let deserialized_vote_packet = vote.vote.as_ref().unwrap().clone();
if let Some(sanitized_vote_transaction) =
unprocessed_packet_batches::transaction_from_deserialized_packet(
&deserialized_vote_packet,
&bank.feature_set,
bank.vote_only_bank(),
bank.as_ref(),
)
{
vote.forwarded = true;
if forward_packet_batches_by_accounts.try_add_packet(
&sanitized_vote_transaction,
deserialized_vote_packet,
) {
vote.forwarded = true;
} else {
// To match behavior of regular transactions we stop
// forwarding votes as soon as one fails
continue_forwarding = false;
}
return true;
} else {
// To match behavior of regular transactions we stop
// forwarding votes as soon as one fails
continue_forwarding = false;
return false;
}
return true;
}
}
false
@ -573,10 +588,9 @@ mod tests {
#[test]
fn test_forwardable_packets() {
let latest_unprocessed_votes = LatestUnprocessedVotes::new();
let bank = Arc::new(Bank::default_for_tests());
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(Arc::new(
Bank::default_for_tests(),
));
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let keypair_a = ValidatorVoteKeypairs::new_rand();
let keypair_b = ValidatorVoteKeypairs::new_rand();
@ -588,7 +602,7 @@ mod tests {
// Don't forward 0 stake accounts
let forwarded = latest_unprocessed_votes
.get_and_insert_forwardable_packets(&mut forward_packet_batches_by_accounts);
.get_and_insert_forwardable_packets(bank, &mut forward_packet_batches_by_accounts);
assert_eq!(0, forwarded);
assert_eq!(
0,
@ -606,11 +620,13 @@ mod tests {
.genesis_config;
let bank = Bank::new_for_tests(&config);
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(Arc::new(bank));
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
// Don't forward votes from gossip
let forwarded = latest_unprocessed_votes
.get_and_insert_forwardable_packets(&mut forward_packet_batches_by_accounts);
let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets(
Arc::new(bank),
&mut forward_packet_batches_by_accounts,
);
assert_eq!(0, forwarded);
assert_eq!(
@ -629,11 +645,13 @@ mod tests {
.genesis_config;
let bank = Arc::new(Bank::new_for_tests(&config));
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(bank.clone());
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
// Forward from TPU
let forwarded = latest_unprocessed_votes
.get_and_insert_forwardable_packets(&mut forward_packet_batches_by_accounts);
let forwarded = latest_unprocessed_votes.get_and_insert_forwardable_packets(
bank.clone(),
&mut forward_packet_batches_by_accounts,
);
assert_eq!(1, forwarded);
assert_eq!(
@ -646,9 +664,9 @@ mod tests {
// Don't forward again
let mut forward_packet_batches_by_accounts =
ForwardPacketBatchesByAccounts::new_with_default_batch_limits(bank);
ForwardPacketBatchesByAccounts::new_with_default_batch_limits();
let forwarded = latest_unprocessed_votes
.get_and_insert_forwardable_packets(&mut forward_packet_batches_by_accounts);
.get_and_insert_forwardable_packets(bank, &mut forward_packet_batches_by_accounts);
assert_eq!(0, forwarded);
assert_eq!(

View File

@ -164,34 +164,6 @@ impl UnprocessedPacketBatches {
self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v)
}
/// Iterates DeserializedPackets in descending priority (max-first) order,
/// calls FnMut for each DeserializedPacket.
pub fn iter_desc<F>(&mut self, mut f: F)
where
F: FnMut(&mut DeserializedPacket) -> bool,
{
let mut packet_priority_queue_clone = self.packet_priority_queue.clone();
for immutable_packet in packet_priority_queue_clone.drain_desc() {
match self
.message_hash_to_transaction
.entry(*immutable_packet.message_hash())
{
Entry::Vacant(_vacant_entry) => {
panic!(
"entry {} must exist to be consistent with `packet_priority_queue`",
immutable_packet.message_hash()
);
}
Entry::Occupied(mut occupied_entry) => {
if !f(occupied_entry.get_mut()) {
return;
}
}
}
}
}
pub fn retain<F>(&mut self, mut f: F)
where
F: FnMut(&mut DeserializedPacket) -> bool,
@ -304,6 +276,12 @@ impl UnprocessedPacketBatches {
pub fn capacity(&self) -> usize {
self.packet_priority_queue.capacity()
}
pub fn is_forwarded(&self, immutable_packet: &ImmutableDeserializedPacket) -> bool {
self.message_hash_to_transaction
.get(immutable_packet.message_hash())
.map_or(true, |p| p.forwarded)
}
}
pub fn deserialize_packets<'a>(
@ -340,7 +318,6 @@ pub fn transaction_from_deserialized_packet(
if votes_only && !deserialized_packet.is_simple_vote() {
return None;
}
let tx = SanitizedTransaction::try_new(
deserialized_packet.transaction().clone(),
*deserialized_packet.message_hash(),

View File

@ -1,7 +1,7 @@
#![allow(dead_code)]
use {
crate::{
banking_stage::{BankingStage, FilterForwardingResults, ForwardOption},
banking_stage::{self, BankingStage, FilterForwardingResults, ForwardOption},
forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts,
immutable_deserialized_packet::ImmutableDeserializedPacket,
latest_unprocessed_votes::{
@ -177,13 +177,20 @@ impl UnprocessedTransactionStorage {
pub fn filter_forwardable_packets_and_add_batches(
&mut self,
bank: Arc<Bank>,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
match self {
Self::LocalTransactionStorage(transaction_storage) => transaction_storage
.filter_forwardable_packets_and_add_batches(forward_packet_batches_by_accounts),
.filter_forwardable_packets_and_add_batches(
bank,
forward_packet_batches_by_accounts,
),
Self::VoteStorage(vote_storage) => vote_storage
.filter_forwardable_packets_and_add_batches(forward_packet_batches_by_accounts),
.filter_forwardable_packets_and_add_batches(
bank,
forward_packet_batches_by_accounts,
),
}
}
@ -249,12 +256,13 @@ impl VoteStorage {
fn filter_forwardable_packets_and_add_batches(
&mut self,
bank: Arc<Bank>,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
if matches!(self.vote_source, VoteSource::Tpu) {
let total_forwardable_packets = self
.latest_unprocessed_votes
.get_and_insert_forwardable_packets(forward_packet_batches_by_accounts);
.get_and_insert_forwardable_packets(bank, forward_packet_batches_by_accounts);
return FilterForwardingResults {
total_forwardable_packets,
..FilterForwardingResults::default()
@ -349,11 +357,14 @@ impl ThreadLocalUnprocessedPackets {
fn filter_forwardable_packets_and_add_batches(
&mut self,
bank: Arc<Bank>,
forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts,
) -> FilterForwardingResults {
BankingStage::filter_valid_packets_for_forwarding(
BankingStage::filter_and_forward_with_account_limits(
&bank,
&mut self.unprocessed_packet_batches,
forward_packet_batches_by_accounts,
banking_stage::UNPROCESSED_BUFFER_STEP_SIZE,
)
}