From 52e63e2ffaa0cfae34682d1c7124efdd2e6dac54 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Thu, 23 Mar 2023 19:05:54 -0500 Subject: [PATCH] Allow banking_stage to update prioritization_fee_cache (#30853) * Allow banking_stage to update prioritization_fee_cache * Update core/src/banking_stage/committer.rs Co-authored-by: Andrew Fitzgerald * move use to top --------- Co-authored-by: Andrew Fitzgerald --- banking-bench/src/main.rs | 5 ++- core/benches/banking_stage.rs | 7 +++- core/src/banking_stage.rs | 14 ++++++- core/src/banking_stage/committer.rs | 13 +++++++ core/src/banking_stage/consumer.rs | 57 ++++++++++++++++++++++++----- core/src/tpu.rs | 3 ++ core/src/validator.rs | 3 +- 7 files changed, 88 insertions(+), 14 deletions(-) diff --git a/banking-bench/src/main.rs b/banking-bench/src/main.rs index 5d55c85b6e..cdfb4989c7 100644 --- a/banking-bench/src/main.rs +++ b/banking-bench/src/main.rs @@ -20,7 +20,9 @@ use { solana_measure::measure::Measure, solana_perf::packet::{to_packet_batches, PacketBatch}, solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, - solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_runtime::{ + bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + }, solana_sdk::{ compute_budget::ComputeBudgetInstruction, hash::Hash, @@ -450,6 +452,7 @@ fn main() { None, Arc::new(connection_cache), bank_forks.clone(), + &Arc::new(PrioritizationFeeCache::new(0u64)), ); poh_recorder.write().unwrap().set_bank(&bank, false); diff --git a/core/benches/banking_stage.rs b/core/benches/banking_stage.rs index a370a3868b..a35bd723a9 100644 --- a/core/benches/banking_stage.rs +++ b/core/benches/banking_stage.rs @@ -29,7 +29,9 @@ use { }, solana_perf::{packet::to_packet_batches, test_tx::test_tx}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, - solana_runtime::{bank::Bank, bank_forks::BankForks}, + solana_runtime::{ + bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + }, solana_sdk::{ genesis_config::GenesisConfig, hash::Hash, @@ -93,7 +95,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) { ThreadType::Transactions, ); let (s, _r) = unbounded(); - let committer = Committer::new(None, s); + let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64))); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // This tests the performance of buffering packets. // If the packet buffers are copied, performance will be poor. @@ -288,6 +290,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) { None, Arc::new(ConnectionCache::default()), bank_forks, + &Arc::new(PrioritizationFeeCache::new(0u64)), ); poh_recorder.write().unwrap().set_bank(&bank, false); diff --git a/core/src/banking_stage.rs b/core/src/banking_stage.rs index 48469aa4ad..52ea4d3b87 100644 --- a/core/src/banking_stage.rs +++ b/core/src/banking_stage.rs @@ -27,7 +27,10 @@ use { solana_measure::{measure, measure_us}, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_poh::poh_recorder::PohRecorder, - solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteSender}, + solana_runtime::{ + bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache, + vote_sender_types::ReplayVoteSender, + }, solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval}, std::{ cmp, env, @@ -290,6 +293,7 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: Arc>, + prioritization_fee_cache: &Arc, ) -> Self { Self::new_num_threads( cluster_info, @@ -303,6 +307,7 @@ impl BankingStage { log_messages_bytes_limit, connection_cache, bank_forks, + prioritization_fee_cache, ) } @@ -319,6 +324,7 @@ impl BankingStage { log_messages_bytes_limit: Option, connection_cache: Arc, bank_forks: Arc>, + prioritization_fee_cache: &Arc, ) -> Self { assert!(num_threads >= MIN_TOTAL_THREADS); // Single thread to generate entries from many banks. @@ -385,6 +391,7 @@ impl BankingStage { let committer = Committer::new( transaction_status_sender.clone(), replay_vote_sender.clone(), + prioritization_fee_cache.clone(), ); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let forwarder = Forwarder::new( @@ -640,6 +647,7 @@ mod tests { None, Arc::new(ConnectionCache::default()), bank_forks, + &Arc::new(PrioritizationFeeCache::new(0u64)), ); drop(non_vote_sender); drop(tpu_vote_sender); @@ -695,6 +703,7 @@ mod tests { None, Arc::new(ConnectionCache::default()), bank_forks, + &Arc::new(PrioritizationFeeCache::new(0u64)), ); trace!("sending bank"); drop(non_vote_sender); @@ -775,6 +784,7 @@ mod tests { None, Arc::new(ConnectionCache::default()), bank_forks, + &Arc::new(PrioritizationFeeCache::new(0u64)), ); // fund another account so we can send 2 good transactions in a single batch. @@ -936,6 +946,7 @@ mod tests { None, Arc::new(ConnectionCache::default()), bank_forks, + &Arc::new(PrioritizationFeeCache::new(0u64)), ); // wait for banking_stage to eat the packets @@ -1129,6 +1140,7 @@ mod tests { None, Arc::new(ConnectionCache::default()), bank_forks, + &Arc::new(PrioritizationFeeCache::new(0u64)), ); let keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); diff --git a/core/src/banking_stage/committer.rs b/core/src/banking_stage/committer.rs index bc7256da23..a80b350b87 100644 --- a/core/src/banking_stage/committer.rs +++ b/core/src/banking_stage/committer.rs @@ -1,5 +1,6 @@ use { crate::leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings, + itertools::Itertools, solana_ledger::{ blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances, }, @@ -11,6 +12,7 @@ use { TransactionResults, }, bank_utils, + prioritization_fee_cache::PrioritizationFeeCache, transaction_batch::TransactionBatch, vote_sender_types::ReplayVoteSender, }, @@ -37,16 +39,19 @@ pub(super) struct PreBalanceInfo { pub struct Committer { transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, + prioritization_fee_cache: Arc, } impl Committer { pub fn new( transaction_status_sender: Option, replay_vote_sender: ReplayVoteSender, + prioritization_fee_cache: Arc, ) -> Self { Self { transaction_status_sender, replay_vote_sender, + prioritization_fee_cache, } } @@ -77,6 +82,12 @@ impl Committer { let (last_blockhash, lamports_per_signature) = bank.last_blockhash_and_lamports_per_signature(); + let executed_transactions = execution_results + .iter() + .zip(batch.sanitized_transactions()) + .filter_map(|(execution_result, tx)| execution_result.was_executed().then_some(tx)) + .collect_vec(); + let (tx_results, commit_time_us) = measure_us!(bank.commit_transactions( batch.sanitized_transactions(), loaded_transactions, @@ -119,6 +130,8 @@ impl Committer { pre_balance_info, starting_transaction_index, ); + self.prioritization_fee_cache + .update(bank.clone(), executed_transactions.into_iter()); }); execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_us; (commit_time_us, commit_transaction_statuses) diff --git a/core/src/banking_stage/consumer.rs b/core/src/banking_stage/consumer.rs index 71436ae9e0..ee53170881 100644 --- a/core/src/banking_stage/consumer.rs +++ b/core/src/banking_stage/consumer.rs @@ -694,6 +694,7 @@ mod tests { solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, solana_program_runtime::timings::ProgramTiming, solana_rpc::transaction_status_service::TransactionStatusService, + solana_runtime::prioritization_fee_cache::PrioritizationFeeCache, solana_sdk::{ account::AccountSharedData, instruction::InstructionError, @@ -751,7 +752,11 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_summary = consumer.process_transactions(&bank, &Instant::now(), &transactions); @@ -891,7 +896,11 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1014,7 +1023,11 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1082,7 +1095,11 @@ mod tests { poh_recorder.write().unwrap().set_bank(&bank, false); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost(); @@ -1200,7 +1217,11 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let process_transactions_batch_output = @@ -1393,7 +1414,11 @@ mod tests { let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder))); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None); let process_transactions_summary = @@ -1517,6 +1542,7 @@ mod tests { sender: transaction_status_sender, }), replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); @@ -1654,6 +1680,7 @@ mod tests { sender: transaction_status_sender, }), replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); @@ -1712,7 +1739,11 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) @@ -1788,7 +1819,11 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed @@ -1838,7 +1873,11 @@ mod tests { ); let (replay_vote_sender, _replay_vote_receiver) = unbounded(); - let committer = Committer::new(None, replay_vote_sender); + let committer = Committer::new( + None, + replay_vote_sender, + Arc::new(PrioritizationFeeCache::new(0u64)), + ); let consumer = Consumer::new(committer, recorder, QosService::new(1), None); // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) diff --git a/core/src/tpu.rs b/core/src/tpu.rs index a86c0b470a..ff792d91f0 100644 --- a/core/src/tpu.rs +++ b/core/src/tpu.rs @@ -28,6 +28,7 @@ use { }, solana_runtime::{ bank_forks::BankForks, + prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, }, solana_sdk::{pubkey::Pubkey, signature::Keypair}, @@ -102,6 +103,7 @@ impl Tpu { banking_tracer: Arc, tracer_thread_hdl: TracerThread, tpu_enable_udp: bool, + prioritization_fee_cache: &Arc, ) -> Self { let TpuSockets { transactions: transactions_sockets, @@ -245,6 +247,7 @@ impl Tpu { log_messages_bytes_limit, connection_cache.clone(), bank_forks.clone(), + prioritization_fee_cache, ); let broadcast_stage = broadcast_type.new_broadcast_stage( diff --git a/core/src/validator.rs b/core/src/validator.rs index e518592120..eb9b85f6ba 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -867,7 +867,7 @@ impl Validator { }; // block min prioritization fee cache should be readable by RPC, and writable by validator - // (for now, by replay stage) + // (by both replay stage and banking stage) let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); let rpc_override_health_check = Arc::new(AtomicBool::new(false)); @@ -1156,6 +1156,7 @@ impl Validator { banking_tracer, tracer_thread, tpu_enable_udp, + &prioritization_fee_cache, ); datapoint_info!(