From 4f4ce69f5f7cd25183cca771034e190148dc39f6 Mon Sep 17 00:00:00 2001 From: Tao Zhu <82401714+taozhu-chicago@users.noreply.github.com> Date: Mon, 11 Sep 2023 10:08:55 -0500 Subject: [PATCH] purge duplicated bank prioritization fee from cache (#33062) * purge duplicated bank prioritization fee from cache * add test for purge dup bank * Added metrics counts to monitor anomalies * fix a flaky test --- .../optimistically_confirmed_bank_tracker.rs | 2 +- rpc/src/rpc.rs | 6 +- runtime/src/prioritization_fee.rs | 46 ++- runtime/src/prioritization_fee_cache.rs | 330 +++++++++++++----- 4 files changed, 277 insertions(+), 107 deletions(-) diff --git a/rpc/src/optimistically_confirmed_bank_tracker.rs b/rpc/src/optimistically_confirmed_bank_tracker.rs index 3d3643b44..3179e5709 100644 --- a/rpc/src/optimistically_confirmed_bank_tracker.rs +++ b/rpc/src/optimistically_confirmed_bank_tracker.rs @@ -197,7 +197,7 @@ impl OptimisticallyConfirmedBankTracker { ); // finalize block's minimum prioritization fee cache for this bank - prioritization_fee_cache.finalize_priority_fee(bank.slot()); + prioritization_fee_cache.finalize_priority_fee(bank.slot(), bank.bank_id()); } } else if bank.slot() > bank_forks.read().unwrap().root() { pending_optimistically_confirmed_banks.insert(bank.slot()); diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 3e284b1f9..90fd6a2a2 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -8658,6 +8658,7 @@ pub mod tests { 0 ); let slot0 = rpc.working_bank().slot(); + let bank0_id = rpc.working_bank().bank_id(); let account0 = Pubkey::new_unique(); let account1 = Pubkey::new_unique(); let account2 = Pubkey::new_unique(); @@ -8677,7 +8678,7 @@ pub mod tests { ]; rpc.update_prioritization_fee_cache(transactions); let cache = rpc.get_prioritization_fee_cache(); - cache.finalize_priority_fee(slot0); + cache.finalize_priority_fee(slot0, bank0_id); wait_for_cache_blocks(cache, 1); let request = create_test_request("getRecentPrioritizationFees", None); @@ -8721,6 +8722,7 @@ pub mod tests { rpc.advance_bank_to_confirmed_slot(1); let slot1 = rpc.working_bank().slot(); + let bank1_id = rpc.working_bank().bank_id(); let price1 = 11; let transactions = vec![ Transaction::new_unsigned(Message::new( @@ -8737,7 +8739,7 @@ pub mod tests { ]; rpc.update_prioritization_fee_cache(transactions); let cache = rpc.get_prioritization_fee_cache(); - cache.finalize_priority_fee(slot1); + cache.finalize_priority_fee(slot1, bank1_id); wait_for_cache_blocks(cache, 2); let request = create_test_request("getRecentPrioritizationFees", None); diff --git a/runtime/src/prioritization_fee.rs b/runtime/src/prioritization_fee.rs index a7d28f11f..bb5f7632c 100644 --- a/runtime/src/prioritization_fee.rs +++ b/runtime/src/prioritization_fee.rs @@ -19,6 +19,9 @@ struct PrioritizationFeeMetrics { // Count of transactions that have zero prioritization fee. non_prioritized_transactions_count: u64, + // Count of attempted update on finalized PrioritizationFee + attempted_update_on_finalized_fee_count: u64, + // Total prioritization fees included in this slot. total_prioritization_fee: u64, @@ -41,6 +44,10 @@ impl PrioritizationFeeMetrics { saturating_add_assign!(self.total_update_elapsed_us, val); } + fn increment_attempted_update_on_finalized_fee_count(&mut self, val: u64) { + saturating_add_assign!(self.attempted_update_on_finalized_fee_count, val); + } + fn update_prioritization_fee(&mut self, fee: u64) { if fee == 0 { saturating_add_assign!(self.non_prioritized_transactions_count, 1); @@ -82,6 +89,11 @@ impl PrioritizationFeeMetrics { self.non_prioritized_transactions_count as i64, i64 ), + ( + "attempted_update_on_finalized_fee_count", + self.attempted_update_on_finalized_fee_count as i64, + i64 + ), ( "total_prioritization_fee", self.total_prioritization_fee as i64, @@ -106,6 +118,7 @@ impl PrioritizationFeeMetrics { } } +#[derive(Debug)] pub enum PrioritizationFeeError { // Not able to get account locks from sanitized transaction, which is required to update block // minimum fees. @@ -159,22 +172,27 @@ impl PrioritizationFee { ) -> Result<(), PrioritizationFeeError> { let (_, update_time) = measure!( { - if transaction_fee < self.min_transaction_fee { - self.min_transaction_fee = transaction_fee; - } + if !self.is_finalized { + if transaction_fee < self.min_transaction_fee { + self.min_transaction_fee = transaction_fee; + } - for write_account in writable_accounts.iter() { - self.min_writable_account_fees - .entry(*write_account) - .and_modify(|write_lock_fee| { - *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) - }) - .or_insert(transaction_fee); - } + for write_account in writable_accounts.iter() { + self.min_writable_account_fees + .entry(*write_account) + .and_modify(|write_lock_fee| { + *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) + }) + .or_insert(transaction_fee); + } - self.metrics - .accumulate_total_prioritization_fee(transaction_fee); - self.metrics.update_prioritization_fee(transaction_fee); + self.metrics + .accumulate_total_prioritization_fee(transaction_fee); + self.metrics.update_prioritization_fee(transaction_fee); + } else { + self.metrics + .increment_attempted_update_on_finalized_fee_count(1); + } }, "update_time", ); diff --git a/runtime/src/prioritization_fee_cache.rs b/runtime/src/prioritization_fee_cache.rs index 78f6214bb..e1005bd63 100644 --- a/runtime/src/prioritization_fee_cache.rs +++ b/runtime/src/prioritization_fee_cache.rs @@ -4,17 +4,20 @@ use { transaction_priority_details::GetTransactionPriorityDetails, }, crossbeam_channel::{unbounded, Receiver, Sender}, + dashmap::DashMap, log::*, lru::LruCache, solana_measure::measure, solana_sdk::{ - clock::Slot, pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction, + clock::{BankId, Slot}, + pubkey::Pubkey, + transaction::SanitizedTransaction, }, std::{ collections::HashMap, sync::{ atomic::{AtomicU64, Ordering}, - Arc, Mutex, RwLock, + Arc, RwLock, }, thread::{Builder, JoinHandle}, }, @@ -30,15 +33,15 @@ struct PrioritizationFeeCacheMetrics { // Count of transactions that successfully updated each slot's prioritization fee cache. successful_transaction_update_count: AtomicU64, + // Count of duplicated banks being purged + purged_duplicated_bank_count: AtomicU64, + // Accumulated time spent on tracking prioritization fee for each slot. total_update_elapsed_us: AtomicU64, // Accumulated time spent on acquiring cache write lock. total_cache_lock_elapsed_us: AtomicU64, - // Accumulated time spent on acquiring each block entry's lock.. - total_entry_lock_elapsed_us: AtomicU64, - // Accumulated time spent on updating block prioritization fees. total_entry_update_elapsed_us: AtomicU64, @@ -52,6 +55,11 @@ impl PrioritizationFeeCacheMetrics { .fetch_add(val, Ordering::Relaxed); } + fn accumulate_total_purged_duplicated_bank_count(&self, val: u64) { + self.purged_duplicated_bank_count + .fetch_add(val, Ordering::Relaxed); + } + fn accumulate_total_update_elapsed_us(&self, val: u64) { self.total_update_elapsed_us .fetch_add(val, Ordering::Relaxed); @@ -62,11 +70,6 @@ impl PrioritizationFeeCacheMetrics { .fetch_add(val, Ordering::Relaxed); } - fn accumulate_total_entry_lock_elapsed_us(&self, val: u64) { - self.total_entry_lock_elapsed_us - .fetch_add(val, Ordering::Relaxed); - } - fn accumulate_total_entry_update_elapsed_us(&self, val: u64) { self.total_entry_update_elapsed_us .fetch_add(val, Ordering::Relaxed); @@ -87,6 +90,11 @@ impl PrioritizationFeeCacheMetrics { .swap(0, Ordering::Relaxed) as i64, i64 ), + ( + "purged_duplicated_bank_count", + self.purged_duplicated_bank_count.swap(0, Ordering::Relaxed) as i64, + i64 + ), ( "total_update_elapsed_us", self.total_update_elapsed_us.swap(0, Ordering::Relaxed) as i64, @@ -97,11 +105,6 @@ impl PrioritizationFeeCacheMetrics { self.total_cache_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, i64 ), - ( - "total_entry_lock_elapsed_us", - self.total_entry_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, - i64 - ), ( "total_entry_update_elapsed_us", self.total_entry_update_elapsed_us @@ -121,20 +124,26 @@ impl PrioritizationFeeCacheMetrics { enum CacheServiceUpdate { TransactionUpdate { slot: Slot, + bank_id: BankId, transaction_fee: u64, writable_accounts: Arc>, }, - BankFrozen { + BankFinalized { slot: Slot, + bank_id: BankId, }, Exit, } +/// Potentially there are more than one bank that updates Prioritization Fee +/// for a slot. The updates are tracked and finalized by bank_id. +type SlotPrioritizationFee = DashMap; + /// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, /// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, /// and collecting stats and reporting metrics. pub struct PrioritizationFeeCache { - cache: Arc>>>>, + cache: Arc>>>, service_thread: Option>, sender: Sender, metrics: Arc, @@ -184,14 +193,14 @@ impl PrioritizationFeeCache { /// Get prioritization fee entry, create new entry if necessary fn get_prioritization_fee( - cache: Arc>>>>, + cache: Arc>>>, slot: &Slot, - ) -> Arc> { + ) -> Arc { let mut cache = cache.write().unwrap(); match cache.get(slot) { Some(entry) => Arc::clone(entry), None => { - let entry = Arc::new(Mutex::new(PrioritizationFee::default())); + let entry = Arc::new(SlotPrioritizationFee::default()); cache.put(*slot, Arc::clone(&entry)); entry } @@ -202,7 +211,6 @@ impl PrioritizationFeeCache { /// transactions have both valid priority_detail and account_locks will be used to update /// fee_cache asynchronously. pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator) { - let mut successful_transaction_update_count: u64 = 0; let (_, send_updates_time) = measure!( { for sanitized_transaction in txs { @@ -241,6 +249,7 @@ impl PrioritizationFeeCache { self.sender .send(CacheServiceUpdate::TransactionUpdate { slot: bank.slot(), + bank_id: bank.bank_id(), transaction_fee: priority_details.priority, writable_accounts, }) @@ -250,7 +259,6 @@ impl PrioritizationFeeCache { err ); }); - saturating_add_assign!(successful_transaction_update_count, 1) } }, "send_updates", @@ -258,15 +266,13 @@ impl PrioritizationFeeCache { self.metrics .accumulate_total_update_elapsed_us(send_updates_time.as_us()); - self.metrics - .accumulate_successful_transaction_update_count(successful_transaction_update_count); } /// Finalize prioritization fee when it's bank is completely replayed from blockstore, /// by pruning irrelevant accounts to save space, and marking its availability for queries. - pub fn finalize_priority_fee(&self, slot: Slot) { + pub fn finalize_priority_fee(&self, slot: Slot, bank_id: BankId) { self.sender - .send(CacheServiceUpdate::BankFrozen { slot }) + .send(CacheServiceUpdate::BankFinalized { slot, bank_id }) .unwrap_or_else(|err| { warn!( "prioritization fee cache signalling bank frozen failed: {:?}", @@ -278,53 +284,76 @@ impl PrioritizationFeeCache { /// Internal function is invoked by worker thread to update slot's minimum prioritization fee, /// Cache lock contends here. fn update_cache( - cache: Arc>>>>, + cache: Arc>>>, slot: &Slot, + bank_id: &BankId, transaction_fee: u64, writable_accounts: Arc>, metrics: Arc, ) { - let (block_prioritization_fee, cache_lock_time) = + let (slot_prioritization_fee, cache_lock_time) = measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); - let (mut block_prioritization_fee, entry_lock_time) = - measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time"); - let (_, entry_update_time) = measure!( - block_prioritization_fee.update(transaction_fee, &writable_accounts), + { + let mut block_prioritization_fee = slot_prioritization_fee + .entry(*bank_id) + .or_insert(PrioritizationFee::default()); + block_prioritization_fee.update(transaction_fee, &writable_accounts) + }, "entry_update_time" ); metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); - metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us()); metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); + metrics.accumulate_successful_transaction_update_count(1); } fn finalize_slot( - cache: Arc>>>>, + cache: Arc>>>, slot: &Slot, + bank_id: &BankId, metrics: Arc, ) { - let (block_prioritization_fee, cache_lock_time) = + let (slot_prioritization_fee, cache_lock_time) = measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); - let (mut block_prioritization_fee, entry_lock_time) = - measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time"); - // prune cache by evicting write account entry from prioritization fee if its fee is less // or equal to block's minimum transaction fee, because they are irrelevant in calculating // block minimum fee. - let (_, slot_finalize_time) = measure!( - block_prioritization_fee.mark_block_completed(), + let (result, slot_finalize_time) = measure!( + { + let pre_purge_bank_count = slot_prioritization_fee.len() as u64; + slot_prioritization_fee.retain(|id, _| id == bank_id); + let post_purge_bank_count = slot_prioritization_fee.len() as u64; + metrics.accumulate_total_purged_duplicated_bank_count( + pre_purge_bank_count.saturating_sub(post_purge_bank_count), + ); + if post_purge_bank_count == 0 { + warn!("Prioritization fee cache unexpected finalized on non-existing bank. slot {slot} bank id {bank_id}"); + } + + let mut block_prioritization_fee = slot_prioritization_fee + .entry(*bank_id) + .or_insert(PrioritizationFee::default()); + let result = block_prioritization_fee.mark_block_completed(); + block_prioritization_fee.report_metrics(*slot); + result + }, "slot_finalize_time" ); - block_prioritization_fee.report_metrics(*slot); metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); - metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us()); metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); + + if let Err(err) = result { + error!( + "Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}", + err + ); + } } fn service_loop( - cache: Arc>>>>, + cache: Arc>>>, receiver: Receiver, metrics: Arc, ) { @@ -332,17 +361,19 @@ impl PrioritizationFeeCache { match update { CacheServiceUpdate::TransactionUpdate { slot, + bank_id, transaction_fee, writable_accounts, } => Self::update_cache( cache.clone(), &slot, + &bank_id, transaction_fee, writable_accounts, metrics.clone(), ), - CacheServiceUpdate::BankFrozen { slot } => { - Self::finalize_slot(cache.clone(), &slot, metrics.clone()); + CacheServiceUpdate::BankFinalized { slot, bank_id } => { + Self::finalize_slot(cache.clone(), &slot, &bank_id, metrics.clone()); metrics.report(slot); } @@ -359,7 +390,11 @@ impl PrioritizationFeeCache { .read() .unwrap() .iter() - .filter(|(_slot, prioritization_fee)| prioritization_fee.lock().unwrap().is_finalized()) + .filter(|(_slot, slot_prioritization_fee)| { + slot_prioritization_fee + .iter() + .any(|prioritization_fee| prioritization_fee.is_finalized()) + }) .count() } @@ -368,21 +403,24 @@ impl PrioritizationFeeCache { .read() .unwrap() .iter() - .filter_map(|(slot, prioritization_fee)| { - let prioritization_fee_read = prioritization_fee.lock().unwrap(); - prioritization_fee_read.is_finalized().then(|| { - let mut fee = prioritization_fee_read - .get_min_transaction_fee() - .unwrap_or_default(); - for account_key in account_keys { - if let Some(account_fee) = - prioritization_fee_read.get_writable_account_fee(account_key) - { - fee = std::cmp::max(fee, account_fee); - } - } - Some((*slot, fee)) - }) + .filter_map(|(slot, slot_prioritization_fee)| { + slot_prioritization_fee + .iter() + .find_map(|prioritization_fee| { + prioritization_fee.is_finalized().then(|| { + let mut fee = prioritization_fee + .get_min_transaction_fee() + .unwrap_or_default(); + for account_key in account_keys { + if let Some(account_fee) = + prioritization_fee.get_writable_account_fee(account_key) + { + fee = std::cmp::max(fee, account_fee); + } + } + Some((*slot, fee)) + }) + }) }) .flatten() .collect() @@ -427,21 +465,22 @@ mod tests { fn sync_update<'a>( prioritization_fee_cache: &PrioritizationFeeCache, bank: Arc, - txs: impl Iterator, + txs: impl Iterator + ExactSizeIterator, ) { + let expected_update_count = prioritization_fee_cache + .metrics + .successful_transaction_update_count + .load(Ordering::Relaxed) + .saturating_add(txs.len() as u64); + prioritization_fee_cache.update(&bank, txs); - let block_fee = PrioritizationFeeCache::get_prioritization_fee( - prioritization_fee_cache.cache.clone(), - &bank.slot(), - ); - - // wait till update is done - while block_fee - .lock() - .unwrap() - .get_min_transaction_fee() - .is_none() + // wait till expected number of transaction updates have occurred... + while prioritization_fee_cache + .metrics + .successful_transaction_update_count + .load(Ordering::Relaxed) + != expected_update_count { std::thread::sleep(std::time::Duration::from_millis(100)); } @@ -451,15 +490,19 @@ mod tests { fn sync_finalize_priority_fee_for_test( prioritization_fee_cache: &PrioritizationFeeCache, slot: Slot, + bank_id: BankId, ) { - prioritization_fee_cache.finalize_priority_fee(slot); + prioritization_fee_cache.finalize_priority_fee(slot, bank_id); let fee = PrioritizationFeeCache::get_prioritization_fee( prioritization_fee_cache.cache.clone(), &slot, ); // wait till finalization is done - while !fee.lock().unwrap().is_finalized() { + while !fee + .get(&bank_id) + .map_or(false, |block_fee| block_fee.is_finalized()) + { std::thread::sleep(std::time::Duration::from_millis(100)); } } @@ -490,7 +533,7 @@ mod tests { let slot = bank.slot(); let prioritization_fee_cache = PrioritizationFeeCache::default(); - sync_update(&prioritization_fee_cache, bank, txs.iter()); + sync_update(&prioritization_fee_cache, bank.clone(), txs.iter()); // assert block minimum fee and account a, b, c fee accordingly { @@ -498,7 +541,7 @@ mod tests { prioritization_fee_cache.cache.clone(), &slot, ); - let fee = fee.lock().unwrap(); + let fee = fee.get(&bank.bank_id()).unwrap(); assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); @@ -511,12 +554,12 @@ mod tests { // assert after prune, account a and c should be removed from cache to save space { - sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); let fee = PrioritizationFeeCache::get_prioritization_fee( prioritization_fee_cache.cache.clone(), &slot, ); - let fee = fee.lock().unwrap(); + let fee = fee.get(&bank.bank_id()).unwrap(); assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert!(fee.get_writable_account_fee(&write_account_a).is_none()); assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); @@ -532,20 +575,22 @@ mod tests { prioritization_fee_cache.cache.clone(), &1 ) - .lock() - .unwrap() + .entry(1) + .or_default() .mark_block_completed() .is_ok()); assert!(PrioritizationFeeCache::get_prioritization_fee( prioritization_fee_cache.cache.clone(), &2 ) - .lock() - .unwrap() + .entry(2) + .or_default() .mark_block_completed() .is_ok()); // add slot 3 entry to cache, but not finalize it - PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3); + PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3) + .entry(3) + .or_default(); // assert available block count should be 2 finalized blocks assert_eq!(2, prioritization_fee_cache.available_block_count()); @@ -603,7 +648,7 @@ mod tests { &Pubkey::new_unique(), ), ]; - sync_update(&prioritization_fee_cache, bank1, txs.iter()); + sync_update(&prioritization_fee_cache, bank1.clone(), txs.iter()); // before block is marked as completed assert!(prioritization_fee_cache .get_prioritization_fees(&[]) @@ -624,7 +669,7 @@ mod tests { .get_prioritization_fees(&[write_account_a, write_account_b, write_account_c]) .is_empty()); // after block is completed - sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id()); assert_eq!( hashmap_of(vec![(1, 1)]), prioritization_fee_cache.get_prioritization_fees(&[]) @@ -666,7 +711,7 @@ mod tests { &Pubkey::new_unique(), ), ]; - sync_update(&prioritization_fee_cache, bank2, txs.iter()); + sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); // before block is marked as completed assert_eq!( hashmap_of(vec![(1, 1)]), @@ -698,7 +743,7 @@ mod tests { ]) ); // after block is completed - sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id()); assert_eq!( hashmap_of(vec![(2, 3), (1, 1)]), prioritization_fee_cache.get_prioritization_fees(&[]), @@ -740,7 +785,7 @@ mod tests { &Pubkey::new_unique(), ), ]; - sync_update(&prioritization_fee_cache, bank3, txs.iter()); + sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter()); // before block is marked as completed assert_eq!( hashmap_of(vec![(2, 3), (1, 1)]), @@ -772,7 +817,7 @@ mod tests { ]), ); // after block is completed - sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3); + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id()); assert_eq!( hashmap_of(vec![(3, 5), (2, 3), (1, 1)]), prioritization_fee_cache.get_prioritization_fees(&[]), @@ -804,4 +849,109 @@ mod tests { ); } } + + #[test] + fn test_purge_duplicated_bank() { + // duplicated bank can exists for same slot before OC. + // prioritization_fee_cache should only have data from OC-ed bank + solana_logger::setup(); + let write_account_a = Pubkey::new_unique(); + let write_account_b = Pubkey::new_unique(); + let write_account_c = Pubkey::new_unique(); + + let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000); + let bank0 = Bank::new_for_benches(&genesis_config); + let bank_forks = BankForks::new(bank0); + let bank = bank_forks.working_bank(); + let collector = solana_sdk::pubkey::new_rand(); + let slot: Slot = 999; + let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, slot)); + let bank2 = Arc::new(Bank::new_from_parent(bank, &collector, slot)); + + let prioritization_fee_cache = PrioritizationFeeCache::default(); + + // Assert after add transactions for bank1 of slot 1 + { + let txs = vec![ + build_sanitized_transaction_for_test(2, &write_account_a, &write_account_b), + build_sanitized_transaction_for_test( + 1, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + sync_update(&prioritization_fee_cache, bank1.clone(), txs.iter()); + + let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + assert_eq!(1, slot_prioritization_fee.len()); + assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + } + + // Assert after add transactions for bank2 of slot 1 + { + let txs = vec![ + build_sanitized_transaction_for_test(4, &write_account_b, &write_account_c), + build_sanitized_transaction_for_test( + 3, + &Pubkey::new_unique(), + &Pubkey::new_unique(), + ), + ]; + sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); + + let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + assert_eq!(2, slot_prioritization_fee.len()); + assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + assert!(slot_prioritization_fee.contains_key(&bank2.bank_id())); + } + + // Assert after finalize with bank1 of slot 1, + { + sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank1.bank_id()); + + let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee( + prioritization_fee_cache.cache.clone(), + &slot, + ); + assert_eq!(1, slot_prioritization_fee.len()); + assert!(slot_prioritization_fee.contains_key(&bank1.bank_id())); + + // and data available for query are from bank1 + assert_eq!( + hashmap_of(vec![(slot, 1)]), + prioritization_fee_cache.get_prioritization_fees(&[]) + ); + assert_eq!( + hashmap_of(vec![(slot, 2)]), + prioritization_fee_cache.get_prioritization_fees(&[write_account_a]) + ); + assert_eq!( + hashmap_of(vec![(slot, 2)]), + prioritization_fee_cache.get_prioritization_fees(&[write_account_b]) + ); + assert_eq!( + hashmap_of(vec![(slot, 1)]), + prioritization_fee_cache.get_prioritization_fees(&[write_account_c]) + ); + assert_eq!( + hashmap_of(vec![(slot, 2)]), + prioritization_fee_cache + .get_prioritization_fees(&[write_account_a, write_account_b]) + ); + assert_eq!( + hashmap_of(vec![(slot, 2)]), + prioritization_fee_cache.get_prioritization_fees(&[ + write_account_a, + write_account_b, + write_account_c + ]) + ); + } + } }