collect min prioritization fees when replaying sanitized transactions (#26709)

* Collect blocks' minimum prioritization fees when replaying sanitized transactions

* Limits block min-fee metrics reporting to top 10 writable accounts

* Add service thread to asynchronously update and finalize prioritization fee cache

* Add bench test for prioritization_fee_cache

Co-authored-by: Tyera Eulberg <teulberg@gmail.com>
This commit is contained in:
Tao Zhu 2022-08-31 08:00:55 -05:00 committed by GitHub
parent 292b2a1bfe
commit 8bb039d08d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1310 additions and 13 deletions

1
Cargo.lock generated
View File

@ -6110,6 +6110,7 @@ dependencies = [
"lazy_static",
"libsecp256k1",
"log",
"lru",
"lz4",
"memmap2",
"num-derive",

View File

@ -189,14 +189,12 @@ impl ForwardPacketBatchesByAccounts {
mod tests {
use {
super::*,
crate::{
transaction_priority_details::TransactionPriorityDetails,
unprocessed_packet_batches::DeserializedPacket,
},
crate::unprocessed_packet_batches::DeserializedPacket,
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
transaction_priority_details::TransactionPriorityDetails,
},
solana_sdk::{hash::Hash, signature::Keypair, system_transaction},
std::sync::RwLock,

View File

@ -1,8 +1,8 @@
use {
crate::transaction_priority_details::{
solana_perf::packet::Packet,
solana_runtime::transaction_priority_details::{
GetTransactionPriorityDetails, TransactionPriorityDetails,
},
solana_perf::packet::Packet,
solana_sdk::{
hash::Hash,
message::Message,

View File

@ -69,7 +69,6 @@ mod tower1_7_14;
pub mod tower_storage;
pub mod tpu;
pub mod tracer_packet_stats;
pub mod transaction_priority_details;
pub mod tree_diff;
pub mod tvu;
pub mod unfrozen_gossip_verified_vote_hashes;

View File

@ -57,6 +57,7 @@ use {
bank::{Bank, NewBankOptions},
bank_forks::{BankForks, MAX_ROOT_DISTANCE_FOR_VOTE_ONLY},
commitment::BlockCommitmentCache,
prioritization_fee_cache::PrioritizationFeeCache,
vote_sender_types::ReplayVoteSender,
},
solana_sdk::{
@ -398,6 +399,7 @@ impl ReplayStage {
drop_bank_sender: Sender<Vec<Arc<Bank>>>,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: Arc<PrioritizationFeeCache>,
) -> Result<Self, String> {
let mut tower = if let Some(process_blockstore) = maybe_process_blockstore {
let tower = process_blockstore.process_to_create_tower()?;
@ -531,7 +533,8 @@ impl ReplayStage {
block_metadata_notifier.clone(),
&mut replay_timing,
log_messages_bytes_limit,
replay_slots_concurrently
replay_slots_concurrently,
&prioritization_fee_cache,
);
replay_active_banks_time.stop();
@ -1716,6 +1719,7 @@ impl ReplayStage {
replay_vote_sender: &ReplayVoteSender,
verify_recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> result::Result<usize, BlockstoreProcessorError> {
let mut w_replay_stats = replay_stats.write().unwrap();
let mut w_replay_progress = replay_progress.write().unwrap();
@ -1735,6 +1739,7 @@ impl ReplayStage {
verify_recyclers,
false,
log_messages_bytes_limit,
prioritization_fee_cache,
)?;
let tx_count_after = w_replay_progress.num_txs;
let tx_count = tx_count_after - tx_count_before;
@ -2237,6 +2242,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
active_bank_slots: &[Slot],
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Vec<ReplaySlotFromBlockstore> {
// Make mutable shared structures thread safe.
let progress = RwLock::new(progress);
@ -2312,6 +2318,7 @@ impl ReplayStage {
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
@ -2342,6 +2349,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
bank_slot: Slot,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> ReplaySlotFromBlockstore {
let mut replay_result = ReplaySlotFromBlockstore {
is_slot_dead: false,
@ -2391,6 +2399,7 @@ impl ReplayStage {
&replay_vote_sender.clone(),
&verify_recyclers.clone(),
log_messages_bytes_limit,
prioritization_fee_cache,
);
replay_blockstore_time.stop();
replay_result.replay_result = Some(blockstore_result);
@ -2422,6 +2431,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender: &AncestorHashesReplayUpdateSender,
block_metadata_notifier: Option<BlockMetadataNotifierLock>,
replay_result_vec: &[ReplaySlotFromBlockstore],
prioritization_fee_cache: &PrioritizationFeeCache,
) -> bool {
// TODO: See if processing of blockstore replay results and bank completion can be made thread safe.
let mut did_complete_bank = false;
@ -2489,6 +2499,9 @@ impl ReplayStage {
warn!("cost_update_sender failed sending bank stats: {:?}", err)
});
// finalize block's minimum prioritization fee cache for this bank
prioritization_fee_cache.finalize_priority_fee(bank.slot());
assert_ne!(bank.hash(), Hash::default());
// Needs to be updated before `check_slot_agrees_with_cluster()` so that
// any updates in `check_slot_agrees_with_cluster()` on fork choice take
@ -2607,6 +2620,7 @@ impl ReplayStage {
replay_timing: &mut ReplayTiming,
log_messages_bytes_limit: Option<usize>,
replay_slots_concurrently: bool,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> bool /* completed a bank */ {
let active_bank_slots = bank_forks.read().unwrap().active_bank_slots();
let num_active_banks = active_bank_slots.len();
@ -2629,6 +2643,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
&active_bank_slots,
prioritization_fee_cache,
)
} else {
active_bank_slots
@ -2646,6 +2661,7 @@ impl ReplayStage {
replay_timing,
log_messages_bytes_limit,
*bank_slot,
prioritization_fee_cache,
)
})
.collect()
@ -2672,6 +2688,7 @@ impl ReplayStage {
ancestor_hashes_replay_update_sender,
block_metadata_notifier,
&replay_result_vec,
prioritization_fee_cache,
)
} else {
false
@ -4162,6 +4179,7 @@ pub(crate) mod tests {
&replay_vote_sender,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
);
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let rpc_subscriptions = Arc::new(RpcSubscriptions::new_for_tests(

View File

@ -42,7 +42,7 @@ use {
solana_runtime::{
accounts_background_service::AbsRequestSender, bank_forks::BankForks,
commitment::BlockCommitmentCache, cost_model::CostModel,
vote_sender_types::ReplayVoteSender,
prioritization_fee_cache::PrioritizationFeeCache, vote_sender_types::ReplayVoteSender,
},
solana_sdk::{clock::Slot, pubkey::Pubkey, signature::Keypair},
solana_tpu_client::connection_cache::ConnectionCache,
@ -130,6 +130,7 @@ impl Tvu {
accounts_background_request_sender: AbsRequestSender,
log_messages_bytes_limit: Option<usize>,
connection_cache: &Arc<ConnectionCache>,
prioritization_fee_cache: &Arc<PrioritizationFeeCache>,
) -> Result<Self, String> {
let TvuSockets {
repair: repair_socket,
@ -290,6 +291,7 @@ impl Tvu {
drop_bank_sender,
block_metadata_notifier,
log_messages_bytes_limit,
prioritization_fee_cache.clone(),
)?;
let ledger_cleanup_service = tvu_config.max_ledger_shreds.map(|max_ledger_shreds| {
@ -403,6 +405,7 @@ pub mod tests {
let (_, gossip_confirmed_slots_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(bank_forks));
let max_complete_transaction_status_slot = Arc::new(AtomicU64::default());
let _ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let tvu = Tvu::new(
&vote_keypair.pubkey(),
Arc::new(RwLock::new(vec![Arc::new(vote_keypair)])),
@ -452,6 +455,7 @@ pub mod tests {
AbsRequestSender::default(),
None,
&Arc::new(ConnectionCache::default()),
&_ignored_prioritization_fee_cache,
)
.expect("assume success");
exit.store(true, Ordering::Relaxed);

View File

@ -1,10 +1,8 @@
use {
crate::{
immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
transaction_priority_details::TransactionPriorityDetails,
},
crate::immutable_deserialized_packet::{DeserializedPacketError, ImmutableDeserializedPacket},
min_max_heap::MinMaxHeap,
solana_perf::packet::{Packet, PacketBatch},
solana_runtime::transaction_priority_details::TransactionPriorityDetails,
solana_sdk::{
feature_set,
hash::Hash,

View File

@ -78,6 +78,7 @@ use {
commitment::BlockCommitmentCache,
cost_model::CostModel,
hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE},
prioritization_fee_cache::PrioritizationFeeCache,
runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
@ -771,6 +772,10 @@ impl Validator {
false => Arc::new(ConnectionCache::with_udp(tpu_connection_pool_size)),
};
// block min prioritization fee cache should be readable by RPC, and writable by validator
// (for now, by replay stage)
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
let rpc_override_health_check = Arc::new(AtomicBool::new(false));
let (
json_rpc_service,
@ -996,6 +1001,7 @@ impl Validator {
accounts_background_request_sender,
config.runtime_config.log_messages_bytes_limit,
&connection_cache,
&prioritization_fee_cache,
)?;
let tpu = Tpu::new(

View File

@ -31,6 +31,7 @@ use {
block_cost_limits::*,
commitment::VOTE_THRESHOLD_SIZE,
cost_model::CostModel,
prioritization_fee_cache::PrioritizationFeeCache,
runtime_config::RuntimeConfig,
transaction_batch::TransactionBatch,
vote_account::VoteAccountsHashMap,
@ -516,6 +517,7 @@ pub fn process_entries_for_tests(
})
.collect();
let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
let result = process_entries_with_callback(
bank,
&mut replay_entries,
@ -526,6 +528,7 @@ pub fn process_entries_for_tests(
&mut confirmation_timing,
Arc::new(RwLock::new(BlockCostCapacityMeter::default())),
None,
&_ignored_prioritization_fee_cache,
);
debug!("process_entries: {:?}", confirmation_timing);
@ -544,6 +547,7 @@ fn process_entries_with_callback(
confirmation_timing: &mut ConfirmationTiming,
cost_capacity_meter: Arc<RwLock<BlockCostCapacityMeter>>,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> Result<()> {
// accumulator for entries that can be processed in parallel
let mut batches = vec![];
@ -606,6 +610,9 @@ fn process_entries_with_callback(
batch,
transaction_indexes,
});
// entry is scheduled to be processed, transactions in it can be used to
// update prioritization fee cache asynchronously.
prioritization_fee_cache.update(bank.clone(), transactions.iter());
// done with this entry
break;
}
@ -954,6 +961,8 @@ fn confirm_full_slot(
) -> result::Result<(), BlockstoreProcessorError> {
let mut confirmation_timing = ConfirmationTiming::default();
let skip_verification = !opts.poh_verify;
let _ignored_prioritization_fee_cache = PrioritizationFeeCache::new(0u64);
confirm_slot(
blockstore,
bank,
@ -966,6 +975,7 @@ fn confirm_full_slot(
recyclers,
opts.allow_dead_slots,
opts.runtime_config.log_messages_bytes_limit,
&_ignored_prioritization_fee_cache,
)?;
timing.accumulate(&confirmation_timing.execute_timings);
@ -1092,6 +1102,7 @@ pub fn confirm_slot(
recyclers: &VerifyRecyclers,
allow_dead_slots: bool,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot();
@ -1120,6 +1131,7 @@ pub fn confirm_slot(
entry_callback,
recyclers,
log_messages_bytes_limit,
prioritization_fee_cache,
)
}
@ -1135,6 +1147,7 @@ fn confirm_slot_entries(
entry_callback: Option<&ProcessCallback>,
recyclers: &VerifyRecyclers,
log_messages_bytes_limit: Option<usize>,
prioritization_fee_cache: &PrioritizationFeeCache,
) -> result::Result<(), BlockstoreProcessorError> {
let slot = bank.slot();
let (entries, num_shreds, slot_full) = slot_entries_load_result;
@ -1236,6 +1249,7 @@ fn confirm_slot_entries(
timing,
cost_capacity_meter,
log_messages_bytes_limit,
prioritization_fee_cache,
)
.map_err(BlockstoreProcessorError::from);
replay_elapsed.stop();
@ -4061,6 +4075,7 @@ pub mod tests {
None,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
)
}
@ -4204,6 +4219,7 @@ pub mod tests {
None,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
)
.unwrap();
assert_eq!(progress.num_txs, 2);
@ -4249,6 +4265,7 @@ pub mod tests {
None,
&VerifyRecyclers::default(),
None,
&PrioritizationFeeCache::new(0u64),
)
.unwrap();
assert_eq!(progress.num_txs, 5);

View File

@ -5453,6 +5453,7 @@ dependencies = [
"itertools",
"lazy_static",
"log",
"lru",
"lz4",
"memmap2",
"num-derive",

View File

@ -27,6 +27,7 @@ index_list = "0.2.7"
itertools = "0.10.3"
lazy_static = "1.4.0"
log = "0.4.17"
lru = "0.7.7"
lz4 = "1.24.0"
memmap2 = "0.5.3"
num-derive = { version = "0.3" }
@ -78,3 +79,6 @@ targets = ["x86_64-unknown-linux-gnu"]
[build-dependencies]
rustc_version = "0.4"
[[bench]]
name = "prioritization_fee_cache"

View File

@ -0,0 +1,113 @@
#![feature(test)]
extern crate test;
use {
rand::{thread_rng, Rng},
solana_runtime::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
prioritization_fee_cache::*,
},
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
message::Message,
pubkey::Pubkey,
system_instruction,
transaction::{SanitizedTransaction, Transaction},
},
std::sync::Arc,
test::Bencher,
};
const TRANSFER_TRANSACTION_COMPUTE_UNIT: u32 = 200;
fn build_sanitized_transaction(
compute_unit_price: u64,
signer_account: &Pubkey,
write_account: &Pubkey,
) -> SanitizedTransaction {
let transfer_lamports = 1;
let transaction = Transaction::new_unsigned(Message::new(
&[
system_instruction::transfer(signer_account, write_account, transfer_lamports),
ComputeBudgetInstruction::set_compute_unit_limit(TRANSFER_TRANSACTION_COMPUTE_UNIT),
ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price),
],
Some(signer_account),
));
SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap()
}
#[bench]
#[ignore]
fn bench_process_transactions_single_slot(bencher: &mut Bencher) {
let prioritization_fee_cache = PrioritizationFeeCache::default();
let bank = Arc::new(Bank::default_for_tests());
// build test transactions
let transactions: Vec<_> = (0..5000)
.map(|n| {
let compute_unit_price = n % 7;
build_sanitized_transaction(
compute_unit_price,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
)
})
.collect();
bencher.iter(|| {
prioritization_fee_cache.update(bank.clone(), transactions.iter());
});
}
fn process_transactions_multiple_slots(banks: &[Arc<Bank>], num_slots: usize, num_threads: usize) {
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::default());
let pool = rayon::ThreadPoolBuilder::new()
.num_threads(num_threads)
.build()
.unwrap();
// each threads updates a slot a batch of 50 transactions, for 100 times
for _ in 0..100 {
pool.install(|| {
let transactions: Vec<_> = (0..50)
.map(|n| {
let compute_unit_price = n % 7;
build_sanitized_transaction(
compute_unit_price,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
)
})
.collect();
let index = thread_rng().gen_range(0, num_slots);
prioritization_fee_cache.update(banks[index].clone(), transactions.iter());
})
}
}
#[bench]
#[ignore]
fn bench_process_transactions_multiple_slots(bencher: &mut Bencher) {
const NUM_SLOTS: usize = 5;
const NUM_THREADS: usize = 3;
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new(bank0);
let bank = bank_forks.working_bank();
let collector = solana_sdk::pubkey::new_rand();
let banks = (1..=NUM_SLOTS)
.map(|n| Arc::new(Bank::new_from_parent(&bank, &collector, n as u64)))
.collect::<Vec<_>>();
bencher.iter(|| {
process_transactions_multiple_slots(&banks, NUM_SLOTS, NUM_THREADS);
});
}

View File

@ -47,6 +47,8 @@ pub mod loader_utils;
pub mod message_processor;
pub mod non_circulating_supply;
mod nonce_keyed_account;
pub mod prioritization_fee;
pub mod prioritization_fee_cache;
mod pubkey_bins;
mod read_only_accounts_cache;
pub mod rent_collector;
@ -73,6 +75,7 @@ mod storable_accounts;
mod system_instruction_processor;
pub mod transaction_batch;
pub mod transaction_error_metrics;
pub mod transaction_priority_details;
mod verify_accounts_hash_in_background;
pub mod vote_account;
pub mod vote_parser;

View File

@ -0,0 +1,324 @@
use {
solana_measure::measure,
solana_sdk::{clock::Slot, pubkey::Pubkey, saturating_add_assign},
std::collections::HashMap,
};
#[derive(Debug, Default)]
struct PrioritizationFeeMetrics {
// Count of writable accounts in slot
total_writable_accounts_count: u64,
// Count of writeable accounts with a minimum prioritization fee higher than the minimum transaction
// fee for this slot.
relevant_writable_accounts_count: u64,
// Total prioritization fees included in this slot.
total_prioritization_fee: u64,
// Accumulated time spent on tracking prioritization fee for each slot.
total_update_elapsed_us: u64,
}
impl PrioritizationFeeMetrics {
fn accumulate_total_prioritization_fee(&mut self, val: u64) {
saturating_add_assign!(self.total_prioritization_fee, val);
}
fn accumulate_total_update_elapsed_us(&mut self, val: u64) {
saturating_add_assign!(self.total_update_elapsed_us, val);
}
fn report(&self, slot: Slot) {
datapoint_info!(
"block_prioritization_fee",
("slot", slot as i64, i64),
(
"total_writable_accounts_count",
self.total_writable_accounts_count as i64,
i64
),
(
"relevant_writable_accounts_count",
self.relevant_writable_accounts_count as i64,
i64
),
(
"total_prioritization_fee",
self.total_prioritization_fee as i64,
i64
),
(
"total_update_elapsed_us",
self.total_update_elapsed_us as i64,
i64
),
);
}
}
pub enum PrioritizationFeeError {
// Not able to get account locks from sanitized transaction, which is required to update block
// minimum fees.
FailGetTransactionAccountLocks,
// Not able to read priority details, including compute-unit price, from transaction.
// Compute-unit price is required to update block minimum fees.
FailGetTransactionPriorityDetails,
// Block is already finalized, trying to finalize it again is usually unexpected
BlockIsAlreadyFinalized,
}
/// Block minimum prioritization fee stats, includes the minimum prioritization fee for a transaction in this
/// block; and the minimum fee for each writable account in all transactions in this block. The only relevant
/// write account minimum fees are those greater than the block minimum transaction fee, because the minimum fee needed to land
/// a transaction is determined by Max( min_transaction_fee, min_writable_account_fees(key), ...)
#[derive(Debug)]
pub struct PrioritizationFee {
// The minimum prioritization fee of transactions that landed in this block.
min_transaction_fee: u64,
// The minimum prioritization fee of each writable account in transactions in this block.
min_writable_account_fees: HashMap<Pubkey, u64>,
// Default to `false`, set to `true` when a block is completed, therefore the minimum fees recorded
// are finalized, and can be made available for use (e.g., RPC query)
is_finalized: bool,
// slot prioritization fee metrics
metrics: PrioritizationFeeMetrics,
}
impl Default for PrioritizationFee {
fn default() -> Self {
PrioritizationFee {
min_transaction_fee: u64::MAX,
min_writable_account_fees: HashMap::new(),
is_finalized: false,
metrics: PrioritizationFeeMetrics::default(),
}
}
}
impl PrioritizationFee {
/// Update self for minimum transaction fee in the block and minimum fee for each writable account.
pub fn update(
&mut self,
transaction_fee: u64,
writable_accounts: &[Pubkey],
) -> Result<(), PrioritizationFeeError> {
let (_, update_time) = measure!(
{
if transaction_fee < self.min_transaction_fee {
self.min_transaction_fee = transaction_fee;
}
for write_account in writable_accounts.iter() {
self.min_writable_account_fees
.entry(*write_account)
.and_modify(|write_lock_fee| {
*write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee)
})
.or_insert(transaction_fee);
}
self.metrics
.accumulate_total_prioritization_fee(transaction_fee);
},
"update_time",
);
self.metrics
.accumulate_total_update_elapsed_us(update_time.as_us());
Ok(())
}
/// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are
/// removed to reduce memory footprint when mark_block_completed() is called.
fn prune_irrelevant_writable_accounts(&mut self) {
self.metrics.total_writable_accounts_count = self.get_writable_accounts_count() as u64;
self.min_writable_account_fees
.retain(|_, account_fee| account_fee > &mut self.min_transaction_fee);
self.metrics.relevant_writable_accounts_count = self.get_writable_accounts_count() as u64;
}
pub fn mark_block_completed(&mut self) -> Result<(), PrioritizationFeeError> {
if self.is_finalized {
return Err(PrioritizationFeeError::BlockIsAlreadyFinalized);
}
self.prune_irrelevant_writable_accounts();
self.is_finalized = true;
Ok(())
}
pub fn get_min_transaction_fee(&self) -> Option<u64> {
(self.min_transaction_fee != u64::MAX).then_some(self.min_transaction_fee)
}
pub fn get_writable_account_fee(&self, key: &Pubkey) -> Option<u64> {
self.min_writable_account_fees.get(key).copied()
}
pub fn get_writable_account_fees(&self) -> impl Iterator<Item = (&Pubkey, &u64)> {
self.min_writable_account_fees.iter()
}
pub fn get_writable_accounts_count(&self) -> usize {
self.min_writable_account_fees.len()
}
pub fn is_finalized(&self) -> bool {
self.is_finalized
}
pub fn report_metrics(&self, slot: Slot) {
self.metrics.report(slot);
// report this slot's min_transaction_fee and top 10 min_writable_account_fees
let min_transaction_fee = self.get_min_transaction_fee().unwrap_or(0);
let mut accounts_fees: Vec<_> = self.get_writable_account_fees().collect();
accounts_fees.sort_by(|lh, rh| rh.1.cmp(lh.1));
datapoint_info!(
"block_min_prioritization_fee",
("slot", slot as i64, i64),
("entity", "block", String),
("min_prioritization_fee", min_transaction_fee as i64, i64),
);
for (account_key, fee) in accounts_fees.iter().take(10) {
datapoint_info!(
"block_min_prioritization_fee",
("slot", slot as i64, i64),
("entity", account_key.to_string(), String),
("min_prioritization_fee", **fee as i64, i64),
);
}
}
}
#[cfg(test)]
mod tests {
use {super::*, solana_sdk::pubkey::Pubkey};
#[test]
fn test_update_prioritization_fee() {
solana_logger::setup();
let write_account_a = Pubkey::new_unique();
let write_account_b = Pubkey::new_unique();
let write_account_c = Pubkey::new_unique();
let mut prioritization_fee = PrioritizationFee::default();
assert!(prioritization_fee.get_min_transaction_fee().is_none());
// Assert for 1st transaction
// [fee, write_accounts...] --> [block, account_a, account_b, account_c]
// -----------------------------------------------------------------------
// [5, a, b ] --> [5, 5, 5, nil ]
{
assert!(prioritization_fee
.update(5, &[write_account_a, write_account_b])
.is_ok());
assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap());
assert_eq!(
5,
prioritization_fee
.get_writable_account_fee(&write_account_a)
.unwrap()
);
assert_eq!(
5,
prioritization_fee
.get_writable_account_fee(&write_account_b)
.unwrap()
);
assert!(prioritization_fee
.get_writable_account_fee(&write_account_c)
.is_none());
}
// Assert for second transaction:
// [fee, write_accounts...] --> [block, account_a, account_b, account_c]
// -----------------------------------------------------------------------
// [9, b, c ] --> [5, 5, 5, 9 ]
{
assert!(prioritization_fee
.update(9, &[write_account_b, write_account_c])
.is_ok());
assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap());
assert_eq!(
5,
prioritization_fee
.get_writable_account_fee(&write_account_a)
.unwrap()
);
assert_eq!(
5,
prioritization_fee
.get_writable_account_fee(&write_account_b)
.unwrap()
);
assert_eq!(
9,
prioritization_fee
.get_writable_account_fee(&write_account_c)
.unwrap()
);
}
// Assert for third transaction:
// [fee, write_accounts...] --> [block, account_a, account_b, account_c]
// -----------------------------------------------------------------------
// [2, a, c ] --> [2, 2, 5, 2 ]
{
assert!(prioritization_fee
.update(2, &[write_account_a, write_account_c])
.is_ok());
assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap());
assert_eq!(
2,
prioritization_fee
.get_writable_account_fee(&write_account_a)
.unwrap()
);
assert_eq!(
5,
prioritization_fee
.get_writable_account_fee(&write_account_b)
.unwrap()
);
assert_eq!(
2,
prioritization_fee
.get_writable_account_fee(&write_account_c)
.unwrap()
);
}
// assert after prune, account a and c should be removed from cache to save space
{
prioritization_fee.prune_irrelevant_writable_accounts();
assert_eq!(1, prioritization_fee.min_writable_account_fees.len());
assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap());
assert!(prioritization_fee
.get_writable_account_fee(&write_account_a)
.is_none());
assert_eq!(
5,
prioritization_fee
.get_writable_account_fee(&write_account_b)
.unwrap()
);
assert!(prioritization_fee
.get_writable_account_fee(&write_account_c)
.is_none());
}
}
#[test]
fn test_mark_block_completed() {
let mut prioritization_fee = PrioritizationFee::default();
assert!(prioritization_fee.mark_block_completed().is_ok());
assert!(prioritization_fee.mark_block_completed().is_err());
}
}

View File

@ -0,0 +1,811 @@
use {
crate::{
bank::Bank, prioritization_fee::*,
transaction_priority_details::GetTransactionPriorityDetails,
},
crossbeam_channel::{unbounded, Receiver, Sender},
log::*,
lru::LruCache,
solana_measure::measure,
solana_sdk::{
clock::Slot, pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction,
},
std::{
sync::{
atomic::{AtomicU64, Ordering},
Arc, Mutex, RwLock,
},
thread::{Builder, JoinHandle},
},
};
/// The maximum number of blocks to keep in `PrioritizationFeeCache`, ie.
/// the amount of history generally desired to estimate the prioritization fee needed to
/// land a transaction in the current block.
const MAX_NUM_RECENT_BLOCKS: u64 = 150;
#[derive(Debug, Default)]
struct PrioritizationFeeCacheMetrics {
// Count of transactions that successfully updated each slot's prioritization fee cache.
successful_transaction_update_count: AtomicU64,
// Accumulated time spent on tracking prioritization fee for each slot.
total_update_elapsed_us: AtomicU64,
// Accumulated time spent on acquiring cache write lock.
total_cache_lock_elapsed_us: AtomicU64,
// Accumulated time spent on acquiring each block entry's lock..
total_entry_lock_elapsed_us: AtomicU64,
// Accumulated time spent on updating block prioritization fees.
total_entry_update_elapsed_us: AtomicU64,
// Accumulated time spent on finalizing block prioritization fees.
total_block_finalize_elapsed_us: AtomicU64,
}
impl PrioritizationFeeCacheMetrics {
fn accumulate_successful_transaction_update_count(&self, val: u64) {
self.successful_transaction_update_count
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_update_elapsed_us(&self, val: u64) {
self.total_update_elapsed_us
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_cache_lock_elapsed_us(&self, val: u64) {
self.total_cache_lock_elapsed_us
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_entry_lock_elapsed_us(&self, val: u64) {
self.total_entry_lock_elapsed_us
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_entry_update_elapsed_us(&self, val: u64) {
self.total_entry_update_elapsed_us
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_block_finalize_elapsed_us(&self, val: u64) {
self.total_block_finalize_elapsed_us
.fetch_add(val, Ordering::Relaxed);
}
fn report(&self, slot: Slot) {
datapoint_info!(
"block_prioritization_fee_counters",
("slot", slot as i64, i64),
(
"successful_transaction_update_count",
self.successful_transaction_update_count
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_update_elapsed_us",
self.total_update_elapsed_us.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_cache_lock_elapsed_us",
self.total_cache_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_entry_lock_elapsed_us",
self.total_entry_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_entry_update_elapsed_us",
self.total_entry_update_elapsed_us
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"total_block_finalize_elapsed_us",
self.total_block_finalize_elapsed_us
.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
enum CacheServiceUpdate {
TransactionUpdate {
slot: Slot,
transaction_fee: u64,
writable_accounts: Arc<Vec<Pubkey>>,
},
BankFrozen {
slot: Slot,
},
Exit,
}
/// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee,
/// A separate internal thread `service_thread` handles additional tasks when a bank is frozen,
/// and collecting stats and reporting metrics.
pub struct PrioritizationFeeCache {
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>,
service_thread: Option<JoinHandle<()>>,
sender: Sender<CacheServiceUpdate>,
metrics: Arc<PrioritizationFeeCacheMetrics>,
}
impl Default for PrioritizationFeeCache {
fn default() -> Self {
Self::new(MAX_NUM_RECENT_BLOCKS)
}
}
impl Drop for PrioritizationFeeCache {
fn drop(&mut self) {
let _ = self.sender.send(CacheServiceUpdate::Exit);
self.service_thread
.take()
.unwrap()
.join()
.expect("Prioritization fee cache servicing thread failed to join");
}
}
impl PrioritizationFeeCache {
pub fn new(capacity: u64) -> Self {
let metrics = Arc::new(PrioritizationFeeCacheMetrics::default());
let (sender, receiver) = unbounded();
let cache = Arc::new(RwLock::new(LruCache::new(capacity as usize)));
let cache_clone = cache.clone();
let metrics_clone = metrics.clone();
let service_thread = Some(
Builder::new()
.name("prioritization-fee-cache-servicing-thread".to_string())
.spawn(move || {
Self::service_loop(cache_clone, receiver, metrics_clone);
})
.unwrap(),
);
PrioritizationFeeCache {
cache,
service_thread,
sender,
metrics,
}
}
/// Get prioritization fee entry, create new entry if necessary
fn get_prioritization_fee(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>,
slot: &Slot,
) -> Arc<Mutex<PrioritizationFee>> {
let mut cache = cache.write().unwrap();
match cache.get(slot) {
Some(entry) => Arc::clone(entry),
None => {
let entry = Arc::new(Mutex::new(PrioritizationFee::default()));
cache.put(*slot, Arc::clone(&entry));
entry
}
}
}
/// Update with a list of transactions' tx_priority_details and tx_account_locks; Only
/// transactions have both valid priority_detail and account_locks will be used to update
/// fee_cache asynchronously.
pub fn update<'a>(&self, bank: Arc<Bank>, txs: impl Iterator<Item = &'a SanitizedTransaction>) {
let mut successful_transaction_update_count: u64 = 0;
let (_, send_updates_time) = measure!(
{
for sanitized_transaction in txs {
let priority_details = sanitized_transaction.get_transaction_priority_details();
let account_locks = sanitized_transaction
.get_account_locks(bank.get_transaction_account_lock_limit());
if priority_details.is_none() || account_locks.is_err() {
continue;
}
let writable_accounts = Arc::new(
account_locks
.unwrap()
.writable
.iter()
.map(|key| **key)
.collect::<Vec<_>>(),
);
self.sender
.send(CacheServiceUpdate::TransactionUpdate {
slot: bank.slot(),
transaction_fee: priority_details.unwrap().priority,
writable_accounts,
})
.unwrap_or_else(|err| {
warn!(
"prioritization fee cache transaction updates failed: {:?}",
err
);
});
saturating_add_assign!(successful_transaction_update_count, 1)
}
},
"send_updates",
);
self.metrics
.accumulate_total_update_elapsed_us(send_updates_time.as_us());
self.metrics
.accumulate_successful_transaction_update_count(successful_transaction_update_count);
}
/// Finalize prioritization fee when it's bank is completely replayed from blockstore,
/// by pruning irrelevant accounts to save space, and marking its availability for queries.
pub fn finalize_priority_fee(&self, slot: Slot) {
self.sender
.send(CacheServiceUpdate::BankFrozen { slot })
.unwrap_or_else(|err| {
warn!(
"prioritization fee cache signalling bank frozen failed: {:?}",
err
)
});
}
/// Internal function is invoked by worker thread to update slot's minimum prioritization fee,
/// Cache lock contends here.
fn update_cache(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>,
slot: &Slot,
transaction_fee: u64,
writable_accounts: Arc<Vec<Pubkey>>,
metrics: Arc<PrioritizationFeeCacheMetrics>,
) {
let (block_prioritization_fee, cache_lock_time) =
measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time");
let (mut block_prioritization_fee, entry_lock_time) =
measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time");
let (_, entry_update_time) = measure!(
block_prioritization_fee.update(transaction_fee, &writable_accounts),
"entry_update_time"
);
metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us());
metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us());
metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us());
}
fn finalize_slot(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>,
slot: &Slot,
metrics: Arc<PrioritizationFeeCacheMetrics>,
) {
let (block_prioritization_fee, cache_lock_time) =
measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time");
let (mut block_prioritization_fee, entry_lock_time) =
measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time");
// prune cache by evicting write account entry from prioritization fee if its fee is less
// or equal to block's minimum transaction fee, because they are irrelevant in calculating
// block minimum fee.
let (_, slot_finalize_time) = measure!(
block_prioritization_fee.mark_block_completed(),
"slot_finalize_time"
);
block_prioritization_fee.report_metrics(*slot);
metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us());
metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us());
metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us());
}
fn service_loop(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>,
receiver: Receiver<CacheServiceUpdate>,
metrics: Arc<PrioritizationFeeCacheMetrics>,
) {
for update in receiver.iter() {
match update {
CacheServiceUpdate::TransactionUpdate {
slot,
transaction_fee,
writable_accounts,
} => Self::update_cache(
cache.clone(),
&slot,
transaction_fee,
writable_accounts,
metrics.clone(),
),
CacheServiceUpdate::BankFrozen { slot } => {
Self::finalize_slot(cache.clone(), &slot, metrics.clone());
metrics.report(slot);
}
CacheServiceUpdate::Exit => {
break;
}
}
}
}
/// Returns number of blocks that have finalized minimum fees collection
pub fn available_block_count(&self) -> usize {
self.cache
.read()
.unwrap()
.iter()
.filter(|(_slot, prioritization_fee)| prioritization_fee.lock().unwrap().is_finalized())
.count()
}
/// Query block minimum fees from finalized blocks in cache,
/// Returns a vector of fee; call site can use it to produce
/// average, or top 5% etc.
pub fn get_prioritization_fees(&self) -> Vec<u64> {
self.cache
.read()
.unwrap()
.iter()
.filter_map(|(_slot, prioritization_fee)| {
let prioritization_fee_read = prioritization_fee.lock().unwrap();
prioritization_fee_read
.is_finalized()
.then(|| prioritization_fee_read.get_min_transaction_fee())
})
.flatten()
.collect()
}
/// Query given account minimum fees from finalized blocks in cache,
/// Returns a vector of fee; call site can use it to produce
/// average, or top 5% etc.
pub fn get_account_prioritization_fees(&self, account_key: &Pubkey) -> Vec<u64> {
self.cache
.read()
.unwrap()
.iter()
.filter_map(|(_slot, prioritization_fee)| {
let prioritization_fee_read = prioritization_fee.lock().unwrap();
prioritization_fee_read
.is_finalized()
.then(|| prioritization_fee_read.get_writable_account_fee(account_key))
})
.flatten()
.collect()
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
bank::Bank,
bank_forks::BankForks,
genesis_utils::{create_genesis_config, GenesisConfigInfo},
},
solana_sdk::{
compute_budget::ComputeBudgetInstruction,
message::Message,
pubkey::Pubkey,
system_instruction,
transaction::{SanitizedTransaction, Transaction},
},
};
fn build_sanitized_transaction_for_test(
compute_unit_price: u64,
signer_account: &Pubkey,
write_account: &Pubkey,
) -> SanitizedTransaction {
let transaction = Transaction::new_unsigned(Message::new(
&[
system_instruction::transfer(signer_account, write_account, 1),
ComputeBudgetInstruction::set_compute_unit_price(compute_unit_price),
],
Some(signer_account),
));
SanitizedTransaction::try_from_legacy_transaction(transaction).unwrap()
}
// update fee cache is asynchronous, this test helper blocks until update is completed.
fn sync_update<'a>(
prioritization_fee_cache: &mut PrioritizationFeeCache,
bank: Arc<Bank>,
txs: impl Iterator<Item = &'a SanitizedTransaction>,
) {
prioritization_fee_cache.update(bank.clone(), txs);
let block_fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&bank.slot(),
);
// wait till update is done
while block_fee
.lock()
.unwrap()
.get_min_transaction_fee()
.is_none()
{
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
// finalization is asynchronous, this test helper blocks until finalization is completed.
fn sync_finalize_priority_fee_for_test(
prioritization_fee_cache: &mut PrioritizationFeeCache,
slot: Slot,
) {
prioritization_fee_cache.finalize_priority_fee(slot);
let fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&slot,
);
// wait till finalization is done
while !fee.lock().unwrap().is_finalized() {
std::thread::sleep(std::time::Duration::from_millis(100));
}
}
#[test]
fn test_prioritization_fee_cache_update() {
solana_logger::setup();
let write_account_a = Pubkey::new_unique();
let write_account_b = Pubkey::new_unique();
let write_account_c = Pubkey::new_unique();
// Set up test with 3 transactions, in format of [fee, write-accounts...],
// Shall expect fee cache is updated in following sequence:
// transaction block minimum prioritization fee cache
// [fee, write_accounts...] --> [block, account_a, account_b, account_c]
// -----------------------------------------------------------------------
// [5, a, b ] --> [5, 5, 5, nil ]
// [9, b, c ] --> [5, 5, 5, 9 ]
// [2, a, c ] --> [2, 2, 5, 2 ]
//
let txs = vec![
build_sanitized_transaction_for_test(5, &write_account_a, &write_account_b),
build_sanitized_transaction_for_test(9, &write_account_b, &write_account_c),
build_sanitized_transaction_for_test(2, &write_account_a, &write_account_c),
];
let bank = Arc::new(Bank::default_for_tests());
let slot = bank.slot();
let mut prioritization_fee_cache = PrioritizationFeeCache::default();
prioritization_fee_cache.update(bank, txs.iter());
// assert block minimum fee and account a, b, c fee accordingly
{
let fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&slot,
);
let fee = fee.lock().unwrap();
assert_eq!(2, fee.get_min_transaction_fee().unwrap());
assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap());
assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap());
assert_eq!(2, fee.get_writable_account_fee(&write_account_c).unwrap());
// assert unknown account d fee
assert!(fee
.get_writable_account_fee(&Pubkey::new_unique())
.is_none());
}
// assert after prune, account a and c should be removed from cache to save space
{
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, slot);
let fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&slot,
);
let fee = fee.lock().unwrap();
assert_eq!(2, fee.get_min_transaction_fee().unwrap());
assert!(fee.get_writable_account_fee(&write_account_a).is_none());
assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap());
assert!(fee.get_writable_account_fee(&write_account_c).is_none());
}
}
#[test]
fn test_available_block_count() {
let prioritization_fee_cache = PrioritizationFeeCache::default();
assert!(PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&1
)
.lock()
.unwrap()
.mark_block_completed()
.is_ok());
assert!(PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&2
)
.lock()
.unwrap()
.mark_block_completed()
.is_ok());
// add slot 3 entry to cache, but not finalize it
PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3);
// assert available block count should be 2 finalized blocks
assert_eq!(2, prioritization_fee_cache.available_block_count());
}
fn assert_vec_eq(expected: &mut Vec<u64>, actual: &mut Vec<u64>) {
expected.sort_unstable();
actual.sort_unstable();
assert_eq!(expected, actual);
}
#[test]
fn test_get_prioritization_fees() {
solana_logger::setup();
let write_account_a = Pubkey::new_unique();
let write_account_b = Pubkey::new_unique();
let write_account_c = Pubkey::new_unique();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new(bank0);
let bank = bank_forks.working_bank();
let collector = solana_sdk::pubkey::new_rand();
let bank1 = Arc::new(Bank::new_from_parent(&bank, &collector, 1));
let bank2 = Arc::new(Bank::new_from_parent(&bank, &collector, 2));
let bank3 = Arc::new(Bank::new_from_parent(&bank, &collector, 3));
let mut prioritization_fee_cache = PrioritizationFeeCache::default();
// Assert no minimum fee from empty cache
assert!(prioritization_fee_cache
.get_prioritization_fees()
.is_empty());
// Assert after add one transaction for slot 1
{
let txs = vec![build_sanitized_transaction_for_test(
5,
&write_account_a,
&write_account_b,
)];
sync_update(&mut prioritization_fee_cache, bank1.clone(), txs.iter());
assert_eq!(
5,
PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&bank1.slot()
)
.lock()
.unwrap()
.get_min_transaction_fee()
.unwrap()
);
// before block is marked as completed
assert!(prioritization_fee_cache
.get_prioritization_fees()
.is_empty());
// after block is completed
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank1.slot());
assert_eq!(vec![5], prioritization_fee_cache.get_prioritization_fees());
}
// Assert after add one transaction for slot 2
{
let txs = vec![build_sanitized_transaction_for_test(
9,
&write_account_b,
&write_account_c,
)];
sync_update(&mut prioritization_fee_cache, bank2.clone(), txs.iter());
assert_eq!(
9,
PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&bank2.slot()
)
.lock()
.unwrap()
.get_min_transaction_fee()
.unwrap()
);
// before block is marked as completed
assert_eq!(vec![5], prioritization_fee_cache.get_prioritization_fees());
// after block is completed
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank2.slot());
assert_vec_eq(
&mut vec![5, 9],
&mut prioritization_fee_cache.get_prioritization_fees(),
);
}
// Assert after add one transaction for slot 3
{
let txs = vec![build_sanitized_transaction_for_test(
2,
&write_account_a,
&write_account_c,
)];
sync_update(&mut prioritization_fee_cache, bank3.clone(), txs.iter());
assert_eq!(
2,
PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&bank3.slot()
)
.lock()
.unwrap()
.get_min_transaction_fee()
.unwrap()
);
// before block is marked as completed
assert_vec_eq(
&mut vec![5, 9],
&mut prioritization_fee_cache.get_prioritization_fees(),
);
// after block is completed
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank3.slot());
assert_vec_eq(
&mut vec![5, 9, 2],
&mut prioritization_fee_cache.get_prioritization_fees(),
);
}
}
#[test]
fn test_get_account_prioritization_fees() {
solana_logger::setup();
let write_account_a = Pubkey::new_unique();
let write_account_b = Pubkey::new_unique();
let write_account_c = Pubkey::new_unique();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new(bank0);
let bank = bank_forks.working_bank();
let collector = solana_sdk::pubkey::new_rand();
let bank1 = Arc::new(Bank::new_from_parent(&bank, &collector, 1));
let bank2 = Arc::new(Bank::new_from_parent(&bank, &collector, 2));
let bank3 = Arc::new(Bank::new_from_parent(&bank, &collector, 3));
let mut prioritization_fee_cache = PrioritizationFeeCache::default();
// Assert no minimum fee from empty cache
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_a)
.is_empty());
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_b)
.is_empty());
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_c)
.is_empty());
// Assert after add one transaction for slot 1
{
let txs = vec![
build_sanitized_transaction_for_test(5, &write_account_a, &write_account_b),
build_sanitized_transaction_for_test(
0,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
),
];
prioritization_fee_cache.update(bank1.clone(), txs.iter());
// before block is marked as completed
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_a)
.is_empty());
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_b)
.is_empty());
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_c)
.is_empty());
// after block is completed
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank1.slot());
assert_eq!(
vec![5],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_a)
);
assert_eq!(
vec![5],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_b)
);
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_c)
.is_empty());
}
// Assert after add one transaction for slot 2
{
let txs = vec![
build_sanitized_transaction_for_test(9, &write_account_b, &write_account_c),
build_sanitized_transaction_for_test(
0,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
),
];
prioritization_fee_cache.update(bank2.clone(), txs.iter());
// before block is marked as completed
assert_eq!(
vec![5],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_a)
);
assert_eq!(
vec![5],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_b)
);
assert!(prioritization_fee_cache
.get_account_prioritization_fees(&write_account_c)
.is_empty());
// after block is completed
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank2.slot());
assert_eq!(
vec![5],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_a)
);
assert_vec_eq(
&mut vec![5, 9],
&mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b),
);
assert_eq!(
vec![9],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_c)
);
}
// Assert after add one transaction for slot 3
{
let txs = vec![
build_sanitized_transaction_for_test(2, &write_account_a, &write_account_c),
build_sanitized_transaction_for_test(
0,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
),
];
prioritization_fee_cache.update(bank3.clone(), txs.iter());
// before block is marked as completed
assert_eq!(
vec![5],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_a)
);
assert_vec_eq(
&mut vec![5, 9],
&mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b),
);
assert_eq!(
vec![9],
prioritization_fee_cache.get_account_prioritization_fees(&write_account_c)
);
// after block is completed
sync_finalize_priority_fee_for_test(&mut prioritization_fee_cache, bank3.slot());
assert_vec_eq(
&mut vec![5, 2],
&mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_a),
);
assert_vec_eq(
&mut vec![5, 9],
&mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_b),
);
assert_vec_eq(
&mut vec![9, 2],
&mut prioritization_fee_cache.get_account_prioritization_fees(&write_account_c),
);
}
}
}