Allow banking_stage to update prioritization_fee_cache (#30853)

* Allow banking_stage to update prioritization_fee_cache

* Update core/src/banking_stage/committer.rs

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>

* move use to top

---------

Co-authored-by: Andrew Fitzgerald <apfitzge@gmail.com>
This commit is contained in:
Tao Zhu 2023-03-23 19:05:54 -05:00 committed by GitHub
parent a36e8f559c
commit 52e63e2ffa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 88 additions and 14 deletions

View File

@ -20,7 +20,9 @@ use {
solana_measure::measure::Measure, solana_measure::measure::Measure,
solana_perf::packet::{to_packet_batches, PacketBatch}, solana_perf::packet::{to_packet_batches, PacketBatch},
solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry}, solana_poh::poh_recorder::{create_test_recorder, PohRecorder, WorkingBankEntry},
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{ solana_sdk::{
compute_budget::ComputeBudgetInstruction, compute_budget::ComputeBudgetInstruction,
hash::Hash, hash::Hash,
@ -450,6 +452,7 @@ fn main() {
None, None,
Arc::new(connection_cache), Arc::new(connection_cache),
bank_forks.clone(), bank_forks.clone(),
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);

View File

@ -29,7 +29,9 @@ use {
}, },
solana_perf::{packet::to_packet_batches, test_tx::test_tx}, solana_perf::{packet::to_packet_batches, test_tx::test_tx},
solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry}, solana_poh::poh_recorder::{create_test_recorder, WorkingBankEntry},
solana_runtime::{bank::Bank, bank_forks::BankForks}, solana_runtime::{
bank::Bank, bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
},
solana_sdk::{ solana_sdk::{
genesis_config::GenesisConfig, genesis_config::GenesisConfig,
hash::Hash, hash::Hash,
@ -93,7 +95,7 @@ fn bench_consume_buffered(bencher: &mut Bencher) {
ThreadType::Transactions, ThreadType::Transactions,
); );
let (s, _r) = unbounded(); let (s, _r) = unbounded();
let committer = Committer::new(None, s); let committer = Committer::new(None, s, Arc::new(PrioritizationFeeCache::new(0u64)));
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// This tests the performance of buffering packets. // This tests the performance of buffering packets.
// If the packet buffers are copied, performance will be poor. // If the packet buffers are copied, performance will be poor.
@ -288,6 +290,7 @@ fn bench_banking(bencher: &mut Bencher, tx_type: TransactionType) {
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);

View File

@ -27,7 +27,10 @@ use {
solana_measure::{measure, measure_us}, solana_measure::{measure, measure_us},
solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH}, solana_perf::{data_budget::DataBudget, packet::PACKETS_PER_BATCH},
solana_poh::poh_recorder::PohRecorder, solana_poh::poh_recorder::PohRecorder,
solana_runtime::{bank_forks::BankForks, vote_sender_types::ReplayVoteSender}, solana_runtime::{
bank_forks::BankForks, prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval}, solana_sdk::{feature_set::allow_votes_to_directly_update_vote_state, timing::AtomicInterval},
std::{ std::{
cmp, env, cmp, env,
@ -290,6 +293,7 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self { ) -> Self {
Self::new_num_threads( Self::new_num_threads(
cluster_info, cluster_info,
@ -303,6 +307,7 @@ impl BankingStage {
log_messages_bytes_limit, log_messages_bytes_limit,
connection_cache, connection_cache,
bank_forks, bank_forks,
prioritization_fee_cache,
) )
} }
@ -319,6 +324,7 @@ impl BankingStage {
log_messages_bytes_limit: Option<usize>, log_messages_bytes_limit: Option<usize>,
connection_cache: Arc<ConnectionCache>, connection_cache: Arc<ConnectionCache>,
bank_forks: Arc<RwLock<BankForks>>, bank_forks: Arc<RwLock<BankForks>>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self { ) -> Self {
assert!(num_threads >= MIN_TOTAL_THREADS); assert!(num_threads >= MIN_TOTAL_THREADS);
// Single thread to generate entries from many banks. // Single thread to generate entries from many banks.
@ -385,6 +391,7 @@ impl BankingStage {
let committer = Committer::new( let committer = Committer::new(
transaction_status_sender.clone(), transaction_status_sender.clone(),
replay_vote_sender.clone(), replay_vote_sender.clone(),
prioritization_fee_cache.clone(),
); );
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone()); let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let forwarder = Forwarder::new( let forwarder = Forwarder::new(
@ -640,6 +647,7 @@ mod tests {
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
drop(non_vote_sender); drop(non_vote_sender);
drop(tpu_vote_sender); drop(tpu_vote_sender);
@ -695,6 +703,7 @@ mod tests {
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
trace!("sending bank"); trace!("sending bank");
drop(non_vote_sender); drop(non_vote_sender);
@ -775,6 +784,7 @@ mod tests {
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
// fund another account so we can send 2 good transactions in a single batch. // fund another account so we can send 2 good transactions in a single batch.
@ -936,6 +946,7 @@ mod tests {
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
// wait for banking_stage to eat the packets // wait for banking_stage to eat the packets
@ -1129,6 +1140,7 @@ mod tests {
None, None,
Arc::new(ConnectionCache::default()), Arc::new(ConnectionCache::default()),
bank_forks, bank_forks,
&Arc::new(PrioritizationFeeCache::new(0u64)),
); );
let keypairs = (0..100).map(|_| Keypair::new()).collect_vec(); let keypairs = (0..100).map(|_| Keypair::new()).collect_vec();

View File

@ -1,5 +1,6 @@
use { use {
crate::leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings, crate::leader_slot_banking_stage_timing_metrics::LeaderExecuteAndCommitTimings,
itertools::Itertools,
solana_ledger::{ solana_ledger::{
blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances, blockstore_processor::TransactionStatusSender, token_balances::collect_token_balances,
}, },
@ -11,6 +12,7 @@ use {
TransactionResults, TransactionResults,
}, },
bank_utils, bank_utils,
prioritization_fee_cache::PrioritizationFeeCache,
transaction_batch::TransactionBatch, transaction_batch::TransactionBatch,
vote_sender_types::ReplayVoteSender, vote_sender_types::ReplayVoteSender,
}, },
@ -37,16 +39,19 @@ pub(super) struct PreBalanceInfo {
pub struct Committer { pub struct Committer {
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
} }
impl Committer { impl Committer {
pub fn new( pub fn new(
transaction_status_sender: Option<TransactionStatusSender>, transaction_status_sender: Option<TransactionStatusSender>,
replay_vote_sender: ReplayVoteSender, replay_vote_sender: ReplayVoteSender,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Self { ) -> Self {
Self { Self {
transaction_status_sender, transaction_status_sender,
replay_vote_sender, replay_vote_sender,
prioritization_fee_cache,
} }
} }
@ -77,6 +82,12 @@ impl Committer {
let (last_blockhash, lamports_per_signature) = let (last_blockhash, lamports_per_signature) =
bank.last_blockhash_and_lamports_per_signature(); bank.last_blockhash_and_lamports_per_signature();
let executed_transactions = execution_results
.iter()
.zip(batch.sanitized_transactions())
.filter_map(|(execution_result, tx)| execution_result.was_executed().then_some(tx))
.collect_vec();
let (tx_results, commit_time_us) = measure_us!(bank.commit_transactions( let (tx_results, commit_time_us) = measure_us!(bank.commit_transactions(
batch.sanitized_transactions(), batch.sanitized_transactions(),
loaded_transactions, loaded_transactions,
@ -119,6 +130,8 @@ impl Committer {
pre_balance_info, pre_balance_info,
starting_transaction_index, starting_transaction_index,
); );
self.prioritization_fee_cache
.update(bank.clone(), executed_transactions.into_iter());
}); });
execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_us; execute_and_commit_timings.find_and_send_votes_us = find_and_send_votes_us;
(commit_time_us, commit_transaction_statuses) (commit_time_us, commit_transaction_statuses)

View File

@ -694,6 +694,7 @@ mod tests {
solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry}, solana_poh::poh_recorder::{PohRecorder, WorkingBankEntry},
solana_program_runtime::timings::ProgramTiming, solana_program_runtime::timings::ProgramTiming,
solana_rpc::transaction_status_service::TransactionStatusService, solana_rpc::transaction_status_service::TransactionStatusService,
solana_runtime::prioritization_fee_cache::PrioritizationFeeCache,
solana_sdk::{ solana_sdk::{
account::AccountSharedData, account::AccountSharedData,
instruction::InstructionError, instruction::InstructionError,
@ -751,7 +752,11 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let process_transactions_summary = let process_transactions_summary =
consumer.process_transactions(&bank, &Instant::now(), &transactions); consumer.process_transactions(&bank, &Instant::now(), &transactions);
@ -891,7 +896,11 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let process_transactions_batch_output = let process_transactions_batch_output =
@ -1014,7 +1023,11 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let process_transactions_batch_output = let process_transactions_batch_output =
@ -1082,7 +1095,11 @@ mod tests {
poh_recorder.write().unwrap().set_bank(&bank, false); poh_recorder.write().unwrap().set_bank(&bank, false);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost(); let get_block_cost = || bank.read_cost_tracker().unwrap().block_cost();
@ -1200,7 +1217,11 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &poh_recorder); let poh_simulator = simulate_poh(record_receiver, &poh_recorder);
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
let process_transactions_batch_output = let process_transactions_batch_output =
@ -1393,7 +1414,11 @@ mod tests {
let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder))); let poh_simulator = simulate_poh(record_receiver, &Arc::new(RwLock::new(poh_recorder)));
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None); let consumer = Consumer::new(committer, recorder.clone(), QosService::new(1), None);
let process_transactions_summary = let process_transactions_summary =
@ -1517,6 +1542,7 @@ mod tests {
sender: transaction_status_sender, sender: transaction_status_sender,
}), }),
replay_vote_sender, replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
); );
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
@ -1654,6 +1680,7 @@ mod tests {
sender: transaction_status_sender, sender: transaction_status_sender,
}), }),
replay_vote_sender, replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
); );
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
@ -1712,7 +1739,11 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)
@ -1788,7 +1819,11 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// When the working bank in poh_recorder is None, no packets should be processed // When the working bank in poh_recorder is None, no packets should be processed
@ -1838,7 +1873,11 @@ mod tests {
); );
let (replay_vote_sender, _replay_vote_receiver) = unbounded(); let (replay_vote_sender, _replay_vote_receiver) = unbounded();
let committer = Committer::new(None, replay_vote_sender); let committer = Committer::new(
None,
replay_vote_sender,
Arc::new(PrioritizationFeeCache::new(0u64)),
);
let consumer = Consumer::new(committer, recorder, QosService::new(1), None); let consumer = Consumer::new(committer, recorder, QosService::new(1), None);
// When the working bank in poh_recorder is None, no packets should be processed (consume will not be called) // When the working bank in poh_recorder is None, no packets should be processed (consume will not be called)

View File

@ -28,6 +28,7 @@ use {
}, },
solana_runtime::{ solana_runtime::{
bank_forks::BankForks, bank_forks::BankForks,
prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender}, vote_sender_types::{ReplayVoteReceiver, ReplayVoteSender},
}, },
solana_sdk::{pubkey::Pubkey, signature::Keypair}, solana_sdk::{pubkey::Pubkey, signature::Keypair},
@ -102,6 +103,7 @@ impl Tpu {
banking_tracer: Arc<BankingTracer>, banking_tracer: Arc<BankingTracer>,
tracer_thread_hdl: TracerThread, tracer_thread_hdl: TracerThread,
tpu_enable_udp: bool, tpu_enable_udp: bool,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Self { ) -> Self {
let TpuSockets { let TpuSockets {
transactions: transactions_sockets, transactions: transactions_sockets,
@ -245,6 +247,7 @@ impl Tpu {
log_messages_bytes_limit, log_messages_bytes_limit,
connection_cache.clone(), connection_cache.clone(),
bank_forks.clone(), bank_forks.clone(),
prioritization_fee_cache,
); );
let broadcast_stage = broadcast_type.new_broadcast_stage( let broadcast_stage = broadcast_type.new_broadcast_stage(

View File

@ -867,7 +867,7 @@ impl Validator {
}; };
// block min prioritization fee cache should be readable by RPC, and writable by validator // block min prioritization fee cache should be readable by RPC, and writable by validator
// (for now, by replay stage) // (by both replay stage and banking stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let rpc_override_health_check = Arc::new(AtomicBool::new(false));
@ -1156,6 +1156,7 @@ impl Validator {
banking_tracer, banking_tracer,
tracer_thread, tracer_thread,
tpu_enable_udp, tpu_enable_udp,
&prioritization_fee_cache,
); );
datapoint_info!( datapoint_info!(