From 8bb039d08d3a90eef0b51c9ca9ed0ad3f020eaa2 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Wed, 31 Aug 2022 08:00:55 -0500 Subject: [PATCH] collect min prioritization fees when replaying sanitized transactions (#26709) * Collect blocks' minimum prioritization fees when replaying sanitized transactions * Limits block min-fee metrics reporting to top 10 writable accounts * Add service thread to asynchronously update and finalize prioritization fee cache * Add bench test for prioritization_fee_cache Co-authored-by: Tyera Eulberg --- Cargo.lock | 1 + .../src/forward_packet_batches_by_accounts.rs | 6 +- core/src/immutable_deserialized_packet.rs | 4 +- core/src/lib.rs | 1 - core/src/replay_stage.rs | 20 +- core/src/tvu.rs | 6 +- core/src/unprocessed_packet_batches.rs | 6 +- core/src/validator.rs | 6 + ledger/src/blockstore_processor.rs | 17 + programs/bpf/Cargo.lock | 1 + runtime/Cargo.toml | 4 + runtime/benches/prioritization_fee_cache.rs | 113 +++ runtime/src/lib.rs | 3 + runtime/src/prioritization_fee.rs | 324 +++++++ runtime/src/prioritization_fee_cache.rs | 811 ++++++++++++++++++ .../src/transaction_priority_details.rs | 0 16 files changed, 1310 insertions(+), 13 deletions(-) create mode 100644 runtime/benches/prioritization_fee_cache.rs create mode 100644 runtime/src/prioritization_fee.rs create mode 100644 runtime/src/prioritization_fee_cache.rs rename {core => runtime}/src/transaction_priority_details.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 1bde67e7d1..e389650fc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6110,6 +6110,7 @@ dependencies = [ "lazy_static", "libsecp256k1", "log", + "lru", "lz4", "memmap2", "num-derive", diff --git a/core/src/forward_packet_batches_by_accounts.rs b/core/src/forward_packet_batches_by_accounts.rs index ccd367349a..3bd761a4cd 100644 --- a/core/src/forward_packet_batches_by_accounts.rs +++ b/core/src/forward_packet_batches_by_accounts.rs @@ -189,14 +189,12 @@ impl ForwardPacketBatchesByAccounts { mod tests { use { super::*, - crate::{ - transaction_priority_details::TransactionPriorityDetails, - unprocessed_packet_batches::DeserializedPacket, - }, + crate::unprocessed_packet_batches::DeserializedPacket, solana_runtime::{ bank::Bank, bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, + transaction_priority_details::TransactionPriorityDetails, }, solana_sdk::{hash::Hash, signature::Keypair, system_transaction}, std::sync::RwLock, diff --git a/core/src/immutable_deserialized_packet.rs b/core/src/immutable_deserialized_packet.rs index a54f64916c..0a12fcad44 100644 --- a/core/src/immutable_deserialized_packet.rs +++ b/core/src/immutable_deserialized_packet.rs @@ -1,8 +1,8 @@ use { - crate::transaction_priority_details::{ + solana_perf::packet::Packet, + solana_runtime::transaction_priority_details::{ GetTransactionPriorityDetails, TransactionPriorityDetails, }, - solana_perf::packet::Packet, solana_sdk::{ hash::Hash, message::Message, diff --git a/core/src/lib.rs b/core/src/lib.rs index 4133f08152..47eaa8f6ce 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -69,7 +69,6 @@ mod tower1_7_14; pub mod tower_storage; pub mod tpu; pub mod tracer_packet_stats; -pub mod transaction_priority_details; pub mod tree_diff; pub mod tvu; pub mod unfrozen_gossip_verified_vote_hashes; diff --git a/core/src/replay_stage.rs b/core/src/replay_stage.rs index 796601c32c..44f2ef5c72 100644 --- a/core/src/replay_stage.rs +++ b/core/src/replay_stage.rs @@ -57,6 +57,7 @@ use { bank::{Bank, NewBankOptions}, bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY}, commitment::BlockCommitmentCache, + prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{ @@ -398,6 +399,7 @@ impl ReplayStage { drop_bank_sender: Sender>>, block_metadata_notifier: Option, log_messages_bytes_limit: Option, + prioritization_fee_cache: Arc, ) -> Result { let mut tower = if let Some(process_blockstore) = maybe_process_blockstore { let tower = process_blockstore.process_to_create_tower()?; @@ -531,7 +533,8 @@ impl ReplayStage { block_metadata_notifier.clone(), &mut replay_timing, log_messages_bytes_limit, - replay_slots_concurrently + replay_slots_concurrently, + &prioritization_fee_cache, ); replay_active_banks_time.stop(); @@ -1716,6 +1719,7 @@ impl ReplayStage { replay_vote_sender: &ReplayVoteSender, verify_recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> result::Result { let mut w_replay_stats = replay_stats.write().unwrap(); let mut w_replay_progress = replay_progress.write().unwrap(); @@ -1735,6 +1739,7 @@ impl ReplayStage { verify_recyclers, false, log_messages_bytes_limit, + prioritization_fee_cache, )?; let tx_count_after = w_replay_progress.num_txs; let tx_count = tx_count_after - tx_count_before; @@ -2237,6 +2242,7 @@ impl ReplayStage { replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, active_bank_slots: &[Slot], + prioritization_fee_cache: &PrioritizationFeeCache, ) -> Vec { // Make mutable shared structures thread safe. let progress = RwLock::new(progress); @@ -2312,6 +2318,7 @@ impl ReplayStage { &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, + prioritization_fee_cache, ); replay_blockstore_time.stop(); replay_result.replay_result = Some(blockstore_result); @@ -2342,6 +2349,7 @@ impl ReplayStage { replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, bank_slot: Slot, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> ReplaySlotFromBlockstore { let mut replay_result = ReplaySlotFromBlockstore { is_slot_dead: false, @@ -2391,6 +2399,7 @@ impl ReplayStage { &replay_vote_sender.clone(), &verify_recyclers.clone(), log_messages_bytes_limit, + prioritization_fee_cache, ); replay_blockstore_time.stop(); replay_result.replay_result = Some(blockstore_result); @@ -2422,6 +2431,7 @@ impl ReplayStage { ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender, block_metadata_notifier: Option, replay_result_vec: &[ReplaySlotFromBlockstore], + prioritization_fee_cache: &PrioritizationFeeCache, ) -> bool { // TODO: See if processing of blockstore replay results and bank completion can be made thread safe. let mut did_complete_bank = false; @@ -2489,6 +2499,9 @@ impl ReplayStage { warn!("cost_update_sender failed sending bank stats: {:?}", err) }); + // finalize block's minimum prioritization fee cache for this bank + prioritization_fee_cache.finalize_priority_fee(bank.slot()); + assert_ne!(bank.hash(), Hash::default()); // Needs to be updated before `check_slot_agrees_with_cluster()` so that // any updates in `check_slot_agrees_with_cluster()` on fork choice take @@ -2607,6 +2620,7 @@ impl ReplayStage { replay_timing: &mut ReplayTiming, log_messages_bytes_limit: Option, replay_slots_concurrently: bool, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> bool /* completed a bank */ { let active_bank_slots = bank_forks.read().unwrap().active_bank_slots(); let num_active_banks = active_bank_slots.len(); @@ -2629,6 +2643,7 @@ impl ReplayStage { replay_timing, log_messages_bytes_limit, &active_bank_slots, + prioritization_fee_cache, ) } else { active_bank_slots @@ -2646,6 +2661,7 @@ impl ReplayStage { replay_timing, log_messages_bytes_limit, *bank_slot, + prioritization_fee_cache, ) }) .collect() @@ -2672,6 +2688,7 @@ impl ReplayStage { ancestor_hashes_replay_update_sender, block_metadata_notifier, &replay_result_vec, + prioritization_fee_cache, ) } else { false @@ -4162,6 +4179,7 @@ pub(crate) mod tests { &replay_vote_sender, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests( diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 95a7255b8f..5e4adce7a3 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -42,7 +42,7 @@ use { solana_runtime::{ accounts_background_service::AbsRequestSender, bank_forks::BankForks, commitment::BlockCommitmentCache, cost_model::CostModel, - vote_sender_types::ReplayVoteSender, + prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender, }, solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair}, solana_tpu_client::connection_cache::ConnectionCache, @@ -130,6 +130,7 @@ impl Tvu { accounts_background_request_sender: AbsRequestSender, log_messages_bytes_limit: Option, connection_cache: &Arc, + prioritization_fee_cache: &Arc, ) -> Result { let TvuSockets { repair: repair_socket, @@ -290,6 +291,7 @@ impl Tvu { drop_bank_sender, block_metadata_notifier, log_messages_bytes_limit, + prioritization_fee_cache.clone(), )?; let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| { @@ -403,6 +405,7 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); + let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); let tvu = Tvu::new( &vote_keypair.pubkey(), Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])), @@ -452,6 +455,7 @@ pub mod tests { AbsRequestSender::default(), None, &Arc::new(ConnectionCache::default()), + &_ignored_prioritization_fee_cache, ) .expect("assume success"); exit.store(true, Ordering::Relaxed); diff --git a/core/src/unprocessed_packet_batches.rs b/core/src/unprocessed_packet_batches.rs index 82ff8f092a..490ab29c41 100644 --- a/core/src/unprocessed_packet_batches.rs +++ b/core/src/unprocessed_packet_batches.rs @@ -1,10 +1,8 @@ use { - crate::{ - immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, - transaction_priority_details::TransactionPriorityDetails, - }, + crate::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket}, min_max_heap::MinMaxHeap, solana_perf::packet::{Packet, PacketBatch}, + solana_runtime::transaction_priority_details::TransactionPriorityDetails, solana_sdk::{ feature_set, hash::Hash, diff --git a/core/src/validator.rs b/core/src/validator.rs index a60bbb3916..d52bdbcd10 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -78,6 +78,7 @@ use { commitment::BlockCommitmentCache, cost_model::CostModel, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, + prioritization_fee_cache::PrioritizationFeeCache, runtime_config::RuntimeConfig, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, @@ -771,6 +772,10 @@ impl Validator { false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)), }; + // block min prioritization fee cache should be readable by RPC, and writable by validator + // (for now, by replay stage) + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + let rpc_override_health_check = Arc::new(AtomicBool::new(false)); let ( json_rpc_service, @@ -996,6 +1001,7 @@ impl Validator { accounts_background_request_sender, config.runtime_config.log_messages_bytes_limit, &connection_cache, + &prioritization_fee_cache, )?; let tpu = Tpu::new( diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 21755ffbfb..29660d2fae 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -31,6 +31,7 @@ use { block_cost_limits::*, commitment::VOTE_THRESHOLD_SIZE, cost_model::CostModel, + prioritization_fee_cache::PrioritizationFeeCache, runtime_config::RuntimeConfig, transaction_batch::TransactionBatch, vote_account::VoteAccountsHashMap, @@ -516,6 +517,7 @@ pub fn process_entries_for_tests( }) .collect(); + let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); let result = process_entries_with_callback( bank, &mut replay_entries, @@ -526,6 +528,7 @@ pub fn process_entries_for_tests( &mut confirmation_timing, Arc::new(RwLock::new(BlockCostCapacityMeter::default())), None, + &_ignored_prioritization_fee_cache, ); debug!("process_entries: {:?}", confirmation_timing); @@ -544,6 +547,7 @@ fn process_entries_with_callback( confirmation_timing: &mut ConfirmationTiming, cost_capacity_meter: Arc>, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> Result<()> { // accumulator for entries that can be processed in parallel let mut batches = vec![]; @@ -606,6 +610,9 @@ fn process_entries_with_callback( batch, transaction_indexes, }); + // entry is scheduled to be processed, transactions in it can be used to + // update prioritization fee cache asynchronously. + prioritization_fee_cache.update(bank.clone(), transactions.iter()); // done with this entry break; } @@ -954,6 +961,8 @@ fn confirm_full_slot( ) -> result::Result<(), BlockstoreProcessorError> { let mut confirmation_timing = ConfirmationTiming::default(); let skip_verification = !opts.poh_verify; + let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64); + confirm_slot( blockstore, bank, @@ -966,6 +975,7 @@ fn confirm_full_slot( recyclers, opts.allow_dead_slots, opts.runtime_config.log_messages_bytes_limit, + &_ignored_prioritization_fee_cache, )?; timing.accumulate(&confirmation_timing.execute_timings); @@ -1092,6 +1102,7 @@ pub fn confirm_slot( recyclers: &VerifyRecyclers, allow_dead_slots: bool, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); @@ -1120,6 +1131,7 @@ pub fn confirm_slot( entry_callback, recyclers, log_messages_bytes_limit, + prioritization_fee_cache, ) } @@ -1135,6 +1147,7 @@ fn confirm_slot_entries( entry_callback: Option<&ProcessCallback>, recyclers: &VerifyRecyclers, log_messages_bytes_limit: Option, + prioritization_fee_cache: &PrioritizationFeeCache, ) -> result::Result<(), BlockstoreProcessorError> { let slot = bank.slot(); let (entries, num_shreds, slot_full) = slot_entries_load_result; @@ -1236,6 +1249,7 @@ fn confirm_slot_entries( timing, cost_capacity_meter, log_messages_bytes_limit, + prioritization_fee_cache, ) .map_err(BlockstoreProcessorError::from); replay_elapsed.stop(); @@ -4061,6 +4075,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ) } @@ -4204,6 +4219,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ) .unwrap(); assert_eq!(progress.num_txs, 2); @@ -4249,6 +4265,7 @@ pub mod tests { None, &VerifyRecyclers::default(), None, + &PrioritizationFeeCache::new(0u64), ) .unwrap(); assert_eq!(progress.num_txs, 5); diff --git a/programs/bpf/Cargo.lock b/programs/bpf/Cargo.lock index 3188fe1cdc..9fc28ac10b 100644 --- a/programs/bpf/Cargo.lock +++ b/programs/bpf/Cargo.lock @@ -5453,6 +5453,7 @@ dependencies = [ "itertools", "lazy_static", "log", + "lru", "lz4", "memmap2", "num-derive", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 6a0c54fbbe..251fc0ef1c 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -27,6 +27,7 @@ index_list = "0.2.7" itertools = "0.10.3" lazy_static = "1.4.0" log = "0.4.17" +lru = "0.7.7" lz4 = "1.24.0" memmap2 = "0.5.3" num-derive = { version = "0.3" } @@ -78,3 +79,6 @@ targets = ["x86_64-unknown-linux-gnu"] [build-dependencies] rustc_version = "0.4" + +[[bench]] +name = "prioritization_fee_cache" diff --git a/runtime/benches/prioritization_fee_cache.rs b/runtime/benches/prioritization_fee_cache.rs new file mode 100644 index 0000000000..e04e783b99 --- /dev/null +++ b/runtime/benches/prioritization_fee_cache.rs @@ -0,0 +1,113 @@ +#![feature(test)] +extern crate test; + +use { + rand::{thread_rng, Rng}, + solana_runtime::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + prioritization_fee_cache::*, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + message::Message, + pubkey::Pubkey, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + std::sync::Arc, + test::Bencher, +}; +const TRANSFER_TRANSACTION_COMPUTE_UNIT: u32 = 200; + +fn build_sanitized_transaction( + compute_unit_price: u64, + signer_account: &Pubkey, + write_account: &Pubkey, +) -> SanitizedTransaction { + let transfer_lamports = 1; + let transaction = Transaction::new_unsigned(Message::new( + &[ + system_instruction::transfer(signer_account, write_account, transfer_lamports), + ComputeBudgetInstruction::set_compute_unit_limit(TRANSFER_TRANSACTION_COMPUTE_UNIT), + ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), + ], + Some(signer_account), + )); + + SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap() +} + +#[bench] +#[ignore] +fn bench_process_transactions_single_slot(bencher: &mut Bencher) { + let prioritization_fee_cache = PrioritizationFeeCache::default(); + + let bank = Arc::new(Bank::default_for_tests()); + + // build test transactions + let transactions: Vec<_> = (0..5000) + .map(|n| { + let compute_unit_price = n % 7; + build_sanitized_transaction( + compute_unit_price, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ) + }) + .collect(); + + bencher.iter(|| { + prioritization_fee_cache.update(bank.clone(), transactions.iter()); + }); +} + +fn process_transactions_multiple_slots(banks: &[Arc], num_slots: usize, num_threads: usize) { + let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default()); + + let pool = rayon::ThreadPoolBuilder::new() + .num_threads(num_threads) + .build() + .unwrap(); + + // each threads updates a slot a batch of 50 transactions, for 100 times + for _ in 0..100 { + pool.install(|| { + let transactions: Vec<_> = (0..50) + .map(|n| { + let compute_unit_price = n % 7; + build_sanitized_transaction( + compute_unit_price, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ) + }) + .collect(); + + let index = thread_rng().gen_range(0, num_slots); + + prioritization_fee_cache.update(banks[index].clone(), transactions.iter()); + }) + } +} + +#[bench] +#[ignore] +fn bench_process_transactions_multiple_slots(bencher: &mut Bencher) { + const NUM_SLOTS: usize = 5; + const NUM_THREADS: usize = 3; + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let banks = (1..=NUM_SLOTS) + .map(|n| Arc::new(Bank::new_from_parent(&bank, &collector, n as u64))) + .collect::>(); + + bencher.iter(|| { + process_transactions_multiple_slots(&banks, NUM_SLOTS, NUM_THREADS); + }); +} diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 16a3f78e6d..15cc059f9a 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -47,6 +47,8 @@ pub mod loader_utils; pub mod message_processor; pub mod non_circulating_supply; mod nonce_keyed_account; +pub mod prioritization_fee; +pub mod prioritization_fee_cache; mod pubkey_bins; mod read_only_accounts_cache; pub mod rent_collector; @@ -73,6 +75,7 @@ mod storable_accounts; mod system_instruction_processor; pub mod transaction_batch; pub mod transaction_error_metrics; +pub mod transaction_priority_details; mod verify_accounts_hash_in_background; pub mod vote_account; pub mod vote_parser; diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs new file mode 100644 index 0000000000..3c22509027 --- /dev/null +++ b/runtime/src/prioritization_fee.rs @@ -0,0 +1,324 @@ +use { + solana_measure::measure, + solana_sdk::{clock::Slot, pubkey::Pubkey, saturating_add_assign}, + std::collections::HashMap, +}; + +#[derive(Debug, Default)] +struct PrioritizationFeeMetrics { + // Count of writable accounts in slot + total_writable_accounts_count: u64, + + // Count of writeable accounts with a minimum prioritization fee higher than the minimum transaction + // fee for this slot. + relevant_writable_accounts_count: u64, + + // Total prioritization fees included in this slot. + total_prioritization_fee: u64, + + // Accumulated time spent on tracking prioritization fee for each slot. + total_update_elapsed_us: u64, +} + +impl PrioritizationFeeMetrics { + fn accumulate_total_prioritization_fee(&mut self, val: u64) { + saturating_add_assign!(self.total_prioritization_fee, val); + } + + fn accumulate_total_update_elapsed_us(&mut self, val: u64) { + saturating_add_assign!(self.total_update_elapsed_us, val); + } + + fn report(&self, slot: Slot) { + datapoint_info!( + "block_prioritization_fee", + ("slot", slot as i64, i64), + ( + "total_writable_accounts_count", + self.total_writable_accounts_count as i64, + i64 + ), + ( + "relevant_writable_accounts_count", + self.relevant_writable_accounts_count as i64, + i64 + ), + ( + "total_prioritization_fee", + self.total_prioritization_fee as i64, + i64 + ), + ( + "total_update_elapsed_us", + self.total_update_elapsed_us as i64, + i64 + ), + ); + } +} + +pub enum PrioritizationFeeError { + // Not able to get account locks from sanitized transaction, which is required to update block + // minimum fees. + FailGetTransactionAccountLocks, + + // Not able to read priority details, including compute-unit price, from transaction. + // Compute-unit price is required to update block minimum fees. + FailGetTransactionPriorityDetails, + + // Block is already finalized, trying to finalize it again is usually unexpected + BlockIsAlreadyFinalized, +} + +/// Block minimum prioritization fee stats, includes the minimum prioritization fee for a transaction in this +/// block; and the minimum fee for each writable account in all transactions in this block. The only relevant +/// write account minimum fees are those greater than the block minimum transaction fee, because the minimum fee needed to land +/// a transaction is determined by Max( min_transaction_fee, min_writable_account_fees(key), ...) +#[derive(Debug)] +pub struct PrioritizationFee { + // The minimum prioritization fee of transactions that landed in this block. + min_transaction_fee: u64, + + // The minimum prioritization fee of each writable account in transactions in this block. + min_writable_account_fees: HashMap, + + // Default to `false`, set to `true` when a block is completed, therefore the minimum fees recorded + // are finalized, and can be made available for use (e.g., RPC query) + is_finalized: bool, + + // slot prioritization fee metrics + metrics: PrioritizationFeeMetrics, +} + +impl Default for PrioritizationFee { + fn default() -> Self { + PrioritizationFee { + min_transaction_fee: u64::MAX, + min_writable_account_fees: HashMap::new(), + is_finalized: false, + metrics: PrioritizationFeeMetrics::default(), + } + } +} + +impl PrioritizationFee { + /// Update self for minimum transaction fee in the block and minimum fee for each writable account. + pub fn update( + &mut self, + transaction_fee: u64, + writable_accounts: &[Pubkey], + ) -> Result<(), PrioritizationFeeError> { + let (_, update_time) = measure!( + { + if transaction_fee < self.min_transaction_fee { + self.min_transaction_fee = transaction_fee; + } + + for write_account in writable_accounts.iter() { + self.min_writable_account_fees + .entry(*write_account) + .and_modify(|write_lock_fee| { + *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) + }) + .or_insert(transaction_fee); + } + + self.metrics + .accumulate_total_prioritization_fee(transaction_fee); + }, + "update_time", + ); + + self.metrics + .accumulate_total_update_elapsed_us(update_time.as_us()); + Ok(()) + } + + /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are + /// removed to reduce memory footprint when mark_block_completed() is called. + fn prune_irrelevant_writable_accounts(&mut self) { + self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64; + self.min_writable_account_fees + .retain(|_, account_fee| account_fee > &mut self.min_transaction_fee); + self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64; + } + + pub fn mark_block_completed(&mut self) -> Result<(), PrioritizationFeeError> { + if self.is_finalized { + return Err(PrioritizationFeeError::BlockIsAlreadyFinalized); + } + self.prune_irrelevant_writable_accounts(); + self.is_finalized = true; + Ok(()) + } + + pub fn get_min_transaction_fee(&self) -> Option { + (self.min_transaction_fee != u64::MAX).then_some(self.min_transaction_fee) + } + + pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option { + self.min_writable_account_fees.get(key).copied() + } + + pub fn get_writable_account_fees(&self) -> impl Iterator { + self.min_writable_account_fees.iter() + } + + pub fn get_writable_accounts_count(&self) -> usize { + self.min_writable_account_fees.len() + } + + pub fn is_finalized(&self) -> bool { + self.is_finalized + } + + pub fn report_metrics(&self, slot: Slot) { + self.metrics.report(slot); + + // report this slot's min_transaction_fee and top 10 min_writable_account_fees + let min_transaction_fee = self.get_min_transaction_fee().unwrap_or(0); + let mut accounts_fees: Vec<_> = self.get_writable_account_fees().collect(); + accounts_fees.sort_by(|lh, rh| rh.1.cmp(lh.1)); + datapoint_info!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", "block", String), + ("min_prioritization_fee", min_transaction_fee as i64, i64), + ); + for (account_key, fee) in accounts_fees.iter().take(10) { + datapoint_info!( + "block_min_prioritization_fee", + ("slot", slot as i64, i64), + ("entity", account_key.to_string(), String), + ("min_prioritization_fee", **fee as i64, i64), + ); + } + } +} + +#[cfg(test)] +mod tests { + use {super::*, solana_sdk::pubkey::Pubkey}; + + #[test] + fn test_update_prioritization_fee() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + let mut prioritization_fee = PrioritizationFee::default(); + assert!(prioritization_fee.get_min_transaction_fee().is_none()); + + // Assert for 1st transaction + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [5, a, b ] --> [5, 5, 5, nil ] + { + assert!(prioritization_fee + .update(5, &[write_account_a, write_account_b]) + .is_ok()); + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_a) + .unwrap() + ); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert!(prioritization_fee + .get_writable_account_fee(&write_account_c) + .is_none()); + } + + // Assert for second transaction: + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [9, b, c ] --> [5, 5, 5, 9 ] + { + assert!(prioritization_fee + .update(9, &[write_account_b, write_account_c]) + .is_ok()); + assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_a) + .unwrap() + ); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 9, + prioritization_fee + .get_writable_account_fee(&write_account_c) + .unwrap() + ); + } + + // Assert for third transaction: + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [2, a, c ] --> [2, 2, 5, 2 ] + { + assert!(prioritization_fee + .update(2, &[write_account_a, write_account_c]) + .is_ok()); + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); + assert_eq!( + 2, + prioritization_fee + .get_writable_account_fee(&write_account_a) + .unwrap() + ); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert_eq!( + 2, + prioritization_fee + .get_writable_account_fee(&write_account_c) + .unwrap() + ); + } + + // assert after prune, account a and c should be removed from cache to save space + { + prioritization_fee.prune_irrelevant_writable_accounts(); + assert_eq!(1, prioritization_fee.min_writable_account_fees.len()); + assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); + assert!(prioritization_fee + .get_writable_account_fee(&write_account_a) + .is_none()); + assert_eq!( + 5, + prioritization_fee + .get_writable_account_fee(&write_account_b) + .unwrap() + ); + assert!(prioritization_fee + .get_writable_account_fee(&write_account_c) + .is_none()); + } + } + + #[test] + fn test_mark_block_completed() { + let mut prioritization_fee = PrioritizationFee::default(); + + assert!(prioritization_fee.mark_block_completed().is_ok()); + assert!(prioritization_fee.mark_block_completed().is_err()); + } +} diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs new file mode 100644 index 0000000000..3ecf11cdc6 --- /dev/null +++ b/runtime/src/prioritization_fee_cache.rs @@ -0,0 +1,811 @@ +use { + crate::{ + bank::Bank, prioritization_fee::*, + transaction_priority_details::GetTransactionPriorityDetails, + }, + crossbeam_channel::{unbounded, Receiver, Sender}, + log::*, + lru::LruCache, + solana_measure::measure, + solana_sdk::{ + clock::Slot, pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction, + }, + std::{ + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, Mutex, RwLock, + }, + thread::{Builder, JoinHandle}, + }, +}; + +/// The maximum number of blocks to keep in `PrioritizationFeeCache`, ie. +/// the amount of history generally desired to estimate the prioritization fee needed to +/// land a transaction in the current block. +const MAX_NUM_RECENT_BLOCKS: u64 = 150; + +#[derive(Debug, Default)] +struct PrioritizationFeeCacheMetrics { + // Count of transactions that successfully updated each slot's prioritization fee cache. + successful_transaction_update_count: AtomicU64, + + // Accumulated time spent on tracking prioritization fee for each slot. + total_update_elapsed_us: AtomicU64, + + // Accumulated time spent on acquiring cache write lock. + total_cache_lock_elapsed_us: AtomicU64, + + // Accumulated time spent on acquiring each block entry's lock.. + total_entry_lock_elapsed_us: AtomicU64, + + // Accumulated time spent on updating block prioritization fees. + total_entry_update_elapsed_us: AtomicU64, + + // Accumulated time spent on finalizing block prioritization fees. + total_block_finalize_elapsed_us: AtomicU64, +} + +impl PrioritizationFeeCacheMetrics { + fn accumulate_successful_transaction_update_count(&self, val: u64) { + self.successful_transaction_update_count + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_update_elapsed_us(&self, val: u64) { + self.total_update_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_cache_lock_elapsed_us(&self, val: u64) { + self.total_cache_lock_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_entry_lock_elapsed_us(&self, val: u64) { + self.total_entry_lock_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_entry_update_elapsed_us(&self, val: u64) { + self.total_entry_update_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn accumulate_total_block_finalize_elapsed_us(&self, val: u64) { + self.total_block_finalize_elapsed_us + .fetch_add(val, Ordering::Relaxed); + } + + fn report(&self, slot: Slot) { + datapoint_info!( + "block_prioritization_fee_counters", + ("slot", slot as i64, i64), + ( + "successful_transaction_update_count", + self.successful_transaction_update_count + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_update_elapsed_us", + self.total_update_elapsed_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_cache_lock_elapsed_us", + self.total_cache_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_entry_lock_elapsed_us", + self.total_entry_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_entry_update_elapsed_us", + self.total_entry_update_elapsed_us + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ( + "total_block_finalize_elapsed_us", + self.total_block_finalize_elapsed_us + .swap(0, Ordering::Relaxed) as i64, + i64 + ), + ); + } +} + +enum CacheServiceUpdate { + TransactionUpdate { + slot: Slot, + transaction_fee: u64, + writable_accounts: Arc>, + }, + BankFrozen { + slot: Slot, + }, + Exit, +} + +/// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, +/// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, +/// and collecting stats and reporting metrics. +pub struct PrioritizationFeeCache { + cache: Arc>>>>, + service_thread: Option>, + sender: Sender, + metrics: Arc, +} + +impl Default for PrioritizationFeeCache { + fn default() -> Self { + Self::new(MAX_NUM_RECENT_BLOCKS) + } +} + +impl Drop for PrioritizationFeeCache { + fn drop(&mut self) { + let _ = self.sender.send(CacheServiceUpdate::Exit); + self.service_thread + .take() + .unwrap() + .join() + .expect("Prioritization fee cache servicing thread failed to join"); + } +} + +impl PrioritizationFeeCache { + pub fn new(capacity: u64) -> Self { + let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); + let (sender, receiver) = unbounded(); + let cache = Arc::new(RwLock::new(LruCache::new(capacity as usize))); + + let cache_clone = cache.clone(); + let metrics_clone = metrics.clone(); + let service_thread = Some( + Builder::new() + .name("prioritization-fee-cache-servicing-thread".to_string()) + .spawn(move || { + Self::service_loop(cache_clone, receiver, metrics_clone); + }) + .unwrap(), + ); + + PrioritizationFeeCache { + cache, + service_thread, + sender, + metrics, + } + } + + /// Get prioritization fee entry, create new entry if necessary + fn get_prioritization_fee( + cache: Arc>>>>, + slot: &Slot, + ) -> Arc> { + let mut cache = cache.write().unwrap(); + match cache.get(slot) { + Some(entry) => Arc::clone(entry), + None => { + let entry = Arc::new(Mutex::new(PrioritizationFee::default())); + cache.put(*slot, Arc::clone(&entry)); + entry + } + } + } + + /// Update with a list of transactions' tx_priority_details and tx_account_locks; Only + /// transactions have both valid priority_detail and account_locks will be used to update + /// fee_cache asynchronously. + pub fn update<'a>(&self, bank: Arc, txs: impl Iterator) { + let mut successful_transaction_update_count: u64 = 0; + let (_, send_updates_time) = measure!( + { + for sanitized_transaction in txs { + let priority_details = sanitized_transaction.get_transaction_priority_details(); + let account_locks = sanitized_transaction + .get_account_locks(bank.get_transaction_account_lock_limit()); + + if priority_details.is_none() || account_locks.is_err() { + continue; + } + + let writable_accounts = Arc::new( + account_locks + .unwrap() + .writable + .iter() + .map(|key| **key) + .collect::>(), + ); + + self.sender + .send(CacheServiceUpdate::TransactionUpdate { + slot: bank.slot(), + transaction_fee: priority_details.unwrap().priority, + writable_accounts, + }) + .unwrap_or_else(|err| { + warn!( + "prioritization fee cache transaction updates failed: {:?}", + err + ); + }); + saturating_add_assign!(successful_transaction_update_count, 1) + } + }, + "send_updates", + ); + + self.metrics + .accumulate_total_update_elapsed_us(send_updates_time.as_us()); + self.metrics + .accumulate_successful_transaction_update_count(successful_transaction_update_count); + } + + /// Finalize prioritization fee when it's bank is completely replayed from blockstore, + /// by pruning irrelevant accounts to save space, and marking its availability for queries. + pub fn finalize_priority_fee(&self, slot: Slot) { + self.sender + .send(CacheServiceUpdate::BankFrozen { slot }) + .unwrap_or_else(|err| { + warn!( + "prioritization fee cache signalling bank frozen failed: {:?}", + err + ) + }); + } + + /// Internal function is invoked by worker thread to update slot's minimum prioritization fee, + /// Cache lock contends here. + fn update_cache( + cache: Arc>>>>, + slot: &Slot, + transaction_fee: u64, + writable_accounts: Arc>, + metrics: Arc, + ) { + let (block_prioritization_fee, cache_lock_time) = + measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); + + let (mut block_prioritization_fee, entry_lock_time) = + measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time"); + + let (_, entry_update_time) = measure!( + block_prioritization_fee.update(transaction_fee, &writable_accounts), + "entry_update_time" + ); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); + metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us()); + metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); + } + + fn finalize_slot( + cache: Arc>>>>, + slot: &Slot, + metrics: Arc, + ) { + let (block_prioritization_fee, cache_lock_time) = + measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); + + let (mut block_prioritization_fee, entry_lock_time) = + measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time"); + + // prune cache by evicting write account entry from prioritization fee if its fee is less + // or equal to block's minimum transaction fee, because they are irrelevant in calculating + // block minimum fee. + let (_, slot_finalize_time) = measure!( + block_prioritization_fee.mark_block_completed(), + "slot_finalize_time" + ); + block_prioritization_fee.report_metrics(*slot); + metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); + metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us()); + metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); + } + + fn service_loop( + cache: Arc>>>>, + receiver: Receiver, + metrics: Arc, + ) { + for update in receiver.iter() { + match update { + CacheServiceUpdate::TransactionUpdate { + slot, + transaction_fee, + writable_accounts, + } => Self::update_cache( + cache.clone(), + &slot, + transaction_fee, + writable_accounts, + metrics.clone(), + ), + CacheServiceUpdate::BankFrozen { slot } => { + Self::finalize_slot(cache.clone(), &slot, metrics.clone()); + + metrics.report(slot); + } + CacheServiceUpdate::Exit => { + break; + } + } + } + } + + /// Returns number of blocks that have finalized minimum fees collection + pub fn available_block_count(&self) -> usize { + self.cache + .read() + .unwrap() + .iter() + .filter(|(_slot, prioritization_fee)| prioritization_fee.lock().unwrap().is_finalized()) + .count() + } + + /// Query block minimum fees from finalized blocks in cache, + /// Returns a vector of fee; call site can use it to produce + /// average, or top 5% etc. + pub fn get_prioritization_fees(&self) -> Vec { + self.cache + .read() + .unwrap() + .iter() + .filter_map(|(_slot, prioritization_fee)| { + let prioritization_fee_read = prioritization_fee.lock().unwrap(); + prioritization_fee_read + .is_finalized() + .then(|| prioritization_fee_read.get_min_transaction_fee()) + }) + .flatten() + .collect() + } + + /// Query given account minimum fees from finalized blocks in cache, + /// Returns a vector of fee; call site can use it to produce + /// average, or top 5% etc. + pub fn get_account_prioritization_fees(&self, account_key: &Pubkey) -> Vec { + self.cache + .read() + .unwrap() + .iter() + .filter_map(|(_slot, prioritization_fee)| { + let prioritization_fee_read = prioritization_fee.lock().unwrap(); + prioritization_fee_read + .is_finalized() + .then(|| prioritization_fee_read.get_writable_account_fee(account_key)) + }) + .flatten() + .collect() + } +} + +#[cfg(test)] +mod tests { + use { + super::*, + crate::{ + bank::Bank, + bank_forks::BankForks, + genesis_utils::{create_genesis_config, GenesisConfigInfo}, + }, + solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + message::Message, + pubkey::Pubkey, + system_instruction, + transaction::{SanitizedTransaction, Transaction}, + }, + }; + + fn build_sanitized_transaction_for_test( + compute_unit_price: u64, + signer_account: &Pubkey, + write_account: &Pubkey, + ) -> SanitizedTransaction { + let transaction = Transaction::new_unsigned(Message::new( + &[ + system_instruction::transfer(signer_account, write_account, 1), + ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price), + ], + Some(signer_account), + )); + + SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap() + } + + // update fee cache is asynchronous, this test helper blocks until update is completed. + fn sync_update<'a>( + prioritization_fee_cache: &mut PrioritizationFeeCache, + bank: Arc, + txs: impl Iterator, + ) { + prioritization_fee_cache.update(bank.clone(), txs); + + let block_fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank.slot(), + ); + + // wait till update is done + while block_fee + .lock() + .unwrap() + .get_min_transaction_fee() + .is_none() + { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } + + // finalization is asynchronous, this test helper blocks until finalization is completed. + fn sync_finalize_priority_fee_for_test( + prioritization_fee_cache: &mut PrioritizationFeeCache, + slot: Slot, + ) { + prioritization_fee_cache.finalize_priority_fee(slot); + let fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + + // wait till finalization is done + while !fee.lock().unwrap().is_finalized() { + std::thread::sleep(std::time::Duration::from_millis(100)); + } + } + + #[test] + fn test_prioritization_fee_cache_update() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + // Set up test with 3 transactions, in format of [fee, write-accounts...], + // Shall expect fee cache is updated in following sequence: + // transaction block minimum prioritization fee cache + // [fee, write_accounts...] --> [block, account_a, account_b, account_c] + // ----------------------------------------------------------------------- + // [5, a, b ] --> [5, 5, 5, nil ] + // [9, b, c ] --> [5, 5, 5, 9 ] + // [2, a, c ] --> [2, 2, 5, 2 ] + // + let txs = vec![ + build_sanitized_transaction_for_test(5, &write_account_a, &write_account_b), + build_sanitized_transaction_for_test(9, &write_account_b, &write_account_c), + build_sanitized_transaction_for_test(2, &write_account_a, &write_account_c), + ]; + + let bank = Arc::new(Bank::default_for_tests()); + let slot = bank.slot(); + + let mut prioritization_fee_cache = PrioritizationFeeCache::default(); + prioritization_fee_cache.update(bank, txs.iter()); + + // assert block minimum fee and account a, b, c fee accordingly + { + let fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + let fee = fee.lock().unwrap(); + assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); + assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); + assert_eq!(2, fee.get_writable_account_fee(&write_account_c).unwrap()); + // assert unknown account d fee + assert!(fee + .get_writable_account_fee(&Pubkey::new_unique()) + .is_none()); + } + + // assert after prune, account a and c should be removed from cache to save space + { + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, slot); + let fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + let fee = fee.lock().unwrap(); + assert_eq!(2, fee.get_min_transaction_fee().unwrap()); + assert!(fee.get_writable_account_fee(&write_account_a).is_none()); + assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); + assert!(fee.get_writable_account_fee(&write_account_c).is_none()); + } + } + + #[test] + fn test_available_block_count() { + let prioritization_fee_cache = PrioritizationFeeCache::default(); + + assert!(PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &1 + ) + .lock() + .unwrap() + .mark_block_completed() + .is_ok()); + assert!(PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &2 + ) + .lock() + .unwrap() + .mark_block_completed() + .is_ok()); + // add slot 3 entry to cache, but not finalize it + PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3); + + // assert available block count should be 2 finalized blocks + assert_eq!(2, prioritization_fee_cache.available_block_count()); + } + + fn assert_vec_eq(expected: &mut Vec, actual: &mut Vec) { + expected.sort_unstable(); + actual.sort_unstable(); + assert_eq!(expected, actual); + } + + #[test] + fn test_get_prioritization_fees() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let bank1 = Arc::new(Bank::new_from_parent(&bank, &collector, 1)); + let bank2 = Arc::new(Bank::new_from_parent(&bank, &collector, 2)); + let bank3 = Arc::new(Bank::new_from_parent(&bank, &collector, 3)); + + let mut prioritization_fee_cache = PrioritizationFeeCache::default(); + + // Assert no minimum fee from empty cache + assert!(prioritization_fee_cache + .get_prioritization_fees() + .is_empty()); + + // Assert after add one transaction for slot 1 + { + let txs = vec![build_sanitized_transaction_for_test( + 5, + &write_account_a, + &write_account_b, + )]; + sync_update(&mut prioritization_fee_cache, bank1.clone(), txs.iter()); + assert_eq!( + 5, + PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank1.slot() + ) + .lock() + .unwrap() + .get_min_transaction_fee() + .unwrap() + ); + // before block is marked as completed + assert!(prioritization_fee_cache + .get_prioritization_fees() + .is_empty()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank1.slot()); + assert_eq!(vec![5], prioritization_fee_cache.get_prioritization_fees()); + } + + // Assert after add one transaction for slot 2 + { + let txs = vec![build_sanitized_transaction_for_test( + 9, + &write_account_b, + &write_account_c, + )]; + sync_update(&mut prioritization_fee_cache, bank2.clone(), txs.iter()); + assert_eq!( + 9, + PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank2.slot() + ) + .lock() + .unwrap() + .get_min_transaction_fee() + .unwrap() + ); + // before block is marked as completed + assert_eq!(vec![5], prioritization_fee_cache.get_prioritization_fees()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank2.slot()); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_prioritization_fees(), + ); + } + + // Assert after add one transaction for slot 3 + { + let txs = vec![build_sanitized_transaction_for_test( + 2, + &write_account_a, + &write_account_c, + )]; + sync_update(&mut prioritization_fee_cache, bank3.clone(), txs.iter()); + assert_eq!( + 2, + PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &bank3.slot() + ) + .lock() + .unwrap() + .get_min_transaction_fee() + .unwrap() + ); + // before block is marked as completed + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_prioritization_fees(), + ); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank3.slot()); + assert_vec_eq( + &mut vec![5, 9, 2], + &mut prioritization_fee_cache.get_prioritization_fees(), + ); + } + } + + #[test] + fn test_get_account_prioritization_fees() { + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let bank1 = Arc::new(Bank::new_from_parent(&bank, &collector, 1)); + let bank2 = Arc::new(Bank::new_from_parent(&bank, &collector, 2)); + let bank3 = Arc::new(Bank::new_from_parent(&bank, &collector, 3)); + + let mut prioritization_fee_cache = PrioritizationFeeCache::default(); + + // Assert no minimum fee from empty cache + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_a) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_b) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + + // Assert after add one transaction for slot 1 + { + let txs = vec![ + build_sanitized_transaction_for_test(5, &write_account_a, &write_account_b), + build_sanitized_transaction_for_test( + 0, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + prioritization_fee_cache.update(bank1.clone(), txs.iter()); + // before block is marked as completed + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_a) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_b) + .is_empty()); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank1.slot()); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_b) + ); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + } + + // Assert after add one transaction for slot 2 + { + let txs = vec![ + build_sanitized_transaction_for_test(9, &write_account_b, &write_account_c), + build_sanitized_transaction_for_test( + 0, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + prioritization_fee_cache.update(bank2.clone(), txs.iter()); + // before block is marked as completed + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_b) + ); + assert!(prioritization_fee_cache + .get_account_prioritization_fees(&write_account_c) + .is_empty()); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank2.slot()); + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b), + ); + assert_eq!( + vec![9], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_c) + ); + } + + // Assert after add one transaction for slot 3 + { + let txs = vec![ + build_sanitized_transaction_for_test(2, &write_account_a, &write_account_c), + build_sanitized_transaction_for_test( + 0, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + prioritization_fee_cache.update(bank3.clone(), txs.iter()); + // before block is marked as completed + assert_eq!( + vec![5], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_a) + ); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b), + ); + assert_eq!( + vec![9], + prioritization_fee_cache.get_account_prioritization_fees(&write_account_c) + ); + // after block is completed + sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank3.slot()); + assert_vec_eq( + &mut vec![5, 2], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_a), + ); + assert_vec_eq( + &mut vec![5, 9], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b), + ); + assert_vec_eq( + &mut vec![9, 2], + &mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_c), + ); + } + } +} diff --git a/core/src/transaction_priority_details.rs b/runtime/src/transaction_priority_details.rs similarity index 100% rename from core/src/transaction_priority_details.rs rename to runtime/src/transaction_priority_details.rs