diff --git a/Cargo.lock b/Cargo.lock index 34a7b912e8..f4b7dd2a67 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6159,6 +6159,7 @@ dependencies = [ "solana-logger 1.15.0", "solana-measure", "solana-metrics", + "solana-perf", "solana-program-runtime", "solana-rayon-threadlimit", "solana-sdk 1.15.0", diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index ae0bd6b07b..5ad2d81d94 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -13,6 +13,7 @@ use { leader_slot_banking_stage_metrics::LeaderSlotMetricsTracker, qos_service::QosService, unprocessed_packet_batches::*, + unprocessed_transaction_storage::{ThreadType, UnprocessedTransactionStorage}, }, solana_entry::entry::{next_hash, Entry}, solana_gossip::cluster_info::{ClusterInfo, Node}, @@ -83,8 +84,10 @@ fn bench_consume_buffered(bencher: &mut Bencher) { let transactions = vec![tx; 4194304]; let batches = transactions_to_deserialized_packets(&transactions).unwrap(); let batches_len = batches.len(); - let mut transaction_buffer = - UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len); + let mut transaction_buffer = UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter(batches.into_iter(), 2 * batches_len), + ThreadType::Transactions, + ); let (s, _r) = unbounded(); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. @@ -94,7 +97,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { std::u128::MAX, &poh_recorder, &mut transaction_buffer, - None, + &None, &s, None::>, &BankingStageStats::default(), diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index 1fa13dde7e..5ba3d903b5 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -6,8 +6,11 @@ extern crate test; use { rand::distributions::{Distribution, Uniform}, solana_core::{ - banking_stage::*, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, unprocessed_packet_batches::*, + unprocessed_transaction_storage::{ + ThreadType, UnprocessedTransactionStorage, UNPROCESSED_BUFFER_STEP_SIZE, + }, }, solana_measure::measure::Measure, solana_perf::packet::{Packet, PacketBatch}, @@ -104,7 +107,7 @@ fn insert_packet_batches( #[allow(clippy::unit_arg)] fn bench_packet_clone(bencher: &mut Bencher) { let batch_count = 1000; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; let packet_batches: Vec = (0..batch_count) .map(|_| build_packet_batch(packet_per_batch_count, None).0) @@ -134,9 +137,9 @@ fn bench_packet_clone(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_000; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; bencher.iter(|| { insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, false); @@ -148,9 +151,9 @@ fn bench_unprocessed_packet_batches_within_limit(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_100; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; // this is the worst scenario testing: all batches are uniformly populated with packets from // priority 100..228, so in order to drop a batch, algo will have to drop all packets that has @@ -167,9 +170,9 @@ fn bench_unprocessed_packet_batches_beyond_limit(bencher: &mut Bencher) { #[bench] #[ignore] fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_000; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; bencher.iter(|| { insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); @@ -181,9 +184,9 @@ fn bench_unprocessed_packet_batches_randomized_within_limit(bencher: &mut Benche #[bench] #[ignore] fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Bencher) { - let buffer_capacity = 1_000 * 128; + let buffer_capacity = 1_000 * UNPROCESSED_BUFFER_STEP_SIZE; let batch_count = 1_100; - let packet_per_batch_count = 128; + let packet_per_batch_count = UNPROCESSED_BUFFER_STEP_SIZE; bencher.iter(|| { insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); @@ -198,7 +201,6 @@ fn buffer_iter_desc_and_forward( ) { solana_logger::setup(); let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); - let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); let bank = Bank::new_for_tests(&genesis_config); let bank_forks = BankForks::new(bank); @@ -226,13 +228,15 @@ fn buffer_iter_desc_and_forward( // forward whole buffer { + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + unprocessed_packet_batches, + ThreadType::Transactions, + ); let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let _ = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut unprocessed_packet_batches, + let _ = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank, &mut forward_packet_batches_by_accounts, - 128usize, ); } } diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 70105973de..bb27050ce3 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -6,6 +6,7 @@ use { crate::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::ImmutableDeserializedPacket, + latest_unprocessed_votes::{LatestUnprocessedVotes, VoteSource}, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, leader_slot_banking_stage_timing_metrics::{ LeaderExecuteAndCommitTimings, RecordTransactionsTimings, @@ -14,7 +15,10 @@ use { qos_service::QosService, sigverify::SigverifyTracerPacketStats, tracer_packet_stats::TracerPacketStats, - unprocessed_packet_batches::{self, *}, + unprocessed_packet_batches::*, + unprocessed_transaction_storage::{ + ThreadType, UnprocessedTransactionStorage, UNPROCESSED_BUFFER_STEP_SIZE, + }, }, core::iter::repeat, crossbeam_channel::{ @@ -22,7 +26,6 @@ use { }, histogram::Histogram, itertools::Itertools, - min_max_heap::MinMaxHeap, solana_entry::entry::hash_transactions, solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ @@ -33,7 +36,6 @@ use { solana_perf::{ data_budget::DataBudget, packet::{Packet, PacketBatch, PACKETS_PER_BATCH}, - perf_libs, }, solana_poh::poh_recorder::{BankStart, PohRecorder, PohRecorderError, TransactionRecorder}, solana_program_runtime::timings::ExecuteTimings, @@ -51,9 +53,10 @@ use { }, solana_sdk::{ clock::{ - Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, - MAX_TRANSACTION_FORWARDING_DELAY_GPU, + Slot, DEFAULT_TICKS_PER_SLOT, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + HOLD_TRANSACTIONS_SLOT_OFFSET, MAX_PROCESSING_AGE, }, + feature_set::allow_votes_to_directly_update_vote_state, pubkey::Pubkey, saturating_add_assign, timing::{duration_as_ms, timestamp, AtomicInterval}, @@ -77,10 +80,6 @@ use { }, }; -/// Transaction forwarding -pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; -pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; - // Fixed thread size seems to be fastest on GCP setup pub const NUM_THREADS: u32 = 6; @@ -91,7 +90,6 @@ const MAX_NUM_TRANSACTIONS_PER_BATCH: usize = 64; const NUM_VOTE_PROCESSING_THREADS: u32 = 2; const MIN_THREADS_BANKING: u32 = 1; const MIN_TOTAL_THREADS: u32 = NUM_VOTE_PROCESSING_THREADS + MIN_THREADS_BANKING; -pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const SLOT_BOUNDARY_CHECK_PERIOD: Duration = Duration::from_millis(10); pub type BankingPacketBatch = (Vec, Option); @@ -149,7 +147,6 @@ pub struct BankingStageStats { pub(crate) dropped_duplicated_packets_count: AtomicUsize, newly_buffered_packets_count: AtomicUsize, current_buffered_packets_count: AtomicUsize, - current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, forwarded_transaction_count: AtomicUsize, @@ -187,9 +184,6 @@ impl BankingStageStats { .load(Ordering::Relaxed) as u64 + self.newly_buffered_packets_count.load(Ordering::Relaxed) as u64 + self.current_buffered_packets_count.load(Ordering::Relaxed) as u64 - + self - .current_buffered_packet_batches_count - .load(Ordering::Relaxed) as u64 + self.rebuffered_packets_count.load(Ordering::Relaxed) as u64 + self.consumed_buffered_packets_count.load(Ordering::Relaxed) as u64 + self @@ -240,12 +234,6 @@ impl BankingStageStats { self.newly_buffered_packets_count.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "current_buffered_packet_batches_count", - self.current_buffered_packet_batches_count - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "current_buffered_packets_count", self.current_buffered_packets_count @@ -442,21 +430,57 @@ impl BankingStage { let data_budget = Arc::new(DataBudget::default()); let batch_limit = TOTAL_BUFFERED_PACKETS / ((num_threads - NUM_VOTE_PROCESSING_THREADS) as usize); + // Keeps track of extraneous vote transactions for the vote threads + let latest_unprocessed_votes = Arc::new(LatestUnprocessedVotes::new()); + let should_split_voting_threads = bank_forks + .read() + .map(|bank_forks| { + let bank = bank_forks.root_bank(); + bank.feature_set + .is_active(&allow_votes_to_directly_update_vote_state::id()) + }) + .unwrap_or(false); // Many banks that process transactions in parallel. let bank_thread_hdls: Vec> = (0..num_threads) .map(|i| { - let (verified_receiver, forward_option) = match i { - 0 => { - // Disable forwarding of vote transactions - // from gossip. Note - votes can also arrive from tpu - (verified_vote_receiver.clone(), ForwardOption::NotForward) - } - 1 => ( - tpu_verified_vote_receiver.clone(), - ForwardOption::ForwardTpuVote, - ), - _ => (verified_receiver.clone(), ForwardOption::ForwardTransaction), - }; + let (verified_receiver, unprocessed_transaction_storage) = + match (i, should_split_voting_threads) { + (0, false) => ( + verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(batch_limit), + ThreadType::Voting(VoteSource::Gossip), + ), + ), + (0, true) => ( + verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_vote_storage( + latest_unprocessed_votes.clone(), + VoteSource::Gossip, + ), + ), + (1, false) => ( + tpu_verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(batch_limit), + ThreadType::Voting(VoteSource::Tpu), + ), + ), + (1, true) => ( + tpu_verified_vote_receiver.clone(), + UnprocessedTransactionStorage::new_vote_storage( + latest_unprocessed_votes.clone(), + VoteSource::Tpu, + ), + ), + _ => ( + verified_receiver.clone(), + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(batch_limit), + ThreadType::Transactions, + ), + ), + }; let mut packet_deserializer = PacketDeserializer::new(verified_receiver); let poh_recorder = poh_recorder.clone(); @@ -476,9 +500,7 @@ impl BankingStage { &poh_recorder, &cluster_info, &mut recv_start, - forward_option, i, - batch_limit, transaction_status_sender, gossip_vote_sender, &data_budget, @@ -486,6 +508,7 @@ impl BankingStage { log_messages_bytes_limit, connection_cache, &bank_forks, + unprocessed_transaction_storage, ); }) .unwrap() @@ -589,13 +612,112 @@ impl BankingStage { (Ok(()), packet_vec_len, Some(leader_pubkey)) } + #[allow(clippy::too_many_arguments)] + fn do_process_packets( + max_tx_ingestion_ns: u128, + poh_recorder: &Arc>, + slot_metrics_tracker: &mut LeaderSlotMetricsTracker, + recorder: &TransactionRecorder, + transaction_status_sender: &Option, + gossip_vote_sender: &ReplayVoteSender, + banking_stage_stats: &BankingStageStats, + qos_service: &QosService, + log_messages_bytes_limit: Option, + consumed_buffered_packets_count: &mut usize, + rebuffered_packet_count: &mut usize, + reached_end_of_slot: &mut bool, + test_fn: &Option, + packets_to_process: &Vec>, + ) -> Option> { + // TODO: Right now we iterate through buffer and try the highest weighted transaction once + // but we should retry the highest weighted transactions more often. + let (bank_start, poh_recorder_lock_time) = measure!( + poh_recorder.read().unwrap().bank_start(), + "poh_recorder.read", + ); + slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( + poh_recorder_lock_time.as_us(), + ); + + let packets_to_process_len = packets_to_process.len(); + if let Some(BankStart { + working_bank, + bank_creation_time, + }) = bank_start + { + let (process_transactions_summary, process_packets_transactions_time) = measure!( + Self::process_packets_transactions( + &working_bank, + &bank_creation_time, + recorder, + packets_to_process.iter().map(|p| &**p), + transaction_status_sender, + gossip_vote_sender, + banking_stage_stats, + qos_service, + slot_metrics_tracker, + log_messages_bytes_limit + ), + "process_packets_transactions", + ); + slot_metrics_tracker.increment_process_packets_transactions_us( + process_packets_transactions_time.as_us(), + ); + + let ProcessTransactionsSummary { + reached_max_poh_height, + retryable_transaction_indexes, + .. + } = process_transactions_summary; + + if reached_max_poh_height + || !Bank::should_bank_still_be_processing_txs( + &bank_creation_time, + max_tx_ingestion_ns, + ) + { + *reached_end_of_slot = true; + } + + // The difference between all transactions passed to execution and the ones that + // are retryable were the ones that were either: + // 1) Committed into the block + // 2) Dropped without being committed because they had some fatal error (too old, + // duplicate signature, etc.) + // + // Note: This assumes that every packet deserializes into one transaction! + *consumed_buffered_packets_count += + packets_to_process_len.saturating_sub(retryable_transaction_indexes.len()); + + // Out of the buffered packets just retried, collect any still unprocessed + // transactions in this batch for forwarding + *rebuffered_packet_count += retryable_transaction_indexes.len(); + if let Some(test_fn) = test_fn { + test_fn(); + } + + slot_metrics_tracker + .increment_retryable_packets_count(retryable_transaction_indexes.len() as u64); + + Some(retryable_transaction_indexes) + } else if *reached_end_of_slot { + None + } else { + // mark as end-of-slot to avoid aggressively lock poh for the remaining for + // packet batches in buffer + *reached_end_of_slot = true; + + None + } + } + #[allow(clippy::too_many_arguments)] pub fn consume_buffered_packets( _my_pubkey: &Pubkey, max_tx_ingestion_ns: u128, poh_recorder: &Arc>, - buffered_packet_batches: &mut UnprocessedPacketBatches, - transaction_status_sender: Option, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, test_fn: Option, banking_stage_stats: &BankingStageStats, @@ -607,122 +729,38 @@ impl BankingStage { ) { let mut rebuffered_packet_count = 0; let mut consumed_buffered_packets_count = 0; - let buffered_packets_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); let mut reached_end_of_slot = false; - let mut retryable_packets = { - let capacity = buffered_packet_batches.capacity(); - std::mem::replace( - &mut buffered_packet_batches.packet_priority_queue, - MinMaxHeap::with_capacity(capacity), - ) - }; - let retryable_packets: MinMaxHeap> = retryable_packets - .drain_desc() - .chunks(num_packets_to_process_per_iteration) - .into_iter() - .flat_map(|packets_to_process| { - let packets_to_process = packets_to_process.into_iter().collect_vec(); - // TODO: Right now we iterate through buffer and try the highest weighted transaction once - // but we should retry the highest weighted transactions more often. - let (bank_start, poh_recorder_lock_time) = measure!( - poh_recorder.read().unwrap().bank_start(), - "poh_recorder.read", - ); - slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( - poh_recorder_lock_time.as_us(), - ); + let num_packets_to_process = unprocessed_transaction_storage.len(); + let bank = poh_recorder.read().unwrap().bank(); - let packets_to_process_len = packets_to_process.len(); - if let Some(BankStart { - working_bank, - bank_creation_time, - }) = bank_start - { - let (process_transactions_summary, process_packets_transactions_time) = - measure!( - Self::process_packets_transactions( - &working_bank, - &bank_creation_time, - recorder, - packets_to_process.iter().map(|p| &**p), - transaction_status_sender.clone(), - gossip_vote_sender, - banking_stage_stats, - qos_service, - slot_metrics_tracker, - log_messages_bytes_limit - ), - "process_packets_transactions", - ); - slot_metrics_tracker.increment_process_packets_transactions_us( - process_packets_transactions_time.as_us(), - ); - - let ProcessTransactionsSummary { - reached_max_poh_height, - retryable_transaction_indexes, - .. - } = process_transactions_summary; - - if reached_max_poh_height - || !Bank::should_bank_still_be_processing_txs( - &bank_creation_time, - max_tx_ingestion_ns, - ) - { - reached_end_of_slot = true; - } - - // The difference between all transactions passed to execution and the ones that - // are retryable were the ones that were either: - // 1) Committed into the block - // 2) Dropped without being committed because they had some fatal error (too old, - // duplicate signature, etc.) - // - // Note: This assumes that every packet deserializes into one transaction! - consumed_buffered_packets_count += - packets_to_process_len.saturating_sub(retryable_transaction_indexes.len()); - - // Out of the buffered packets just retried, collect any still unprocessed - // transactions in this batch for forwarding - rebuffered_packet_count += retryable_transaction_indexes.len(); - if let Some(test_fn) = &test_fn { - test_fn(); - } - - slot_metrics_tracker.increment_retryable_packets_count( - retryable_transaction_indexes.len() as u64, - ); - - let result = retryable_transaction_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec(); - - // Remove the non-retryable packets, packets that were either: - // 1) Successfully processed - // 2) Failed but not retryable - Self::remove_non_retained_packets(buffered_packet_batches, &packets_to_process, &retryable_transaction_indexes); - - result - } else if reached_end_of_slot { - packets_to_process - } else { - // mark as end-of-slot to avoid aggressively lock poh for the remaining for - // packet batches in buffer - reached_end_of_slot = true; - - packets_to_process - } - }) - .collect(); - - buffered_packet_batches.packet_priority_queue = retryable_packets; + unprocessed_transaction_storage.process_packets( + bank, + num_packets_to_process_per_iteration, + |packets_to_process| { + Self::do_process_packets( + max_tx_ingestion_ns, + poh_recorder, + slot_metrics_tracker, + recorder, + transaction_status_sender, + gossip_vote_sender, + banking_stage_stats, + qos_service, + log_messages_bytes_limit, + &mut consumed_buffered_packets_count, + &mut rebuffered_packet_count, + &mut reached_end_of_slot, + &test_fn, + packets_to_process, + ) + }, + ); if reached_end_of_slot { - slot_metrics_tracker - .set_end_of_slot_unprocessed_buffer_len(buffered_packet_batches.len() as u64); + slot_metrics_tracker.set_end_of_slot_unprocessed_buffer_len( + unprocessed_transaction_storage.len() as u64, + ); // We've hit the end of this slot, no need to perform more processing, // Packet filtering will be done before forwarding. @@ -732,17 +770,12 @@ impl BankingStage { debug!( "@{:?} done processing buffered batches: {} time: {:?}ms tx count: {} tx/s: {}", timestamp(), - buffered_packets_len, + num_packets_to_process, proc_start.as_ms(), consumed_buffered_packets_count, (consumed_buffered_packets_count as f32) / (proc_start.as_s()) ); - // Assert unprocessed queue is still consistent - assert_eq!( - buffered_packet_batches.packet_priority_queue.len(), - buffered_packet_batches.message_hash_to_transaction.len() - ); banking_stage_stats .consume_buffered_packets_elapsed .fetch_add(proc_start.as_us(), Ordering::Relaxed); @@ -793,9 +826,8 @@ impl BankingStage { socket: &UdpSocket, poh_recorder: &Arc>, cluster_info: &ClusterInfo, - buffered_packet_batches: &mut UnprocessedPacketBatches, - forward_option: &ForwardOption, - transaction_status_sender: Option, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, banking_stage_stats: &BankingStageStats, recorder: &TransactionRecorder, @@ -807,6 +839,9 @@ impl BankingStage { tracer_packet_stats: &mut TracerPacketStats, bank_forks: &Arc>, ) { + if unprocessed_transaction_storage.should_not_process() { + return; + } let ((metrics_action, decision), make_decision_time) = measure!( { let bank_start; @@ -856,7 +891,7 @@ impl BankingStage { my_pubkey, max_tx_ingestion_ns, poh_recorder, - buffered_packet_batches, + unprocessed_transaction_storage, transaction_status_sender, gossip_vote_sender, None::>, @@ -875,9 +910,8 @@ impl BankingStage { BufferedPacketsDecision::Forward => { let (_, forward_time) = measure!( Self::handle_forwarding( - forward_option, cluster_info, - buffered_packet_batches, + unprocessed_transaction_storage, poh_recorder, socket, false, @@ -898,9 +932,8 @@ impl BankingStage { BufferedPacketsDecision::ForwardAndHold => { let (_, forward_and_hold_time) = measure!( Self::handle_forwarding( - forward_option, cluster_info, - buffered_packet_batches, + unprocessed_transaction_storage, poh_recorder, socket, true, @@ -923,9 +956,8 @@ impl BankingStage { #[allow(clippy::too_many_arguments)] fn handle_forwarding( - forward_option: &ForwardOption, cluster_info: &ClusterInfo, - buffered_packet_batches: &mut UnprocessedPacketBatches, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, poh_recorder: &Arc>, socket: &UdpSocket, hold: bool, @@ -936,12 +968,7 @@ impl BankingStage { tracer_packet_stats: &mut TracerPacketStats, bank_forks: &Arc>, ) { - if let ForwardOption::NotForward = forward_option { - if !hold { - buffered_packet_batches.clear(); - } - return; - } + let forward_option = unprocessed_transaction_storage.forward_option(); // get current root bank from bank_forks, use it to sanitize transaction and // load all accounts from address loader; @@ -952,12 +979,11 @@ impl BankingStage { // sanitize and filter packets that are no longer valid (could be too old, a duplicate of something // already processed), then add to forwarding buffer. - let filter_forwarding_result = Self::filter_and_forward_with_account_limits( - ¤t_bank, - buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); + let filter_forwarding_result = unprocessed_transaction_storage + .filter_forwardable_packets_and_add_batches( + current_bank, + &mut forward_packet_batches_by_accounts, + ); slot_metrics_tracker.increment_transactions_from_packets_us( filter_forwarding_result.total_packet_conversion_us, ); @@ -982,7 +1008,7 @@ impl BankingStage { let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = Self::forward_buffered_packets( connection_cache, - forward_option, + &forward_option, cluster_info, poh_recorder, socket, @@ -1021,307 +1047,17 @@ impl BankingStage { tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( filter_forwarding_result.total_tracer_packets_in_buffer, ); - buffered_packet_batches.clear(); + unprocessed_transaction_storage.clear_forwarded_packets(); } } - /// Filter out packets that fail to sanitize, or are no longer valid (could be - /// too old, a duplicate of something already processed). Doing this in batches to avoid - /// checking bank's blockhash and status cache per transaction which could be bad for performance. - /// Added valid and sanitized packets to forwarding queue. - pub fn filter_and_forward_with_account_limits( - bank: &Arc, - buffered_packet_batches: &mut UnprocessedPacketBatches, - forward_buffer: &mut ForwardPacketBatchesByAccounts, - batch_size: usize, - ) -> FilterForwardingResults { - let mut total_forwardable_tracer_packets: usize = 0; - let mut total_tracer_packets_in_buffer: usize = 0; - let mut total_forwardable_packets: usize = 0; - let mut total_packet_conversion_us: u64 = 0; - let mut total_filter_packets_us: u64 = 0; - let mut dropped_tx_before_forwarding_count: usize = 0; - - let mut original_priority_queue = Self::swap_priority_queue(buffered_packet_batches); - - // indicates if `forward_buffer` still accept more packets, see details at - // `ForwardPacketBatchesByAccounts.rs`. - let mut accepting_packets = true; - // batch iterate through buffered_packet_batches in desc priority order - let retained_priority_queue: MinMaxHeap<_> = original_priority_queue - .drain_desc() - .chunks(batch_size) - .into_iter() - .flat_map(|packets_to_process| { - let packets_to_process = packets_to_process.into_iter().collect_vec(); - - // Vec of same size of `packets_to_process`, each indicates - // corresponding packet is tracer packet. - let tracer_packet_indexes = packets_to_process - .iter() - .map(|deserialized_packet| { - deserialized_packet - .original_packet() - .meta - .is_tracer_packet() - }) - .collect::>(); - saturating_add_assign!( - total_tracer_packets_in_buffer, - tracer_packet_indexes - .iter() - .filter(|is_tracer| **is_tracer) - .count() - ); - - if accepting_packets { - let ( - (sanitized_transactions, transaction_to_packet_indexes), - packet_conversion_time, - ): ((Vec, Vec), _) = measure!( - Self::sanitize_unforwarded_packets( - buffered_packet_batches, - &packets_to_process, - bank, - ), - "sanitize_packet", - ); - saturating_add_assign!( - total_packet_conversion_us, - packet_conversion_time.as_us() - ); - - let (forwardable_transaction_indexes, filter_packets_time) = measure!( - Self::filter_invalid_transactions(&sanitized_transactions, bank,), - "filter_packets", - ); - saturating_add_assign!(total_filter_packets_us, filter_packets_time.as_us()); - - for forwardable_transaction_index in &forwardable_transaction_indexes { - saturating_add_assign!(total_forwardable_packets, 1); - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - if tracer_packet_indexes[forwardable_packet_index] { - saturating_add_assign!(total_forwardable_tracer_packets, 1); - } - } - - let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer( - forward_buffer, - &packets_to_process, - &sanitized_transactions, - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - &mut dropped_tx_before_forwarding_count, - ); - accepting_packets = - accepted_packet_indexes.len() == forwardable_transaction_indexes.len(); - - UnprocessedPacketBatches::mark_accepted_packets_as_forwarded( - buffered_packet_batches, - &packets_to_process, - &accepted_packet_indexes, - ); - - Self::collect_retained_packets( - buffered_packet_batches, - &packets_to_process, - &Self::prepare_filtered_packet_indexes( - &transaction_to_packet_indexes, - &forwardable_transaction_indexes, - ), - ) - } else { - // skip sanitizing and filtering if not longer able to add more packets for forwarding - saturating_add_assign!( - dropped_tx_before_forwarding_count, - packets_to_process.len() - ); - packets_to_process - } - }) - .collect(); - - // replace packet priority queue - buffered_packet_batches.packet_priority_queue = retained_priority_queue; - - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - dropped_tx_before_forwarding_count - ); - - FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - total_packet_conversion_us, - total_filter_packets_us, - } - } - - /// Take buffered_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. - fn swap_priority_queue( - buffered_packet_batches: &mut UnprocessedPacketBatches, - ) -> MinMaxHeap> { - let capacity = buffered_packet_batches.capacity(); - std::mem::replace( - &mut buffered_packet_batches.packet_priority_queue, - MinMaxHeap::with_capacity(capacity), - ) - } - - /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. - fn sanitize_unforwarded_packets( - buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Arc], - bank: &Arc, - ) -> (Vec, Vec) { - // Get ref of ImmutableDeserializedPacket - let deserialized_packets = packets_to_process.iter().map(|p| &**p); - let (transactions, transaction_to_packet_indexes): (Vec, Vec) = - deserialized_packets - .enumerate() - .filter_map(|(packet_index, deserialized_packet)| { - if !buffered_packet_batches.is_forwarded(deserialized_packet) { - unprocessed_packet_batches::transaction_from_deserialized_packet( - deserialized_packet, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ) - .map(|transaction| (transaction, packet_index)) - } else { - None - } - }) - .unzip(); - - // report metrics - inc_new_counter_info!("banking_stage-packet_conversion", 1); - let unsanitized_packets_filtered_count = - packets_to_process.len().saturating_sub(transactions.len()); - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - unsanitized_packets_filtered_count - ); - - (transactions, transaction_to_packet_indexes) - } - - /// Checks sanitized transactions against bank, returns valid transaction indexes - fn filter_invalid_transactions( - transactions: &[SanitizedTransaction], - bank: &Arc, - ) -> Vec { - let filter = vec![Ok(()); transactions.len()]; - let results = Self::bank_check_transactions(bank, transactions, &filter); - // report metrics - let filtered_out_transactions_count = transactions.len().saturating_sub(results.len()); - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - filtered_out_transactions_count - ); - - results - .iter() - .enumerate() - .filter_map( - |(tx_index, (result, _))| if result.is_ok() { Some(tx_index) } else { None }, - ) - .collect_vec() - } - - fn prepare_filtered_packet_indexes( - transaction_to_packet_indexes: &[usize], - retained_transaction_indexes: &[usize], - ) -> Vec { - retained_transaction_indexes - .iter() - .map(|tx_index| transaction_to_packet_indexes[*tx_index]) - .collect_vec() - } - - fn collect_retained_packets( - buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) -> Vec> { - Self::remove_non_retained_packets( - buffered_packet_batches, - packets_to_process, - retained_packet_indexes, - ); - retained_packet_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() - } - - /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have - /// been removed from UnprocessedPacketBatches.packet_priority_queue - fn remove_non_retained_packets( - buffered_packet_batches: &mut UnprocessedPacketBatches, - packets_to_process: &[Arc], - retained_packet_indexes: &[usize], - ) { - Self::filter_processed_packets( - retained_packet_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - buffered_packet_batches - .message_hash_to_transaction - .remove(processed_packet.message_hash()); - } - }, - ) - } - - /// try to add filtered forwardable and valid packets to forward buffer; - /// returns vector of packet indexes that were accepted for forwarding. - fn add_filtered_packets_to_forward_buffer( - forward_buffer: &mut ForwardPacketBatchesByAccounts, - packets_to_process: &[Arc], - transactions: &[SanitizedTransaction], - transaction_to_packet_indexes: &[usize], - forwardable_transaction_indexes: &[usize], - dropped_tx_before_forwarding_count: &mut usize, - ) -> Vec { - let mut added_packets_count: usize = 0; - let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len()); - for forwardable_transaction_index in forwardable_transaction_indexes { - let sanitized_transaction = &transactions[*forwardable_transaction_index]; - let forwardable_packet_index = - transaction_to_packet_indexes[*forwardable_transaction_index]; - let immutable_deserialized_packet = - packets_to_process[forwardable_packet_index].clone(); - if !forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet) - { - break; - } - accepted_packet_indexes.push(forwardable_packet_index); - saturating_add_assign!(added_packets_count, 1); - } - - // count the packets not being forwarded in this batch - saturating_add_assign!( - *dropped_tx_before_forwarding_count, - forwardable_transaction_indexes.len() - added_packets_count - ); - - accepted_packet_indexes - } - #[allow(clippy::too_many_arguments)] fn process_loop( packet_deserializer: &mut PacketDeserializer, poh_recorder: &Arc>, cluster_info: &ClusterInfo, recv_start: &mut Instant, - forward_option: ForwardOption, id: u32, - batch_limit: usize, transaction_status_sender: Option, gossip_vote_sender: ReplayVoteSender, data_budget: &DataBudget, @@ -1329,10 +1065,10 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: &Arc>, + mut unprocessed_transaction_storage: UnprocessedTransactionStorage, ) { let recorder = poh_recorder.read().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - let mut buffered_packet_batches = UnprocessedPacketBatches::with_capacity(batch_limit); let mut banking_stage_stats = BankingStageStats::new(id); let mut tracer_packet_stats = TracerPacketStats::new(id); let qos_service = QosService::new(cost_model, id); @@ -1342,7 +1078,7 @@ impl BankingStage { loop { let my_pubkey = cluster_info.id(); - if !buffered_packet_batches.is_empty() + if !unprocessed_transaction_storage.is_empty() || last_metrics_update.elapsed() >= SLOT_BOUNDARY_CHECK_PERIOD { let (_, process_buffered_packets_time) = measure!( @@ -1351,9 +1087,8 @@ impl BankingStage { &socket, poh_recorder, cluster_info, - &mut buffered_packet_batches, - &forward_option, - transaction_status_sender.clone(), + &mut unprocessed_transaction_storage, + &transaction_status_sender, &gossip_vote_sender, &banking_stage_stats, &recorder, @@ -1374,7 +1109,8 @@ impl BankingStage { tracer_packet_stats.report(1000); - let recv_timeout = if !buffered_packet_batches.is_empty() { + // Gossip thread will almost always not wait because the transaction storage will most likely not be empty + let recv_timeout = if !unprocessed_transaction_storage.is_empty() { // If there are buffered packets, run the equivalent of try_recv to try reading more // packets. This prevents starving BankingStage::consume_buffered_packets due to // buffered_packet_batches containing transactions that exceed the cost model for @@ -1391,7 +1127,7 @@ impl BankingStage { recv_start, recv_timeout, id, - &mut buffered_packet_batches, + &mut unprocessed_transaction_storage, &mut banking_stage_stats, &mut tracer_packet_stats, &mut slot_metrics_tracker, @@ -1470,7 +1206,7 @@ impl BankingStage { bank: &Arc, poh: &TransactionRecorder, batch: &TransactionBatch, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, log_messages_bytes_limit: Option, ) -> ExecuteAndCommitTransactionsOutput { @@ -1712,7 +1448,7 @@ impl BankingStage { txs: &[SanitizedTransaction], poh: &TransactionRecorder, chunk_offset: usize, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, log_messages_bytes_limit: Option, @@ -1905,7 +1641,7 @@ impl BankingStage { bank_creation_time: &Instant, transactions: &[SanitizedTransaction], poh: &TransactionRecorder, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &ReplayVoteSender, qos_service: &QosService, log_messages_bytes_limit: Option, @@ -1937,7 +1673,7 @@ impl BankingStage { &transactions[chunk_start..chunk_end], poh, chunk_start, - transaction_status_sender.clone(), + transaction_status_sender, gossip_vote_sender, qos_service, log_messages_bytes_limit, @@ -2057,35 +1793,6 @@ impl BankingStage { .collect_vec() } - /// Checks a batch of sanitized transactions again bank for age and status - fn bank_check_transactions( - bank: &Arc, - transactions: &[SanitizedTransaction], - filter: &[transaction::Result<()>], - ) -> Vec { - let mut error_counters = TransactionErrorMetrics::default(); - // The following code also checks if the blockhash for a transaction is too old - // The check accounts for - // 1. Transaction forwarding delay - // 2. The slot at which the next leader will actually process the transaction - // Drop the transaction if it will expire by the time the next node receives and processes it - let api = perf_libs::api(); - let max_tx_fwd_delay = if api.is_none() { - MAX_TRANSACTION_FORWARDING_DELAY - } else { - MAX_TRANSACTION_FORWARDING_DELAY_GPU - }; - - bank.check_transactions( - transactions, - filter, - (MAX_PROCESSING_AGE) - .saturating_sub(max_tx_fwd_delay) - .saturating_sub(FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET as usize), - &mut error_counters, - ) - } - /// This function filters pending packets that are still valid /// # Arguments /// * `transactions` - a batch of transactions deserialized from packets @@ -2100,37 +1807,22 @@ impl BankingStage { let filter = Self::prepare_filter_for_pending_transactions(transactions.len(), pending_indexes); - let results = Self::bank_check_transactions(bank, transactions, &filter); + let results = bank.check_transactions_with_forwarding_delay( + transactions, + &filter, + FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + ); Self::filter_valid_transaction_indexes(&results, transaction_to_packet_indexes) } - fn filter_processed_packets<'a, F>( - retryable_transaction_indexes: impl Iterator, - mut f: F, - ) where - F: FnMut(usize, usize), - { - let mut prev_retryable_index = 0; - for (i, retryable_index) in retryable_transaction_indexes.enumerate() { - let start = if i == 0 { 0 } else { prev_retryable_index + 1 }; - - let end = *retryable_index; - prev_retryable_index = *retryable_index; - - if start < end { - f(start, end) - } - } - } - #[allow(clippy::too_many_arguments)] fn process_packets_transactions<'a>( bank: &'a Arc, bank_creation_time: &Instant, poh: &'a TransactionRecorder, deserialized_packets: impl Iterator, - transaction_status_sender: Option, + transaction_status_sender: &Option, gossip_vote_sender: &'a ReplayVoteSender, banking_stage_stats: &'a BankingStageStats, qos_service: &'a QosService, @@ -2145,13 +1837,13 @@ impl BankingStage { deserialized_packets .enumerate() .filter_map(|(i, deserialized_packet)| { - unprocessed_packet_batches::transaction_from_deserialized_packet( - deserialized_packet, - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ) - .map(|transaction| (transaction, i)) + deserialized_packet + .build_sanitized_transaction( + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + .map(|transaction| (transaction, i)) }) .unzip(), "packet_conversion", @@ -2238,7 +1930,7 @@ impl BankingStage { recv_start: &mut Instant, recv_timeout: Duration, id: u32, - buffered_packet_batches: &mut UnprocessedPacketBatches, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, banking_stage_stats: &mut BankingStageStats, tracer_packet_stats: &mut TracerPacketStats, slot_metrics_tracker: &mut LeaderSlotMetricsTracker, @@ -2251,7 +1943,7 @@ impl BankingStage { failed_sigverify_count, } = packet_deserializer.handle_received_packets( recv_timeout, - buffered_packet_batches.capacity() - buffered_packet_batches.len(), + unprocessed_transaction_storage.capacity() - unprocessed_transaction_storage.len(), )?; let packet_count = deserialized_packets.len(); debug!( @@ -2273,7 +1965,7 @@ impl BankingStage { let mut dropped_packets_count = 0; let mut newly_buffered_packets_count = 0; Self::push_unprocessed( - buffered_packet_batches, + unprocessed_transaction_storage, deserialized_packets, &mut dropped_packets_count, &mut newly_buffered_packets_count, @@ -2295,18 +1987,15 @@ impl BankingStage { banking_stage_stats .newly_buffered_packets_count .fetch_add(newly_buffered_packets_count, Ordering::Relaxed); - banking_stage_stats - .current_buffered_packet_batches_count - .swap(buffered_packet_batches.len(), Ordering::Relaxed); banking_stage_stats .current_buffered_packets_count - .swap(buffered_packet_batches.len(), Ordering::Relaxed); + .swap(unprocessed_transaction_storage.len(), Ordering::Relaxed); *recv_start = Instant::now(); Ok(()) } fn push_unprocessed( - unprocessed_packet_batches: &mut UnprocessedPacketBatches, + unprocessed_transaction_storage: &mut UnprocessedTransactionStorage, deserialized_packets: Vec, dropped_packets_count: &mut usize, newly_buffered_packets_count: &mut usize, @@ -2323,20 +2012,17 @@ impl BankingStage { slot_metrics_tracker .increment_newly_buffered_packets_count(deserialized_packets.len() as u64); - let (number_of_dropped_packets, number_of_dropped_tracer_packets) = - unprocessed_packet_batches.insert_batch( - deserialized_packets - .into_iter() - .map(DeserializedPacket::from_immutable_section), - ); - - saturating_add_assign!(*dropped_packets_count, number_of_dropped_packets); - slot_metrics_tracker.increment_exceeded_buffer_limit_dropped_packets_count( - number_of_dropped_packets as u64, + let insert_packet_batches_summary = + unprocessed_transaction_storage.insert_batch(deserialized_packets); + slot_metrics_tracker + .accumulate_insert_packet_batches_summary(&insert_packet_batches_summary); + saturating_add_assign!( + *dropped_packets_count, + insert_packet_batches_summary.total_dropped_packets() + ); + tracer_packet_stats.increment_total_exceeded_banking_stage_buffer( + insert_packet_batches_summary.dropped_tracer_packets(), ); - - tracer_packet_stats - .increment_total_exceeded_banking_stage_buffer(number_of_dropped_tracer_packets); } } @@ -2394,6 +2080,7 @@ where mod tests { use { super::*, + crate::unprocessed_packet_batches, crossbeam_channel::{unbounded, Receiver}, solana_address_lookup_table_program::state::{AddressLookupTable, LookupTableMeta}, solana_entry::entry::{next_entry, next_versioned_entry, Entry, EntrySlice}, @@ -2411,7 +2098,7 @@ mod tests { }, solana_program_runtime::timings::ProgramTiming, solana_rpc::transaction_status_service::TransactionStatusService, - solana_runtime::bank_forks::BankForks, + solana_runtime::{bank_forks::BankForks, genesis_utils::activate_feature}, solana_sdk::{ account::AccountSharedData, hash::Hash, @@ -2427,6 +2114,9 @@ mod tests { }, solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, + solana_vote_program::{ + vote_state::VoteStateUpdate, vote_transaction::new_vote_state_update_transaction, + }, std::{ borrow::Cow, collections::HashSet, @@ -2434,7 +2124,6 @@ mod tests { sync::atomic::{AtomicBool, Ordering}, thread::sleep, }, - unprocessed_packet_batches::DeserializedPacket, }; fn new_test_cluster_info(contact_info: ContactInfo) -> ClusterInfo { @@ -3071,7 +2760,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3124,7 +2813,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3208,7 +2897,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3300,7 +2989,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &qos_service, None, @@ -3340,7 +3029,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &qos_service, None, @@ -3437,7 +3126,7 @@ mod tests { &transactions, &recorder, 0, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3466,144 +3155,6 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[test] - fn test_filter_and_forward_with_account_limits() { - solana_logger::setup(); - let GenesisConfigInfo { - genesis_config, - mint_keypair, - .. - } = create_genesis_config(10); - let current_bank = Arc::new(Bank::new_for_tests(&genesis_config)); - - let simple_transactions: Vec = (0..256) - .map(|_id| { - // packets are deserialized upon receiving, failed packets will not be - // forwarded; Therefore we need to create real packets here. - let key1 = Keypair::new(); - system_transaction::transfer( - &mint_keypair, - &key1.pubkey(), - genesis_config.rent.minimum_balance(0), - genesis_config.hash(), - ) - }) - .collect_vec(); - - let mut packets: Vec = simple_transactions - .iter() - .enumerate() - .map(|(packets_id, transaction)| { - let mut p = Packet::from_data(None, transaction).unwrap(); - p.meta.port = packets_id as u16; - p.meta.set_tracer(true); - DeserializedPacket::new(p).unwrap() - }) - .collect_vec(); - - // all packets are forwarded - { - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - - let FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - .. - } = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); - assert_eq!(total_forwardable_packets, 256); - assert_eq!(total_tracer_packets_in_buffer, 256); - assert_eq!(total_forwardable_tracer_packets, 256); - - // packets in a batch are forwarded in arbitrary order; verify the ports match after - // sorting - let expected_ports: Vec<_> = (0..256).collect(); - let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts - .iter_batches() - .flat_map(|batch| { - batch - .get_forwardable_packets() - .into_iter() - .map(|p| p.meta.port) - }) - .collect(); - forwarded_ports.sort_unstable(); - assert_eq!(expected_ports, forwarded_ports); - } - - // some packets are forwarded - { - let num_already_forwarded = 16; - for packet in &mut packets[0..num_already_forwarded] { - packet.forwarded = true; - } - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - .. - } = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); - assert_eq!( - total_forwardable_packets, - packets.len() - num_already_forwarded - ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_forwarded - ); - } - - // some packets are invalid (already processed) - { - let num_already_processed = 16; - for tx in &simple_transactions[0..num_already_processed] { - assert_eq!(current_bank.process_transaction(tx), Ok(())); - } - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); - let mut forward_packet_batches_by_accounts = - ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); - let FilterForwardingResults { - total_forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - .. - } = BankingStage::filter_and_forward_with_account_limits( - ¤t_bank, - &mut buffered_packet_batches, - &mut forward_packet_batches_by_accounts, - UNPROCESSED_BUFFER_STEP_SIZE, - ); - assert_eq!( - total_forwardable_packets, - packets.len() - num_already_processed - ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_processed - ); - } - } - #[test] fn test_process_transactions_returns_unprocessed_txs() { solana_logger::setup(); @@ -3653,7 +3204,7 @@ mod tests { &Instant::now(), &transactions, &recorder, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3720,7 +3271,7 @@ mod tests { &Instant::now(), &transactions, &recorder, - None, + &None, &gossip_vote_sender, &QosService::new(Arc::new(RwLock::new(CostModel::default())), 1), None, @@ -3948,7 +3499,7 @@ mod tests { &transactions, &recorder, 0, - Some(TransactionStatusSender { + &Some(TransactionStatusSender { sender: transaction_status_sender, }), &gossip_vote_sender, @@ -4117,7 +3668,7 @@ mod tests { &[sanitized_tx.clone()], &recorder, 0, - Some(TransactionStatusSender { + &Some(TransactionStatusSender { sender: transaction_status_sender, }), &gossip_vote_sender, @@ -4222,10 +3773,13 @@ mod tests { unprocessed_packet_batches::transactions_to_deserialized_packets(&transactions) .unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - deserialized_packets.into_iter(), - num_conflicting_transactions, + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets.into_iter(), + num_conflicting_transactions, + ), + ThreadType::Transactions, ); let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); @@ -4238,7 +3792,7 @@ mod tests { max_tx_processing_ns, &poh_recorder, &mut buffered_packet_batches, - None, + &None, &gossip_vote_sender, None::>, &BankingStageStats::default(), @@ -4259,7 +3813,7 @@ mod tests { max_tx_processing_ns, &poh_recorder, &mut buffered_packet_batches, - None, + &None, &gossip_vote_sender, None::>, &BankingStageStats::default(), @@ -4319,10 +3873,13 @@ mod tests { .unwrap(); assert_eq!(deserialized_packets.len(), num_conflicting_transactions); let num_packets_to_process_per_iteration = 1; - let mut buffered_packet_batches: UnprocessedPacketBatches = - UnprocessedPacketBatches::from_iter( - deserialized_packets.clone().into_iter(), - num_conflicting_transactions, + let mut buffered_packet_batches = + UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::from_iter( + deserialized_packets.clone().into_iter(), + num_conflicting_transactions, + ), + ThreadType::Transactions, ); let all_packet_message_hashes: HashSet = buffered_packet_batches .iter() @@ -4333,7 +3890,7 @@ mod tests { std::u128::MAX, &poh_recorder_, &mut buffered_packet_batches, - None, + &None, &gossip_vote_sender, test_fn, &BankingStageStats::default(), @@ -4431,16 +3988,18 @@ mod tests { let connection_cache = ConnectionCache::default(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); for (name, data_budget, expected_num_forwarded) in test_cases { - let mut unprocessed_packet_batches: UnprocessedPacketBatches = + let unprocessed_packet_batches: UnprocessedPacketBatches = UnprocessedPacketBatches::from_iter( vec![deserialized_packet.clone()].into_iter(), 1, ); let stats = BankingStageStats::default(); BankingStage::handle_forwarding( - &ForwardOption::ForwardTransaction, &cluster_info, - &mut unprocessed_packet_batches, + &mut UnprocessedTransactionStorage::new_transaction_storage( + unprocessed_packet_batches, + ThreadType::Transactions, + ), &poh_recorder, &socket, true, @@ -4491,11 +4050,13 @@ mod tests { DeserializedPacket::new(packet).unwrap() }; - let mut unprocessed_packet_batches: UnprocessedPacketBatches = + let mut unprocessed_packet_batches = UnprocessedTransactionStorage::new_transaction_storage( UnprocessedPacketBatches::from_iter( vec![forwarded_packet, normal_packet].into_iter(), 2, - ); + ), + ThreadType::Transactions, + ); let genesis_config_info = create_slow_genesis_config(10_000); let GenesisConfigInfo { @@ -4528,35 +4089,15 @@ mod tests { let connection_cache = ConnectionCache::default(); let test_cases = vec![ - ("not-forward", ForwardOption::NotForward, true, vec![], 2), - ( - "fwd-normal", - ForwardOption::ForwardTransaction, - true, - vec![normal_block_hash], - 2, - ), - ( - "fwd-no-op", - ForwardOption::ForwardTransaction, - true, - vec![], - 2, - ), - ( - "fwd-no-hold", - ForwardOption::ForwardTransaction, - false, - vec![], - 0, - ), + ("fwd-normal", true, vec![normal_block_hash], 2), + ("fwd-no-op", true, vec![], 2), + ("fwd-no-hold", false, vec![], 0), ]; let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); - for (name, forward_option, hold, expected_ids, expected_num_unprocessed) in test_cases { + for (name, hold, expected_ids, expected_num_unprocessed) in test_cases { let stats = BankingStageStats::default(); BankingStage::handle_forwarding( - &forward_option, &cluster_info, &mut unprocessed_packet_batches, &poh_recorder, @@ -4691,53 +4232,139 @@ mod tests { } #[test] - fn test_filter_processed_packets() { - let retryable_indexes = [0, 1, 2, 3]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert!(non_retryable_indexes.is_empty()); + fn test_unprocessed_transaction_storage_full_send() { + solana_logger::setup(); + let GenesisConfigInfo { + mut genesis_config, + mint_keypair, + .. + } = create_slow_genesis_config(10000); + activate_feature( + &mut genesis_config, + allow_votes_to_directly_update_vote_state::id(), + ); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); + let start_hash = bank.last_blockhash(); + let (verified_sender, verified_receiver) = unbounded(); + let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); + let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); + let ledger_path = get_tmp_ledger_path_auto_delete!(); + { + let blockstore = Arc::new( + Blockstore::open(ledger_path.path()) + .expect("Expected to be able to open database ledger"), + ); + let poh_config = PohConfig { + // limit tick count to avoid clearing working_bank at PohRecord then + // PohRecorderError(MaxHeightReached) at BankingStage + target_tick_count: Some(bank.max_tick_height() - 1), + ..PohConfig::default() + }; + let (exit, poh_recorder, poh_service, _entry_receiver) = + create_test_recorder(&bank, &blockstore, Some(poh_config), None); + let cluster_info = new_test_cluster_info(Node::new_localhost().info); + let cluster_info = Arc::new(cluster_info); + let (gossip_vote_sender, _gossip_vote_receiver) = unbounded(); - let retryable_indexes = [0, 1, 2, 3, 5]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(4, 5)]); + let banking_stage = BankingStage::new( + &cluster_info, + &poh_recorder, + verified_receiver, + tpu_vote_receiver, + gossip_verified_vote_receiver, + None, + gossip_vote_sender, + Arc::new(RwLock::new(CostModel::default())), + None, + Arc::new(ConnectionCache::default()), + bank_forks, + ); - let retryable_indexes = [1, 2, 3]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1)]); + let keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); + let vote_keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); + for keypair in keypairs.iter() { + bank.process_transaction(&system_transaction::transfer( + &mint_keypair, + &keypair.pubkey(), + 20, + start_hash, + )) + .unwrap(); + } - let retryable_indexes = [1, 2, 3, 5]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5)]); + // Send a bunch of votes and transfers + let tpu_votes = (0..100_usize) + .map(|i| { + new_vote_state_update_transaction( + VoteStateUpdate::from(vec![ + (0, 8), + (1, 7), + (i as u64 + 10, 6), + (i as u64 + 11, 1), + ]), + Hash::new_unique(), + &keypairs[i], + &vote_keypairs[i], + &vote_keypairs[i], + None, + ); + }) + .collect_vec(); + let gossip_votes = (0..100_usize) + .map(|i| { + new_vote_state_update_transaction( + VoteStateUpdate::from(vec![ + (0, 8), + (1, 7), + (i as u64 + 64 + 5, 6), + (i as u64 + 7, 1), + ]), + Hash::new_unique(), + &keypairs[i], + &vote_keypairs[i], + &vote_keypairs[i], + None, + ); + }) + .collect_vec(); + let txs = (0..100_usize) + .map(|i| { + system_transaction::transfer( + &keypairs[i], + &keypairs[(i + 1) % 100].pubkey(), + 10, + start_hash, + ); + }) + .collect_vec(); - let retryable_indexes = [1, 2, 3, 5, 8]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + let tpu_packet_batches = to_packet_batches(&tpu_votes, 10); + let gossip_packet_batches = to_packet_batches(&gossip_votes, 10); + let tx_packet_batches = to_packet_batches(&txs, 10); - let retryable_indexes = [1, 2, 3, 5, 8, 8]; - let mut non_retryable_indexes = vec![]; - let f = |start, end| { - non_retryable_indexes.push((start, end)); - }; - BankingStage::filter_processed_packets(retryable_indexes.iter(), f); - assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); + // Send em all + [ + (tpu_packet_batches, tpu_vote_sender.clone()), + (gossip_packet_batches, gossip_verified_vote_sender.clone()), + (tx_packet_batches, verified_sender.clone()), + ] + .into_iter() + .map(|(packet_batches, sender)| { + Builder::new() + .spawn(move || sender.send((packet_batches, None)).unwrap()) + .unwrap() + }) + .for_each(|handle| handle.join().unwrap()); + + drop(verified_sender); + drop(tpu_vote_sender); + drop(gossip_verified_vote_sender); + banking_stage.join().unwrap(); + exit.store(true, Ordering::Relaxed); + poh_service.join().unwrap(); + } + Blockstore::destroy(ledger_path.path()).unwrap(); } } diff --git a/core/src/fetch_stage.rs b/core/src/fetch_stage.rs index 701e4f58e8..885f30ea88 100644 --- a/core/src/fetch_stage.rs +++ b/core/src/fetch_stage.rs @@ -1,16 +1,13 @@ //! The `fetch_stage` batches input from a UDP socket and sends it to a channel. use { - crate::{ - banking_stage::HOLD_TRANSACTIONS_SLOT_OFFSET, - result::{Error, Result}, - }, + crate::result::{Error, Result}, crossbeam_channel::{unbounded, RecvTimeoutError}, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, solana_perf::{packet::PacketBatchRecycler, recycler::Recycler}, solana_poh::poh_recorder::PohRecorder, solana_sdk::{ - clock::DEFAULT_TICKS_PER_SLOT, + clock::{DEFAULT_TICKS_PER_SLOT, HOLD_TRANSACTIONS_SLOT_OFFSET}, packet::{Packet, PacketFlags}, }, solana_streamer::streamer::{ diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs index 9132b14a21..6d0722f7a2 100644 --- a/core/src/forward_packet_batches_by_accounts.rs +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -182,7 +182,7 @@ impl ForwardPacketBatchesByAccounts { mod tests { use { super::*, - crate::unprocessed_packet_batches::{self, DeserializedPacket}, + crate::unprocessed_packet_batches::DeserializedPacket, solana_runtime::transaction_priority_details::TransactionPriorityDetails, solana_sdk::{ feature_set::FeatureSet, hash::Hash, signature::Keypair, system_transaction, @@ -352,13 +352,14 @@ mod tests { // assert it is added, and buffer still accepts more packets { let packet = build_deserialized_packet_for_test(10, &hot_account, requested_cu); - let tx = unprocessed_packet_batches::transaction_from_deserialized_packet( - packet.immutable_section(), - &Arc::new(FeatureSet::default()), - false, //votes_only, - SimpleAddressLoader::Disabled, - ) - .unwrap(); + let tx = packet + .immutable_section() + .build_sanitized_transaction( + &Arc::new(FeatureSet::default()), + false, //votes_only, + SimpleAddressLoader::Disabled, + ) + .unwrap(); assert!(forward_packet_batches_by_accounts .try_add_packet(&tx, packet.immutable_section().clone())); @@ -372,13 +373,14 @@ mod tests { { let packet = build_deserialized_packet_for_test(100, &hot_account, 1 /*requested_cu*/); - let tx = unprocessed_packet_batches::transaction_from_deserialized_packet( - packet.immutable_section(), - &Arc::new(FeatureSet::default()), - false, //votes_only, - SimpleAddressLoader::Disabled, - ) - .unwrap(); + let tx = packet + .immutable_section() + .build_sanitized_transaction( + &Arc::new(FeatureSet::default()), + false, //votes_only, + SimpleAddressLoader::Disabled, + ) + .unwrap(); assert!(!forward_packet_batches_by_accounts .try_add_packet(&tx, packet.immutable_section().clone())); @@ -392,13 +394,14 @@ mod tests { { let packet = build_deserialized_packet_for_test(100, &other_account, 1 /*requested_cu*/); - let tx = unprocessed_packet_batches::transaction_from_deserialized_packet( - packet.immutable_section(), - &Arc::new(FeatureSet::default()), - false, //votes_only, - SimpleAddressLoader::Disabled, - ) - .unwrap(); + let tx = packet + .immutable_section() + .build_sanitized_transaction( + &Arc::new(FeatureSet::default()), + false, //votes_only, + SimpleAddressLoader::Disabled, + ) + .unwrap(); assert!(!forward_packet_batches_by_accounts .try_add_packet(&tx, packet.immutable_section().clone())); diff --git a/core/src/immutable_deserialized_packet.rs b/core/src/immutable_deserialized_packet.rs index a8cefa993d..923da031e6 100644 --- a/core/src/immutable_deserialized_packet.rs +++ b/core/src/immutable_deserialized_packet.rs @@ -4,14 +4,18 @@ use { GetTransactionPriorityDetails, TransactionPriorityDetails, }, solana_sdk::{ + feature_set, hash::Hash, message::Message, sanitize::SanitizeError, short_vec::decode_shortu16_len, signature::Signature, - transaction::{SanitizedVersionedTransaction, VersionedTransaction}, + transaction::{ + AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, + VersionedTransaction, + }, }, - std::{cmp::Ordering, mem::size_of}, + std::{cmp::Ordering, mem::size_of, sync::Arc}, thiserror::Error, }; @@ -94,6 +98,28 @@ impl ImmutableDeserializedPacket { pub fn compute_unit_limit(&self) -> u64 { self.priority_details.compute_unit_limit } + + // This function deserializes packets into transactions, computes the blake3 hash of transaction + // messages, and verifies secp256k1 instructions. + pub fn build_sanitized_transaction( + &self, + feature_set: &Arc, + votes_only: bool, + address_loader: impl AddressLoader, + ) -> Option { + if votes_only && !self.is_simple_vote() { + return None; + } + let tx = SanitizedTransaction::try_new( + self.transaction().clone(), + *self.message_hash(), + self.is_simple_vote(), + address_loader, + ) + .ok()?; + tx.verify_precompiles(feature_set).ok()?; + Some(tx) + } } impl PartialOrd for ImmutableDeserializedPacket { diff --git a/core/src/latest_unprocessed_votes.rs b/core/src/latest_unprocessed_votes.rs index fe73582abd..2e4fe33bc6 100644 --- a/core/src/latest_unprocessed_votes.rs +++ b/core/src/latest_unprocessed_votes.rs @@ -1,13 +1,11 @@ -#![allow(dead_code)] use { crate::{ forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, - unprocessed_packet_batches, }, itertools::Itertools, rand::{thread_rng, Rng}, - solana_perf::packet::{Packet, PacketBatch}, + solana_perf::packet::Packet, solana_runtime::bank::Bank, solana_sdk::{clock::Slot, program_utils::limited_deserialize, pubkey::Pubkey}, solana_vote_program::vote_instruction::VoteInstruction, @@ -56,7 +54,9 @@ impl LatestValidatorVotePacket { match limited_deserialize::(&instruction.data) { Ok(VoteInstruction::UpdateVoteState(vote_state_update)) - | Ok(VoteInstruction::UpdateVoteStateSwitch(vote_state_update, _)) => { + | Ok(VoteInstruction::UpdateVoteStateSwitch(vote_state_update, _)) + | Ok(VoteInstruction::CompactUpdateVoteState(vote_state_update)) + | Ok(VoteInstruction::CompactUpdateVoteStateSwitch(vote_state_update, _)) => { let &pubkey = message .message .static_account_keys() @@ -102,16 +102,6 @@ impl LatestValidatorVotePacket { } } -pub fn deserialize_packets<'a>( - packet_batch: &'a PacketBatch, - packet_indexes: &'a [usize], - vote_source: VoteSource, -) -> impl Iterator + 'a { - packet_indexes.iter().filter_map(move |packet_index| { - LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() - }) -} - // TODO: replace this with rand::seq::index::sample_weighted once we can update rand to 0.8+ // This requires updating dependencies of ed25519-dalek as rand_core is not compatible cross // version https://github.com/dalek-cryptography/ed25519-dalek/pull/214 @@ -135,7 +125,8 @@ pub(crate) fn weighted_random_order_by_stake<'a>( pubkey_with_weight.into_iter().map(|(_, pubkey)| pubkey) } -pub struct VoteBatchInsertionMetrics { +#[derive(Default, Debug)] +pub(crate) struct VoteBatchInsertionMetrics { pub(crate) num_dropped_gossip: usize, pub(crate) num_dropped_tpu: usize, } @@ -164,7 +155,7 @@ impl LatestUnprocessedVotes { self.len() == 0 } - pub fn insert_batch( + pub(crate) fn insert_batch( &self, votes: impl Iterator, ) -> VoteBatchInsertionMetrics { @@ -259,9 +250,8 @@ impl LatestUnprocessedVotes { let mut vote = lock.write().unwrap(); if !vote.is_vote_taken() && !vote.is_forwarded() { let deserialized_vote_packet = vote.vote.as_ref().unwrap().clone(); - if let Some(sanitized_vote_transaction) = - unprocessed_packet_batches::transaction_from_deserialized_packet( - &deserialized_vote_packet, + if let Some(sanitized_vote_transaction) = deserialized_vote_packet + .build_sanitized_transaction( &bank.feature_set, bank.vote_only_bank(), bank.as_ref(), @@ -329,7 +319,7 @@ mod tests { super::*, itertools::Itertools, rand::{thread_rng, Rng}, - solana_perf::packet::{Packet, PacketFlags}, + solana_perf::packet::{Packet, PacketBatch, PacketFlags}, solana_runtime::{ bank::Bank, genesis_utils::{self, ValidatorVoteKeypairs}, @@ -361,6 +351,16 @@ mod tests { LatestValidatorVotePacket::new(packet, vote_source).unwrap() } + fn deserialize_packets<'a>( + packet_batch: &'a PacketBatch, + packet_indexes: &'a [usize], + vote_source: VoteSource, + ) -> impl Iterator + 'a { + packet_indexes.iter().filter_map(move |packet_index| { + LatestValidatorVotePacket::new(packet_batch[*packet_index].clone(), vote_source).ok() + }) + } + #[test] fn test_deserialize_vote_packets() { let keypairs = ValidatorVoteKeypairs::new_rand(); diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs index ed556991e1..f2efc6db8c 100644 --- a/core/src/leader_slot_banking_stage_metrics.rs +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -1,5 +1,8 @@ use { - crate::leader_slot_banking_stage_timing_metrics::*, + crate::{ + leader_slot_banking_stage_timing_metrics::*, + unprocessed_transaction_storage::InsertPacketBatchSummary, + }, solana_poh::poh_recorder::BankStart, solana_runtime::transaction_error_metrics::*, solana_sdk::{clock::Slot, saturating_add_assign}, @@ -270,6 +273,8 @@ pub(crate) struct LeaderSlotMetrics { transaction_error_metrics: TransactionErrorMetrics, + vote_packet_count_metrics: VotePacketCountMetrics, + timing_metrics: LeaderSlotTimingMetrics, // Used by tests to check if the `self.report()` method was called @@ -283,6 +288,7 @@ impl LeaderSlotMetrics { slot, packet_count_metrics: LeaderSlotPacketCountMetrics::new(), transaction_error_metrics: TransactionErrorMetrics::new(), + vote_packet_count_metrics: VotePacketCountMetrics::new(), timing_metrics: LeaderSlotTimingMetrics::new(bank_creation_time), is_reported: false, } @@ -294,6 +300,7 @@ impl LeaderSlotMetrics { self.timing_metrics.report(self.id, self.slot); self.transaction_error_metrics.report(self.id, self.slot); self.packet_count_metrics.report(self.id, self.slot); + self.vote_packet_count_metrics.report(self.id, self.slot); } /// Returns `Some(self.slot)` if the metrics have been reported, otherwise returns None @@ -310,6 +317,33 @@ impl LeaderSlotMetrics { } } +// Metrics describing vote tx packets that were processed in the tpu vote thread as well as +// extraneous votes that were filtered out +#[derive(Debug, Default)] +pub(crate) struct VotePacketCountMetrics { + // How many votes ingested from gossip were dropped + dropped_gossip_votes: u64, + + // How many votes ingested from tpu were dropped + dropped_tpu_votes: u64, +} + +impl VotePacketCountMetrics { + fn new() -> Self { + Self { ..Self::default() } + } + + fn report(&self, id: u32, slot: Slot) { + datapoint_info!( + "banking_stage-vote_packet_counts", + ("id", id, i64), + ("slot", slot, i64), + ("dropped_gossip_votes", self.dropped_gossip_votes, i64), + ("dropped_tpu_votes", self.dropped_tpu_votes, i64) + ); + } +} + #[derive(Debug)] pub(crate) enum MetricsTrackerAction { Noop, @@ -498,6 +532,21 @@ impl LeaderSlotMetricsTracker { } } + pub(crate) fn accumulate_insert_packet_batches_summary( + &mut self, + insert_packet_batches_summary: &InsertPacketBatchSummary, + ) { + self.increment_exceeded_buffer_limit_dropped_packets_count( + insert_packet_batches_summary.total_dropped_packets() as u64, + ); + self.increment_dropped_gossip_vote_count( + insert_packet_batches_summary.dropped_gossip_packets() as u64, + ); + self.increment_dropped_tpu_vote_count( + insert_packet_batches_summary.dropped_tpu_packets() as u64 + ); + } + pub(crate) fn accumulate_transaction_errors( &mut self, error_metrics: &TransactionErrorMetrics, @@ -780,6 +829,28 @@ impl LeaderSlotMetricsTracker { ); } } + + pub(crate) fn increment_dropped_gossip_vote_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .vote_packet_count_metrics + .dropped_gossip_votes, + count + ); + } + } + + pub(crate) fn increment_dropped_tpu_vote_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .vote_packet_count_metrics + .dropped_tpu_votes, + count + ); + } + } } #[cfg(test)] diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index a72ba3dffb..420cc14e8d 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -3,11 +3,7 @@ use { min_max_heap::MinMaxHeap, solana_perf::packet::{Packet, PacketBatch}, solana_runtime::transaction_priority_details::TransactionPriorityDetails, - solana_sdk::{ - feature_set, - hash::Hash, - transaction::{AddressLoader, SanitizedTransaction, Transaction}, - }, + solana_sdk::{hash::Hash, transaction::Transaction}, std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap}, @@ -74,6 +70,12 @@ impl Ord for DeserializedPacket { } } +#[derive(Debug)] +pub struct PacketBatchInsertionMetrics { + pub(crate) num_dropped_packets: usize, + pub(crate) num_dropped_tracer_packets: usize, +} + /// Currently each banking_stage thread has a `UnprocessedPacketBatches` buffer to store /// PacketBatch's received from sigverify. Banking thread continuously scans the buffer /// to pick proper packets to add to the block. @@ -115,7 +117,7 @@ impl UnprocessedPacketBatches { pub fn insert_batch( &mut self, deserialized_packets: impl Iterator, - ) -> (usize, usize) { + ) -> PacketBatchInsertionMetrics { let mut num_dropped_packets = 0; let mut num_dropped_tracer_packets = 0; for deserialized_packet in deserialized_packets { @@ -131,7 +133,10 @@ impl UnprocessedPacketBatches { } } } - (num_dropped_packets, num_dropped_tracer_packets) + PacketBatchInsertionMetrics { + num_dropped_packets, + num_dropped_tracer_packets, + } } /// Pushes a new `deserialized_packet` into the unprocessed packet batches if it does not already @@ -283,7 +288,7 @@ impl UnprocessedPacketBatches { } pub fn mark_accepted_packets_as_forwarded( - buffered_packet_batches: &mut UnprocessedPacketBatches, + &mut self, packets_to_process: &[Arc], accepted_packet_indexes: &[usize], ) { @@ -291,7 +296,7 @@ impl UnprocessedPacketBatches { .iter() .for_each(|accepted_packet_index| { let accepted_packet = packets_to_process[*accepted_packet_index].clone(); - if let Some(deserialized_packet) = buffered_packet_batches + if let Some(deserialized_packet) = self .message_hash_to_transaction .get_mut(accepted_packet.message_hash()) { @@ -322,30 +327,6 @@ pub fn transactions_to_deserialized_packets( .collect() } -// This function deserializes packets into transactions, computes the blake3 hash of transaction -// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned -// with their packet indexes. -#[allow(clippy::needless_collect)] -pub fn transaction_from_deserialized_packet( - deserialized_packet: &ImmutableDeserializedPacket, - feature_set: &Arc, - votes_only: bool, - address_loader: impl AddressLoader, -) -> Option { - if votes_only && !deserialized_packet.is_simple_vote() { - return None; - } - let tx = SanitizedTransaction::try_new( - deserialized_packet.transaction().clone(), - *deserialized_packet.message_hash(), - deserialized_packet.is_simple_vote(), - address_loader, - ) - .ok()?; - tx.verify_precompiles(feature_set).ok()?; - Some(tx) -} - #[cfg(test)] mod tests { use { @@ -357,6 +338,7 @@ mod tests { transaction::{SimpleAddressLoader, Transaction}, }, solana_vote_program::vote_transaction, + std::sync::Arc, }; fn simple_deserialized_packet() -> DeserializedPacket { @@ -529,8 +511,7 @@ mod tests { let mut votes_only = false; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -540,8 +521,7 @@ mod tests { votes_only = true; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -560,8 +540,7 @@ mod tests { let mut votes_only = false; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -571,8 +550,7 @@ mod tests { votes_only = true; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -591,8 +569,7 @@ mod tests { let mut votes_only = false; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, @@ -602,8 +579,7 @@ mod tests { votes_only = true; let txs = packet_vector.iter().filter_map(|tx| { - transaction_from_deserialized_packet( - tx.immutable_section(), + tx.immutable_section().build_sanitized_transaction( &Arc::new(FeatureSet::default()), votes_only, SimpleAddressLoader::Disabled, diff --git a/core/src/unprocessed_transaction_storage.rs b/core/src/unprocessed_transaction_storage.rs index 6642da00a4..9c6f8438af 100644 --- a/core/src/unprocessed_transaction_storage.rs +++ b/core/src/unprocessed_transaction_storage.rs @@ -1,22 +1,28 @@ -#![allow(dead_code)] use { crate::{ - banking_stage::{self, BankingStage, FilterForwardingResults, ForwardOption}, + banking_stage::{FilterForwardingResults, ForwardOption}, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, immutable_deserialized_packet::ImmutableDeserializedPacket, latest_unprocessed_votes::{ - self, LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics, + LatestUnprocessedVotes, LatestValidatorVotePacket, VoteBatchInsertionMetrics, VoteSource, }, - unprocessed_packet_batches::{self, DeserializedPacket, UnprocessedPacketBatches}, + unprocessed_packet_batches::{ + DeserializedPacket, PacketBatchInsertionMetrics, UnprocessedPacketBatches, + }, }, itertools::Itertools, min_max_heap::MinMaxHeap, - solana_perf::packet::PacketBatch, + solana_measure::measure, solana_runtime::bank::Bank, + solana_sdk::{ + clock::FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, saturating_add_assign, + transaction::SanitizedTransaction, + }, std::sync::Arc, }; +pub const UNPROCESSED_BUFFER_STEP_SIZE: usize = 128; const MAX_STAKED_VALIDATORS: usize = 10_000; #[derive(Debug)] @@ -43,12 +49,54 @@ pub enum ThreadType { Transactions, } -#[derive(Debug, Default)] -pub struct InsertPacketBatchesSummary { - pub(crate) num_dropped_packets: usize, - pub(crate) num_dropped_gossip_vote_packets: usize, - pub(crate) num_dropped_tpu_vote_packets: usize, - pub(crate) num_dropped_tracer_packets: usize, +#[derive(Debug)] +pub(crate) enum InsertPacketBatchSummary { + VoteBatchInsertionMetrics(VoteBatchInsertionMetrics), + PacketBatchInsertionMetrics(PacketBatchInsertionMetrics), +} + +impl InsertPacketBatchSummary { + pub fn total_dropped_packets(&self) -> usize { + match self { + Self::VoteBatchInsertionMetrics(metrics) => { + metrics.num_dropped_gossip + metrics.num_dropped_tpu + } + Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_packets, + } + } + + pub fn dropped_gossip_packets(&self) -> usize { + match self { + Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_gossip, + _ => 0, + } + } + + pub fn dropped_tpu_packets(&self) -> usize { + match self { + Self::VoteBatchInsertionMetrics(metrics) => metrics.num_dropped_tpu, + _ => 0, + } + } + + pub fn dropped_tracer_packets(&self) -> usize { + match self { + Self::PacketBatchInsertionMetrics(metrics) => metrics.num_dropped_tracer_packets, + _ => 0, + } + } +} + +impl From for InsertPacketBatchSummary { + fn from(metrics: VoteBatchInsertionMetrics) -> Self { + Self::VoteBatchInsertionMetrics(metrics) + } +} + +impl From for InsertPacketBatchSummary { + fn from(metrics: PacketBatchInsertionMetrics) -> Self { + Self::PacketBatchInsertionMetrics(metrics) + } } fn filter_processed_packets<'a, F>( @@ -145,33 +193,17 @@ impl UnprocessedTransactionStorage { } } - pub fn deserialize_and_insert_batch( + pub(crate) fn insert_batch( &mut self, - packet_batch: &PacketBatch, - packet_indexes: &[usize], - ) -> InsertPacketBatchesSummary { + deserialized_packets: Vec, + ) -> InsertPacketBatchSummary { match self { Self::VoteStorage(vote_storage) => { - let VoteBatchInsertionMetrics { - num_dropped_gossip, - num_dropped_tpu, - } = vote_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); - InsertPacketBatchesSummary { - num_dropped_packets: num_dropped_gossip + num_dropped_tpu, - num_dropped_gossip_vote_packets: num_dropped_gossip, - num_dropped_tpu_vote_packets: num_dropped_tpu, - ..InsertPacketBatchesSummary::default() - } - } - Self::LocalTransactionStorage(transaction_storage) => { - let (num_dropped_packets, num_dropped_tracer_packets) = - transaction_storage.deserialize_and_insert_batch(packet_batch, packet_indexes); - InsertPacketBatchesSummary { - num_dropped_packets, - num_dropped_tracer_packets, - ..InsertPacketBatchesSummary::default() - } + InsertPacketBatchSummary::from(vote_storage.insert_batch(deserialized_packets)) } + Self::LocalTransactionStorage(transaction_storage) => InsertPacketBatchSummary::from( + transaction_storage.insert_batch(deserialized_packets), + ), } } @@ -241,17 +273,22 @@ impl VoteStorage { self.latest_unprocessed_votes.clear_forwarded_packets(); } - fn deserialize_and_insert_batch( + fn insert_batch( &mut self, - packet_batch: &PacketBatch, - packet_indexes: &[usize], + deserialized_packets: Vec, ) -> VoteBatchInsertionMetrics { self.latest_unprocessed_votes - .insert_batch(latest_unprocessed_votes::deserialize_packets( - packet_batch, - packet_indexes, - self.vote_source, - )) + .insert_batch( + deserialized_packets + .into_iter() + .filter_map(|deserialized_packet| { + LatestValidatorVotePacket::new_from_immutable( + Arc::new(deserialized_packet), + self.vote_source, + ) + .ok() + }), + ) } fn filter_forwardable_packets_and_add_batches( @@ -345,13 +382,14 @@ impl ThreadLocalUnprocessedPackets { self.unprocessed_packet_batches.clear(); } - fn deserialize_and_insert_batch( + fn insert_batch( &mut self, - packet_batch: &PacketBatch, - packet_indexes: &[usize], - ) -> (usize, usize) { + deserialized_packets: Vec, + ) -> PacketBatchInsertionMetrics { self.unprocessed_packet_batches.insert_batch( - unprocessed_packet_batches::deserialize_packets(packet_batch, packet_indexes), + deserialized_packets + .into_iter() + .map(DeserializedPacket::from_immutable_section), ) } @@ -360,11 +398,298 @@ impl ThreadLocalUnprocessedPackets { bank: Arc, forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, ) -> FilterForwardingResults { - BankingStage::filter_and_forward_with_account_limits( - &bank, - &mut self.unprocessed_packet_batches, + self.filter_and_forward_with_account_limits( + bank, forward_packet_batches_by_accounts, - banking_stage::UNPROCESSED_BUFFER_STEP_SIZE, + UNPROCESSED_BUFFER_STEP_SIZE, + ) + } + + /// Filter out packets that fail to sanitize, or are no longer valid (could be + /// too old, a duplicate of something already processed). Doing this in batches to avoid + /// checking bank's blockhash and status cache per transaction which could be bad for performance. + /// Added valid and sanitized packets to forwarding queue. + fn filter_and_forward_with_account_limits( + &mut self, + bank: Arc, + forward_buffer: &mut ForwardPacketBatchesByAccounts, + batch_size: usize, + ) -> FilterForwardingResults { + let mut total_forwardable_tracer_packets: usize = 0; + let mut total_tracer_packets_in_buffer: usize = 0; + let mut total_forwardable_packets: usize = 0; + let mut total_packet_conversion_us: u64 = 0; + let mut total_filter_packets_us: u64 = 0; + let mut dropped_tx_before_forwarding_count: usize = 0; + + let mut original_priority_queue = self.swap_priority_queue(); + + // indicates if `forward_buffer` still accept more packets, see details at + // `ForwardPacketBatchesByAccounts.rs`. + let mut accepting_packets = true; + // batch iterate through self.unprocessed_packet_batches in desc priority order + let retained_priority_queue: MinMaxHeap> = + original_priority_queue + .drain_desc() + .chunks(batch_size) + .into_iter() + .flat_map(|packets_to_process| { + let packets_to_process = packets_to_process.into_iter().collect_vec(); + + // Vec of same size of `packets_to_process`, each indicates + // corresponding packet is tracer packet. + let tracer_packet_indexes = packets_to_process + .iter() + .map(|deserialized_packet| { + deserialized_packet + .original_packet() + .meta + .is_tracer_packet() + }) + .collect::>(); + saturating_add_assign!( + total_tracer_packets_in_buffer, + tracer_packet_indexes + .iter() + .filter(|is_tracer| **is_tracer) + .count() + ); + + if accepting_packets { + let ( + (sanitized_transactions, transaction_to_packet_indexes), + packet_conversion_time, + ): ((Vec, Vec), _) = measure!( + self.sanitize_unforwarded_packets(&packets_to_process, &bank,), + "sanitize_packet", + ); + saturating_add_assign!( + total_packet_conversion_us, + packet_conversion_time.as_us() + ); + + let (forwardable_transaction_indexes, filter_packets_time) = measure!( + Self::filter_invalid_transactions(&sanitized_transactions, &bank,), + "filter_packets", + ); + saturating_add_assign!( + total_filter_packets_us, + filter_packets_time.as_us() + ); + + for forwardable_transaction_index in &forwardable_transaction_indexes { + saturating_add_assign!(total_forwardable_packets, 1); + let forwardable_packet_index = + transaction_to_packet_indexes[*forwardable_transaction_index]; + if tracer_packet_indexes[forwardable_packet_index] { + saturating_add_assign!(total_forwardable_tracer_packets, 1); + } + } + + let accepted_packet_indexes = Self::add_filtered_packets_to_forward_buffer( + forward_buffer, + &packets_to_process, + &sanitized_transactions, + &transaction_to_packet_indexes, + &forwardable_transaction_indexes, + &mut dropped_tx_before_forwarding_count, + ); + accepting_packets = + accepted_packet_indexes.len() == forwardable_transaction_indexes.len(); + + self.unprocessed_packet_batches + .mark_accepted_packets_as_forwarded( + &packets_to_process, + &accepted_packet_indexes, + ); + + self.collect_retained_packets( + &packets_to_process, + &Self::prepare_filtered_packet_indexes( + &transaction_to_packet_indexes, + &forwardable_transaction_indexes, + ), + ) + } else { + // skip sanitizing and filtering if not longer able to add more packets for forwarding + saturating_add_assign!( + dropped_tx_before_forwarding_count, + packets_to_process.len() + ); + packets_to_process + } + }) + .collect(); + + // replace packet priority queue + self.unprocessed_packet_batches.packet_priority_queue = retained_priority_queue; + + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + dropped_tx_before_forwarding_count + ); + + FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + total_packet_conversion_us, + total_filter_packets_us, + } + } + + /// Take self.unprocessed_packet_batches's priority_queue out, leave empty MinMaxHeap in its place. + fn swap_priority_queue(&mut self) -> MinMaxHeap> { + let capacity = self.unprocessed_packet_batches.capacity(); + std::mem::replace( + &mut self.unprocessed_packet_batches.packet_priority_queue, + MinMaxHeap::with_capacity(capacity), + ) + } + + /// sanitize un-forwarded packet into SanitizedTransaction for validation and forwarding. + fn sanitize_unforwarded_packets( + &mut self, + packets_to_process: &[Arc], + bank: &Arc, + ) -> (Vec, Vec) { + // Get ref of ImmutableDeserializedPacket + let deserialized_packets = packets_to_process.iter().map(|p| &**p); + let (transactions, transaction_to_packet_indexes): (Vec, Vec) = + deserialized_packets + .enumerate() + .filter_map(|(packet_index, deserialized_packet)| { + if !self + .unprocessed_packet_batches + .is_forwarded(deserialized_packet) + { + deserialized_packet + .build_sanitized_transaction( + &bank.feature_set, + bank.vote_only_bank(), + bank.as_ref(), + ) + .map(|transaction| (transaction, packet_index)) + } else { + None + } + }) + .unzip(); + + // report metrics + inc_new_counter_info!("banking_stage-packet_conversion", 1); + let unsanitized_packets_filtered_count = + packets_to_process.len().saturating_sub(transactions.len()); + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + unsanitized_packets_filtered_count + ); + + (transactions, transaction_to_packet_indexes) + } + + /// Checks sanitized transactions against bank, returns valid transaction indexes + fn filter_invalid_transactions( + transactions: &[SanitizedTransaction], + bank: &Arc, + ) -> Vec { + let filter = vec![Ok(()); transactions.len()]; + let results = bank.check_transactions_with_forwarding_delay( + transactions, + &filter, + FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET, + ); + // report metrics + let filtered_out_transactions_count = transactions.len().saturating_sub(results.len()); + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + filtered_out_transactions_count + ); + + results + .iter() + .enumerate() + .filter_map( + |(tx_index, (result, _))| if result.is_ok() { Some(tx_index) } else { None }, + ) + .collect_vec() + } + + fn prepare_filtered_packet_indexes( + transaction_to_packet_indexes: &[usize], + retained_transaction_indexes: &[usize], + ) -> Vec { + retained_transaction_indexes + .iter() + .map(|tx_index| transaction_to_packet_indexes[*tx_index]) + .collect_vec() + } + + /// try to add filtered forwardable and valid packets to forward buffer; + /// returns vector of packet indexes that were accepted for forwarding. + fn add_filtered_packets_to_forward_buffer( + forward_buffer: &mut ForwardPacketBatchesByAccounts, + packets_to_process: &[Arc], + transactions: &[SanitizedTransaction], + transaction_to_packet_indexes: &[usize], + forwardable_transaction_indexes: &[usize], + dropped_tx_before_forwarding_count: &mut usize, + ) -> Vec { + let mut added_packets_count: usize = 0; + let mut accepted_packet_indexes = Vec::with_capacity(transaction_to_packet_indexes.len()); + for forwardable_transaction_index in forwardable_transaction_indexes { + let sanitized_transaction = &transactions[*forwardable_transaction_index]; + let forwardable_packet_index = + transaction_to_packet_indexes[*forwardable_transaction_index]; + let immutable_deserialized_packet = + packets_to_process[forwardable_packet_index].clone(); + if !forward_buffer.try_add_packet(sanitized_transaction, immutable_deserialized_packet) + { + break; + } + accepted_packet_indexes.push(forwardable_packet_index); + saturating_add_assign!(added_packets_count, 1); + } + + // count the packets not being forwarded in this batch + saturating_add_assign!( + *dropped_tx_before_forwarding_count, + forwardable_transaction_indexes.len() - added_packets_count + ); + + accepted_packet_indexes + } + + fn collect_retained_packets( + &mut self, + packets_to_process: &[Arc], + retained_packet_indexes: &[usize], + ) -> Vec> { + self.remove_non_retained_packets(packets_to_process, retained_packet_indexes); + retained_packet_indexes + .iter() + .map(|i| packets_to_process[*i].clone()) + .collect_vec() + } + + /// remove packets from UnprocessedPacketBatches.message_hash_to_transaction after they have + /// been removed from UnprocessedPacketBatches.packet_priority_queue + fn remove_non_retained_packets( + &mut self, + packets_to_process: &[Arc], + retained_packet_indexes: &[usize], + ) { + filter_processed_packets( + retained_packet_indexes + .iter() + .chain(std::iter::once(&packets_to_process.len())), + |start, end| { + for processed_packet in &packets_to_process[start..end] { + self.unprocessed_packet_batches + .message_hash_to_transaction + .remove(processed_packet.message_hash()); + } + }, ) } @@ -372,13 +697,7 @@ impl ThreadLocalUnprocessedPackets { where F: FnMut(&Vec>) -> Option>, { - let mut retryable_packets = { - let capacity = self.unprocessed_packet_batches.capacity(); - std::mem::replace( - &mut self.unprocessed_packet_batches.packet_priority_queue, - MinMaxHeap::with_capacity(capacity), - ) - }; + let mut retryable_packets = self.swap_priority_queue(); let retryable_packets: MinMaxHeap> = retryable_packets .drain_desc() .chunks(batch_size) @@ -388,25 +707,10 @@ impl ThreadLocalUnprocessedPackets { if let Some(retryable_transaction_indexes) = processing_function(&packets_to_process) { - // Remove the non-retryable packets, packets that were either: - // 1) Successfully processed - // 2) Failed but not retryable - filter_processed_packets( - retryable_transaction_indexes - .iter() - .chain(std::iter::once(&packets_to_process.len())), - |start, end| { - for processed_packet in &packets_to_process[start..end] { - self.unprocessed_packet_batches - .message_hash_to_transaction - .remove(processed_packet.message_hash()); - } - }, - ); - retryable_transaction_indexes - .iter() - .map(|i| packets_to_process[*i].clone()) - .collect_vec() + self.collect_retained_packets( + &packets_to_process, + &retryable_transaction_indexes, + ) } else { packets_to_process } @@ -427,7 +731,21 @@ impl ThreadLocalUnprocessedPackets { #[cfg(test)] mod tests { - use super::*; + use { + super::*, + solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo}, + solana_perf::packet::{Packet, PacketFlags}, + solana_sdk::{ + hash::Hash, + signature::{Keypair, Signer}, + system_transaction, + transaction::Transaction, + }, + solana_vote_program::{ + vote_state::VoteStateUpdate, vote_transaction::new_vote_state_update_transaction, + }, + std::error::Error, + }; #[test] fn test_filter_processed_packets() { @@ -479,4 +797,214 @@ mod tests { filter_processed_packets(retryable_indexes.iter(), f); assert_eq!(non_retryable_indexes, vec![(0, 1), (4, 5), (6, 8)]); } + + #[test] + fn test_filter_and_forward_with_account_limits() { + solana_logger::setup(); + let GenesisConfigInfo { + genesis_config, + mint_keypair, + .. + } = create_genesis_config(10); + let current_bank = Arc::new(Bank::new_for_tests(&genesis_config)); + + let simple_transactions: Vec = (0..256) + .map(|_id| { + // packets are deserialized upon receiving, failed packets will not be + // forwarded; Therefore we need to create real packets here. + let key1 = Keypair::new(); + system_transaction::transfer( + &mint_keypair, + &key1.pubkey(), + genesis_config.rent.minimum_balance(0), + genesis_config.hash(), + ) + }) + .collect_vec(); + + let mut packets: Vec = simple_transactions + .iter() + .enumerate() + .map(|(packets_id, transaction)| { + let mut p = Packet::from_data(None, transaction).unwrap(); + p.meta.port = packets_id as u16; + p.meta.set_tracer(true); + DeserializedPacket::new(p).unwrap() + }) + .collect_vec(); + + // all packets are forwarded + { + let buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + buffered_packet_batches, + ThreadType::Transactions, + ); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + .. + } = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank.clone(), + &mut forward_packet_batches_by_accounts, + ); + assert_eq!(total_forwardable_packets, 256); + assert_eq!(total_tracer_packets_in_buffer, 256); + assert_eq!(total_forwardable_tracer_packets, 256); + + // packets in a batch are forwarded in arbitrary order; verify the ports match after + // sorting + let expected_ports: Vec<_> = (0..256).collect(); + let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts + .iter_batches() + .flat_map(|batch| { + batch + .get_forwardable_packets() + .into_iter() + .map(|p| p.meta.port) + }) + .collect(); + forwarded_ports.sort_unstable(); + assert_eq!(expected_ports, forwarded_ports); + } + + // some packets are forwarded + { + let num_already_forwarded = 16; + for packet in &mut packets[0..num_already_forwarded] { + packet.forwarded = true; + } + let buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + buffered_packet_batches, + ThreadType::Transactions, + ); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + .. + } = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank.clone(), + &mut forward_packet_batches_by_accounts, + ); + assert_eq!( + total_forwardable_packets, + packets.len() - num_already_forwarded + ); + assert_eq!(total_tracer_packets_in_buffer, packets.len()); + assert_eq!( + total_forwardable_tracer_packets, + packets.len() - num_already_forwarded + ); + } + + // some packets are invalid (already processed) + { + let num_already_processed = 16; + for tx in &simple_transactions[0..num_already_processed] { + assert_eq!(current_bank.process_transaction(tx), Ok(())); + } + let buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + buffered_packet_batches, + ThreadType::Transactions, + ); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(); + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + .. + } = transaction_storage.filter_forwardable_packets_and_add_batches( + current_bank, + &mut forward_packet_batches_by_accounts, + ); + assert_eq!( + total_forwardable_packets, + packets.len() - num_already_processed + ); + assert_eq!(total_tracer_packets_in_buffer, packets.len()); + assert_eq!( + total_forwardable_tracer_packets, + packets.len() - num_already_processed + ); + } + } + + #[test] + fn test_unprocessed_transaction_storage_insert() -> Result<(), Box> { + let keypair = Keypair::new(); + let vote_keypair = Keypair::new(); + let pubkey = solana_sdk::pubkey::new_rand(); + + let small_transfer = Packet::from_data( + None, + system_transaction::transfer(&keypair, &pubkey, 1, Hash::new_unique()), + )?; + let mut vote = Packet::from_data( + None, + new_vote_state_update_transaction( + VoteStateUpdate::default(), + Hash::new_unique(), + &keypair, + &vote_keypair, + &vote_keypair, + None, + ), + )?; + vote.meta.flags.set(PacketFlags::SIMPLE_VOTE_TX, true); + let big_transfer = Packet::from_data( + None, + system_transaction::transfer(&keypair, &pubkey, 1000000, Hash::new_unique()), + )?; + + for thread_type in [ + ThreadType::Transactions, + ThreadType::Voting(VoteSource::Gossip), + ThreadType::Voting(VoteSource::Tpu), + ] { + let mut transaction_storage = UnprocessedTransactionStorage::new_transaction_storage( + UnprocessedPacketBatches::with_capacity(100), + thread_type, + ); + transaction_storage.insert_batch(vec![ + ImmutableDeserializedPacket::new(small_transfer.clone(), None)?, + ImmutableDeserializedPacket::new(vote.clone(), None)?, + ImmutableDeserializedPacket::new(big_transfer.clone(), None)?, + ]); + let deserialized_packets = transaction_storage + .iter() + .map(|packet| packet.immutable_section().original_packet().clone()) + .collect_vec(); + assert_eq!(3, deserialized_packets.len()); + assert!(deserialized_packets.contains(&small_transfer)); + assert!(deserialized_packets.contains(&vote)); + assert!(deserialized_packets.contains(&big_transfer)); + } + + for vote_source in [VoteSource::Gossip, VoteSource::Tpu] { + let mut transaction_storage = UnprocessedTransactionStorage::new_vote_storage( + Arc::new(LatestUnprocessedVotes::new()), + vote_source, + ); + transaction_storage.insert_batch(vec![ + ImmutableDeserializedPacket::new(small_transfer.clone(), None)?, + ImmutableDeserializedPacket::new(vote.clone(), None)?, + ImmutableDeserializedPacket::new(big_transfer.clone(), None)?, + ]); + assert_eq!(1, transaction_storage.len()); + } + Ok(()) + } } diff --git a/local-cluster/tests/local_cluster_slow_1.rs b/local-cluster/tests/local_cluster_slow_1.rs index 90e4f9bd4c..3e63d1bc0d 100644 --- a/local-cluster/tests/local_cluster_slow_1.rs +++ b/local-cluster/tests/local_cluster_slow_1.rs @@ -30,6 +30,7 @@ use { hash::Hash, pubkey::Pubkey, signature::Signer, + vote::state::VoteStateUpdate, }, solana_streamer::socket::SocketAddrSpace, solana_vote_program::{vote_state::MAX_LOCKOUT_HISTORY, vote_transaction}, @@ -541,22 +542,31 @@ fn test_duplicate_shreds_broadcast_leader() { // root by this validator, but we're not concerned with lockout violations // by this validator so it's fine. let leader_blockstore = open_blockstore(&bad_leader_ledger_path); - let mut vote_slots: Vec = AncestorIterator::new_inclusive( + let mut vote_slots: Vec<(Slot, u32)> = AncestorIterator::new_inclusive( latest_vote_slot, &leader_blockstore, ) .take(MAX_LOCKOUT_HISTORY) + .zip(1..) .collect(); vote_slots.reverse(); - let vote_tx = vote_transaction::new_vote_transaction( - vote_slots, - vote_hash, - leader_vote_tx.message.recent_blockhash, - &node_keypair, - &vote_keypair, - &vote_keypair, - None, - ); + let mut vote = VoteStateUpdate::from(vote_slots); + let root = AncestorIterator::new_inclusive( + latest_vote_slot, + &leader_blockstore, + ) + .nth(MAX_LOCKOUT_HISTORY); + vote.root = root; + vote.hash = vote_hash; + let vote_tx = + vote_transaction::new_compact_vote_state_update_transaction( + vote, + leader_vote_tx.message.recent_blockhash, + &node_keypair, + &vote_keypair, + &vote_keypair, + None, + ); gossip_vote_index += 1; gossip_vote_index %= MAX_LOCKOUT_HISTORY; cluster_info.push_vote_at_index(vote_tx, gossip_vote_index as u8) diff --git a/programs/sbf/Cargo.lock b/programs/sbf/Cargo.lock index af6b921d23..02dfe920cc 100644 --- a/programs/sbf/Cargo.lock +++ b/programs/sbf/Cargo.lock @@ -5088,6 +5088,7 @@ dependencies = [ "solana-frozen-abi-macro 1.15.0", "solana-measure", "solana-metrics", + "solana-perf", "solana-program-runtime", "solana-rayon-threadlimit", "solana-sdk 1.15.0", diff --git a/programs/vote/src/vote_transaction.rs b/programs/vote/src/vote_transaction.rs index 43cdbb11d6..48316daff6 100644 --- a/programs/vote/src/vote_transaction.rs +++ b/programs/vote/src/vote_transaction.rs @@ -72,3 +72,33 @@ pub fn new_vote_state_update_transaction( vote_tx.partial_sign(&[authorized_voter_keypair], blockhash); vote_tx } + +pub fn new_compact_vote_state_update_transaction( + vote_state_update: VoteStateUpdate, + blockhash: Hash, + node_keypair: &Keypair, + vote_keypair: &Keypair, + authorized_voter_keypair: &Keypair, + switch_proof_hash: Option, +) -> Transaction { + let vote_ix = if let Some(switch_proof_hash) = switch_proof_hash { + vote::instruction::compact_update_vote_state_switch( + &vote_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + vote_state_update, + switch_proof_hash, + ) + } else { + vote::instruction::compact_update_vote_state( + &vote_keypair.pubkey(), + &authorized_voter_keypair.pubkey(), + vote_state_update, + ) + }; + + let mut vote_tx = Transaction::new_with_payer(&[vote_ix], Some(&node_keypair.pubkey())); + + vote_tx.partial_sign(&[node_keypair], blockhash); + vote_tx.partial_sign(&[authorized_voter_keypair], blockhash); + vote_tx +} diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 0c1fff0578..54a8cc34ca 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -49,6 +49,7 @@ solana-frozen-abi = { path = "../frozen-abi", version = "=1.15.0" } solana-frozen-abi-macro = { path = "../frozen-abi/macro", version = "=1.15.0" } solana-measure = { path = "../measure", version = "=1.15.0" } solana-metrics = { path = "../metrics", version = "=1.15.0" } +solana-perf = { path = "../perf", version = "=1.15.0" } solana-program-runtime = { path = "../program-runtime", version = "=1.15.0" } solana-rayon-threadlimit = { path = "../rayon-threadlimit", version = "=1.15.0" } solana-sdk = { path = "../sdk", version = "=1.15.0" } diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index fe2e1be443..ff42f61759 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -83,6 +83,7 @@ use { }, solana_measure::{measure, measure::Measure}, solana_metrics::{inc_new_counter_debug, inc_new_counter_info}, + solana_perf::perf_libs, solana_program_runtime::{ accounts_data_meter::MAX_ACCOUNTS_DATA_LEN, compute_budget::{self, ComputeBudget}, @@ -105,7 +106,7 @@ use { clock::{ BankId, Epoch, Slot, SlotCount, SlotIndex, UnixTimestamp, DEFAULT_TICKS_PER_SECOND, INITIAL_RENT_EPOCH, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, - SECONDS_PER_DAY, + MAX_TRANSACTION_FORWARDING_DELAY_GPU, SECONDS_PER_DAY, }, ed25519_program, epoch_info::EpochInfo, @@ -141,7 +142,7 @@ use { sysvar::{self, Sysvar, SysvarId}, timing::years_as_slots, transaction::{ - MessageHash, Result, SanitizedTransaction, Transaction, TransactionError, + self, MessageHash, Result, SanitizedTransaction, Transaction, TransactionError, TransactionVerificationMode, VersionedTransaction, MAX_TX_ACCOUNT_LOCKS, }, transaction_context::{ @@ -7736,6 +7737,36 @@ impl Bank { .epoch_accounts_hash_manager .try_get_epoch_accounts_hash() } + + /// Checks a batch of sanitized transactions again bank for age and status + pub fn check_transactions_with_forwarding_delay( + &self, + transactions: &[SanitizedTransaction], + filter: &[transaction::Result<()>], + forward_transactions_to_leader_at_slot_offset: u64, + ) -> Vec { + let mut error_counters = TransactionErrorMetrics::default(); + // The following code also checks if the blockhash for a transaction is too old + // The check accounts for + // 1. Transaction forwarding delay + // 2. The slot at which the next leader will actually process the transaction + // Drop the transaction if it will expire by the time the next node receives and processes it + let api = perf_libs::api(); + let max_tx_fwd_delay = if api.is_none() { + MAX_TRANSACTION_FORWARDING_DELAY + } else { + MAX_TRANSACTION_FORWARDING_DELAY_GPU + }; + + self.check_transactions( + transactions, + filter, + (MAX_PROCESSING_AGE) + .saturating_sub(max_tx_fwd_delay) + .saturating_sub(forward_transactions_to_leader_at_slot_offset as usize), + &mut error_counters, + ) + } } /// Compute how much an account has changed size. This function is useful when the data size delta diff --git a/sdk/program/src/clock.rs b/sdk/program/src/clock.rs index 408fdebe2e..a28a735109 100644 --- a/sdk/program/src/clock.rs +++ b/sdk/program/src/clock.rs @@ -108,6 +108,10 @@ pub const MAX_TRANSACTION_FORWARDING_DELAY_GPU: usize = 2; /// More delay is expected if CUDA is not enabled (as signature verification takes longer) pub const MAX_TRANSACTION_FORWARDING_DELAY: usize = 6; +/// Transaction forwarding, which leader to forward to and how long to hold +pub const FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET: u64 = 2; +pub const HOLD_TRANSACTIONS_SLOT_OFFSET: u64 = 20; + /// The unit of time given to a leader for encoding a block. /// /// It is some some number of _ticks_ long.