From c1d89ad749283d6b4edb4bcd2834736900fe43ce Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Tue, 5 Jul 2022 23:24:58 -0500 Subject: [PATCH] forward packets by prioritization in desc order (#25406) - Forward packets by prioritization in desc order - Add support of cost-tracking by transaction requested compute units - Hook up account buckets to forwarder - Add metrics for forwardable batches count - Remove redundant invalid packets filtering at end of slot since forwarder will do the same when batch forwardable packets - Add bench test for forwarding --- banking-bench/src/main.rs | 20 +- core/benches/banking_stage.rs | 6 +- core/benches/unprocessed_packet_batches.rs | 86 ++- core/src/banking_stage.rs | 621 +++++++----------- .../src/forward_packet_batches_by_accounts.rs | 343 ++++++++++ core/src/leader_slot_banking_stage_metrics.rs | 44 +- ...eader_slot_banking_stage_timing_metrics.rs | 8 - core/src/lib.rs | 1 + core/src/tpu.rs | 1 + core/src/unprocessed_packet_batches.rs | 206 +++++- runtime/src/cost_tracker.rs | 76 ++- 11 files changed, 952 insertions(+), 460 deletions(-) create mode 100644 core/src/forward_packet_batches_by_accounts.rs diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index e3a910079d..a3c9019ea9 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -249,8 +249,8 @@ fn main() { let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let bank0 = Bank::new_for_benches(&genesis_config); - let mut bank_forks = BankForks::new(bank0); - let mut bank = bank_forks.working_bank(); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank0))); + let mut bank = bank_forks.read().unwrap().working_bank(); // set cost tracker limits to MAX so it will not filter out TXs bank.write_cost_tracker() @@ -357,6 +357,7 @@ fn main() { replay_vote_sender, Arc::new(RwLock::new(CostModel::default())), Arc::new(connection_cache), + bank_forks.clone(), ); poh_recorder.write().unwrap().set_bank(&bank, false); @@ -428,8 +429,8 @@ fn main() { new_bank_time.stop(); let mut insert_time = Measure::start("insert_time"); - bank_forks.insert(new_bank); - bank = bank_forks.working_bank(); + bank_forks.write().unwrap().insert(new_bank); + bank = bank_forks.read().unwrap().working_bank(); insert_time.stop(); // set cost tracker limits to MAX so it will not filter out TXs @@ -443,7 +444,10 @@ fn main() { assert!(poh_recorder.read().unwrap().bank().is_some()); if bank.slot() > 32 { leader_schedule_cache.set_root(&bank); - bank_forks.set_root(root, &AbsRequestSender::default(), None); + bank_forks + .write() + .unwrap() + .set_root(root, &AbsRequestSender::default(), None); root += 1; } debug!( @@ -476,7 +480,11 @@ fn main() { } } } - let txs_processed = bank_forks.working_bank().transaction_count(); + let txs_processed = bank_forks + .read() + .unwrap() + .working_bank() + .transaction_count(); debug!("processed: {} base: {}", txs_processed, base_tx_count); eprintln!( "{{'name': 'banking_bench_total', 'median': '{:.2}'}}", diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index 2562ad3789..fc01bdf1ac 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -25,7 +25,7 @@ use { }, solana_perf::{packet::to_packet_batches, test_tx::test_tx}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, - solana_runtime::{bank::Bank, cost_model::CostModel}, + solana_runtime::{bank::Bank, bank_forks::BankForks, cost_model::CostModel}, solana_sdk::{ genesis_config::GenesisConfig, hash::Hash, @@ -170,7 +170,8 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { let mut bank = Bank::new_for_benches(&genesis_config); // Allow arbitrary transaction processing time for the purposes of this bench bank.ns_per_slot = u128::MAX; - let bank = Arc::new(bank); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = bank_forks.read().unwrap().get(0).unwrap(); // set cost tracker limits to MAX so it will not filter out TXs bank.write_cost_tracker() @@ -232,6 +233,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { s, Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), + bank_forks, ); poh_recorder.write().unwrap().set_bank(&bank, false); diff --git a/core/benches/unprocessed_packet_batches.rs b/core/benches/unprocessed_packet_batches.rs index 1e5caf0130..11a2e46c6d 100644 --- a/core/benches/unprocessed_packet_batches.rs +++ b/core/benches/unprocessed_packet_batches.rs @@ -5,10 +5,19 @@ extern crate test; use { rand::distributions::{Distribution, Uniform}, - solana_core::unprocessed_packet_batches::*, + solana_core::{ + banking_stage::*, forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, + unprocessed_packet_batches::*, + }, solana_measure::measure::Measure, solana_perf::packet::{Packet, PacketBatch}, + solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, + std::sync::{Arc, RwLock}, test::Bencher, }; @@ -174,3 +183,78 @@ fn bench_unprocessed_packet_batches_randomized_beyond_limit(bencher: &mut Benche insert_packet_batches(buffer_capacity, batch_count, packet_per_batch_count, true); }); } + +fn build_bank_forks_for_test() -> Arc> { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new(bank); + Arc::new(RwLock::new(bank_forks)) +} + +fn buffer_iter_desc_and_forward( + buffer_max_size: usize, + batch_count: usize, + packet_per_batch_count: usize, + randomize: bool, +) { + solana_logger::setup(); + let mut unprocessed_packet_batches = UnprocessedPacketBatches::with_capacity(buffer_max_size); + + // fill buffer + { + let mut timer = Measure::start("fill_buffer"); + (0..batch_count).for_each(|_| { + let (packet_batch, packet_indexes) = if randomize { + build_randomized_packet_batch(packet_per_batch_count) + } else { + build_packet_batch(packet_per_batch_count) + }; + let deserialized_packets = deserialize_packets(&packet_batch, &packet_indexes); + unprocessed_packet_batches.insert_batch(deserialized_packets); + }); + timer.stop(); + log::info!( + "inserted {} batch, elapsed {}", + buffer_max_size, + timer.as_us() + ); + } + + // forward whole buffer + { + let mut timer = Measure::start("forward_time"); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits( + build_bank_forks_for_test().read().unwrap().root_bank(), + ); + // iter_desc buffer + let filter_forwarding_results = BankingStage::filter_valid_packets_for_forwarding( + &mut unprocessed_packet_batches, + &mut forward_packet_batches_by_accounts, + ); + timer.stop(); + + let batched_filter_forwarding_results: usize = forward_packet_batches_by_accounts + .iter_batches() + .map(|forward_batch| forward_batch.len()) + .sum(); + log::info!( + "filter_forwarding_results {:?}, batched_forwardable packets {}, elapsed {}", + filter_forwarding_results, + batched_filter_forwarding_results, + timer.as_us() + ); + } +} + +#[bench] +#[ignore] +fn bench_forwarding_unprocessed_packet_batches(bencher: &mut Bencher) { + let batch_count = 1_000; + let packet_per_batch_count = 64; + let buffer_capacity = batch_count * packet_per_batch_count; + + bencher.iter(|| { + buffer_iter_desc_and_forward(buffer_capacity, batch_count, packet_per_batch_count, true); + }); +} diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 8a3f4e3a9a..48e213dcde 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -3,6 +3,7 @@ //! can do its processing in parallel with signature verification on the GPU. use { crate::{ + forward_packet_batches_by_accounts::ForwardPacketBatchesByAccounts, leader_slot_banking_stage_metrics::{LeaderSlotMetricsTracker, ProcessTransactionsSummary}, leader_slot_banking_stage_timing_metrics::{ LeaderExecuteAndCommitTimings, RecordTransactionsTimings, @@ -37,6 +38,7 @@ use { Bank, CommitTransactionCounts, LoadAndExecuteTransactionsOutput, TransactionBalancesSet, TransactionCheckResult, }, + bank_forks::BankForks, bank_utils, cost_model::{CostModel, TransactionCost}, transaction_batch::TransactionBatch, @@ -48,13 +50,10 @@ use { Slot, DEFAULT_TICKS_PER_SLOT, MAX_PROCESSING_AGE, MAX_TRANSACTION_FORWARDING_DELAY, MAX_TRANSACTION_FORWARDING_DELAY_GPU, }, - feature_set, pubkey::Pubkey, saturating_add_assign, timing::{duration_as_ms, timestamp, AtomicInterval}, - transaction::{ - self, AddressLoader, SanitizedTransaction, TransactionError, VersionedTransaction, - }, + transaction::{self, SanitizedTransaction, TransactionError, VersionedTransaction}, transport::TransportError, }, solana_streamer::sendmmsg::batch_send, @@ -151,7 +150,6 @@ pub struct BankingStageStats { current_buffered_packet_batches_count: AtomicUsize, rebuffered_packets_count: AtomicUsize, consumed_buffered_packets_count: AtomicUsize, - end_of_slot_filtered_invalid_count: AtomicUsize, forwarded_transaction_count: AtomicUsize, forwarded_vote_count: AtomicUsize, batch_packet_indexes_len: Histogram, @@ -162,7 +160,6 @@ pub struct BankingStageStats { handle_retryable_packets_elapsed: AtomicU64, filter_pending_packets_elapsed: AtomicU64, packet_conversion_elapsed: AtomicU64, - unprocessed_packet_conversion_elapsed: AtomicU64, transaction_processing_elapsed: AtomicU64, } @@ -204,9 +201,6 @@ impl BankingStageStats { .load(Ordering::Relaxed) + self.filter_pending_packets_elapsed.load(Ordering::Relaxed) + self.packet_conversion_elapsed.load(Ordering::Relaxed) - + self - .unprocessed_packet_conversion_elapsed - .load(Ordering::Relaxed) + self.transaction_processing_elapsed.load(Ordering::Relaxed) + self.forwarded_transaction_count.load(Ordering::Relaxed) as u64 + self.forwarded_vote_count.load(Ordering::Relaxed) as u64 @@ -267,12 +261,6 @@ impl BankingStageStats { .swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "end_of_slot_filtered_invalid_count", - self.end_of_slot_filtered_invalid_count - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "forwarded_transaction_count", self.forwarded_transaction_count.swap(0, Ordering::Relaxed) as i64, @@ -312,12 +300,6 @@ impl BankingStageStats { self.packet_conversion_elapsed.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "unprocessed_packet_conversion_elapsed", - self.unprocessed_packet_conversion_elapsed - .swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "transaction_processing_elapsed", self.transaction_processing_elapsed @@ -374,12 +356,6 @@ pub struct BatchedTransactionErrorDetails { pub batched_dropped_txs_per_account_data_total_limit_count: u64, } -#[derive(Debug, Default)] -struct EndOfSlot { - next_slot_leader: Option, - working_bank: Option>, -} - /// Stores the stage's thread handle and output receiver. pub struct BankingStage { bank_thread_hdls: Vec>, @@ -400,8 +376,9 @@ pub enum ForwardOption { ForwardTransaction, } -struct FilterForwardingResults<'a> { - forwardable_packets: Vec<&'a Packet>, +#[derive(Debug)] +pub struct FilterForwardingResults { + total_forwardable_packets: usize, total_tracer_packets_in_buffer: usize, total_forwardable_tracer_packets: usize, } @@ -409,6 +386,7 @@ struct FilterForwardingResults<'a> { impl BankingStage { /// Create the stage using `bank`. Exit when `verified_receiver` is dropped. #[allow(clippy::new_ret_no_self)] + #[allow(clippy::too_many_arguments)] pub fn new( cluster_info: &Arc, poh_recorder: &Arc>, @@ -419,6 +397,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, connection_cache: Arc, + bank_forks: Arc>, ) -> Self { Self::new_num_threads( cluster_info, @@ -431,6 +410,7 @@ impl BankingStage { gossip_vote_sender, cost_model, connection_cache, + bank_forks, ) } @@ -446,6 +426,7 @@ impl BankingStage { gossip_vote_sender: ReplayVoteSender, cost_model: Arc>, connection_cache: Arc, + bank_forks: Arc>, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -478,6 +459,7 @@ impl BankingStage { let data_budget = data_budget.clone(); let cost_model = cost_model.clone(); let connection_cache = connection_cache.clone(); + let bank_forks = bank_forks.clone(); Builder::new() .name(format!("solana-banking-stage-tx-{}", i)) .spawn(move || { @@ -494,6 +476,7 @@ impl BankingStage { &data_budget, cost_model, connection_cache, + &bank_forks, ); }) .unwrap() @@ -502,32 +485,53 @@ impl BankingStage { Self { bank_thread_hdls } } - fn filter_valid_packets_for_forwarding<'a>( - deserialized_packets: impl Iterator, - ) -> FilterForwardingResults<'a> { - let mut total_forwardable_tracer_packets = 0; - let mut total_tracer_packets_in_buffer = 0; + // filter forwardable Rcs that: + // 1. are not forwarded, and + // 2. in priority order from max to min, and + // 3. not exceeding account bucket limit + // returns forwarded packets count + pub fn filter_valid_packets_for_forwarding( + buffered_packet_batches: &mut UnprocessedPacketBatches, + forward_packet_batches_by_accounts: &mut ForwardPacketBatchesByAccounts, + ) -> FilterForwardingResults { + let mut total_forwardable_tracer_packets: usize = 0; + let mut total_tracer_packets_in_buffer: usize = 0; + let mut total_forwardable_packets: usize = 0; + let mut dropped_tx_before_forwarding_count: usize = 0; + + let filter_forwardable_packet = |deserialized_packet: &mut DeserializedPacket| -> bool { + let mut result = true; + let is_tracer_packet = deserialized_packet + .immutable_section() + .original_packet() + .meta + .is_tracer_packet(); + if is_tracer_packet { + saturating_add_assign!(total_tracer_packets_in_buffer, 1); + } + if !deserialized_packet.forwarded { + saturating_add_assign!(total_forwardable_packets, 1); + if is_tracer_packet { + saturating_add_assign!(total_forwardable_tracer_packets, 1); + } + result = forward_packet_batches_by_accounts + .add_packet(deserialized_packet.immutable_section().clone()); + if !result { + saturating_add_assign!(dropped_tx_before_forwarding_count, 1); + } + } + result + }; + + buffered_packet_batches.iter_desc(filter_forwardable_packet); + + inc_new_counter_info!( + "banking_stage-dropped_tx_before_forwarding", + dropped_tx_before_forwarding_count + ); + FilterForwardingResults { - forwardable_packets: deserialized_packets - .filter_map(|deserialized_packet| { - let is_tracer_packet = deserialized_packet - .immutable_section() - .original_packet() - .meta - .is_tracer_packet(); - if is_tracer_packet { - total_tracer_packets_in_buffer += 1; - } - if !deserialized_packet.forwarded { - if is_tracer_packet { - total_forwardable_tracer_packets += 1; - } - Some(deserialized_packet.immutable_section().original_packet()) - } else { - None - } - }) - .collect(), + total_forwardable_packets, total_tracer_packets_in_buffer, total_forwardable_tracer_packets, } @@ -535,19 +539,22 @@ impl BankingStage { /// Forwards all valid, unprocessed packets in the buffer, up to a rate limit. Returns /// the number of successfully forwarded packets in second part of tuple - fn forward_buffered_packets( + fn forward_buffered_packets<'a>( connection_cache: &ConnectionCache, forward_option: &ForwardOption, cluster_info: &ClusterInfo, poh_recorder: &Arc>, socket: &UdpSocket, - filter_forwarding_results: &FilterForwardingResults, + forwardable_packets: impl Iterator, data_budget: &DataBudget, banking_stage_stats: &BankingStageStats, - tracer_packet_stats: &mut TracerPacketStats, - ) -> (std::result::Result<(), TransportError>, usize) { + ) -> ( + std::result::Result<(), TransportError>, + usize, + Option, + ) { let leader_and_addr = match forward_option { - ForwardOption::NotForward => return (Ok(()), 0), + ForwardOption::NotForward => return (Ok(()), 0, None), ForwardOption::ForwardTransaction => { next_leader_tpu_forwards(cluster_info, poh_recorder) } @@ -556,20 +563,9 @@ impl BankingStage { }; let (leader_pubkey, addr) = match leader_and_addr { Some(leader_and_addr) => leader_and_addr, - None => return (Ok(()), 0), + None => return (Ok(()), 0, None), }; - let FilterForwardingResults { - forwardable_packets, - total_forwardable_tracer_packets, - .. - } = filter_forwarding_results; - - tracer_packet_stats.increment_total_forwardable_tracer_packets( - *total_forwardable_tracer_packets, - leader_pubkey, - ); - const INTERVAL_MS: u64 = 100; const MAX_BYTES_PER_SECOND: usize = 10_000 * 1200; const MAX_BYTES_PER_INTERVAL: usize = MAX_BYTES_PER_SECOND * INTERVAL_MS as usize / 1000; @@ -582,7 +578,6 @@ impl BankingStage { }); let packet_vec: Vec<_> = forwardable_packets - .iter() .filter_map(|p| { if !p.meta.forwarded() && data_budget.take(p.meta.size) { Some(p.data(..)?.to_vec()) @@ -629,16 +624,16 @@ impl BankingStage { if let Err(err) = res { inc_new_counter_info!("banking_stage-forward_packets-failed-batches", 1); - return (Err(err), 0); + return (Err(err), 0, Some(leader_pubkey)); } } - (Ok(()), packet_vec_len) + (Ok(()), packet_vec_len, Some(leader_pubkey)) } #[allow(clippy::too_many_arguments)] pub fn consume_buffered_packets( - my_pubkey: &Pubkey, + _my_pubkey: &Pubkey, max_tx_ingestion_ns: u128, poh_recorder: &Arc>, buffered_packet_batches: &mut UnprocessedPacketBatches, @@ -655,7 +650,7 @@ impl BankingStage { let mut consumed_buffered_packets_count = 0; let buffered_packets_len = buffered_packet_batches.len(); let mut proc_start = Measure::start("consume_buffered_process"); - let mut reached_end_of_slot: Option = None; + let mut reached_end_of_slot = false; let mut retryable_packets = MinMaxHeap::with_capacity(buffered_packet_batches.capacity()); std::mem::swap( @@ -717,13 +712,10 @@ impl BankingStage { ) { let poh_recorder_lock_time = { - let (poh_recorder_locked, poh_recorder_lock_time) = + let (_poh_recorder_locked, poh_recorder_lock_time) = measure!(poh_recorder.read().unwrap(), "poh_recorder.read"); - reached_end_of_slot = Some(EndOfSlot { - next_slot_leader: poh_recorder_locked.next_slot_leader(), - working_bank: Some(working_bank), - }); + reached_end_of_slot = true; poh_recorder_lock_time }; @@ -776,19 +768,16 @@ impl BankingStage { ); result - } else if reached_end_of_slot.is_some() { + } else if reached_end_of_slot { packets_to_process } else { // mark as end-of-slot to avoid aggressively lock poh for the remaining for // packet batches in buffer let poh_recorder_lock_time = { - let (poh_recorder_locked, poh_recorder_lock_time) = + let (_poh_recorder_locked, poh_recorder_lock_time) = measure!(poh_recorder.read().unwrap(), "poh_recorder.read"); - reached_end_of_slot = Some(EndOfSlot { - next_slot_leader: poh_recorder_locked.next_slot_leader(), - working_bank: None, - }); + reached_end_of_slot = true; poh_recorder_lock_time }; slot_metrics_tracker.increment_consume_buffered_packets_poh_recorder_lock_us( @@ -805,43 +794,12 @@ impl BankingStage { &mut buffered_packet_batches.packet_priority_queue, ); - if let Some(end_of_slot) = &reached_end_of_slot { + if reached_end_of_slot { slot_metrics_tracker .set_end_of_slot_unprocessed_buffer_len(buffered_packet_batches.len() as u64); // We've hit the end of this slot, no need to perform more processing, - // just filter the remaining packets for the invalid (e.g. too old) ones - // if the working_bank is available - let mut end_of_slot_filtering_time = Measure::start("end_of_slot_filtering"); - // TODO: This doesn't have to be done at the end of every slot, can instead - // hold multiple unbuffered queues without merging them - - // TODO: update this here to filter the rest of the packets remaining - // TODO: this needs to be done even if there is no end_of_slot.working_bank - // to put retryable packets back in buffer - let end_of_slot_filtered_invalid_count = - Self::filter_unprocessed_packets_at_end_of_slot( - &end_of_slot.working_bank, - buffered_packet_batches, - my_pubkey, - end_of_slot.next_slot_leader, - banking_stage_stats, - ); - - inc_new_counter_info!( - "banking_stage-dropped_tx_before_forwarding", - end_of_slot_filtered_invalid_count - ); - slot_metrics_tracker.increment_end_of_slot_filtered_invalid_count( - end_of_slot_filtered_invalid_count as u64, - ); - banking_stage_stats - .end_of_slot_filtered_invalid_count - .fetch_add(end_of_slot_filtered_invalid_count, Ordering::Relaxed); - - end_of_slot_filtering_time.stop(); - slot_metrics_tracker - .increment_end_of_slot_filtering_us(end_of_slot_filtering_time.as_us()); + // Packet filtering will be done at `forward_packet_batches_by_accounts.add_packet()` } proc_start.stop(); @@ -920,6 +878,7 @@ impl BankingStage { slot_metrics_tracker: &mut LeaderSlotMetricsTracker, connection_cache: &ConnectionCache, tracer_packet_stats: &mut TracerPacketStats, + bank_forks: &Arc>, ) { let ((metrics_action, decision), make_decision_time) = measure!( { @@ -999,6 +958,7 @@ impl BankingStage { banking_stage_stats, connection_cache, tracer_packet_stats, + bank_forks, ), "forward", ); @@ -1021,6 +981,7 @@ impl BankingStage { banking_stage_stats, connection_cache, tracer_packet_stats, + bank_forks, ), "forward_and_hold", ); @@ -1045,6 +1006,7 @@ impl BankingStage { banking_stage_stats: &BankingStageStats, connection_cache: &ConnectionCache, tracer_packet_stats: &mut TracerPacketStats, + bank_forks: &Arc>, ) { if let ForwardOption::NotForward = forward_option { if !hold { @@ -1053,43 +1015,65 @@ impl BankingStage { return; } - let filter_forwarding_result = - Self::filter_valid_packets_for_forwarding(buffered_packet_batches.iter()); - - let forwardable_packets_len = filter_forwarding_result.forwardable_packets.len(); - let (_forward_result, sucessful_forwarded_packets_count) = Self::forward_buffered_packets( - connection_cache, - forward_option, - cluster_info, - poh_recorder, - socket, - &filter_forwarding_result, - data_budget, - banking_stage_stats, - tracer_packet_stats, + // get current root bank from bank_forks, use it to sanitize transaction and + // load all accounts from address loader; + let current_bank = bank_forks.read().unwrap().root_bank(); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new_with_default_batch_limits(current_bank); + let filter_forwarding_result = Self::filter_valid_packets_for_forwarding( + buffered_packet_batches, + &mut forward_packet_batches_by_accounts, ); - let failed_forwarded_packets_count = - forwardable_packets_len.saturating_sub(sucessful_forwarded_packets_count); + forward_packet_batches_by_accounts + .iter_batches() + .filter(|&batch| !batch.is_empty()) + .for_each(|forward_batch| { + slot_metrics_tracker.increment_forwardable_batches_count(1); - if failed_forwarded_packets_count > 0 { - slot_metrics_tracker - .increment_failed_forwarded_packets_count(failed_forwarded_packets_count as u64); - slot_metrics_tracker.increment_packet_batch_forward_failure_count(1); - } + let batched_forwardable_packets_count = forward_batch.len(); + let (_forward_result, sucessful_forwarded_packets_count, leader_pubkey) = + Self::forward_buffered_packets( + connection_cache, + forward_option, + cluster_info, + poh_recorder, + socket, + forward_batch.get_forwardable_packets(), + data_budget, + banking_stage_stats, + ); - if sucessful_forwarded_packets_count > 0 { - slot_metrics_tracker.increment_successful_forwarded_packets_count( - sucessful_forwarded_packets_count as u64, - ); - } + if let Some(leader_pubkey) = leader_pubkey { + tracer_packet_stats.increment_total_forwardable_tracer_packets( + filter_forwarding_result.total_forwardable_tracer_packets, + leader_pubkey, + ); + } + let failed_forwarded_packets_count = batched_forwardable_packets_count + .saturating_sub(sucessful_forwarded_packets_count); + + if failed_forwarded_packets_count > 0 { + slot_metrics_tracker.increment_failed_forwarded_packets_count( + failed_forwarded_packets_count as u64, + ); + slot_metrics_tracker.increment_packet_batch_forward_failure_count(1); + } + + if sucessful_forwarded_packets_count > 0 { + slot_metrics_tracker.increment_successful_forwarded_packets_count( + sucessful_forwarded_packets_count as u64, + ); + } + }); if hold { for deserialized_packet in buffered_packet_batches.iter_mut() { deserialized_packet.forwarded = true; } } else { - slot_metrics_tracker - .increment_cleared_from_buffer_after_forward_count(forwardable_packets_len as u64); + slot_metrics_tracker.increment_cleared_from_buffer_after_forward_count( + filter_forwarding_result.total_forwardable_packets as u64, + ); tracer_packet_stats.increment_total_cleared_from_buffer_after_forward( filter_forwarding_result.total_tracer_packets_in_buffer, ); @@ -1111,6 +1095,7 @@ impl BankingStage { data_budget: &DataBudget, cost_model: Arc>, connection_cache: Arc, + bank_forks: &Arc>, ) { let recorder = poh_recorder.read().unwrap().recorder(); let socket = UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -1144,6 +1129,7 @@ impl BankingStage { &mut slot_metrics_tracker, &connection_cache, &mut tracer_packet_stats, + bank_forks, ), "process_buffered_packets", ); @@ -1831,31 +1817,6 @@ impl BankingStage { .collect_vec() } - // This function deserializes packets into transactions, computes the blake3 hash of transaction - // messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned - // with their packet indexes. - #[allow(clippy::needless_collect)] - fn transaction_from_deserialized_packet( - deserialized_packet: &ImmutableDeserializedPacket, - feature_set: &Arc, - votes_only: bool, - address_loader: impl AddressLoader, - ) -> Option { - if votes_only && !deserialized_packet.is_simple_vote() { - return None; - } - - let tx = SanitizedTransaction::try_new( - deserialized_packet.transaction().clone(), - *deserialized_packet.message_hash(), - deserialized_packet.is_simple_vote(), - address_loader, - ) - .ok()?; - tx.verify_precompiles(feature_set).ok()?; - Some(tx) - } - /// This function filters pending packets that are still valid /// # Arguments /// * `transactions` - a batch of transactions deserialized from packets @@ -1934,7 +1895,7 @@ impl BankingStage { deserialized_packets .enumerate() .filter_map(|(i, deserialized_packet)| { - Self::transaction_from_deserialized_packet( + unprocessed_packet_batches::transaction_from_deserialized_packet( deserialized_packet, &bank.feature_set, bank.vote_only_bank(), @@ -2019,54 +1980,6 @@ impl BankingStage { process_transactions_summary } - // Returns the number of packets that were filtered out for - // no longer being valid (could be too old, a duplicate of something - // already processed, etc.) - fn filter_unprocessed_packets_at_end_of_slot( - bank: &Option>, - unprocessed_packets: &mut UnprocessedPacketBatches, - my_pubkey: &Pubkey, - next_leader: Option, - banking_stage_stats: &BankingStageStats, - ) -> usize { - // Check if we are the next leader. If so, let's not filter the packets - // as we'll filter it again while processing the packets. - // Filtering helps if we were going to forward the packets to some other node - let will_still_be_leader = next_leader - .map(|next_leader| next_leader == *my_pubkey) - .unwrap_or(false); - let should_filter_unprocessed_packets = !will_still_be_leader && bank.is_some(); - let original_unprocessed_packets_len = unprocessed_packets.len(); - - if should_filter_unprocessed_packets { - // If `should_filter_unprocessed_packets` is true, then the bank - // must be `Some` - let bank = bank.as_ref().unwrap(); - let mut unprocessed_packet_conversion_time = - Measure::start("unprocessed_packet_conversion"); - - let should_retain = |deserialized_packet: &mut DeserializedPacket| { - Self::transaction_from_deserialized_packet( - deserialized_packet.immutable_section(), - &bank.feature_set, - bank.vote_only_bank(), - bank.as_ref(), - ) - .is_some() - }; - unprocessed_packets.retain(should_retain); - unprocessed_packet_conversion_time.stop(); - banking_stage_stats - .unprocessed_packet_conversion_elapsed - .fetch_add( - unprocessed_packet_conversion_time.as_us(), - Ordering::Relaxed, - ); - } - - original_unprocessed_packets_len.saturating_sub(unprocessed_packets.len()) - } - fn generate_packet_indexes(vers: &PacketBatch) -> Vec { vers.iter() .enumerate() @@ -2298,6 +2211,7 @@ mod tests { }, solana_program_runtime::timings::ProgramTiming, solana_rpc::transaction_status_service::TransactionStatusService, + solana_runtime::bank_forks::BankForks, solana_sdk::{ account::AccountSharedData, hash::Hash, @@ -2309,14 +2223,10 @@ mod tests { poh_config::PohConfig, signature::{Keypair, Signer}, system_transaction, - transaction::{ - MessageHash, SimpleAddressLoader, Transaction, TransactionError, - VersionedTransaction, - }, + transaction::{MessageHash, Transaction, TransactionError, VersionedTransaction}, }, solana_streamer::{recvmmsg::recv_mmsg, socket::SocketAddrSpace}, solana_transaction_status::{TransactionStatusMeta, VersionedTransactionWithStatusMeta}, - solana_vote_program::vote_transaction, std::{ borrow::Cow, collections::HashSet, @@ -2338,7 +2248,9 @@ mod tests { #[test] fn test_banking_stage_shutdown1() { let genesis_config = create_genesis_config(2).genesis_config; - let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let (verified_sender, verified_receiver) = unbounded(); let (gossip_verified_vote_sender, gossip_verified_vote_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); @@ -2364,6 +2276,7 @@ mod tests { gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), + bank_forks, ); drop(verified_sender); drop(gossip_verified_vote_sender); @@ -2383,7 +2296,9 @@ mod tests { } = create_genesis_config(2); genesis_config.ticks_per_slot = 4; let num_extra_ticks = 2; - let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); @@ -2414,6 +2329,7 @@ mod tests { gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), + bank_forks, ); trace!("sending bank"); drop(verified_sender); @@ -2456,7 +2372,9 @@ mod tests { mint_keypair, .. } = create_slow_genesis_config(10); - let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let start_hash = bank.last_blockhash(); let (verified_sender, verified_receiver) = unbounded(); let (tpu_vote_sender, tpu_vote_receiver) = unbounded(); @@ -2489,6 +2407,7 @@ mod tests { gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), + bank_forks, ); // fund another account so we can send 2 good transactions in a single batch. @@ -2615,7 +2534,9 @@ mod tests { let entry_receiver = { // start a banking_stage to eat verified receiver - let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(&genesis_config)); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let blockstore = Arc::new( Blockstore::open(ledger_path.path()) .expect("Expected to be able to open database ledger"), @@ -2641,6 +2562,7 @@ mod tests { gossip_vote_sender, Arc::new(RwLock::new(CostModel::default())), Arc::new(ConnectionCache::default()), + bank_forks, ); // wait for banking_stage to eat the packets @@ -3337,6 +3259,11 @@ mod tests { #[test] fn test_filter_valid_packets() { solana_logger::setup(); + let GenesisConfigInfo { genesis_config, .. } = create_slow_genesis_config(10); + let bank = Bank::new_no_wallclock_throttle_for_tests(&genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let current_bank = bank_forks.read().unwrap().root_bank(); + let mut packets: Vec = (0..256) .map(|packets_id| { // packets are deserialized upon receiving, failed packets will not be @@ -3352,43 +3279,69 @@ mod tests { }) .collect_vec(); - let FilterForwardingResults { - forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - } = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); - assert_eq!(forwardable_packets.len(), 256); - assert_eq!(total_tracer_packets_in_buffer, 256); - assert_eq!(total_forwardable_tracer_packets, 256); + // all packets are forwarded + { + let mut buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new(current_bank.clone(), 1, 2); - // packets in a batch are forwarded in arbitrary order; verify the ports match after - // sorting - let expected_ports: Vec<_> = (0..256).collect(); - let mut forwarded_ports: Vec<_> = forwardable_packets - .into_iter() - .map(|p| p.meta.port) - .collect(); - forwarded_ports.sort_unstable(); - assert_eq!(expected_ports, forwarded_ports); + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + } = BankingStage::filter_valid_packets_for_forwarding( + &mut buffered_packet_batches, + &mut forward_packet_batches_by_accounts, + ); + assert_eq!(total_forwardable_packets, 256); + assert_eq!(total_tracer_packets_in_buffer, 256); + assert_eq!(total_forwardable_tracer_packets, 256); - let num_already_forwarded = 16; - for packet in &mut packets[0..num_already_forwarded] { - packet.forwarded = true; + // packets in a batch are forwarded in arbitrary order; verify the ports match after + // sorting + let expected_ports: Vec<_> = (0..256).collect(); + let mut forwarded_ports: Vec<_> = forward_packet_batches_by_accounts + .iter_batches() + .flat_map(|batch| { + batch + .get_forwardable_packets() + .into_iter() + .map(|p| p.meta.port) + }) + .collect(); + forwarded_ports.sort_unstable(); + assert_eq!(expected_ports, forwarded_ports); + } + + // some packets are forwarded + { + let num_already_forwarded = 16; + for packet in &mut packets[0..num_already_forwarded] { + packet.forwarded = true; + } + let mut buffered_packet_batches: UnprocessedPacketBatches = + UnprocessedPacketBatches::from_iter(packets.clone().into_iter(), packets.len()); + let mut forward_packet_batches_by_accounts = + ForwardPacketBatchesByAccounts::new(current_bank, 1, 2); + let FilterForwardingResults { + total_forwardable_packets, + total_tracer_packets_in_buffer, + total_forwardable_tracer_packets, + } = BankingStage::filter_valid_packets_for_forwarding( + &mut buffered_packet_batches, + &mut forward_packet_batches_by_accounts, + ); + assert_eq!( + total_forwardable_packets, + packets.len() - num_already_forwarded + ); + assert_eq!(total_tracer_packets_in_buffer, packets.len()); + assert_eq!( + total_forwardable_tracer_packets, + packets.len() - num_already_forwarded + ); } - let FilterForwardingResults { - forwardable_packets, - total_tracer_packets_in_buffer, - total_forwardable_tracer_packets, - } = BankingStage::filter_valid_packets_for_forwarding(packets.iter()); - assert_eq!( - forwardable_packets.len(), - packets.len() - num_already_forwarded - ); - assert_eq!(total_tracer_packets_in_buffer, packets.len()); - assert_eq!( - total_forwardable_tracer_packets, - packets.len() - num_already_forwarded - ); } #[test] @@ -4164,7 +4117,9 @@ mod tests { .. } = &genesis_config_info; - let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); + let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -4211,6 +4166,7 @@ mod tests { &stats, &connection_cache, &mut TracerPacketStats::new(0), + &bank_forks, ); recv_socket @@ -4263,7 +4219,9 @@ mod tests { validator_pubkey, .. } = &genesis_config_info; - let bank = Arc::new(Bank::new_no_wallclock_throttle_for_tests(genesis_config)); + let bank = Bank::new_no_wallclock_throttle_for_tests(genesis_config); + let bank_forks = Arc::new(RwLock::new(BankForks::new(bank))); + let bank = Arc::new(bank_forks.read().unwrap().get(0).unwrap()); let ledger_path = get_tmp_ledger_path_auto_delete!(); { let blockstore = Arc::new( @@ -4325,6 +4283,7 @@ mod tests { &stats, &connection_cache, &mut TracerPacketStats::new(0), + &bank_forks, ); recv_socket @@ -4360,134 +4319,6 @@ mod tests { Blockstore::destroy(ledger_path.path()).unwrap(); } - #[cfg(test)] - fn make_test_packets( - transactions: Vec, - vote_indexes: Vec, - ) -> Vec { - let capacity = transactions.len(); - let mut packet_vector = Vec::with_capacity(capacity); - for tx in transactions.iter() { - packet_vector.push(Packet::from_data(None, &tx).unwrap()); - } - for index in vote_indexes.iter() { - packet_vector[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; - } - - packet_vector - .into_iter() - .map(|p| DeserializedPacket::new(p).unwrap()) - .collect() - } - - #[test] - fn test_transaction_from_deserialized_packet() { - use solana_sdk::feature_set::FeatureSet; - let keypair = Keypair::new(); - let transfer_tx = - system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()); - let vote_tx = vote_transaction::new_vote_transaction( - vec![42], - Hash::default(), - Hash::default(), - &keypair, - &keypair, - &keypair, - None, - ); - - // packets with no votes - { - let vote_indexes = vec![]; - let packet_vector = - make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); - - let mut votes_only = false; - let txs = packet_vector.iter().filter_map(|tx| { - BankingStage::transaction_from_deserialized_packet( - tx.immutable_section(), - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ) - }); - assert_eq!(2, txs.count()); - - votes_only = true; - let txs = packet_vector.iter().filter_map(|tx| { - BankingStage::transaction_from_deserialized_packet( - tx.immutable_section(), - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ) - }); - assert_eq!(0, txs.count()); - } - - // packets with some votes - { - let vote_indexes = vec![0, 2]; - let packet_vector = make_test_packets( - vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], - vote_indexes, - ); - - let mut votes_only = false; - let txs = packet_vector.iter().filter_map(|tx| { - BankingStage::transaction_from_deserialized_packet( - tx.immutable_section(), - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ) - }); - assert_eq!(3, txs.count()); - - votes_only = true; - let txs = packet_vector.iter().filter_map(|tx| { - BankingStage::transaction_from_deserialized_packet( - tx.immutable_section(), - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ) - }); - assert_eq!(2, txs.count()); - } - - // packets with all votes - { - let vote_indexes = vec![0, 1, 2]; - let packet_vector = make_test_packets( - vec![vote_tx.clone(), vote_tx.clone(), vote_tx], - vote_indexes, - ); - - let mut votes_only = false; - let txs = packet_vector.iter().filter_map(|tx| { - BankingStage::transaction_from_deserialized_packet( - tx.immutable_section(), - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ) - }); - assert_eq!(3, txs.count()); - - votes_only = true; - let txs = packet_vector.iter().filter_map(|tx| { - BankingStage::transaction_from_deserialized_packet( - tx.immutable_section(), - &Arc::new(FeatureSet::default()), - votes_only, - SimpleAddressLoader::Disabled, - ) - }); - assert_eq!(3, txs.count()); - } - } - #[test] fn test_accumulate_batched_transaction_costs() { let signature_cost = 1; diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs new file mode 100644 index 0000000000..889747ece1 --- /dev/null +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -0,0 +1,343 @@ +use { + crate::unprocessed_packet_batches::{self, ImmutableDeserializedPacket}, + solana_perf::packet::Packet, + solana_runtime::{ + bank::Bank, + block_cost_limits, + cost_tracker::{CostTracker, CostTrackerError}, + }, + solana_sdk::pubkey::Pubkey, + std::{rc::Rc, sync::Arc}, +}; + +/// `ForwardBatch` to have half of default cost_tracker limits, as smaller batch +/// allows better granularity in composing forwarding transactions; e.g., +/// transactions in each batch are potentially more evenly distributed across accounts. +const FORWARDED_BLOCK_COMPUTE_RATIO: u32 = 2; +/// this number divided by`FORWARDED_BLOCK_COMPUTE_RATIO` is the total blocks to forward. +/// To accommodate transactions without `compute_budget` instruction, which will +/// have default 200_000 compute units, it has 100 batches as default to forward +/// up to 12_000 such transaction. (120 such transactions fill up a batch, 100 +/// batches allows 12_000 transactions) +const DEFAULT_NUMBER_OF_BATCHES: u32 = 100; + +/// `ForwardBatch` represents one forwardable batch of transactions with a +/// limited number of total compute units +#[derive(Debug)] +pub struct ForwardBatch { + cost_tracker: CostTracker, + // `forwardable_packets` keeps forwardable packets in a vector in its + // original fee prioritized order + forwardable_packets: Vec>, +} + +impl Default for ForwardBatch { + /// default ForwardBatch has cost_tracker with default limits + fn default() -> Self { + Self::new(1) + } +} + +impl ForwardBatch { + /// `ForwardBatch` keeps forwardable packets in a vector in its original fee prioritized order, + /// Number of packets are limited by `cost_tracker` with customized `limit_ratio` to lower + /// (when `limit_ratio` > 1) `cost_tracker`'s default limits. + /// Lower limits yield smaller batch for forwarding. + fn new(limit_ratio: u32) -> Self { + let mut cost_tracker = CostTracker::default(); + cost_tracker.set_limits( + block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64), + block_cost_limits::MAX_BLOCK_UNITS.saturating_div(limit_ratio as u64), + block_cost_limits::MAX_VOTE_UNITS.saturating_div(limit_ratio as u64), + ); + Self { + cost_tracker, + forwardable_packets: Vec::default(), + } + } + + fn try_add( + &mut self, + write_lock_accounts: &[Pubkey], + compute_units: u64, + immutable_packet: Rc, + ) -> Result { + let res = self.cost_tracker.try_add_requested_cus( + write_lock_accounts, + compute_units, + immutable_packet.is_simple_vote(), + ); + + if res.is_ok() { + self.forwardable_packets.push(immutable_packet); + } + res + } + + pub fn get_forwardable_packets(&self) -> impl Iterator { + self.forwardable_packets + .iter() + .map(|immutable_packet| immutable_packet.original_packet()) + } + + pub fn len(&self) -> usize { + self.forwardable_packets.len() + } + + pub fn is_empty(&self) -> bool { + self.forwardable_packets.is_empty() + } +} + +/// To avoid forward queue being saturated by transactions for single hot account, +/// the forwarder will group and send prioritized transactions by account limit +/// to allow transactions on non-congested accounts to be forwarded alongside higher fee +/// transactions that saturate those highly demanded accounts. +#[derive(Debug)] +pub struct ForwardPacketBatchesByAccounts { + // Need a `bank` to load all accounts for VersionedTransaction. Currently + // using current rooted bank for it. + current_bank: Arc, + // Forwardable packets are staged in number of batches, each batch is limited + // by cost_tracker on both account limit and block limits. Those limits are + // set as `limit_ratio` of regular block limits to facilitate quicker iteration. + forward_batches: Vec, +} + +impl ForwardPacketBatchesByAccounts { + pub fn new_with_default_batch_limits(current_bank: Arc) -> Self { + Self::new( + current_bank, + FORWARDED_BLOCK_COMPUTE_RATIO, + DEFAULT_NUMBER_OF_BATCHES, + ) + } + + pub fn new(current_bank: Arc, limit_ratio: u32, number_of_batches: u32) -> Self { + let forward_batches = (0..number_of_batches) + .map(|_| ForwardBatch::new(limit_ratio)) + .collect(); + Self { + current_bank, + forward_batches, + } + } + + pub fn add_packet(&mut self, packet: Rc) -> bool { + // do not forward packet that cannot be sanitized + if let Some(sanitized_transaction) = + unprocessed_packet_batches::transaction_from_deserialized_packet( + &packet, + &self.current_bank.feature_set, + self.current_bank.vote_only_bank(), + self.current_bank.as_ref(), + ) + { + // get write_lock_accounts + let message = sanitized_transaction.message(); + let write_lock_accounts: Vec<_> = message + .account_keys() + .iter() + .enumerate() + .filter_map(|(i, account_key)| { + if message.is_writable(i) { + Some(*account_key) + } else { + None + } + }) + .collect(); + + // get requested CUs + let requested_cu = packet.compute_unit_limit(); + + // try to fill into forward batches + self.add_packet_to_batches(&write_lock_accounts, requested_cu, packet) + } else { + false + } + } + + pub fn iter_batches(&self) -> impl Iterator { + self.forward_batches.iter() + } + + /// transaction will try to be filled into 'batches', if can't fit into first batch + /// due to cost_tracker (eg., exceeding account limit or block limit), it will try + /// next batch until either being added to one of 'bucket' or not being forwarded. + fn add_packet_to_batches( + &mut self, + write_lock_accounts: &[Pubkey], + compute_units: u64, + immutable_packet: Rc, + ) -> bool { + for forward_batch in self.forward_batches.iter_mut() { + if forward_batch + .try_add(write_lock_accounts, compute_units, immutable_packet.clone()) + .is_ok() + { + return true; + } + } + false + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::unprocessed_packet_batches::{DeserializedPacket, TransactionPriorityDetails}, + solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, + std::sync::RwLock, + }; + + fn build_bank_forks_for_test() -> Arc> { + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank = Bank::new_for_tests(&genesis_config); + let bank_forks = BankForks::new(bank); + Arc::new(RwLock::new(bank_forks)) + } + + fn build_deserialized_packet_for_test( + priority: u64, + compute_unit_limit: u64, + ) -> DeserializedPacket { + let tx = system_transaction::transfer( + &Keypair::new(), + &solana_sdk::pubkey::new_rand(), + 1, + Hash::new_unique(), + ); + let packet = Packet::from_data(None, &tx).unwrap(); + DeserializedPacket::new_with_priority_details( + packet, + TransactionPriorityDetails { + priority, + compute_unit_limit, + }, + ) + .unwrap() + } + + #[test] + fn test_try_add_to_forward_batch() { + // set test batch limit to be 1 millionth of regular block limit + let limit_ratio = 1_000_000u32; + // set requested_cu to be half of batch account limit + let requested_cu = + block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64); + + let mut forward_batch = ForwardBatch::new(limit_ratio); + + let write_lock_accounts = vec![Pubkey::new_unique(), Pubkey::new_unique()]; + let packet = build_deserialized_packet_for_test(10, requested_cu); + // first packet will be successful + assert!(forward_batch + .try_add( + &write_lock_accounts, + requested_cu, + packet.immutable_section().clone() + ) + .is_ok()); + assert_eq!(1, forward_batch.forwardable_packets.len()); + // second packet will hit account limit, therefore not added + assert!(forward_batch + .try_add( + &write_lock_accounts, + requested_cu, + packet.immutable_section().clone() + ) + .is_err()); + assert_eq!(1, forward_batch.forwardable_packets.len()); + } + + #[test] + fn test_add_packet_to_batches() { + solana_logger::setup(); + // set test batch limit to be 1 millionth of regular block limit + let limit_ratio = 1_000_000u32; + let number_of_batches = 2; + // set requested_cu to be half of batch account limit + let requested_cu = + block_cost_limits::MAX_WRITABLE_ACCOUNT_UNITS.saturating_div(limit_ratio as u64); + + let mut forward_packet_batches_by_accounts = ForwardPacketBatchesByAccounts::new( + build_bank_forks_for_test().read().unwrap().root_bank(), + limit_ratio, + number_of_batches, + ); + + // initially both batches are empty + { + let mut batches = forward_packet_batches_by_accounts.iter_batches(); + assert_eq!(0, batches.next().unwrap().len()); + assert_eq!(0, batches.next().unwrap().len()); + assert!(batches.next().is_none()); + } + + let hot_account = solana_sdk::pubkey::new_rand(); + let other_account = solana_sdk::pubkey::new_rand(); + let packet_high_priority = build_deserialized_packet_for_test(10, requested_cu); + let packet_low_priority = build_deserialized_packet_for_test(0, requested_cu); + // with 4 packets, first 3 write to same hot_account with higher priority, + // the 4th write to other_account with lower priority; + // assert the 1st and 4th fit in fist batch, the 2nd in 2nd batch and 3rd will be dropped. + + // 1st high-priority packet added to 1st batch + { + forward_packet_batches_by_accounts.add_packet_to_batches( + &[hot_account], + requested_cu, + packet_high_priority.immutable_section().clone(), + ); + let mut batches = forward_packet_batches_by_accounts.iter_batches(); + assert_eq!(1, batches.next().unwrap().len()); + assert_eq!(0, batches.next().unwrap().len()); + assert!(batches.next().is_none()); + } + + // 2nd high-priority packet added to 2nd packet + { + forward_packet_batches_by_accounts.add_packet_to_batches( + &[hot_account], + requested_cu, + packet_high_priority.immutable_section().clone(), + ); + let mut batches = forward_packet_batches_by_accounts.iter_batches(); + assert_eq!(1, batches.next().unwrap().len()); + assert_eq!(1, batches.next().unwrap().len()); + } + + // 3rd high-priority packet not included in forwarding + { + forward_packet_batches_by_accounts.add_packet_to_batches( + &[hot_account], + requested_cu, + packet_high_priority.immutable_section().clone(), + ); + let mut batches = forward_packet_batches_by_accounts.iter_batches(); + assert_eq!(1, batches.next().unwrap().len()); + assert_eq!(1, batches.next().unwrap().len()); + assert!(batches.next().is_none()); + } + + // 4rd lower priority packet added to 1st bucket on non-content account + { + forward_packet_batches_by_accounts.add_packet_to_batches( + &[other_account], + requested_cu, + packet_low_priority.immutable_section().clone(), + ); + let mut batches = forward_packet_batches_by_accounts.iter_batches(); + assert_eq!(2, batches.next().unwrap().len()); + assert_eq!(1, batches.next().unwrap().len()); + assert!(batches.next().is_none()); + } + } +} diff --git a/core/src/leader_slot_banking_stage_metrics.rs b/core/src/leader_slot_banking_stage_metrics.rs index 2307b2f0e5..06e767ed35 100644 --- a/core/src/leader_slot_banking_stage_metrics.rs +++ b/core/src/leader_slot_banking_stage_metrics.rs @@ -122,8 +122,9 @@ struct LeaderSlotPacketCountMetrics { // total number of valid unprocessed packets in the buffer that were removed after being forwarded cleared_from_buffer_after_forward_count: u64, - // total number of packets removed at the end of the slot due to being too old, duplicate, etc. - end_of_slot_filtered_invalid_count: u64, + // total number of forwardable batches that were attempted for forwarding. A forwardable batch + // is defined in `ForwardPacketBatchesByAccounts` in `forward_packet_batches_by_accounts.rs` + forwardable_batches_count: u64, } impl LeaderSlotPacketCountMetrics { @@ -222,8 +223,8 @@ impl LeaderSlotPacketCountMetrics { i64 ), ( - "end_of_slot_filtered_invalid_count", - self.end_of_slot_filtered_invalid_count as i64, + "forwardable_batches_count", + self.forwardable_batches_count as i64, i64 ), ( @@ -573,6 +574,17 @@ impl LeaderSlotMetricsTracker { } } + pub(crate) fn increment_forwardable_batches_count(&mut self, count: u64) { + if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { + saturating_add_assign!( + leader_slot_metrics + .packet_count_metrics + .forwardable_batches_count, + count + ); + } + } + pub(crate) fn increment_retryable_packets_count(&mut self, count: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { saturating_add_assign!( @@ -584,17 +596,6 @@ impl LeaderSlotMetricsTracker { } } - pub(crate) fn increment_end_of_slot_filtered_invalid_count(&mut self, count: u64) { - if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { - saturating_add_assign!( - leader_slot_metrics - .packet_count_metrics - .end_of_slot_filtered_invalid_count, - count - ); - } - } - pub(crate) fn set_end_of_slot_unprocessed_buffer_len(&mut self, len: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { leader_slot_metrics @@ -684,19 +685,6 @@ impl LeaderSlotMetricsTracker { } } - // Consuming buffered packets timing metrics - pub(crate) fn increment_end_of_slot_filtering_us(&mut self, us: u64) { - if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { - saturating_add_assign!( - leader_slot_metrics - .timing_metrics - .consume_buffered_packets_timings - .end_of_slot_filtering_us, - us - ); - } - } - pub(crate) fn increment_consume_buffered_packets_poh_recorder_lock_us(&mut self, us: u64) { if let Some(leader_slot_metrics) = &mut self.leader_slot_metrics { saturating_add_assign!( diff --git a/core/src/leader_slot_banking_stage_timing_metrics.rs b/core/src/leader_slot_banking_stage_timing_metrics.rs index a56f8a754a..2005bc2df0 100644 --- a/core/src/leader_slot_banking_stage_timing_metrics.rs +++ b/core/src/leader_slot_banking_stage_timing_metrics.rs @@ -227,9 +227,6 @@ pub(crate) struct ConsumeBufferedPacketsTimings { // Time spent grabbing poh recorder lock pub poh_recorder_lock_us: u64, - // Time spent filtering invalid packets after leader slot has ended - pub end_of_slot_filtering_us: u64, - // Time spent processing transactions pub process_packets_transactions_us: u64, } @@ -245,11 +242,6 @@ impl ConsumeBufferedPacketsTimings { self.poh_recorder_lock_us as i64, i64 ), - ( - "end_of_slot_filtering_us", - self.end_of_slot_filtering_us as i64, - i64 - ), ( "process_packets_transactions_us", self.process_packets_transactions_us as i64, diff --git a/core/src/lib.rs b/core/src/lib.rs index b47a5f125e..53d580a03a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -27,6 +27,7 @@ pub mod duplicate_repair_status; pub mod fetch_stage; pub mod find_packet_sender_stake_stage; pub mod fork_choice; +pub mod forward_packet_batches_by_accounts; pub mod gen_keys; pub mod heaviest_subtree_fork_choice; pub mod latest_validator_votes_for_frozen_banks; diff --git a/core/src/tpu.rs b/core/src/tpu.rs index 72acd127db..41b3f434e3 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -230,6 +230,7 @@ impl Tpu { replay_vote_sender, cost_model.clone(), connection_cache.clone(), + bank_forks.clone(), ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 22e8d9cce9..e5e14e346d 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -3,18 +3,23 @@ use { solana_perf::packet::{Packet, PacketBatch}, solana_program_runtime::compute_budget::ComputeBudget, solana_sdk::{ + feature_set, hash::Hash, message::{Message, SanitizedVersionedMessage}, sanitize::SanitizeError, short_vec::decode_shortu16_len, signature::Signature, - transaction::{SanitizedVersionedTransaction, Transaction, VersionedTransaction}, + transaction::{ + AddressLoader, SanitizedTransaction, SanitizedVersionedTransaction, Transaction, + VersionedTransaction, + }, }, std::{ cmp::Ordering, collections::{hash_map::Entry, HashMap}, mem::size_of, rc::Rc, + sync::Arc, }, thiserror::Error, }; @@ -36,8 +41,8 @@ pub enum DeserializedPacketError { #[derive(Debug, PartialEq, Eq)] pub struct TransactionPriorityDetails { - priority: u64, - compute_unit_limit: u64, + pub priority: u64, + pub compute_unit_limit: u64, } #[derive(Debug, PartialEq, Eq)] @@ -93,7 +98,7 @@ impl DeserializedPacket { } #[cfg(test)] - fn new_with_priority_details( + pub fn new_with_priority_details( packet: Packet, priority_details: TransactionPriorityDetails, ) -> Result { @@ -254,12 +259,40 @@ impl UnprocessedPacketBatches { self.message_hash_to_transaction.iter_mut().map(|(_k, v)| v) } + /// Iterates DeserializedPackets in descending priority (max-first) order, + /// calls FnMut for each DeserializedPacket. + pub fn iter_desc(&mut self, mut f: F) + where + F: FnMut(&mut DeserializedPacket) -> bool, + { + let mut packet_priority_queue_clone = self.packet_priority_queue.clone(); + + for immutable_packet in packet_priority_queue_clone.drain_desc() { + match self + .message_hash_to_transaction + .entry(*immutable_packet.message_hash()) + { + Entry::Vacant(_vacant_entry) => { + panic!( + "entry {} must exist to be consistent with `packet_priority_queue`", + immutable_packet.message_hash() + ); + } + Entry::Occupied(mut occupied_entry) => { + if !f(occupied_entry.get_mut()) { + return; + } + } + } + } + } + pub fn retain(&mut self, mut f: F) where F: FnMut(&mut DeserializedPacket) -> bool, { // TODO: optimize this only when number of packets - // with oudated blockhash is high + // with outdated blockhash is high let new_packet_priority_queue: MinMaxHeap> = self .packet_priority_queue .drain() @@ -415,14 +448,45 @@ pub fn transactions_to_deserialized_packets( .collect() } +// This function deserializes packets into transactions, computes the blake3 hash of transaction +// messages, and verifies secp256k1 instructions. A list of sanitized transactions are returned +// with their packet indexes. +#[allow(clippy::needless_collect)] +pub fn transaction_from_deserialized_packet( + deserialized_packet: &ImmutableDeserializedPacket, + feature_set: &Arc, + votes_only: bool, + address_loader: impl AddressLoader, +) -> Option { + if votes_only && !deserialized_packet.is_simple_vote() { + return None; + } + + let tx = SanitizedTransaction::try_new( + deserialized_packet.transaction().clone(), + *deserialized_packet.message_hash(), + deserialized_packet.is_simple_vote(), + address_loader, + ) + .ok()?; + tx.verify_precompiles(feature_set).ok()?; + Some(tx) +} + #[cfg(test)] mod tests { use { super::*, + solana_perf::packet::PacketFlags, solana_sdk::{ - compute_budget::ComputeBudgetInstruction, message::VersionedMessage, pubkey::Pubkey, - signature::Keypair, system_instruction, system_transaction, + compute_budget::ComputeBudgetInstruction, + message::VersionedMessage, + pubkey::Pubkey, + signature::{Keypair, Signer}, + system_instruction, system_transaction, + transaction::{SimpleAddressLoader, Transaction}, }, + solana_vote_program::vote_transaction, std::net::IpAddr, }; @@ -622,4 +686,132 @@ mod tests { }) ); } + + #[cfg(test)] + fn make_test_packets( + transactions: Vec, + vote_indexes: Vec, + ) -> Vec { + let capacity = transactions.len(); + let mut packet_vector = Vec::with_capacity(capacity); + for tx in transactions.iter() { + packet_vector.push(Packet::from_data(None, &tx).unwrap()); + } + for index in vote_indexes.iter() { + packet_vector[*index].meta.flags |= PacketFlags::SIMPLE_VOTE_TX; + } + + packet_vector + .into_iter() + .map(|p| DeserializedPacket::new(p).unwrap()) + .collect() + } + + #[test] + fn test_transaction_from_deserialized_packet() { + use solana_sdk::feature_set::FeatureSet; + let keypair = Keypair::new(); + let transfer_tx = + system_transaction::transfer(&keypair, &keypair.pubkey(), 1, Hash::default()); + let vote_tx = vote_transaction::new_vote_transaction( + vec![42], + Hash::default(), + Hash::default(), + &keypair, + &keypair, + &keypair, + None, + ); + + // packets with no votes + { + let vote_indexes = vec![]; + let packet_vector = + make_test_packets(vec![transfer_tx.clone(), transfer_tx.clone()], vote_indexes); + + let mut votes_only = false; + let txs = packet_vector.iter().filter_map(|tx| { + transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(2, txs.count()); + + votes_only = true; + let txs = packet_vector.iter().filter_map(|tx| { + transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(0, txs.count()); + } + + // packets with some votes + { + let vote_indexes = vec![0, 2]; + let packet_vector = make_test_packets( + vec![vote_tx.clone(), transfer_tx, vote_tx.clone()], + vote_indexes, + ); + + let mut votes_only = false; + let txs = packet_vector.iter().filter_map(|tx| { + transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(3, txs.count()); + + votes_only = true; + let txs = packet_vector.iter().filter_map(|tx| { + transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(2, txs.count()); + } + + // packets with all votes + { + let vote_indexes = vec![0, 1, 2]; + let packet_vector = make_test_packets( + vec![vote_tx.clone(), vote_tx.clone(), vote_tx], + vote_indexes, + ); + + let mut votes_only = false; + let txs = packet_vector.iter().filter_map(|tx| { + transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(3, txs.count()); + + votes_only = true; + let txs = packet_vector.iter().filter_map(|tx| { + transaction_from_deserialized_packet( + tx.immutable_section(), + &Arc::new(FeatureSet::default()), + votes_only, + SimpleAddressLoader::Disabled, + ) + }); + assert_eq!(3, txs.count()); + } + } } diff --git a/runtime/src/cost_tracker.rs b/runtime/src/cost_tracker.rs index 05f35ce9d1..a1d779a8a5 100644 --- a/runtime/src/cost_tracker.rs +++ b/runtime/src/cost_tracker.rs @@ -77,7 +77,7 @@ impl CostTracker { } } - // bench tests needs to reset limits + /// allows to adjust limits initiated during construction pub fn set_limits( &mut self, account_cost_limit: u64, @@ -95,6 +95,18 @@ impl CostTracker { Ok(self.block_cost) } + /// Using user requested compute-units to track cost. + pub fn try_add_requested_cus( + &mut self, + write_lock_accounts: &[Pubkey], + requested_cus: u64, + is_vote: bool, + ) -> Result { + self.would_fit_internal(write_lock_accounts.iter(), requested_cus, is_vote, 0)?; + self.add_transaction_cost_internal(write_lock_accounts.iter(), requested_cus, is_vote, 0); + Ok(self.block_cost) + } + pub fn update_execution_cost( &mut self, estimated_tx_cost: &TransactionCost, @@ -165,9 +177,22 @@ impl CostTracker { } fn would_fit(&self, tx_cost: &TransactionCost) -> Result<(), CostTrackerError> { - let writable_accounts = &tx_cost.writable_accounts; - let cost = tx_cost.sum(); - let vote_cost = if tx_cost.is_simple_vote { cost } else { 0 }; + self.would_fit_internal( + tx_cost.writable_accounts.iter(), + tx_cost.sum(), + tx_cost.is_simple_vote, + tx_cost.account_data_size, + ) + } + + fn would_fit_internal<'a>( + &self, + write_lock_accounts: impl Iterator, + cost: u64, + is_vote: bool, + account_data_size: u64, + ) -> Result<(), CostTrackerError> { + let vote_cost = if is_vote { cost } else { 0 }; // check against the total package cost if self.block_cost.saturating_add(cost) > self.block_cost_limit { @@ -186,9 +211,7 @@ impl CostTracker { // NOTE: Check if the total accounts data size is exceeded *before* the block accounts data // size. This way, transactions are not unnecessarily retried. - let account_data_size = self - .account_data_size - .saturating_add(tx_cost.account_data_size); + let account_data_size = self.account_data_size.saturating_add(account_data_size); if let Some(account_data_size_limit) = self.account_data_size_limit { if account_data_size > account_data_size_limit { return Err(CostTrackerError::WouldExceedAccountDataTotalLimit); @@ -200,7 +223,7 @@ impl CostTracker { } // check each account against account_cost_limit, - for account_key in writable_accounts.iter() { + for account_key in write_lock_accounts { match self.cost_by_writable_accounts.get(account_key) { Some(chained_cost) => { if chained_cost.saturating_add(cost) > self.account_cost_limit { @@ -217,9 +240,23 @@ impl CostTracker { } fn add_transaction_cost(&mut self, tx_cost: &TransactionCost) { - let cost = tx_cost.sum(); - self.add_transaction_execution_cost(tx_cost, cost); - saturating_add_assign!(self.account_data_size, tx_cost.account_data_size); + self.add_transaction_cost_internal( + tx_cost.writable_accounts.iter(), + tx_cost.sum(), + tx_cost.is_simple_vote, + tx_cost.account_data_size, + ) + } + + fn add_transaction_cost_internal<'a>( + &mut self, + write_lock_accounts: impl Iterator, + cost: u64, + is_vote: bool, + account_data_size: u64, + ) { + self.add_transaction_execution_cost_internal(write_lock_accounts, is_vote, cost); + saturating_add_assign!(self.account_data_size, account_data_size); saturating_add_assign!(self.transaction_count, 1); } @@ -234,7 +271,20 @@ impl CostTracker { /// Apply additional actual execution units to cost_tracker fn add_transaction_execution_cost(&mut self, tx_cost: &TransactionCost, adjustment: u64) { - for account_key in tx_cost.writable_accounts.iter() { + self.add_transaction_execution_cost_internal( + tx_cost.writable_accounts.iter(), + tx_cost.is_simple_vote, + adjustment, + ) + } + + fn add_transaction_execution_cost_internal<'a>( + &mut self, + write_lock_accounts: impl Iterator, + is_vote: bool, + adjustment: u64, + ) { + for account_key in write_lock_accounts { let account_cost = self .cost_by_writable_accounts .entry(*account_key) @@ -242,7 +292,7 @@ impl CostTracker { *account_cost = account_cost.saturating_add(adjustment); } self.block_cost = self.block_cost.saturating_add(adjustment); - if tx_cost.is_simple_vote { + if is_vote { self.vote_cost = self.vote_cost.saturating_add(adjustment); } }