From f207af765e3cd1478a705c279782350f82f5ea50 Mon Sep 17 00:00:00 2001 From: Ashwin Sekar Date: Thu, 20 Oct 2022 14:10:48 -0700 Subject: [PATCH] Split out voting and banking threads in banking stage (#27931) * Split out voting and banking threads in banking stage Additionally this allows us to aggressively prune the buffer for voting threads as with the new vote state only the latest vote from each validator is necessary. * Update local cluster test to use new Vote ix * Encapsulate transaction storage filtering better * Address pr comments * Commit cargo lock change * clippy * Remove unsafe impls * pr comments * compute_sanitized_transaction -> build_sanitized_transaction * &Arc -> Arc * Move test * Refactor metrics enums * clippy --- Cargo.lock | 1 + core/benches/banking_stage.rs | 9 +- core/benches/unprocessed_packet_batches.rs | 34 +- core/src/banking_stage.rs | 1205 ++++++----------- core/src/fetch_stage.rs | 7 +- .../src/forward_packet_batches_by_accounts.rs | 47 +- core/src/immutable_deserialized_packet.rs | 30 +- core/src/latest_unprocessed_votes.rs | 40 +- core/src/leader_slot_banking_stage_metrics.rs | 73 +- core/src/unprocessed_packet_batches.rs | 66 +- core/src/unprocessed_transaction_storage.rs | 684 ++++++++-- local-cluster/tests/local_cluster_slow_1.rs | 30 +- programs/sbf/Cargo.lock | 1 + programs/vote/src/vote_transaction.rs | 30 + runtime/Cargo.toml | 1 + runtime/src/bank.rs | 35 +- sdk/program/src/clock.rs | 4 + 17 files changed, 1305 insertions(+), 992 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 34a7b912e..f4b7dd2a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6159,6 +6159,7 @@ dependencies = [ "solana-logger 1.15.0", "solana-measure", "solana-metrics", + "solana-perf", "solana-program-runtime", "solana-rayon-threadlimit", "solana-sdk 1.15.0", diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index ae0bd6b07..5ad2d81d9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -13,6 +13,7 @@ use { leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, qos_service::QosService, unprocessed_packet_batches::*, + unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, }, solana_entry::entry::{next_hash, Entry}, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -83,8 +84,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let transactions = vec![tx; 4194304]; let batches = transactions_to_deserialized_packets(&transactions).unwrap(); let batches_len = batches.len(); - let mut transaction_buffer = - UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len); + let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len), + ThreadType::Transactions, + ); let (s, _r) = unbounded(); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. @@ -94,7 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { std::u128::MAX, &poh_recorder, &mut transaction_buffer, - None, + &None, &s, None::>, &BankingStageStats::default(), diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index 1fa13dde7..5ba3d903b 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -6,8 +6,11 @@ extern crate test; use { rand::distributions::{Distribution, Uniform}, solana_core::{ - banking_stage::*, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, unprocessed_packet_batches::*, + unprocessed_transaction_storage::{ + ThreadType, UnprocessedTransactionStorage, UNPROCESSED_BUFFER_STEP_SIZE, + }, }, solana_measure::measure::Measure, solana_perf::packet::{Packet, PacketBatch}, @@ -104,7 +107,7 @@ fn insert_packet_batches( #[allow(clippy::unit_arg)] fn bench_packet_clone(bencher: &mut Bencher) { let batch_count = 1000; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; let packet_batches: Vec = (0..batch_count) .map(|_| build_packet_batch(packet_per_batch_count, None).0) @@ -134,9 +137,9 @@ fn bench_packet_clone(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_000; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; bencher.iter(|| { insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false); @@ -148,9 +151,9 @@ fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_100; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; // this is the worst scenario testing: all batches are uniformly populated with packets from // priority 100..228, so in order to drop a batch, algo will have to drop all packets that has @@ -167,9 +170,9 @@ fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_000; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; bencher.iter(|| { insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); @@ -181,9 +184,9 @@ fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Benche #[bench] #[ignore] fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_100; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; bencher.iter(|| { insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); @@ -198,7 +201,6 @@ fn buffer_iter_desc_and_forward( ) { solana_logger::setup(); let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = BankForks::new(bank); @@ -226,13 +228,15 @@ fn buffer_iter_desc_and_forward( // forward whole buffer { + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + unprocessed_packet_batches, + ThreadType::Transactions, + ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let _ = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut unprocessed_packet_batches, + let _ = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank, &mut forward_packet_batches_by_accounts, - 128usize, ); } } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 70105973d..bb27050ce 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,6 +6,7 @@ use { crate::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::ImmutableDeserializedPacket, + latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, leader_slot_banking_stage_timing_metrics::{ LeaderExecuteAndCommitTimings, RecordTransactionsTimings, @@ -14,7 +15,10 @@ use { qos_service::QosService, sigverify::SigverifyTracerPacketStats, tracer_packet_stats::TracerPacketStats, - unprocessed_packet_batches::{self, *}, + unprocessed_packet_batches::*, + unprocessed_transaction_storage::{ + ThreadType, UnprocessedTransactionStorage, UNPROCESSED_BUFFER_STEP_SIZE, + }, }, core::iter::repeat, crossbeam_channel::{ @@ -22,7 +26,6 @@ use { }, histogram::Histogram, itertools::Itertools, - min_max_heap::MinMaxHeap, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ @@ -33,7 +36,6 @@ use { solana_perf::{ data_budget::DataBudget, packet::{Packet, PacketBatch, PACKETS_PER_BATCH}, - perf_libs, }, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_program_runtime::timings::ExecuteTimings, @@ -51,9 +53,10 @@ use { }, solana_sdk::{ clock::{ - Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, - MAX_TRANSACTION_FORWARDING_DELAY_GPU, + Slot, DEFAULT_TICKS_PER_SLOT, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + HOLD_TRANSACTIONS_SLOT_OFFSET, MAX_PROCESSING_AGE, }, + feature_set::allow_votes_to_directly_update_vote_state, pubkey::Pubkey, saturating_add_assign, timing::{duration_as_ms, timestamp, AtomicInterval}, @@ -77,10 +80,6 @@ use { }, }; -/// Transaction forwarding -pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; -pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; - // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 6; @@ -91,7 +90,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 64; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; -pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); pub type BankingPacketBatch = (Vec, Option); @@ -149,7 +147,6 @@ pub struct BankingStageStats { pub(crate) dropped_duplicated_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, - current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, forwarded_transaction_count: AtomicUsize, @@ -187,9 +184,6 @@ impl BankingStageStats { .load(Ordering::Relaxed) as u64 + self.newly_buffered_packets_count.load(Ordering::Relaxed) as u64 + self.current_buffered_packets_count.load(Ordering::Relaxed) as u64 - + self - .current_buffered_packet_batches_count - .load(Ordering::Relaxed) as u64 + self.rebuffered_packets_count.load(Ordering::Relaxed) as u64 + self.consumed_buffered_packets_count.load(Ordering::Relaxed) as u64 + self @@ -240,12 +234,6 @@ impl BankingStageStats { self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "current_buffered_packet_batches_count", - self.current_buffered_packet_batches_count - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "current_buffered_packets_count", self.current_buffered_packets_count @@ -442,21 +430,57 @@ impl BankingStage { let data_budget = Arc::new(DataBudget::default()); let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); + // Keeps track of extraneous vote transactions for the vote threads + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let should_split_voting_threads = bank_forks + .read() + .map(|bank_forks| { + let bank = bank_forks.root_bank(); + bank.feature_set + .is_active(&allow_votes_to_directly_update_vote_state::id()) + }) + .unwrap_or(false); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, forward_option) = match i { - 0 => { - // Disable forwarding of vote transactions - // from gossip. Note - votes can also arrive from tpu - (verified_vote_receiver.clone(), ForwardOption::NotForward) - } - 1 => ( - tpu_verified_vote_receiver.clone(), - ForwardOption::ForwardTpuVote, - ), - _ => (verified_receiver.clone(), ForwardOption::ForwardTransaction), - }; + let (verified_receiver, unprocessed_transaction_storage) = + match (i, should_split_voting_threads) { + (0, false) => ( + verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(batch_limit), + ThreadType::Voting(VoteSource::Gossip), + ), + ), + (0, true) => ( + verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_vote_storage( + latest_unprocessed_votes.clone(), + VoteSource::Gossip, + ), + ), + (1, false) => ( + tpu_verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(batch_limit), + ThreadType::Voting(VoteSource::Tpu), + ), + ), + (1, true) => ( + tpu_verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_vote_storage( + latest_unprocessed_votes.clone(), + VoteSource::Tpu, + ), + ), + _ => ( + verified_receiver.clone(), + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(batch_limit), + ThreadType::Transactions, + ), + ), + }; let mut packet_deserializer = PacketDeserializer::new(verified_receiver); let poh_recorder = poh_recorder.clone(); @@ -476,9 +500,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, - forward_option, i, - batch_limit, transaction_status_sender, gossip_vote_sender, &data_budget, @@ -486,6 +508,7 @@ impl BankingStage { log_messages_bytes_limit, connection_cache, &bank_forks, + unprocessed_transaction_storage, ); }) .unwrap() @@ -589,13 +612,112 @@ impl BankingStage { (Ok(()), packet_vec_len, Some(leader_pubkey)) } + #[allow(clippy::too_many_arguments)] + fn do_process_packets( + max_tx_ingestion_ns: u128, + poh_recorder: &Arc>, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + recorder: &TransactionRecorder, + transaction_status_sender: &Option, + gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, + qos_service: &QosService, + log_messages_bytes_limit: Option, + consumed_buffered_packets_count: &mut usize, + rebuffered_packet_count: &mut usize, + reached_end_of_slot: &mut bool, + test_fn: &Option, + packets_to_process: &Vec>, + ) -> Option> { + // TODO: Right now we iterate through buffer and try the highest weighted transaction once + // but we should retry the highest weighted transactions more often. + let (bank_start, poh_recorder_lock_time) = measure!( + poh_recorder.read().unwrap().bank_start(), + "poh_recorder.read", + ); + slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); + + let packets_to_process_len = packets_to_process.len(); + if let Some(BankStart { + working_bank, + bank_creation_time, + }) = bank_start + { + let (process_transactions_summary, process_packets_transactions_time) = measure!( + Self::process_packets_transactions( + &working_bank, + &bank_creation_time, + recorder, + packets_to_process.iter().map(|p| &**p), + transaction_status_sender, + gossip_vote_sender, + banking_stage_stats, + qos_service, + slot_metrics_tracker, + log_messages_bytes_limit + ), + "process_packets_transactions", + ); + slot_metrics_tracker.increment_process_packets_transactions_us( + process_packets_transactions_time.as_us(), + ); + + let ProcessTransactionsSummary { + reached_max_poh_height, + retryable_transaction_indexes, + .. + } = process_transactions_summary; + + if reached_max_poh_height + || !Bank::should_bank_still_be_processing_txs( + &bank_creation_time, + max_tx_ingestion_ns, + ) + { + *reached_end_of_slot = true; + } + + // The difference between all transactions passed to execution and the ones that + // are retryable were the ones that were either: + // 1) Committed into the block + // 2) Dropped without being committed because they had some fatal error (too old, + // duplicate signature, etc.) + // + // Note: This assumes that every packet deserializes into one transaction! + *consumed_buffered_packets_count += + packets_to_process_len.saturating_sub(retryable_transaction_indexes.len()); + + // Out of the buffered packets just retried, collect any still unprocessed + // transactions in this batch for forwarding + *rebuffered_packet_count += retryable_transaction_indexes.len(); + if let Some(test_fn) = test_fn { + test_fn(); + } + + slot_metrics_tracker + .increment_retryable_packets_count(retryable_transaction_indexes.len() as u64); + + Some(retryable_transaction_indexes) + } else if *reached_end_of_slot { + None + } else { + // mark as end-of-slot to avoid aggressively lock poh for the remaining for + // packet batches in buffer + *reached_end_of_slot = true; + + None + } + } + #[allow(clippy::too_many_arguments)] pub fn consume_buffered_packets( _my_pubkey: &Pubkey, max_tx_ingestion_ns: u128, poh_recorder: &Arc>, - buffered_packet_batches: &mut UnprocessedPacketBatches, - transaction_status_sender: Option, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, test_fn: Option, banking_stage_stats: &BankingStageStats, @@ -607,122 +729,38 @@ impl BankingStage { ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; - let buffered_packets_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = false; - let mut retryable_packets = { - let capacity = buffered_packet_batches.capacity(); - std::mem::replace( - &mut buffered_packet_batches.packet_priority_queue, - MinMaxHeap::with_capacity(capacity), - ) - }; - let retryable_packets: MinMaxHeap> = retryable_packets - .drain_desc() - .chunks(num_packets_to_process_per_iteration) - .into_iter() - .flat_map(|packets_to_process| { - let packets_to_process = packets_to_process.into_iter().collect_vec(); - // TODO: Right now we iterate through buffer and try the highest weighted transaction once - // but we should retry the highest weighted transactions more often. - let (bank_start, poh_recorder_lock_time) = measure!( - poh_recorder.read().unwrap().bank_start(), - "poh_recorder.read", - ); - slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( - poh_recorder_lock_time.as_us(), - ); + let num_packets_to_process = unprocessed_transaction_storage.len(); + let bank = poh_recorder.read().unwrap().bank(); - let packets_to_process_len = packets_to_process.len(); - if let Some(BankStart { - working_bank, - bank_creation_time, - }) = bank_start - { - let (process_transactions_summary, process_packets_transactions_time) = - measure!( - Self::process_packets_transactions( - &working_bank, - &bank_creation_time, - recorder, - packets_to_process.iter().map(|p| &**p), - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - qos_service, - slot_metrics_tracker, - log_messages_bytes_limit - ), - "process_packets_transactions", - ); - slot_metrics_tracker.increment_process_packets_transactions_us( - process_packets_transactions_time.as_us(), - ); - - let ProcessTransactionsSummary { - reached_max_poh_height, - retryable_transaction_indexes, - .. - } = process_transactions_summary; - - if reached_max_poh_height - || !Bank::should_bank_still_be_processing_txs( - &bank_creation_time, - max_tx_ingestion_ns, - ) - { - reached_end_of_slot = true; - } - - // The difference between all transactions passed to execution and the ones that - // are retryable were the ones that were either: - // 1) Committed into the block - // 2) Dropped without being committed because they had some fatal error (too old, - // duplicate signature, etc.) - // - // Note: This assumes that every packet deserializes into one transaction! - consumed_buffered_packets_count += - packets_to_process_len.saturating_sub(retryable_transaction_indexes.len()); - - // Out of the buffered packets just retried, collect any still unprocessed - // transactions in this batch for forwarding - rebuffered_packet_count += retryable_transaction_indexes.len(); - if let Some(test_fn) = &test_fn { - test_fn(); - } - - slot_metrics_tracker.increment_retryable_packets_count( - retryable_transaction_indexes.len() as u64, - ); - - let result = retryable_transaction_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec(); - - // Remove the non-retryable packets, packets that were either: - // 1) Successfully processed - // 2) Failed but not retryable - Self::remove_non_retained_packets(buffered_packet_batches, &packets_to_process, &retryable_transaction_indexes); - - result - } else if reached_end_of_slot { - packets_to_process - } else { - // mark as end-of-slot to avoid aggressively lock poh for the remaining for - // packet batches in buffer - reached_end_of_slot = true; - - packets_to_process - } - }) - .collect(); - - buffered_packet_batches.packet_priority_queue = retryable_packets; + unprocessed_transaction_storage.process_packets( + bank, + num_packets_to_process_per_iteration, + |packets_to_process| { + Self::do_process_packets( + max_tx_ingestion_ns, + poh_recorder, + slot_metrics_tracker, + recorder, + transaction_status_sender, + gossip_vote_sender, + banking_stage_stats, + qos_service, + log_messages_bytes_limit, + &mut consumed_buffered_packets_count, + &mut rebuffered_packet_count, + &mut reached_end_of_slot, + &test_fn, + packets_to_process, + ) + }, + ); if reached_end_of_slot { - slot_metrics_tracker - .set_end_of_slot_unprocessed_buffer_len(buffered_packet_batches.len() as u64); + slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len( + unprocessed_transaction_storage.len() as u64, + ); // We've hit the end of this slot, no need to perform more processing, // Packet filtering will be done before forwarding. @@ -732,17 +770,12 @@ impl BankingStage { debug!( "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", timestamp(), - buffered_packets_len, + num_packets_to_process, proc_start.as_ms(), consumed_buffered_packets_count, (consumed_buffered_packets_count as f32) / (proc_start.as_s()) ); - // Assert unprocessed queue is still consistent - assert_eq!( - buffered_packet_batches.packet_priority_queue.len(), - buffered_packet_batches.message_hash_to_transaction.len() - ); banking_stage_stats .consume_buffered_packets_elapsed .fetch_add(proc_start.as_us(), Ordering::Relaxed); @@ -793,9 +826,8 @@ impl BankingStage { socket: &UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, - buffered_packet_batches: &mut UnprocessedPacketBatches, - forward_option: &ForwardOption, - transaction_status_sender: Option, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, @@ -807,6 +839,9 @@ impl BankingStage { tracer_packet_stats: &mut TracerPacketStats, bank_forks: &Arc>, ) { + if unprocessed_transaction_storage.should_not_process() { + return; + } let ((metrics_action, decision), make_decision_time) = measure!( { let bank_start; @@ -856,7 +891,7 @@ impl BankingStage { my_pubkey, max_tx_ingestion_ns, poh_recorder, - buffered_packet_batches, + unprocessed_transaction_storage, transaction_status_sender, gossip_vote_sender, None::>, @@ -875,9 +910,8 @@ impl BankingStage { BufferedPacketsDecision::Forward => { let (_, forward_time) = measure!( Self::handle_forwarding( - forward_option, cluster_info, - buffered_packet_batches, + unprocessed_transaction_storage, poh_recorder, socket, false, @@ -898,9 +932,8 @@ impl BankingStage { BufferedPacketsDecision::ForwardAndHold => { let (_, forward_and_hold_time) = measure!( Self::handle_forwarding( - forward_option, cluster_info, - buffered_packet_batches, + unprocessed_transaction_storage, poh_recorder, socket, true, @@ -923,9 +956,8 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn handle_forwarding( - forward_option: &ForwardOption, cluster_info: &ClusterInfo, - buffered_packet_batches: &mut UnprocessedPacketBatches, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, @@ -936,12 +968,7 @@ impl BankingStage { tracer_packet_stats: &mut TracerPacketStats, bank_forks: &Arc>, ) { - if let ForwardOption::NotForward = forward_option { - if !hold { - buffered_packet_batches.clear(); - } - return; - } + let forward_option = unprocessed_transaction_storage.forward_option(); // get current root bank from bank_forks, use it to sanitize transaction and // load all accounts from address loader; @@ -952,12 +979,11 @@ impl BankingStage { // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something // already processed), then add to forwarding buffer. - let filter_forwarding_result = Self::filter_and_forward_with_account_limits( - ¤t_bank, - buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); + let filter_forwarding_result = unprocessed_transaction_storage + .filter_forwardable_packets_and_add_batches( + current_bank, + &mut forward_packet_batches_by_accounts, + ); slot_metrics_tracker.increment_transactions_from_packets_us( filter_forwarding_result.total_packet_conversion_us, ); @@ -982,7 +1008,7 @@ impl BankingStage { let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = Self::forward_buffered_packets( connection_cache, - forward_option, + &forward_option, cluster_info, poh_recorder, socket, @@ -1021,307 +1047,17 @@ impl BankingStage { tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( filter_forwarding_result.total_tracer_packets_in_buffer, ); - buffered_packet_batches.clear(); + unprocessed_transaction_storage.clear_forwarded_packets(); } } - /// Filter out packets that fail to sanitize, or are no longer valid (could be - /// too old, a duplicate of something already processed). Doing this in batches to avoid - /// checking bank's blockhash and status cache per transaction which could be bad for performance. - /// Added valid and sanitized packets to forwarding queue. - pub fn filter_and_forward_with_account_limits( - bank: &Arc, - buffered_packet_batches: &mut UnprocessedPacketBatches, - forward_buffer: &mut ForwardPacketBatchesByAccounts, - batch_size: usize, - ) -> FilterForwardingResults { - let mut total_forwardable_tracer_packets: usize = 0; - let mut total_tracer_packets_in_buffer: usize = 0; - let mut total_forwardable_packets: usize = 0; - let mut total_packet_conversion_us: u64 = 0; - let mut total_filter_packets_us: u64 = 0; - let mut dropped_tx_before_forwarding_count: usize = 0; - - let mut original_priority_queue = Self::swap_priority_queue(buffered_packet_batches); - - // indicates if `forward_buffer` still accept more packets, see details at - // `ForwardPacketBatchesByAccounts.rs`. - let mut accepting_packets = true; - // batch iterate through buffered_packet_batches in desc priority order - let retained_priority_queue: MinMaxHeap<_> = original_priority_queue - .drain_desc() - .chunks(batch_size) - .into_iter() - .flat_map(|packets_to_process| { - let packets_to_process = packets_to_process.into_iter().collect_vec(); - - // Vec of same size of `packets_to_process`, each indicates - // corresponding packet is tracer packet. - let tracer_packet_indexes = packets_to_process - .iter() - .map(|deserialized_packet| { - deserialized_packet - .original_packet() - .meta - .is_tracer_packet() - }) - .collect::>(); - saturating_add_assign!( - total_tracer_packets_in_buffer, - tracer_packet_indexes - .iter() - .filter(|is_tracer| **is_tracer) - .count() - ); - - if accepting_packets { - let ( - (sanitized_transactions, transaction_to_packet_indexes), - packet_conversion_time, - ): ((Vec, Vec), _) = measure!( - Self::sanitize_unforwarded_packets( - buffered_packet_batches, - &packets_to_process, - bank, - ), - "sanitize_packet", - ); - saturating_add_assign!( - total_packet_conversion_us, - packet_conversion_time.as_us() - ); - - let (forwardable_transaction_indexes, filter_packets_time) = measure!( - Self::filter_invalid_transactions(&sanitized_transactions, bank,), - "filter_packets", - ); - saturating_add_assign!(total_filter_packets_us, filter_packets_time.as_us()); - - for forwardable_transaction_index in &forwardable_transaction_indexes { - saturating_add_assign!(total_forwardable_packets, 1); - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - if tracer_packet_indexes[forwardable_packet_index] { - saturating_add_assign!(total_forwardable_tracer_packets, 1); - } - } - - let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer( - forward_buffer, - &packets_to_process, - &sanitized_transactions, - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - &mut dropped_tx_before_forwarding_count, - ); - accepting_packets = - accepted_packet_indexes.len() == forwardable_transaction_indexes.len(); - - UnprocessedPacketBatches::mark_accepted_packets_as_forwarded( - buffered_packet_batches, - &packets_to_process, - &accepted_packet_indexes, - ); - - Self::collect_retained_packets( - buffered_packet_batches, - &packets_to_process, - &Self::prepare_filtered_packet_indexes( - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - ), - ) - } else { - // skip sanitizing and filtering if not longer able to add more packets for forwarding - saturating_add_assign!( - dropped_tx_before_forwarding_count, - packets_to_process.len() - ); - packets_to_process - } - }) - .collect(); - - // replace packet priority queue - buffered_packet_batches.packet_priority_queue = retained_priority_queue; - - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - dropped_tx_before_forwarding_count - ); - - FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - total_packet_conversion_us, - total_filter_packets_us, - } - } - - /// Take buffered_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. - fn swap_priority_queue( - buffered_packet_batches: &mut UnprocessedPacketBatches, - ) -> MinMaxHeap> { - let capacity = buffered_packet_batches.capacity(); - std::mem::replace( - &mut buffered_packet_batches.packet_priority_queue, - MinMaxHeap::with_capacity(capacity), - ) - } - - /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. - fn sanitize_unforwarded_packets( - buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Arc], - bank: &Arc, - ) -> (Vec, Vec) { - // Get ref of ImmutableDeserializedPacket - let deserialized_packets = packets_to_process.iter().map(|p| &**p); - let (transactions, transaction_to_packet_indexes): (Vec, Vec) = - deserialized_packets - .enumerate() - .filter_map(|(packet_index, deserialized_packet)| { - if !buffered_packet_batches.is_forwarded(deserialized_packet) { - unprocessed_packet_batches::transaction_from_deserialized_packet( - deserialized_packet, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ) - .map(|transaction| (transaction, packet_index)) - } else { - None - } - }) - .unzip(); - - // report metrics - inc_new_counter_info!("banking_stage-packet_conversion", 1); - let unsanitized_packets_filtered_count = - packets_to_process.len().saturating_sub(transactions.len()); - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - unsanitized_packets_filtered_count - ); - - (transactions, transaction_to_packet_indexes) - } - - /// Checks sanitized transactions against bank, returns valid transaction indexes - fn filter_invalid_transactions( - transactions: &[SanitizedTransaction], - bank: &Arc, - ) -> Vec { - let filter = vec![Ok(()); transactions.len()]; - let results = Self::bank_check_transactions(bank, transactions, &filter); - // report metrics - let filtered_out_transactions_count = transactions.len().saturating_sub(results.len()); - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - filtered_out_transactions_count - ); - - results - .iter() - .enumerate() - .filter_map( - |(tx_index, (result, _))| if result.is_ok() { Some(tx_index) } else { None }, - ) - .collect_vec() - } - - fn prepare_filtered_packet_indexes( - transaction_to_packet_indexes: &[usize], - retained_transaction_indexes: &[usize], - ) -> Vec { - retained_transaction_indexes - .iter() - .map(|tx_index| transaction_to_packet_indexes[*tx_index]) - .collect_vec() - } - - fn collect_retained_packets( - buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) -> Vec> { - Self::remove_non_retained_packets( - buffered_packet_batches, - packets_to_process, - retained_packet_indexes, - ); - retained_packet_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() - } - - /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have - /// been removed from UnprocessedPacketBatches.packet_priority_queue - fn remove_non_retained_packets( - buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) { - Self::filter_processed_packets( - retained_packet_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - buffered_packet_batches - .message_hash_to_transaction - .remove(processed_packet.message_hash()); - } - }, - ) - } - - /// try to add filtered forwardable and valid packets to forward buffer; - /// returns vector of packet indexes that were accepted for forwarding. - fn add_filtered_packets_to_forward_buffer( - forward_buffer: &mut ForwardPacketBatchesByAccounts, - packets_to_process: &[Arc], - transactions: &[SanitizedTransaction], - transaction_to_packet_indexes: &[usize], - forwardable_transaction_indexes: &[usize], - dropped_tx_before_forwarding_count: &mut usize, - ) -> Vec { - let mut added_packets_count: usize = 0; - let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len()); - for forwardable_transaction_index in forwardable_transaction_indexes { - let sanitized_transaction = &transactions[*forwardable_transaction_index]; - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - let immutable_deserialized_packet = - packets_to_process[forwardable_packet_index].clone(); - if !forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet) - { - break; - } - accepted_packet_indexes.push(forwardable_packet_index); - saturating_add_assign!(added_packets_count, 1); - } - - // count the packets not being forwarded in this batch - saturating_add_assign!( - *dropped_tx_before_forwarding_count, - forwardable_transaction_indexes.len() - added_packets_count - ); - - accepted_packet_indexes - } - #[allow(clippy::too_many_arguments)] fn process_loop( packet_deserializer: &mut PacketDeserializer, poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, - forward_option: ForwardOption, id: u32, - batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, data_budget: &DataBudget, @@ -1329,10 +1065,10 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: &Arc>, + mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { let recorder = poh_recorder.read().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let mut tracer_packet_stats = TracerPacketStats::new(id); let qos_service = QosService::new(cost_model, id); @@ -1342,7 +1078,7 @@ impl BankingStage { loop { let my_pubkey = cluster_info.id(); - if !buffered_packet_batches.is_empty() + if !unprocessed_transaction_storage.is_empty() || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { let (_, process_buffered_packets_time) = measure!( @@ -1351,9 +1087,8 @@ impl BankingStage { &socket, poh_recorder, cluster_info, - &mut buffered_packet_batches, - &forward_option, - transaction_status_sender.clone(), + &mut unprocessed_transaction_storage, + &transaction_status_sender, &gossip_vote_sender, &banking_stage_stats, &recorder, @@ -1374,7 +1109,8 @@ impl BankingStage { tracer_packet_stats.report(1000); - let recv_timeout = if !buffered_packet_batches.is_empty() { + // Gossip thread will almost always not wait because the transaction storage will most likely not be empty + let recv_timeout = if !unprocessed_transaction_storage.is_empty() { // If there are buffered packets, run the equivalent of try_recv to try reading more // packets. This prevents starving BankingStage::consume_buffered_packets due to // buffered_packet_batches containing transactions that exceed the cost model for @@ -1391,7 +1127,7 @@ impl BankingStage { recv_start, recv_timeout, id, - &mut buffered_packet_batches, + &mut unprocessed_transaction_storage, &mut banking_stage_stats, &mut tracer_packet_stats, &mut slot_metrics_tracker, @@ -1470,7 +1206,7 @@ impl BankingStage { bank: &Arc, poh: &TransactionRecorder, batch: &TransactionBatch, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, log_messages_bytes_limit: Option, ) -> ExecuteAndCommitTransactionsOutput { @@ -1712,7 +1448,7 @@ impl BankingStage { txs: &[SanitizedTransaction], poh: &TransactionRecorder, chunk_offset: usize, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, log_messages_bytes_limit: Option, @@ -1905,7 +1641,7 @@ impl BankingStage { bank_creation_time: &Instant, transactions: &[SanitizedTransaction], poh: &TransactionRecorder, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, log_messages_bytes_limit: Option, @@ -1937,7 +1673,7 @@ impl BankingStage { &transactions[chunk_start..chunk_end], poh, chunk_start, - transaction_status_sender.clone(), + transaction_status_sender, gossip_vote_sender, qos_service, log_messages_bytes_limit, @@ -2057,35 +1793,6 @@ impl BankingStage { .collect_vec() } - /// Checks a batch of sanitized transactions again bank for age and status - fn bank_check_transactions( - bank: &Arc, - transactions: &[SanitizedTransaction], - filter: &[transaction::Result<()>], - ) -> Vec { - let mut error_counters = TransactionErrorMetrics::default(); - // The following code also checks if the blockhash for a transaction is too old - // The check accounts for - // 1. Transaction forwarding delay - // 2. The slot at which the next leader will actually process the transaction - // Drop the transaction if it will expire by the time the next node receives and processes it - let api = perf_libs::api(); - let max_tx_fwd_delay = if api.is_none() { - MAX_TRANSACTION_FORWARDING_DELAY - } else { - MAX_TRANSACTION_FORWARDING_DELAY_GPU - }; - - bank.check_transactions( - transactions, - filter, - (MAX_PROCESSING_AGE) - .saturating_sub(max_tx_fwd_delay) - .saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize), - &mut error_counters, - ) - } - /// This function filters pending packets that are still valid /// # Arguments /// * `transactions` - a batch of transactions deserialized from packets @@ -2100,37 +1807,22 @@ impl BankingStage { let filter = Self::prepare_filter_for_pending_transactions(transactions.len(), pending_indexes); - let results = Self::bank_check_transactions(bank, transactions, &filter); + let results = bank.check_transactions_with_forwarding_delay( + transactions, + &filter, + FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + ); Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes) } - fn filter_processed_packets<'a, F>( - retryable_transaction_indexes: impl Iterator, - mut f: F, - ) where - F: FnMut(usize, usize), - { - let mut prev_retryable_index = 0; - for (i, retryable_index) in retryable_transaction_indexes.enumerate() { - let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; - - let end = *retryable_index; - prev_retryable_index = *retryable_index; - - if start < end { - f(start, end) - } - } - } - #[allow(clippy::too_many_arguments)] fn process_packets_transactions<'a>( bank: &'a Arc, bank_creation_time: &Instant, poh: &'a TransactionRecorder, deserialized_packets: impl Iterator, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &'a ReplayVoteSender, banking_stage_stats: &'a BankingStageStats, qos_service: &'a QosService, @@ -2145,13 +1837,13 @@ impl BankingStage { deserialized_packets .enumerate() .filter_map(|(i, deserialized_packet)| { - unprocessed_packet_batches::transaction_from_deserialized_packet( - deserialized_packet, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ) - .map(|transaction| (transaction, i)) + deserialized_packet + .build_sanitized_transaction( + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + .map(|transaction| (transaction, i)) }) .unzip(), "packet_conversion", @@ -2238,7 +1930,7 @@ impl BankingStage { recv_start: &mut Instant, recv_timeout: Duration, id: u32, - buffered_packet_batches: &mut UnprocessedPacketBatches, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -2251,7 +1943,7 @@ impl BankingStage { failed_sigverify_count, } = packet_deserializer.handle_received_packets( recv_timeout, - buffered_packet_batches.capacity() - buffered_packet_batches.len(), + unprocessed_transaction_storage.capacity() - unprocessed_transaction_storage.len(), )?; let packet_count = deserialized_packets.len(); debug!( @@ -2273,7 +1965,7 @@ impl BankingStage { let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; Self::push_unprocessed( - buffered_packet_batches, + unprocessed_transaction_storage, deserialized_packets, &mut dropped_packets_count, &mut newly_buffered_packets_count, @@ -2295,18 +1987,15 @@ impl BankingStage { banking_stage_stats .newly_buffered_packets_count .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); - banking_stage_stats - .current_buffered_packet_batches_count - .swap(buffered_packet_batches.len(), Ordering::Relaxed); banking_stage_stats .current_buffered_packets_count - .swap(buffered_packet_batches.len(), Ordering::Relaxed); + .swap(unprocessed_transaction_storage.len(), Ordering::Relaxed); *recv_start = Instant::now(); Ok(()) } fn push_unprocessed( - unprocessed_packet_batches: &mut UnprocessedPacketBatches, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, deserialized_packets: Vec, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, @@ -2323,20 +2012,17 @@ impl BankingStage { slot_metrics_tracker .increment_newly_buffered_packets_count(deserialized_packets.len() as u64); - let (number_of_dropped_packets, number_of_dropped_tracer_packets) = - unprocessed_packet_batches.insert_batch( - deserialized_packets - .into_iter() - .map(DeserializedPacket::from_immutable_section), - ); - - saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets); - slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( - number_of_dropped_packets as u64, + let insert_packet_batches_summary = + unprocessed_transaction_storage.insert_batch(deserialized_packets); + slot_metrics_tracker + .accumulate_insert_packet_batches_summary(&insert_packet_batches_summary); + saturating_add_assign!( + *dropped_packets_count, + insert_packet_batches_summary.total_dropped_packets() + ); + tracer_packet_stats.increment_total_exceeded_banking_stage_buffer( + insert_packet_batches_summary.dropped_tracer_packets(), ); - - tracer_packet_stats - .increment_total_exceeded_banking_stage_buffer(number_of_dropped_tracer_packets); } } @@ -2394,6 +2080,7 @@ where mod tests { use { super::*, + crate::unprocessed_packet_batches, crossbeam_channel::{unbounded, Receiver}, solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta}, solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice}, @@ -2411,7 +2098,7 @@ mod tests { }, solana_program_runtime::timings::ProgramTiming, solana_rpc::transaction_status_service::TransactionStatusService, - solana_runtime::bank_forks::BankForks, + solana_runtime::{bank_forks::BankForks, genesis_utils::activate_feature}, solana_sdk::{ account::AccountSharedData, hash::Hash, @@ -2427,6 +2114,9 @@ mod tests { }, solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, + solana_vote_program::{ + vote_state::VoteStateUpdate, vote_transaction::new_vote_state_update_transaction, + }, std::{ borrow::Cow, collections::HashSet, @@ -2434,7 +2124,6 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, thread::sleep, }, - unprocessed_packet_batches::DeserializedPacket, }; fn new_test_cluster_info(contact_info: ContactInfo) -> ClusterInfo { @@ -3071,7 +2760,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3124,7 +2813,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3208,7 +2897,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3300,7 +2989,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &qos_service, None, @@ -3340,7 +3029,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &qos_service, None, @@ -3437,7 +3126,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3466,144 +3155,6 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_filter_and_forward_with_account_limits() { - solana_logger::setup(); - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = create_genesis_config(10); - let current_bank = Arc::new(Bank::new_for_tests(&genesis_config)); - - let simple_transactions: Vec = (0..256) - .map(|_id| { - // packets are deserialized upon receiving, failed packets will not be - // forwarded; Therefore we need to create real packets here. - let key1 = Keypair::new(); - system_transaction::transfer( - &mint_keypair, - &key1.pubkey(), - genesis_config.rent.minimum_balance(0), - genesis_config.hash(), - ) - }) - .collect_vec(); - - let mut packets: Vec = simple_transactions - .iter() - .enumerate() - .map(|(packets_id, transaction)| { - let mut p = Packet::from_data(None, transaction).unwrap(); - p.meta.port = packets_id as u16; - p.meta.set_tracer(true); - DeserializedPacket::new(p).unwrap() - }) - .collect_vec(); - - // all packets are forwarded - { - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - - let FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - .. - } = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); - assert_eq!(total_forwardable_packets, 256); - assert_eq!(total_tracer_packets_in_buffer, 256); - assert_eq!(total_forwardable_tracer_packets, 256); - - // packets in a batch are forwarded in arbitrary order; verify the ports match after - // sorting - let expected_ports: Vec<_> = (0..256).collect(); - let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts - .iter_batches() - .flat_map(|batch| { - batch - .get_forwardable_packets() - .into_iter() - .map(|p| p.meta.port) - }) - .collect(); - forwarded_ports.sort_unstable(); - assert_eq!(expected_ports, forwarded_ports); - } - - // some packets are forwarded - { - let num_already_forwarded = 16; - for packet in &mut packets[0..num_already_forwarded] { - packet.forwarded = true; - } - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - .. - } = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); - assert_eq!( - total_forwardable_packets, - packets.len() - num_already_forwarded - ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_forwarded - ); - } - - // some packets are invalid (already processed) - { - let num_already_processed = 16; - for tx in &simple_transactions[0..num_already_processed] { - assert_eq!(current_bank.process_transaction(tx), Ok(())); - } - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - .. - } = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); - assert_eq!( - total_forwardable_packets, - packets.len() - num_already_processed - ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_processed - ); - } - } - #[test] fn test_process_transactions_returns_unprocessed_txs() { solana_logger::setup(); @@ -3653,7 +3204,7 @@ mod tests { &Instant::now(), &transactions, &recorder, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3720,7 +3271,7 @@ mod tests { &Instant::now(), &transactions, &recorder, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3948,7 +3499,7 @@ mod tests { &transactions, &recorder, 0, - Some(TransactionStatusSender { + &Some(TransactionStatusSender { sender: transaction_status_sender, }), &gossip_vote_sender, @@ -4117,7 +3668,7 @@ mod tests { &[sanitized_tx.clone()], &recorder, 0, - Some(TransactionStatusSender { + &Some(TransactionStatusSender { sender: transaction_status_sender, }), &gossip_vote_sender, @@ -4222,10 +3773,13 @@ mod tests { unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) .unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - deserialized_packets.into_iter(), - num_conflicting_transactions, + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets.into_iter(), + num_conflicting_transactions, + ), + ThreadType::Transactions, ); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -4238,7 +3792,7 @@ mod tests { max_tx_processing_ns, &poh_recorder, &mut buffered_packet_batches, - None, + &None, &gossip_vote_sender, None::>, &BankingStageStats::default(), @@ -4259,7 +3813,7 @@ mod tests { max_tx_processing_ns, &poh_recorder, &mut buffered_packet_batches, - None, + &None, &gossip_vote_sender, None::>, &BankingStageStats::default(), @@ -4319,10 +3873,13 @@ mod tests { .unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); let num_packets_to_process_per_iteration = 1; - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - deserialized_packets.clone().into_iter(), - num_conflicting_transactions, + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets.clone().into_iter(), + num_conflicting_transactions, + ), + ThreadType::Transactions, ); let all_packet_message_hashes: HashSet = buffered_packet_batches .iter() @@ -4333,7 +3890,7 @@ mod tests { std::u128::MAX, &poh_recorder_, &mut buffered_packet_batches, - None, + &None, &gossip_vote_sender, test_fn, &BankingStageStats::default(), @@ -4431,16 +3988,18 @@ mod tests { let connection_cache = ConnectionCache::default(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); for (name, data_budget, expected_num_forwarded) in test_cases { - let mut unprocessed_packet_batches: UnprocessedPacketBatches = + let unprocessed_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter( vec![deserialized_packet.clone()].into_iter(), 1, ); let stats = BankingStageStats::default(); BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, &cluster_info, - &mut unprocessed_packet_batches, + &mut UnprocessedTransactionStorage::new_transaction_storage( + unprocessed_packet_batches, + ThreadType::Transactions, + ), &poh_recorder, &socket, true, @@ -4491,11 +4050,13 @@ mod tests { DeserializedPacket::new(packet).unwrap() }; - let mut unprocessed_packet_batches: UnprocessedPacketBatches = + let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( UnprocessedPacketBatches::from_iter( vec![forwarded_packet, normal_packet].into_iter(), 2, - ); + ), + ThreadType::Transactions, + ); let genesis_config_info = create_slow_genesis_config(10_000); let GenesisConfigInfo { @@ -4528,35 +4089,15 @@ mod tests { let connection_cache = ConnectionCache::default(); let test_cases = vec![ - ("not-forward", ForwardOption::NotForward, true, vec![], 2), - ( - "fwd-normal", - ForwardOption::ForwardTransaction, - true, - vec![normal_block_hash], - 2, - ), - ( - "fwd-no-op", - ForwardOption::ForwardTransaction, - true, - vec![], - 2, - ), - ( - "fwd-no-hold", - ForwardOption::ForwardTransaction, - false, - vec![], - 0, - ), + ("fwd-normal", true, vec![normal_block_hash], 2), + ("fwd-no-op", true, vec![], 2), + ("fwd-no-hold", false, vec![], 0), ]; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { let stats = BankingStageStats::default(); BankingStage::handle_forwarding( - &forward_option, &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, @@ -4691,53 +4232,139 @@ mod tests { } #[test] - fn test_filter_processed_packets() { - let retryable_indexes = [0, 1, 2, 3]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert!(non_retryable_indexes.is_empty()); + fn test_unprocessed_transaction_storage_full_send() { + solana_logger::setup(); + let GenesisConfigInfo { + mut genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10000); + activate_feature( + &mut genesis_config, + allow_votes_to_directly_update_vote_state::id(), + ); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); + let start_hash = bank.last_blockhash(); + let (verified_sender, verified_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let blockstore = Arc::new( + Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at PohRecord then + // PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config), None); + let cluster_info = new_test_cluster_info(Node::new_localhost().info); + let cluster_info = Arc::new(cluster_info); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - let retryable_indexes = [0, 1, 2, 3, 5]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(4, 5)]); + let banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, + None, + gossip_vote_sender, + Arc::new(RwLock::new(CostModel::default())), + None, + Arc::new(ConnectionCache::default()), + bank_forks, + ); - let retryable_indexes = [1, 2, 3]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1)]); + let keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); + let vote_keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); + for keypair in keypairs.iter() { + bank.process_transaction(&system_transaction::transfer( + &mint_keypair, + &keypair.pubkey(), + 20, + start_hash, + )) + .unwrap(); + } - let retryable_indexes = [1, 2, 3, 5]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]); + // Send a bunch of votes and transfers + let tpu_votes = (0..100_usize) + .map(|i| { + new_vote_state_update_transaction( + VoteStateUpdate::from(vec![ + (0, 8), + (1, 7), + (i as u64 + 10, 6), + (i as u64 + 11, 1), + ]), + Hash::new_unique(), + &keypairs[i], + &vote_keypairs[i], + &vote_keypairs[i], + None, + ); + }) + .collect_vec(); + let gossip_votes = (0..100_usize) + .map(|i| { + new_vote_state_update_transaction( + VoteStateUpdate::from(vec![ + (0, 8), + (1, 7), + (i as u64 + 64 + 5, 6), + (i as u64 + 7, 1), + ]), + Hash::new_unique(), + &keypairs[i], + &vote_keypairs[i], + &vote_keypairs[i], + None, + ); + }) + .collect_vec(); + let txs = (0..100_usize) + .map(|i| { + system_transaction::transfer( + &keypairs[i], + &keypairs[(i + 1) % 100].pubkey(), + 10, + start_hash, + ); + }) + .collect_vec(); - let retryable_indexes = [1, 2, 3, 5, 8]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + let tpu_packet_batches = to_packet_batches(&tpu_votes, 10); + let gossip_packet_batches = to_packet_batches(&gossip_votes, 10); + let tx_packet_batches = to_packet_batches(&txs, 10); - let retryable_indexes = [1, 2, 3, 5, 8, 8]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + // Send em all + [ + (tpu_packet_batches, tpu_vote_sender.clone()), + (gossip_packet_batches, gossip_verified_vote_sender.clone()), + (tx_packet_batches, verified_sender.clone()), + ] + .into_iter() + .map(|(packet_batches, sender)| { + Builder::new() + .spawn(move || sender.send((packet_batches, None)).unwrap()) + .unwrap() + }) + .for_each(|handle| handle.join().unwrap()); + + drop(verified_sender); + drop(tpu_vote_sender); + drop(gossip_verified_vote_sender); + banking_stage.join().unwrap(); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); } } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 701e4f58e..885f30ea8 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,16 +1,13 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use { - crate::{ - banking_stage::HOLD_TRANSACTIONS_SLOT_OFFSET, - result::{Error, Result}, - }, + crate::result::{Error, Result}, crossbeam_channel::{unbounded, RecvTimeoutError}, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, solana_sdk::{ - clock::DEFAULT_TICKS_PER_SLOT, + clock::{DEFAULT_TICKS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET}, packet::{Packet, PacketFlags}, }, solana_streamer::streamer::{ diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs index 9132b14a2..6d0722f7a 100644 --- a/core/src/forward_packet_batches_by_accounts.rs +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -182,7 +182,7 @@ impl ForwardPacketBatchesByAccounts { mod tests { use { super::*, - crate::unprocessed_packet_batches::{self, DeserializedPacket}, + crate::unprocessed_packet_batches::DeserializedPacket, solana_runtime::transaction_priority_details::TransactionPriorityDetails, solana_sdk::{ feature_set::FeatureSet, hash::Hash, signature::Keypair, system_transaction, @@ -352,13 +352,14 @@ mod tests { // assert it is added, and buffer still accepts more packets { let packet = build_deserialized_packet_for_test(10, &hot_account, requested_cu); - let tx = unprocessed_packet_batches::transaction_from_deserialized_packet( - packet.immutable_section(), - &Arc::new(FeatureSet::default()), - false, //votes_only, - SimpleAddressLoader::Disabled, - ) - .unwrap(); + let tx = packet + .immutable_section() + .build_sanitized_transaction( + &Arc::new(FeatureSet::default()), + false, //votes_only, + SimpleAddressLoader::Disabled, + ) + .unwrap(); assert!(forward_packet_batches_by_accounts .try_add_packet(&tx, packet.immutable_section().clone())); @@ -372,13 +373,14 @@ mod tests { { let packet = build_deserialized_packet_for_test(100, &hot_account, 1 /*requested_cu*/); - let tx = unprocessed_packet_batches::transaction_from_deserialized_packet( - packet.immutable_section(), - &Arc::new(FeatureSet::default()), - false, //votes_only, - SimpleAddressLoader::Disabled, - ) - .unwrap(); + let tx = packet + .immutable_section() + .build_sanitized_transaction( + &Arc::new(FeatureSet::default()), + false, //votes_only, + SimpleAddressLoader::Disabled, + ) + .unwrap(); assert!(!forward_packet_batches_by_accounts .try_add_packet(&tx, packet.immutable_section().clone())); @@ -392,13 +394,14 @@ mod tests { { let packet = build_deserialized_packet_for_test(100, &other_account, 1 /*requested_cu*/); - let tx = unprocessed_packet_batches::transaction_from_deserialized_packet( - packet.immutable_section(), - &Arc::new(FeatureSet::default()), - false, //votes_only, - SimpleAddressLoader::Disabled, - ) - .unwrap(); + let tx = packet + .immutable_section() + .build_sanitized_transaction( + &Arc::new(FeatureSet::default()), + false, //votes_only, + SimpleAddressLoader::Disabled, + ) + .unwrap(); assert!(!forward_packet_batches_by_accounts .try_add_packet(&tx, packet.immutable_section().clone())); diff --git a/core/src/immutable_deserialized_packet.rs b/core/src/immutable_deserialized_packet.rs index a8cefa993..923da031e 100644 --- a/core/src/immutable_deserialized_packet.rs +++ b/core/src/immutable_deserialized_packet.rs @@ -4,14 +4,18 @@ use { GetTransactionPriorityDetails, TransactionPriorityDetails, }, solana_sdk::{ + feature_set, hash::Hash, message::Message, sanitize::SanitizeError, short_vec::decode_shortu16_len, signature::Signature, - transaction::{SanitizedVersionedTransaction, VersionedTransaction}, + transaction::{ + AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, + VersionedTransaction, + }, }, - std::{cmp::Ordering, mem::size_of}, + std::{cmp::Ordering, mem::size_of, sync::Arc}, thiserror::Error, }; @@ -94,6 +98,28 @@ impl ImmutableDeserializedPacket { pub fn compute_unit_limit(&self) -> u64 { self.priority_details.compute_unit_limit } + + // This function deserializes packets into transactions, computes the blake3 hash of transaction + // messages, and verifies secp256k1 instructions. + pub fn build_sanitized_transaction( + &self, + feature_set: &Arc, + votes_only: bool, + address_loader: impl AddressLoader, + ) -> Option { + if votes_only && !self.is_simple_vote() { + return None; + } + let tx = SanitizedTransaction::try_new( + self.transaction().clone(), + *self.message_hash(), + self.is_simple_vote(), + address_loader, + ) + .ok()?; + tx.verify_precompiles(feature_set).ok()?; + Some(tx) + } } impl PartialOrd for ImmutableDeserializedPacket { diff --git a/core/src/latest_unprocessed_votes.rs b/core/src/latest_unprocessed_votes.rs index fe73582ab..2e4fe33bc 100644 --- a/core/src/latest_unprocessed_votes.rs +++ b/core/src/latest_unprocessed_votes.rs @@ -1,13 +1,11 @@ -#![allow(dead_code)] use { crate::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, - unprocessed_packet_batches, }, itertools::Itertools, rand::{thread_rng, Rng}, - solana_perf::packet::{Packet, PacketBatch}, + solana_perf::packet::Packet, solana_runtime::bank::Bank, solana_sdk::{clock::Slot, program_utils::limited_deserialize, pubkey::Pubkey}, solana_vote_program::vote_instruction::VoteInstruction, @@ -56,7 +54,9 @@ impl LatestValidatorVotePacket { match limited_deserialize::(&instruction.data) { Ok(VoteInstruction::UpdateVoteState(vote_state_update)) - | Ok(VoteInstruction::UpdateVoteStateSwitch(vote_state_update, _)) => { + | Ok(VoteInstruction::UpdateVoteStateSwitch(vote_state_update, _)) + | Ok(VoteInstruction::CompactUpdateVoteState(vote_state_update)) + | Ok(VoteInstruction::CompactUpdateVoteStateSwitch(vote_state_update, _)) => { let &pubkey = message .message .static_account_keys() @@ -102,16 +102,6 @@ impl LatestValidatorVotePacket { } } -pub fn deserialize_packets<'a>( - packet_batch: &'a PacketBatch, - packet_indexes: &'a [usize], - vote_source: VoteSource, -) -> impl Iterator + 'a { - packet_indexes.iter().filter_map(move |packet_index| { - LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() - }) -} - // TODO: replace this with rand::seq::index::sample_weighted once we can update rand to 0.8+ // This requires updating dependencies of ed25519-dalek as rand_core is not compatible cross // version https://github.com/dalek-cryptography/ed25519-dalek/pull/214 @@ -135,7 +125,8 @@ pub(crate) fn weighted_random_order_by_stake<'a>( pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) } -pub struct VoteBatchInsertionMetrics { +#[derive(Default, Debug)] +pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_gossip: usize, pub(crate) num_dropped_tpu: usize, } @@ -164,7 +155,7 @@ impl LatestUnprocessedVotes { self.len() == 0 } - pub fn insert_batch( + pub(crate) fn insert_batch( &self, votes: impl Iterator, ) -> VoteBatchInsertionMetrics { @@ -259,9 +250,8 @@ impl LatestUnprocessedVotes { let mut vote = lock.write().unwrap(); if !vote.is_vote_taken() && !vote.is_forwarded() { let deserialized_vote_packet = vote.vote.as_ref().unwrap().clone(); - if let Some(sanitized_vote_transaction) = - unprocessed_packet_batches::transaction_from_deserialized_packet( - &deserialized_vote_packet, + if let Some(sanitized_vote_transaction) = deserialized_vote_packet + .build_sanitized_transaction( &bank.feature_set, bank.vote_only_bank(), bank.as_ref(), @@ -329,7 +319,7 @@ mod tests { super::*, itertools::Itertools, rand::{thread_rng, Rng}, - solana_perf::packet::{Packet, PacketFlags}, + solana_perf::packet::{Packet, PacketBatch, PacketFlags}, solana_runtime::{ bank::Bank, genesis_utils::{self, ValidatorVoteKeypairs}, @@ -361,6 +351,16 @@ mod tests { LatestValidatorVotePacket::new(packet, vote_source).unwrap() } + fn deserialize_packets<'a>( + packet_batch: &'a PacketBatch, + packet_indexes: &'a [usize], + vote_source: VoteSource, + ) -> impl Iterator + 'a { + packet_indexes.iter().filter_map(move |packet_index| { + LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() + }) + } + #[test] fn test_deserialize_vote_packets() { let keypairs = ValidatorVoteKeypairs::new_rand(); diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs index ed556991e..f2efc6db8 100644 --- a/core/src/leader_slot_banking_stage_metrics.rs +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -1,5 +1,8 @@ use { - crate::leader_slot_banking_stage_timing_metrics::*, + crate::{ + leader_slot_banking_stage_timing_metrics::*, + unprocessed_transaction_storage::InsertPacketBatchSummary, + }, solana_poh::poh_recorder::BankStart, solana_runtime::transaction_error_metrics::*, solana_sdk::{clock::Slot, saturating_add_assign}, @@ -270,6 +273,8 @@ pub(crate) struct LeaderSlotMetrics { transaction_error_metrics: TransactionErrorMetrics, + vote_packet_count_metrics: VotePacketCountMetrics, + timing_metrics: LeaderSlotTimingMetrics, // Used by tests to check if the `self.report()` method was called @@ -283,6 +288,7 @@ impl LeaderSlotMetrics { slot, packet_count_metrics: LeaderSlotPacketCountMetrics::new(), transaction_error_metrics: TransactionErrorMetrics::new(), + vote_packet_count_metrics: VotePacketCountMetrics::new(), timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), is_reported: false, } @@ -294,6 +300,7 @@ impl LeaderSlotMetrics { self.timing_metrics.report(self.id, self.slot); self.transaction_error_metrics.report(self.id, self.slot); self.packet_count_metrics.report(self.id, self.slot); + self.vote_packet_count_metrics.report(self.id, self.slot); } /// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None @@ -310,6 +317,33 @@ impl LeaderSlotMetrics { } } +// Metrics describing vote tx packets that were processed in the tpu vote thread as well as +// extraneous votes that were filtered out +#[derive(Debug, Default)] +pub(crate) struct VotePacketCountMetrics { + // How many votes ingested from gossip were dropped + dropped_gossip_votes: u64, + + // How many votes ingested from tpu were dropped + dropped_tpu_votes: u64, +} + +impl VotePacketCountMetrics { + fn new() -> Self { + Self { ..Self::default() } + } + + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-vote_packet_counts", + ("id", id, i64), + ("slot", slot, i64), + ("dropped_gossip_votes", self.dropped_gossip_votes, i64), + ("dropped_tpu_votes", self.dropped_tpu_votes, i64) + ); + } +} + #[derive(Debug)] pub(crate) enum MetricsTrackerAction { Noop, @@ -498,6 +532,21 @@ impl LeaderSlotMetricsTracker { } } + pub(crate) fn accumulate_insert_packet_batches_summary( + &mut self, + insert_packet_batches_summary: &InsertPacketBatchSummary, + ) { + self.increment_exceeded_buffer_limit_dropped_packets_count( + insert_packet_batches_summary.total_dropped_packets() as u64, + ); + self.increment_dropped_gossip_vote_count( + insert_packet_batches_summary.dropped_gossip_packets() as u64, + ); + self.increment_dropped_tpu_vote_count( + insert_packet_batches_summary.dropped_tpu_packets() as u64 + ); + } + pub(crate) fn accumulate_transaction_errors( &mut self, error_metrics: &TransactionErrorMetrics, @@ -780,6 +829,28 @@ impl LeaderSlotMetricsTracker { ); } } + + pub(crate) fn increment_dropped_gossip_vote_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .vote_packet_count_metrics + .dropped_gossip_votes, + count + ); + } + } + + pub(crate) fn increment_dropped_tpu_vote_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .vote_packet_count_metrics + .dropped_tpu_votes, + count + ); + } + } } #[cfg(test)] diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index a72ba3dff..420cc14e8 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -3,11 +3,7 @@ use { min_max_heap::MinMaxHeap, solana_perf::packet::{Packet, PacketBatch}, solana_runtime::transaction_priority_details::TransactionPriorityDetails, - solana_sdk::{ - feature_set, - hash::Hash, - transaction::{AddressLoader, SanitizedTransaction, Transaction}, - }, + solana_sdk::{hash::Hash, transaction::Transaction}, std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap}, @@ -74,6 +70,12 @@ impl Ord for DeserializedPacket { } } +#[derive(Debug)] +pub struct PacketBatchInsertionMetrics { + pub(crate) num_dropped_packets: usize, + pub(crate) num_dropped_tracer_packets: usize, +} + /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store /// PacketBatch's received from sigverify. Banking thread continuously scans the buffer /// to pick proper packets to add to the block. @@ -115,7 +117,7 @@ impl UnprocessedPacketBatches { pub fn insert_batch( &mut self, deserialized_packets: impl Iterator, - ) -> (usize, usize) { + ) -> PacketBatchInsertionMetrics { let mut num_dropped_packets = 0; let mut num_dropped_tracer_packets = 0; for deserialized_packet in deserialized_packets { @@ -131,7 +133,10 @@ impl UnprocessedPacketBatches { } } } - (num_dropped_packets, num_dropped_tracer_packets) + PacketBatchInsertionMetrics { + num_dropped_packets, + num_dropped_tracer_packets, + } } /// Pushes a new `deserialized_packet` into the unprocessed packet batches if it does not already @@ -283,7 +288,7 @@ impl UnprocessedPacketBatches { } pub fn mark_accepted_packets_as_forwarded( - buffered_packet_batches: &mut UnprocessedPacketBatches, + &mut self, packets_to_process: &[Arc], accepted_packet_indexes: &[usize], ) { @@ -291,7 +296,7 @@ impl UnprocessedPacketBatches { .iter() .for_each(|accepted_packet_index| { let accepted_packet = packets_to_process[*accepted_packet_index].clone(); - if let Some(deserialized_packet) = buffered_packet_batches + if let Some(deserialized_packet) = self .message_hash_to_transaction .get_mut(accepted_packet.message_hash()) { @@ -322,30 +327,6 @@ pub fn transactions_to_deserialized_packets( .collect() } -// This function deserializes packets into transactions, computes the blake3 hash of transaction -// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned -// with their packet indexes. -#[allow(clippy::needless_collect)] -pub fn transaction_from_deserialized_packet( - deserialized_packet: &ImmutableDeserializedPacket, - feature_set: &Arc, - votes_only: bool, - address_loader: impl AddressLoader, -) -> Option { - if votes_only && !deserialized_packet.is_simple_vote() { - return None; - } - let tx = SanitizedTransaction::try_new( - deserialized_packet.transaction().clone(), - *deserialized_packet.message_hash(), - deserialized_packet.is_simple_vote(), - address_loader, - ) - .ok()?; - tx.verify_precompiles(feature_set).ok()?; - Some(tx) -} - #[cfg(test)] mod tests { use { @@ -357,6 +338,7 @@ mod tests { transaction::{SimpleAddressLoader, Transaction}, }, solana_vote_program::vote_transaction, + std::sync::Arc, }; fn simple_deserialized_packet() -> DeserializedPacket { @@ -529,8 +511,7 @@ mod tests { let mut votes_only = false; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -540,8 +521,7 @@ mod tests { votes_only = true; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -560,8 +540,7 @@ mod tests { let mut votes_only = false; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -571,8 +550,7 @@ mod tests { votes_only = true; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -591,8 +569,7 @@ mod tests { let mut votes_only = false; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -602,8 +579,7 @@ mod tests { votes_only = true; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 6642da00a..9c6f8438a 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -1,22 +1,28 @@ -#![allow(dead_code)] use { crate::{ - banking_stage::{self, BankingStage, FilterForwardingResults, ForwardOption}, + banking_stage::{FilterForwardingResults, ForwardOption}, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::ImmutableDeserializedPacket, latest_unprocessed_votes::{ - self, LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics, + LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics, VoteSource, }, - unprocessed_packet_batches::{self, DeserializedPacket, UnprocessedPacketBatches}, + unprocessed_packet_batches::{ + DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches, + }, }, itertools::Itertools, min_max_heap::MinMaxHeap, - solana_perf::packet::PacketBatch, + solana_measure::measure, solana_runtime::bank::Bank, + solana_sdk::{ + clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign, + transaction::SanitizedTransaction, + }, std::sync::Arc, }; +pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const MAX_STAKED_VALIDATORS: usize = 10_000; #[derive(Debug)] @@ -43,12 +49,54 @@ pub enum ThreadType { Transactions, } -#[derive(Debug, Default)] -pub struct InsertPacketBatchesSummary { - pub(crate) num_dropped_packets: usize, - pub(crate) num_dropped_gossip_vote_packets: usize, - pub(crate) num_dropped_tpu_vote_packets: usize, - pub(crate) num_dropped_tracer_packets: usize, +#[derive(Debug)] +pub(crate) enum InsertPacketBatchSummary { + VoteBatchInsertionMetrics(VoteBatchInsertionMetrics), + PacketBatchInsertionMetrics(PacketBatchInsertionMetrics), +} + +impl InsertPacketBatchSummary { + pub fn total_dropped_packets(&self) -> usize { + match self { + Self::VoteBatchInsertionMetrics(metrics) => { + metrics.num_dropped_gossip + metrics.num_dropped_tpu + } + Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_packets, + } + } + + pub fn dropped_gossip_packets(&self) -> usize { + match self { + Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_gossip, + _ => 0, + } + } + + pub fn dropped_tpu_packets(&self) -> usize { + match self { + Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_tpu, + _ => 0, + } + } + + pub fn dropped_tracer_packets(&self) -> usize { + match self { + Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_tracer_packets, + _ => 0, + } + } +} + +impl From for InsertPacketBatchSummary { + fn from(metrics: VoteBatchInsertionMetrics) -> Self { + Self::VoteBatchInsertionMetrics(metrics) + } +} + +impl From for InsertPacketBatchSummary { + fn from(metrics: PacketBatchInsertionMetrics) -> Self { + Self::PacketBatchInsertionMetrics(metrics) + } } fn filter_processed_packets<'a, F>( @@ -145,33 +193,17 @@ impl UnprocessedTransactionStorage { } } - pub fn deserialize_and_insert_batch( + pub(crate) fn insert_batch( &mut self, - packet_batch: &PacketBatch, - packet_indexes: &[usize], - ) -> InsertPacketBatchesSummary { + deserialized_packets: Vec, + ) -> InsertPacketBatchSummary { match self { Self::VoteStorage(vote_storage) => { - let VoteBatchInsertionMetrics { - num_dropped_gossip, - num_dropped_tpu, - } = vote_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); - InsertPacketBatchesSummary { - num_dropped_packets: num_dropped_gossip + num_dropped_tpu, - num_dropped_gossip_vote_packets: num_dropped_gossip, - num_dropped_tpu_vote_packets: num_dropped_tpu, - ..InsertPacketBatchesSummary::default() - } - } - Self::LocalTransactionStorage(transaction_storage) => { - let (num_dropped_packets, num_dropped_tracer_packets) = - transaction_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); - InsertPacketBatchesSummary { - num_dropped_packets, - num_dropped_tracer_packets, - ..InsertPacketBatchesSummary::default() - } + InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) } + Self::LocalTransactionStorage(transaction_storage) => InsertPacketBatchSummary::from( + transaction_storage.insert_batch(deserialized_packets), + ), } } @@ -241,17 +273,22 @@ impl VoteStorage { self.latest_unprocessed_votes.clear_forwarded_packets(); } - fn deserialize_and_insert_batch( + fn insert_batch( &mut self, - packet_batch: &PacketBatch, - packet_indexes: &[usize], + deserialized_packets: Vec, ) -> VoteBatchInsertionMetrics { self.latest_unprocessed_votes - .insert_batch(latest_unprocessed_votes::deserialize_packets( - packet_batch, - packet_indexes, - self.vote_source, - )) + .insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + ) + .ok() + }), + ) } fn filter_forwardable_packets_and_add_batches( @@ -345,13 +382,14 @@ impl ThreadLocalUnprocessedPackets { self.unprocessed_packet_batches.clear(); } - fn deserialize_and_insert_batch( + fn insert_batch( &mut self, - packet_batch: &PacketBatch, - packet_indexes: &[usize], - ) -> (usize, usize) { + deserialized_packets: Vec, + ) -> PacketBatchInsertionMetrics { self.unprocessed_packet_batches.insert_batch( - unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes), + deserialized_packets + .into_iter() + .map(DeserializedPacket::from_immutable_section), ) } @@ -360,11 +398,298 @@ impl ThreadLocalUnprocessedPackets { bank: Arc, forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> FilterForwardingResults { - BankingStage::filter_and_forward_with_account_limits( - &bank, - &mut self.unprocessed_packet_batches, + self.filter_and_forward_with_account_limits( + bank, forward_packet_batches_by_accounts, - banking_stage::UNPROCESSED_BUFFER_STEP_SIZE, + UNPROCESSED_BUFFER_STEP_SIZE, + ) + } + + /// Filter out packets that fail to sanitize, or are no longer valid (could be + /// too old, a duplicate of something already processed). Doing this in batches to avoid + /// checking bank's blockhash and status cache per transaction which could be bad for performance. + /// Added valid and sanitized packets to forwarding queue. + fn filter_and_forward_with_account_limits( + &mut self, + bank: Arc, + forward_buffer: &mut ForwardPacketBatchesByAccounts, + batch_size: usize, + ) -> FilterForwardingResults { + let mut total_forwardable_tracer_packets: usize = 0; + let mut total_tracer_packets_in_buffer: usize = 0; + let mut total_forwardable_packets: usize = 0; + let mut total_packet_conversion_us: u64 = 0; + let mut total_filter_packets_us: u64 = 0; + let mut dropped_tx_before_forwarding_count: usize = 0; + + let mut original_priority_queue = self.swap_priority_queue(); + + // indicates if `forward_buffer` still accept more packets, see details at + // `ForwardPacketBatchesByAccounts.rs`. + let mut accepting_packets = true; + // batch iterate through self.unprocessed_packet_batches in desc priority order + let retained_priority_queue: MinMaxHeap> = + original_priority_queue + .drain_desc() + .chunks(batch_size) + .into_iter() + .flat_map(|packets_to_process| { + let packets_to_process = packets_to_process.into_iter().collect_vec(); + + // Vec of same size of `packets_to_process`, each indicates + // corresponding packet is tracer packet. + let tracer_packet_indexes = packets_to_process + .iter() + .map(|deserialized_packet| { + deserialized_packet + .original_packet() + .meta + .is_tracer_packet() + }) + .collect::>(); + saturating_add_assign!( + total_tracer_packets_in_buffer, + tracer_packet_indexes + .iter() + .filter(|is_tracer| **is_tracer) + .count() + ); + + if accepting_packets { + let ( + (sanitized_transactions, transaction_to_packet_indexes), + packet_conversion_time, + ): ((Vec, Vec), _) = measure!( + self.sanitize_unforwarded_packets(&packets_to_process, &bank,), + "sanitize_packet", + ); + saturating_add_assign!( + total_packet_conversion_us, + packet_conversion_time.as_us() + ); + + let (forwardable_transaction_indexes, filter_packets_time) = measure!( + Self::filter_invalid_transactions(&sanitized_transactions, &bank,), + "filter_packets", + ); + saturating_add_assign!( + total_filter_packets_us, + filter_packets_time.as_us() + ); + + for forwardable_transaction_index in &forwardable_transaction_indexes { + saturating_add_assign!(total_forwardable_packets, 1); + let forwardable_packet_index = + transaction_to_packet_indexes[*forwardable_transaction_index]; + if tracer_packet_indexes[forwardable_packet_index] { + saturating_add_assign!(total_forwardable_tracer_packets, 1); + } + } + + let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer( + forward_buffer, + &packets_to_process, + &sanitized_transactions, + &transaction_to_packet_indexes, + &forwardable_transaction_indexes, + &mut dropped_tx_before_forwarding_count, + ); + accepting_packets = + accepted_packet_indexes.len() == forwardable_transaction_indexes.len(); + + self.unprocessed_packet_batches + .mark_accepted_packets_as_forwarded( + &packets_to_process, + &accepted_packet_indexes, + ); + + self.collect_retained_packets( + &packets_to_process, + &Self::prepare_filtered_packet_indexes( + &transaction_to_packet_indexes, + &forwardable_transaction_indexes, + ), + ) + } else { + // skip sanitizing and filtering if not longer able to add more packets for forwarding + saturating_add_assign!( + dropped_tx_before_forwarding_count, + packets_to_process.len() + ); + packets_to_process + } + }) + .collect(); + + // replace packet priority queue + self.unprocessed_packet_batches.packet_priority_queue = retained_priority_queue; + + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + dropped_tx_before_forwarding_count + ); + + FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + total_packet_conversion_us, + total_filter_packets_us, + } + } + + /// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. + fn swap_priority_queue(&mut self) -> MinMaxHeap> { + let capacity = self.unprocessed_packet_batches.capacity(); + std::mem::replace( + &mut self.unprocessed_packet_batches.packet_priority_queue, + MinMaxHeap::with_capacity(capacity), + ) + } + + /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. + fn sanitize_unforwarded_packets( + &mut self, + packets_to_process: &[Arc], + bank: &Arc, + ) -> (Vec, Vec) { + // Get ref of ImmutableDeserializedPacket + let deserialized_packets = packets_to_process.iter().map(|p| &**p); + let (transactions, transaction_to_packet_indexes): (Vec, Vec) = + deserialized_packets + .enumerate() + .filter_map(|(packet_index, deserialized_packet)| { + if !self + .unprocessed_packet_batches + .is_forwarded(deserialized_packet) + { + deserialized_packet + .build_sanitized_transaction( + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + .map(|transaction| (transaction, packet_index)) + } else { + None + } + }) + .unzip(); + + // report metrics + inc_new_counter_info!("banking_stage-packet_conversion", 1); + let unsanitized_packets_filtered_count = + packets_to_process.len().saturating_sub(transactions.len()); + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + unsanitized_packets_filtered_count + ); + + (transactions, transaction_to_packet_indexes) + } + + /// Checks sanitized transactions against bank, returns valid transaction indexes + fn filter_invalid_transactions( + transactions: &[SanitizedTransaction], + bank: &Arc, + ) -> Vec { + let filter = vec![Ok(()); transactions.len()]; + let results = bank.check_transactions_with_forwarding_delay( + transactions, + &filter, + FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + ); + // report metrics + let filtered_out_transactions_count = transactions.len().saturating_sub(results.len()); + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + filtered_out_transactions_count + ); + + results + .iter() + .enumerate() + .filter_map( + |(tx_index, (result, _))| if result.is_ok() { Some(tx_index) } else { None }, + ) + .collect_vec() + } + + fn prepare_filtered_packet_indexes( + transaction_to_packet_indexes: &[usize], + retained_transaction_indexes: &[usize], + ) -> Vec { + retained_transaction_indexes + .iter() + .map(|tx_index| transaction_to_packet_indexes[*tx_index]) + .collect_vec() + } + + /// try to add filtered forwardable and valid packets to forward buffer; + /// returns vector of packet indexes that were accepted for forwarding. + fn add_filtered_packets_to_forward_buffer( + forward_buffer: &mut ForwardPacketBatchesByAccounts, + packets_to_process: &[Arc], + transactions: &[SanitizedTransaction], + transaction_to_packet_indexes: &[usize], + forwardable_transaction_indexes: &[usize], + dropped_tx_before_forwarding_count: &mut usize, + ) -> Vec { + let mut added_packets_count: usize = 0; + let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len()); + for forwardable_transaction_index in forwardable_transaction_indexes { + let sanitized_transaction = &transactions[*forwardable_transaction_index]; + let forwardable_packet_index = + transaction_to_packet_indexes[*forwardable_transaction_index]; + let immutable_deserialized_packet = + packets_to_process[forwardable_packet_index].clone(); + if !forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet) + { + break; + } + accepted_packet_indexes.push(forwardable_packet_index); + saturating_add_assign!(added_packets_count, 1); + } + + // count the packets not being forwarded in this batch + saturating_add_assign!( + *dropped_tx_before_forwarding_count, + forwardable_transaction_indexes.len() - added_packets_count + ); + + accepted_packet_indexes + } + + fn collect_retained_packets( + &mut self, + packets_to_process: &[Arc], + retained_packet_indexes: &[usize], + ) -> Vec> { + self.remove_non_retained_packets(packets_to_process, retained_packet_indexes); + retained_packet_indexes + .iter() + .map(|i| packets_to_process[*i].clone()) + .collect_vec() + } + + /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have + /// been removed from UnprocessedPacketBatches.packet_priority_queue + fn remove_non_retained_packets( + &mut self, + packets_to_process: &[Arc], + retained_packet_indexes: &[usize], + ) { + filter_processed_packets( + retained_packet_indexes + .iter() + .chain(std::iter::once(&packets_to_process.len())), + |start, end| { + for processed_packet in &packets_to_process[start..end] { + self.unprocessed_packet_batches + .message_hash_to_transaction + .remove(processed_packet.message_hash()); + } + }, ) } @@ -372,13 +697,7 @@ impl ThreadLocalUnprocessedPackets { where F: FnMut(&Vec>) -> Option>, { - let mut retryable_packets = { - let capacity = self.unprocessed_packet_batches.capacity(); - std::mem::replace( - &mut self.unprocessed_packet_batches.packet_priority_queue, - MinMaxHeap::with_capacity(capacity), - ) - }; + let mut retryable_packets = self.swap_priority_queue(); let retryable_packets: MinMaxHeap> = retryable_packets .drain_desc() .chunks(batch_size) @@ -388,25 +707,10 @@ impl ThreadLocalUnprocessedPackets { if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process) { - // Remove the non-retryable packets, packets that were either: - // 1) Successfully processed - // 2) Failed but not retryable - filter_processed_packets( - retryable_transaction_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - self.unprocessed_packet_batches - .message_hash_to_transaction - .remove(processed_packet.message_hash()); - } - }, - ); - retryable_transaction_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() + self.collect_retained_packets( + &packets_to_process, + &retryable_transaction_indexes, + ) } else { packets_to_process } @@ -427,7 +731,21 @@ impl ThreadLocalUnprocessedPackets { #[cfg(test)] mod tests { - use super::*; + use { + super::*, + solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_perf::packet::{Packet, PacketFlags}, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, + transaction::Transaction, + }, + solana_vote_program::{ + vote_state::VoteStateUpdate, vote_transaction::new_vote_state_update_transaction, + }, + std::error::Error, + }; #[test] fn test_filter_processed_packets() { @@ -479,4 +797,214 @@ mod tests { filter_processed_packets(retryable_indexes.iter(), f); assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); } + + #[test] + fn test_filter_and_forward_with_account_limits() { + solana_logger::setup(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10); + let current_bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + let simple_transactions: Vec = (0..256) + .map(|_id| { + // packets are deserialized upon receiving, failed packets will not be + // forwarded; Therefore we need to create real packets here. + let key1 = Keypair::new(); + system_transaction::transfer( + &mint_keypair, + &key1.pubkey(), + genesis_config.rent.minimum_balance(0), + genesis_config.hash(), + ) + }) + .collect_vec(); + + let mut packets: Vec = simple_transactions + .iter() + .enumerate() + .map(|(packets_id, transaction)| { + let mut p = Packet::from_data(None, transaction).unwrap(); + p.meta.port = packets_id as u16; + p.meta.set_tracer(true); + DeserializedPacket::new(p).unwrap() + }) + .collect_vec(); + + // all packets are forwarded + { + let buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + buffered_packet_batches, + ThreadType::Transactions, + ); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + .. + } = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank.clone(), + &mut forward_packet_batches_by_accounts, + ); + assert_eq!(total_forwardable_packets, 256); + assert_eq!(total_tracer_packets_in_buffer, 256); + assert_eq!(total_forwardable_tracer_packets, 256); + + // packets in a batch are forwarded in arbitrary order; verify the ports match after + // sorting + let expected_ports: Vec<_> = (0..256).collect(); + let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts + .iter_batches() + .flat_map(|batch| { + batch + .get_forwardable_packets() + .into_iter() + .map(|p| p.meta.port) + }) + .collect(); + forwarded_ports.sort_unstable(); + assert_eq!(expected_ports, forwarded_ports); + } + + // some packets are forwarded + { + let num_already_forwarded = 16; + for packet in &mut packets[0..num_already_forwarded] { + packet.forwarded = true; + } + let buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + buffered_packet_batches, + ThreadType::Transactions, + ); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + .. + } = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank.clone(), + &mut forward_packet_batches_by_accounts, + ); + assert_eq!( + total_forwardable_packets, + packets.len() - num_already_forwarded + ); + assert_eq!(total_tracer_packets_in_buffer, packets.len()); + assert_eq!( + total_forwardable_tracer_packets, + packets.len() - num_already_forwarded + ); + } + + // some packets are invalid (already processed) + { + let num_already_processed = 16; + for tx in &simple_transactions[0..num_already_processed] { + assert_eq!(current_bank.process_transaction(tx), Ok(())); + } + let buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + buffered_packet_batches, + ThreadType::Transactions, + ); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + .. + } = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank, + &mut forward_packet_batches_by_accounts, + ); + assert_eq!( + total_forwardable_packets, + packets.len() - num_already_processed + ); + assert_eq!(total_tracer_packets_in_buffer, packets.len()); + assert_eq!( + total_forwardable_tracer_packets, + packets.len() - num_already_processed + ); + } + } + + #[test] + fn test_unprocessed_transaction_storage_insert() -> Result<(), Box> { + let keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + + let small_transfer = Packet::from_data( + None, + system_transaction::transfer(&keypair, &pubkey, 1, Hash::new_unique()), + )?; + let mut vote = Packet::from_data( + None, + new_vote_state_update_transaction( + VoteStateUpdate::default(), + Hash::new_unique(), + &keypair, + &vote_keypair, + &vote_keypair, + None, + ), + )?; + vote.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + let big_transfer = Packet::from_data( + None, + system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()), + )?; + + for thread_type in [ + ThreadType::Transactions, + ThreadType::Voting(VoteSource::Gossip), + ThreadType::Voting(VoteSource::Tpu), + ] { + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(100), + thread_type, + ); + transaction_storage.insert_batch(vec![ + ImmutableDeserializedPacket::new(small_transfer.clone(), None)?, + ImmutableDeserializedPacket::new(vote.clone(), None)?, + ImmutableDeserializedPacket::new(big_transfer.clone(), None)?, + ]); + let deserialized_packets = transaction_storage + .iter() + .map(|packet| packet.immutable_section().original_packet().clone()) + .collect_vec(); + assert_eq!(3, deserialized_packets.len()); + assert!(deserialized_packets.contains(&small_transfer)); + assert!(deserialized_packets.contains(&vote)); + assert!(deserialized_packets.contains(&big_transfer)); + } + + for vote_source in [VoteSource::Gossip, VoteSource::Tpu] { + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(LatestUnprocessedVotes::new()), + vote_source, + ); + transaction_storage.insert_batch(vec![ + ImmutableDeserializedPacket::new(small_transfer.clone(), None)?, + ImmutableDeserializedPacket::new(vote.clone(), None)?, + ImmutableDeserializedPacket::new(big_transfer.clone(), None)?, + ]); + assert_eq!(1, transaction_storage.len()); + } + Ok(()) + } } diff --git a/local-cluster/tests/local_cluster_slow_1.rs b/local-cluster/tests/local_cluster_slow_1.rs index 90e4f9bd4..3e63d1bc0 100644 --- a/local-cluster/tests/local_cluster_slow_1.rs +++ b/local-cluster/tests/local_cluster_slow_1.rs @@ -30,6 +30,7 @@ use { hash::Hash, pubkey::Pubkey, signature::Signer, + vote::state::VoteStateUpdate, }, solana_streamer::socket::SocketAddrSpace, solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, @@ -541,22 +542,31 @@ fn test_duplicate_shreds_broadcast_leader() { // root by this validator, but we're not concerned with lockout violations // by this validator so it's fine. let leader_blockstore = open_blockstore(&bad_leader_ledger_path); - let mut vote_slots: Vec = AncestorIterator::new_inclusive( + let mut vote_slots: Vec<(Slot, u32)> = AncestorIterator::new_inclusive( latest_vote_slot, &leader_blockstore, ) .take(MAX_LOCKOUT_HISTORY) + .zip(1..) .collect(); vote_slots.reverse(); - let vote_tx = vote_transaction::new_vote_transaction( - vote_slots, - vote_hash, - leader_vote_tx.message.recent_blockhash, - &node_keypair, - &vote_keypair, - &vote_keypair, - None, - ); + let mut vote = VoteStateUpdate::from(vote_slots); + let root = AncestorIterator::new_inclusive( + latest_vote_slot, + &leader_blockstore, + ) + .nth(MAX_LOCKOUT_HISTORY); + vote.root = root; + vote.hash = vote_hash; + let vote_tx = + vote_transaction::new_compact_vote_state_update_transaction( + vote, + leader_vote_tx.message.recent_blockhash, + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ); gossip_vote_index += 1; gossip_vote_index %= MAX_LOCKOUT_HISTORY; cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index af6b921d2..02dfe920c 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5088,6 +5088,7 @@ dependencies = [ "solana-frozen-abi-macro 1.15.0", "solana-measure", "solana-metrics", + "solana-perf", "solana-program-runtime", "solana-rayon-threadlimit", "solana-sdk 1.15.0", diff --git a/programs/vote/src/vote_transaction.rs b/programs/vote/src/vote_transaction.rs index 43cdbb11d..48316daff 100644 --- a/programs/vote/src/vote_transaction.rs +++ b/programs/vote/src/vote_transaction.rs @@ -72,3 +72,33 @@ pub fn new_vote_state_update_transaction( vote_tx.partial_sign(&[authorized_voter_keypair], blockhash); vote_tx } + +pub fn new_compact_vote_state_update_transaction( + vote_state_update: VoteStateUpdate, + blockhash: Hash, + node_keypair: &Keypair, + vote_keypair: &Keypair, + authorized_voter_keypair: &Keypair, + switch_proof_hash: Option, +) -> Transaction { + let vote_ix = if let Some(switch_proof_hash) = switch_proof_hash { + vote::instruction::compact_update_vote_state_switch( + &vote_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + vote_state_update, + switch_proof_hash, + ) + } else { + vote::instruction::compact_update_vote_state( + &vote_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + vote_state_update, + ) + }; + + let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); + + vote_tx.partial_sign(&[node_keypair], blockhash); + vote_tx.partial_sign(&[authorized_voter_keypair], blockhash); + vote_tx +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 0c1fff057..54a8cc34c 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -49,6 +49,7 @@ solana-frozen-abi = { path = "../frozen-abi", version = "=1.15.0" } solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.15.0" } solana-measure = { path = "../measure", version = "=1.15.0" } solana-metrics = { path = "../metrics", version = "=1.15.0" } +solana-perf = { path = "../perf", version = "=1.15.0" } solana-program-runtime = { path = "../program-runtime", version = "=1.15.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.15.0" } solana-sdk = { path = "../sdk", version = "=1.15.0" } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index fe2e1be44..ff42f6175 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -83,6 +83,7 @@ use { }, solana_measure::{measure, measure::Measure}, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, + solana_perf::perf_libs, solana_program_runtime::{ accounts_data_meter::MAX_ACCOUNTS_DATA_LEN, compute_budget::{self, ComputeBudget}, @@ -105,7 +106,7 @@ use { clock::{ BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, INITIAL_RENT_EPOCH, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, - SECONDS_PER_DAY, + MAX_TRANSACTION_FORWARDING_DELAY_GPU, SECONDS_PER_DAY, }, ed25519_program, epoch_info::EpochInfo, @@ -141,7 +142,7 @@ use { sysvar::{self, Sysvar, SysvarId}, timing::years_as_slots, transaction::{ - MessageHash, Result, SanitizedTransaction, Transaction, TransactionError, + self, MessageHash, Result, SanitizedTransaction, Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS, }, transaction_context::{ @@ -7736,6 +7737,36 @@ impl Bank { .epoch_accounts_hash_manager .try_get_epoch_accounts_hash() } + + /// Checks a batch of sanitized transactions again bank for age and status + pub fn check_transactions_with_forwarding_delay( + &self, + transactions: &[SanitizedTransaction], + filter: &[transaction::Result<()>], + forward_transactions_to_leader_at_slot_offset: u64, + ) -> Vec { + let mut error_counters = TransactionErrorMetrics::default(); + // The following code also checks if the blockhash for a transaction is too old + // The check accounts for + // 1. Transaction forwarding delay + // 2. The slot at which the next leader will actually process the transaction + // Drop the transaction if it will expire by the time the next node receives and processes it + let api = perf_libs::api(); + let max_tx_fwd_delay = if api.is_none() { + MAX_TRANSACTION_FORWARDING_DELAY + } else { + MAX_TRANSACTION_FORWARDING_DELAY_GPU + }; + + self.check_transactions( + transactions, + filter, + (MAX_PROCESSING_AGE) + .saturating_sub(max_tx_fwd_delay) + .saturating_sub(forward_transactions_to_leader_at_slot_offset as usize), + &mut error_counters, + ) + } } /// Compute how much an account has changed size. This function is useful when the data size delta diff --git a/sdk/program/src/clock.rs b/sdk/program/src/clock.rs index 408fdebe2..a28a73510 100644 --- a/sdk/program/src/clock.rs +++ b/sdk/program/src/clock.rs @@ -108,6 +108,10 @@ pub const MAX_TRANSACTION_FORWARDING_DELAY_GPU: usize = 2; /// More delay is expected if CUDA is not enabled (as signature verification takes longer) pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 6; +/// Transaction forwarding, which leader to forward to and how long to hold +pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; +pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; + /// The unit of time given to a leader for encoding a block. /// /// It is some some number of _ticks_ long.