purge duplicated bank prioritization fee from cache (#33062)

* purge duplicated bank prioritization fee from cache

* add test for purge dup bank

* Added metrics counts to monitor anomalies

* fix a flaky test
This commit is contained in:
Tao Zhu 2023-09-11 10:08:55 -05:00 committed by GitHub
parent 527a4bbf00
commit 4f4ce69f5f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 277 additions and 107 deletions

View File

@ -197,7 +197,7 @@ impl OptimisticallyConfirmedBankTracker {
); );
// finalize block's minimum prioritization fee cache for this bank // finalize block's minimum prioritization fee cache for this bank
prioritization_fee_cache.finalize_priority_fee(bank.slot()); prioritization_fee_cache.finalize_priority_fee(bank.slot(), bank.bank_id());
} }
} else if bank.slot() > bank_forks.read().unwrap().root() { } else if bank.slot() > bank_forks.read().unwrap().root() {
pending_optimistically_confirmed_banks.insert(bank.slot()); pending_optimistically_confirmed_banks.insert(bank.slot());

View File

@ -8658,6 +8658,7 @@ pub mod tests {
0 0
); );
let slot0 = rpc.working_bank().slot(); let slot0 = rpc.working_bank().slot();
let bank0_id = rpc.working_bank().bank_id();
let account0 = Pubkey::new_unique(); let account0 = Pubkey::new_unique();
let account1 = Pubkey::new_unique(); let account1 = Pubkey::new_unique();
let account2 = Pubkey::new_unique(); let account2 = Pubkey::new_unique();
@ -8677,7 +8678,7 @@ pub mod tests {
]; ];
rpc.update_prioritization_fee_cache(transactions); rpc.update_prioritization_fee_cache(transactions);
let cache = rpc.get_prioritization_fee_cache(); let cache = rpc.get_prioritization_fee_cache();
cache.finalize_priority_fee(slot0); cache.finalize_priority_fee(slot0, bank0_id);
wait_for_cache_blocks(cache, 1); wait_for_cache_blocks(cache, 1);
let request = create_test_request("getRecentPrioritizationFees", None); let request = create_test_request("getRecentPrioritizationFees", None);
@ -8721,6 +8722,7 @@ pub mod tests {
rpc.advance_bank_to_confirmed_slot(1); rpc.advance_bank_to_confirmed_slot(1);
let slot1 = rpc.working_bank().slot(); let slot1 = rpc.working_bank().slot();
let bank1_id = rpc.working_bank().bank_id();
let price1 = 11; let price1 = 11;
let transactions = vec![ let transactions = vec![
Transaction::new_unsigned(Message::new( Transaction::new_unsigned(Message::new(
@ -8737,7 +8739,7 @@ pub mod tests {
]; ];
rpc.update_prioritization_fee_cache(transactions); rpc.update_prioritization_fee_cache(transactions);
let cache = rpc.get_prioritization_fee_cache(); let cache = rpc.get_prioritization_fee_cache();
cache.finalize_priority_fee(slot1); cache.finalize_priority_fee(slot1, bank1_id);
wait_for_cache_blocks(cache, 2); wait_for_cache_blocks(cache, 2);
let request = create_test_request("getRecentPrioritizationFees", None); let request = create_test_request("getRecentPrioritizationFees", None);

View File

@ -19,6 +19,9 @@ struct PrioritizationFeeMetrics {
// Count of transactions that have zero prioritization fee. // Count of transactions that have zero prioritization fee.
non_prioritized_transactions_count: u64, non_prioritized_transactions_count: u64,
// Count of attempted update on finalized PrioritizationFee
attempted_update_on_finalized_fee_count: u64,
// Total prioritization fees included in this slot. // Total prioritization fees included in this slot.
total_prioritization_fee: u64, total_prioritization_fee: u64,
@ -41,6 +44,10 @@ impl PrioritizationFeeMetrics {
saturating_add_assign!(self.total_update_elapsed_us, val); saturating_add_assign!(self.total_update_elapsed_us, val);
} }
fn increment_attempted_update_on_finalized_fee_count(&mut self, val: u64) {
saturating_add_assign!(self.attempted_update_on_finalized_fee_count, val);
}
fn update_prioritization_fee(&mut self, fee: u64) { fn update_prioritization_fee(&mut self, fee: u64) {
if fee == 0 { if fee == 0 {
saturating_add_assign!(self.non_prioritized_transactions_count, 1); saturating_add_assign!(self.non_prioritized_transactions_count, 1);
@ -82,6 +89,11 @@ impl PrioritizationFeeMetrics {
self.non_prioritized_transactions_count as i64, self.non_prioritized_transactions_count as i64,
i64 i64
), ),
(
"attempted_update_on_finalized_fee_count",
self.attempted_update_on_finalized_fee_count as i64,
i64
),
( (
"total_prioritization_fee", "total_prioritization_fee",
self.total_prioritization_fee as i64, self.total_prioritization_fee as i64,
@ -106,6 +118,7 @@ impl PrioritizationFeeMetrics {
} }
} }
#[derive(Debug)]
pub enum PrioritizationFeeError { pub enum PrioritizationFeeError {
// Not able to get account locks from sanitized transaction, which is required to update block // Not able to get account locks from sanitized transaction, which is required to update block
// minimum fees. // minimum fees.
@ -159,22 +172,27 @@ impl PrioritizationFee {
) -> Result<(), PrioritizationFeeError> { ) -> Result<(), PrioritizationFeeError> {
let (_, update_time) = measure!( let (_, update_time) = measure!(
{ {
if transaction_fee < self.min_transaction_fee { if !self.is_finalized {
self.min_transaction_fee = transaction_fee; if transaction_fee < self.min_transaction_fee {
} self.min_transaction_fee = transaction_fee;
}
for write_account in writable_accounts.iter() { for write_account in writable_accounts.iter() {
self.min_writable_account_fees self.min_writable_account_fees
.entry(*write_account) .entry(*write_account)
.and_modify(|write_lock_fee| { .and_modify(|write_lock_fee| {
*write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee) *write_lock_fee = std::cmp::min(*write_lock_fee, transaction_fee)
}) })
.or_insert(transaction_fee); .or_insert(transaction_fee);
} }
self.metrics self.metrics
.accumulate_total_prioritization_fee(transaction_fee); .accumulate_total_prioritization_fee(transaction_fee);
self.metrics.update_prioritization_fee(transaction_fee); self.metrics.update_prioritization_fee(transaction_fee);
} else {
self.metrics
.increment_attempted_update_on_finalized_fee_count(1);
}
}, },
"update_time", "update_time",
); );

View File

@ -4,17 +4,20 @@ use {
transaction_priority_details::GetTransactionPriorityDetails, transaction_priority_details::GetTransactionPriorityDetails,
}, },
crossbeam_channel::{unbounded, Receiver, Sender}, crossbeam_channel::{unbounded, Receiver, Sender},
dashmap::DashMap,
log::*, log::*,
lru::LruCache, lru::LruCache,
solana_measure::measure, solana_measure::measure,
solana_sdk::{ solana_sdk::{
clock::Slot, pubkey::Pubkey, saturating_add_assign, transaction::SanitizedTransaction, clock::{BankId, Slot},
pubkey::Pubkey,
transaction::SanitizedTransaction,
}, },
std::{ std::{
collections::HashMap, collections::HashMap,
sync::{ sync::{
atomic::{AtomicU64, Ordering}, atomic::{AtomicU64, Ordering},
Arc, Mutex, RwLock, Arc, RwLock,
}, },
thread::{Builder, JoinHandle}, thread::{Builder, JoinHandle},
}, },
@ -30,15 +33,15 @@ struct PrioritizationFeeCacheMetrics {
// Count of transactions that successfully updated each slot's prioritization fee cache. // Count of transactions that successfully updated each slot's prioritization fee cache.
successful_transaction_update_count: AtomicU64, successful_transaction_update_count: AtomicU64,
// Count of duplicated banks being purged
purged_duplicated_bank_count: AtomicU64,
// Accumulated time spent on tracking prioritization fee for each slot. // Accumulated time spent on tracking prioritization fee for each slot.
total_update_elapsed_us: AtomicU64, total_update_elapsed_us: AtomicU64,
// Accumulated time spent on acquiring cache write lock. // Accumulated time spent on acquiring cache write lock.
total_cache_lock_elapsed_us: AtomicU64, total_cache_lock_elapsed_us: AtomicU64,
// Accumulated time spent on acquiring each block entry's lock..
total_entry_lock_elapsed_us: AtomicU64,
// Accumulated time spent on updating block prioritization fees. // Accumulated time spent on updating block prioritization fees.
total_entry_update_elapsed_us: AtomicU64, total_entry_update_elapsed_us: AtomicU64,
@ -52,6 +55,11 @@ impl PrioritizationFeeCacheMetrics {
.fetch_add(val, Ordering::Relaxed); .fetch_add(val, Ordering::Relaxed);
} }
fn accumulate_total_purged_duplicated_bank_count(&self, val: u64) {
self.purged_duplicated_bank_count
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_update_elapsed_us(&self, val: u64) { fn accumulate_total_update_elapsed_us(&self, val: u64) {
self.total_update_elapsed_us self.total_update_elapsed_us
.fetch_add(val, Ordering::Relaxed); .fetch_add(val, Ordering::Relaxed);
@ -62,11 +70,6 @@ impl PrioritizationFeeCacheMetrics {
.fetch_add(val, Ordering::Relaxed); .fetch_add(val, Ordering::Relaxed);
} }
fn accumulate_total_entry_lock_elapsed_us(&self, val: u64) {
self.total_entry_lock_elapsed_us
.fetch_add(val, Ordering::Relaxed);
}
fn accumulate_total_entry_update_elapsed_us(&self, val: u64) { fn accumulate_total_entry_update_elapsed_us(&self, val: u64) {
self.total_entry_update_elapsed_us self.total_entry_update_elapsed_us
.fetch_add(val, Ordering::Relaxed); .fetch_add(val, Ordering::Relaxed);
@ -87,6 +90,11 @@ impl PrioritizationFeeCacheMetrics {
.swap(0, Ordering::Relaxed) as i64, .swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"purged_duplicated_bank_count",
self.purged_duplicated_bank_count.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"total_update_elapsed_us", "total_update_elapsed_us",
self.total_update_elapsed_us.swap(0, Ordering::Relaxed) as i64, self.total_update_elapsed_us.swap(0, Ordering::Relaxed) as i64,
@ -97,11 +105,6 @@ impl PrioritizationFeeCacheMetrics {
self.total_cache_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64, self.total_cache_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64,
i64 i64
), ),
(
"total_entry_lock_elapsed_us",
self.total_entry_lock_elapsed_us.swap(0, Ordering::Relaxed) as i64,
i64
),
( (
"total_entry_update_elapsed_us", "total_entry_update_elapsed_us",
self.total_entry_update_elapsed_us self.total_entry_update_elapsed_us
@ -121,20 +124,26 @@ impl PrioritizationFeeCacheMetrics {
enum CacheServiceUpdate { enum CacheServiceUpdate {
TransactionUpdate { TransactionUpdate {
slot: Slot, slot: Slot,
bank_id: BankId,
transaction_fee: u64, transaction_fee: u64,
writable_accounts: Arc<Vec<Pubkey>>, writable_accounts: Arc<Vec<Pubkey>>,
}, },
BankFrozen { BankFinalized {
slot: Slot, slot: Slot,
bank_id: BankId,
}, },
Exit, Exit,
} }
/// Potentially there are more than one bank that updates Prioritization Fee
/// for a slot. The updates are tracked and finalized by bank_id.
type SlotPrioritizationFee = DashMap<BankId, PrioritizationFee>;
/// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee, /// Stores up to MAX_NUM_RECENT_BLOCKS recent block's prioritization fee,
/// A separate internal thread `service_thread` handles additional tasks when a bank is frozen, /// A separate internal thread `service_thread` handles additional tasks when a bank is frozen,
/// and collecting stats and reporting metrics. /// and collecting stats and reporting metrics.
pub struct PrioritizationFeeCache { pub struct PrioritizationFeeCache {
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>, cache: Arc<RwLock<LruCache<Slot, Arc<SlotPrioritizationFee>>>>,
service_thread: Option<JoinHandle<()>>, service_thread: Option<JoinHandle<()>>,
sender: Sender<CacheServiceUpdate>, sender: Sender<CacheServiceUpdate>,
metrics: Arc<PrioritizationFeeCacheMetrics>, metrics: Arc<PrioritizationFeeCacheMetrics>,
@ -184,14 +193,14 @@ impl PrioritizationFeeCache {
/// Get prioritization fee entry, create new entry if necessary /// Get prioritization fee entry, create new entry if necessary
fn get_prioritization_fee( fn get_prioritization_fee(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>, cache: Arc<RwLock<LruCache<Slot, Arc<SlotPrioritizationFee>>>>,
slot: &Slot, slot: &Slot,
) -> Arc<Mutex<PrioritizationFee>> { ) -> Arc<SlotPrioritizationFee> {
let mut cache = cache.write().unwrap(); let mut cache = cache.write().unwrap();
match cache.get(slot) { match cache.get(slot) {
Some(entry) => Arc::clone(entry), Some(entry) => Arc::clone(entry),
None => { None => {
let entry = Arc::new(Mutex::new(PrioritizationFee::default())); let entry = Arc::new(SlotPrioritizationFee::default());
cache.put(*slot, Arc::clone(&entry)); cache.put(*slot, Arc::clone(&entry));
entry entry
} }
@ -202,7 +211,6 @@ impl PrioritizationFeeCache {
/// transactions have both valid priority_detail and account_locks will be used to update /// transactions have both valid priority_detail and account_locks will be used to update
/// fee_cache asynchronously. /// fee_cache asynchronously.
pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator<Item = &'a SanitizedTransaction>) { pub fn update<'a>(&self, bank: &Bank, txs: impl Iterator<Item = &'a SanitizedTransaction>) {
let mut successful_transaction_update_count: u64 = 0;
let (_, send_updates_time) = measure!( let (_, send_updates_time) = measure!(
{ {
for sanitized_transaction in txs { for sanitized_transaction in txs {
@ -241,6 +249,7 @@ impl PrioritizationFeeCache {
self.sender self.sender
.send(CacheServiceUpdate::TransactionUpdate { .send(CacheServiceUpdate::TransactionUpdate {
slot: bank.slot(), slot: bank.slot(),
bank_id: bank.bank_id(),
transaction_fee: priority_details.priority, transaction_fee: priority_details.priority,
writable_accounts, writable_accounts,
}) })
@ -250,7 +259,6 @@ impl PrioritizationFeeCache {
err err
); );
}); });
saturating_add_assign!(successful_transaction_update_count, 1)
} }
}, },
"send_updates", "send_updates",
@ -258,15 +266,13 @@ impl PrioritizationFeeCache {
self.metrics self.metrics
.accumulate_total_update_elapsed_us(send_updates_time.as_us()); .accumulate_total_update_elapsed_us(send_updates_time.as_us());
self.metrics
.accumulate_successful_transaction_update_count(successful_transaction_update_count);
} }
/// Finalize prioritization fee when it's bank is completely replayed from blockstore, /// Finalize prioritization fee when it's bank is completely replayed from blockstore,
/// by pruning irrelevant accounts to save space, and marking its availability for queries. /// by pruning irrelevant accounts to save space, and marking its availability for queries.
pub fn finalize_priority_fee(&self, slot: Slot) { pub fn finalize_priority_fee(&self, slot: Slot, bank_id: BankId) {
self.sender self.sender
.send(CacheServiceUpdate::BankFrozen { slot }) .send(CacheServiceUpdate::BankFinalized { slot, bank_id })
.unwrap_or_else(|err| { .unwrap_or_else(|err| {
warn!( warn!(
"prioritization fee cache signalling bank frozen failed: {:?}", "prioritization fee cache signalling bank frozen failed: {:?}",
@ -278,53 +284,76 @@ impl PrioritizationFeeCache {
/// Internal function is invoked by worker thread to update slot's minimum prioritization fee, /// Internal function is invoked by worker thread to update slot's minimum prioritization fee,
/// Cache lock contends here. /// Cache lock contends here.
fn update_cache( fn update_cache(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>, cache: Arc<RwLock<LruCache<Slot, Arc<SlotPrioritizationFee>>>>,
slot: &Slot, slot: &Slot,
bank_id: &BankId,
transaction_fee: u64, transaction_fee: u64,
writable_accounts: Arc<Vec<Pubkey>>, writable_accounts: Arc<Vec<Pubkey>>,
metrics: Arc<PrioritizationFeeCacheMetrics>, metrics: Arc<PrioritizationFeeCacheMetrics>,
) { ) {
let (block_prioritization_fee, cache_lock_time) = let (slot_prioritization_fee, cache_lock_time) =
measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time");
let (mut block_prioritization_fee, entry_lock_time) =
measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time");
let (_, entry_update_time) = measure!( let (_, entry_update_time) = measure!(
block_prioritization_fee.update(transaction_fee, &writable_accounts), {
let mut block_prioritization_fee = slot_prioritization_fee
.entry(*bank_id)
.or_insert(PrioritizationFee::default());
block_prioritization_fee.update(transaction_fee, &writable_accounts)
},
"entry_update_time" "entry_update_time"
); );
metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us());
metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us());
metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us()); metrics.accumulate_total_entry_update_elapsed_us(entry_update_time.as_us());
metrics.accumulate_successful_transaction_update_count(1);
} }
fn finalize_slot( fn finalize_slot(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>, cache: Arc<RwLock<LruCache<Slot, Arc<SlotPrioritizationFee>>>>,
slot: &Slot, slot: &Slot,
bank_id: &BankId,
metrics: Arc<PrioritizationFeeCacheMetrics>, metrics: Arc<PrioritizationFeeCacheMetrics>,
) { ) {
let (block_prioritization_fee, cache_lock_time) = let (slot_prioritization_fee, cache_lock_time) =
measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time"); measure!(Self::get_prioritization_fee(cache, slot), "cache_lock_time");
let (mut block_prioritization_fee, entry_lock_time) =
measure!(block_prioritization_fee.lock().unwrap(), "entry_lock_time");
// prune cache by evicting write account entry from prioritization fee if its fee is less // prune cache by evicting write account entry from prioritization fee if its fee is less
// or equal to block's minimum transaction fee, because they are irrelevant in calculating // or equal to block's minimum transaction fee, because they are irrelevant in calculating
// block minimum fee. // block minimum fee.
let (_, slot_finalize_time) = measure!( let (result, slot_finalize_time) = measure!(
block_prioritization_fee.mark_block_completed(), {
let pre_purge_bank_count = slot_prioritization_fee.len() as u64;
slot_prioritization_fee.retain(|id, _| id == bank_id);
let post_purge_bank_count = slot_prioritization_fee.len() as u64;
metrics.accumulate_total_purged_duplicated_bank_count(
pre_purge_bank_count.saturating_sub(post_purge_bank_count),
);
if post_purge_bank_count == 0 {
warn!("Prioritization fee cache unexpected finalized on non-existing bank. slot {slot} bank id {bank_id}");
}
let mut block_prioritization_fee = slot_prioritization_fee
.entry(*bank_id)
.or_insert(PrioritizationFee::default());
let result = block_prioritization_fee.mark_block_completed();
block_prioritization_fee.report_metrics(*slot);
result
},
"slot_finalize_time" "slot_finalize_time"
); );
block_prioritization_fee.report_metrics(*slot);
metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us()); metrics.accumulate_total_cache_lock_elapsed_us(cache_lock_time.as_us());
metrics.accumulate_total_entry_lock_elapsed_us(entry_lock_time.as_us());
metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us()); metrics.accumulate_total_block_finalize_elapsed_us(slot_finalize_time.as_us());
if let Err(err) = result {
error!(
"Unsuccessful finalizing slot {slot}, bank ID {bank_id}: {:?}",
err
);
}
} }
fn service_loop( fn service_loop(
cache: Arc<RwLock<LruCache<Slot, Arc<Mutex<PrioritizationFee>>>>>, cache: Arc<RwLock<LruCache<Slot, Arc<SlotPrioritizationFee>>>>,
receiver: Receiver<CacheServiceUpdate>, receiver: Receiver<CacheServiceUpdate>,
metrics: Arc<PrioritizationFeeCacheMetrics>, metrics: Arc<PrioritizationFeeCacheMetrics>,
) { ) {
@ -332,17 +361,19 @@ impl PrioritizationFeeCache {
match update { match update {
CacheServiceUpdate::TransactionUpdate { CacheServiceUpdate::TransactionUpdate {
slot, slot,
bank_id,
transaction_fee, transaction_fee,
writable_accounts, writable_accounts,
} => Self::update_cache( } => Self::update_cache(
cache.clone(), cache.clone(),
&slot, &slot,
&bank_id,
transaction_fee, transaction_fee,
writable_accounts, writable_accounts,
metrics.clone(), metrics.clone(),
), ),
CacheServiceUpdate::BankFrozen { slot } => { CacheServiceUpdate::BankFinalized { slot, bank_id } => {
Self::finalize_slot(cache.clone(), &slot, metrics.clone()); Self::finalize_slot(cache.clone(), &slot, &bank_id, metrics.clone());
metrics.report(slot); metrics.report(slot);
} }
@ -359,7 +390,11 @@ impl PrioritizationFeeCache {
.read() .read()
.unwrap() .unwrap()
.iter() .iter()
.filter(|(_slot, prioritization_fee)| prioritization_fee.lock().unwrap().is_finalized()) .filter(|(_slot, slot_prioritization_fee)| {
slot_prioritization_fee
.iter()
.any(|prioritization_fee| prioritization_fee.is_finalized())
})
.count() .count()
} }
@ -368,21 +403,24 @@ impl PrioritizationFeeCache {
.read() .read()
.unwrap() .unwrap()
.iter() .iter()
.filter_map(|(slot, prioritization_fee)| { .filter_map(|(slot, slot_prioritization_fee)| {
let prioritization_fee_read = prioritization_fee.lock().unwrap(); slot_prioritization_fee
prioritization_fee_read.is_finalized().then(|| { .iter()
let mut fee = prioritization_fee_read .find_map(|prioritization_fee| {
.get_min_transaction_fee() prioritization_fee.is_finalized().then(|| {
.unwrap_or_default(); let mut fee = prioritization_fee
for account_key in account_keys { .get_min_transaction_fee()
if let Some(account_fee) = .unwrap_or_default();
prioritization_fee_read.get_writable_account_fee(account_key) for account_key in account_keys {
{ if let Some(account_fee) =
fee = std::cmp::max(fee, account_fee); prioritization_fee.get_writable_account_fee(account_key)
} {
} fee = std::cmp::max(fee, account_fee);
Some((*slot, fee)) }
}) }
Some((*slot, fee))
})
})
}) })
.flatten() .flatten()
.collect() .collect()
@ -427,21 +465,22 @@ mod tests {
fn sync_update<'a>( fn sync_update<'a>(
prioritization_fee_cache: &PrioritizationFeeCache, prioritization_fee_cache: &PrioritizationFeeCache,
bank: Arc<Bank>, bank: Arc<Bank>,
txs: impl Iterator<Item = &'a SanitizedTransaction>, txs: impl Iterator<Item = &'a SanitizedTransaction> + ExactSizeIterator,
) { ) {
let expected_update_count = prioritization_fee_cache
.metrics
.successful_transaction_update_count
.load(Ordering::Relaxed)
.saturating_add(txs.len() as u64);
prioritization_fee_cache.update(&bank, txs); prioritization_fee_cache.update(&bank, txs);
let block_fee = PrioritizationFeeCache::get_prioritization_fee( // wait till expected number of transaction updates have occurred...
prioritization_fee_cache.cache.clone(), while prioritization_fee_cache
&bank.slot(), .metrics
); .successful_transaction_update_count
.load(Ordering::Relaxed)
// wait till update is done != expected_update_count
while block_fee
.lock()
.unwrap()
.get_min_transaction_fee()
.is_none()
{ {
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
} }
@ -451,15 +490,19 @@ mod tests {
fn sync_finalize_priority_fee_for_test( fn sync_finalize_priority_fee_for_test(
prioritization_fee_cache: &PrioritizationFeeCache, prioritization_fee_cache: &PrioritizationFeeCache,
slot: Slot, slot: Slot,
bank_id: BankId,
) { ) {
prioritization_fee_cache.finalize_priority_fee(slot); prioritization_fee_cache.finalize_priority_fee(slot, bank_id);
let fee = PrioritizationFeeCache::get_prioritization_fee( let fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(), prioritization_fee_cache.cache.clone(),
&slot, &slot,
); );
// wait till finalization is done // wait till finalization is done
while !fee.lock().unwrap().is_finalized() { while !fee
.get(&bank_id)
.map_or(false, |block_fee| block_fee.is_finalized())
{
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(100));
} }
} }
@ -490,7 +533,7 @@ mod tests {
let slot = bank.slot(); let slot = bank.slot();
let prioritization_fee_cache = PrioritizationFeeCache::default(); let prioritization_fee_cache = PrioritizationFeeCache::default();
sync_update(&prioritization_fee_cache, bank, txs.iter()); sync_update(&prioritization_fee_cache, bank.clone(), txs.iter());
// assert block minimum fee and account a, b, c fee accordingly // assert block minimum fee and account a, b, c fee accordingly
{ {
@ -498,7 +541,7 @@ mod tests {
prioritization_fee_cache.cache.clone(), prioritization_fee_cache.cache.clone(),
&slot, &slot,
); );
let fee = fee.lock().unwrap(); let fee = fee.get(&bank.bank_id()).unwrap();
assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert_eq!(2, fee.get_min_transaction_fee().unwrap());
assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap()); assert_eq!(2, fee.get_writable_account_fee(&write_account_a).unwrap());
assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap());
@ -511,12 +554,12 @@ mod tests {
// assert after prune, account a and c should be removed from cache to save space // assert after prune, account a and c should be removed from cache to save space
{ {
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id());
let fee = PrioritizationFeeCache::get_prioritization_fee( let fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(), prioritization_fee_cache.cache.clone(),
&slot, &slot,
); );
let fee = fee.lock().unwrap(); let fee = fee.get(&bank.bank_id()).unwrap();
assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert_eq!(2, fee.get_min_transaction_fee().unwrap());
assert!(fee.get_writable_account_fee(&write_account_a).is_none()); assert!(fee.get_writable_account_fee(&write_account_a).is_none());
assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap()); assert_eq!(5, fee.get_writable_account_fee(&write_account_b).unwrap());
@ -532,20 +575,22 @@ mod tests {
prioritization_fee_cache.cache.clone(), prioritization_fee_cache.cache.clone(),
&1 &1
) )
.lock() .entry(1)
.unwrap() .or_default()
.mark_block_completed() .mark_block_completed()
.is_ok()); .is_ok());
assert!(PrioritizationFeeCache::get_prioritization_fee( assert!(PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(), prioritization_fee_cache.cache.clone(),
&2 &2
) )
.lock() .entry(2)
.unwrap() .or_default()
.mark_block_completed() .mark_block_completed()
.is_ok()); .is_ok());
// add slot 3 entry to cache, but not finalize it // add slot 3 entry to cache, but not finalize it
PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3); PrioritizationFeeCache::get_prioritization_fee(prioritization_fee_cache.cache.clone(), &3)
.entry(3)
.or_default();
// assert available block count should be 2 finalized blocks // assert available block count should be 2 finalized blocks
assert_eq!(2, prioritization_fee_cache.available_block_count()); assert_eq!(2, prioritization_fee_cache.available_block_count());
@ -603,7 +648,7 @@ mod tests {
&Pubkey::new_unique(), &Pubkey::new_unique(),
), ),
]; ];
sync_update(&prioritization_fee_cache, bank1, txs.iter()); sync_update(&prioritization_fee_cache, bank1.clone(), txs.iter());
// before block is marked as completed // before block is marked as completed
assert!(prioritization_fee_cache assert!(prioritization_fee_cache
.get_prioritization_fees(&[]) .get_prioritization_fees(&[])
@ -624,7 +669,7 @@ mod tests {
.get_prioritization_fees(&[write_account_a, write_account_b, write_account_c]) .get_prioritization_fees(&[write_account_a, write_account_b, write_account_c])
.is_empty()); .is_empty());
// after block is completed // after block is completed
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id());
assert_eq!( assert_eq!(
hashmap_of(vec![(1, 1)]), hashmap_of(vec![(1, 1)]),
prioritization_fee_cache.get_prioritization_fees(&[]) prioritization_fee_cache.get_prioritization_fees(&[])
@ -666,7 +711,7 @@ mod tests {
&Pubkey::new_unique(), &Pubkey::new_unique(),
), ),
]; ];
sync_update(&prioritization_fee_cache, bank2, txs.iter()); sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter());
// before block is marked as completed // before block is marked as completed
assert_eq!( assert_eq!(
hashmap_of(vec![(1, 1)]), hashmap_of(vec![(1, 1)]),
@ -698,7 +743,7 @@ mod tests {
]) ])
); );
// after block is completed // after block is completed
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id());
assert_eq!( assert_eq!(
hashmap_of(vec![(2, 3), (1, 1)]), hashmap_of(vec![(2, 3), (1, 1)]),
prioritization_fee_cache.get_prioritization_fees(&[]), prioritization_fee_cache.get_prioritization_fees(&[]),
@ -740,7 +785,7 @@ mod tests {
&Pubkey::new_unique(), &Pubkey::new_unique(),
), ),
]; ];
sync_update(&prioritization_fee_cache, bank3, txs.iter()); sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter());
// before block is marked as completed // before block is marked as completed
assert_eq!( assert_eq!(
hashmap_of(vec![(2, 3), (1, 1)]), hashmap_of(vec![(2, 3), (1, 1)]),
@ -772,7 +817,7 @@ mod tests {
]), ]),
); );
// after block is completed // after block is completed
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id());
assert_eq!( assert_eq!(
hashmap_of(vec![(3, 5), (2, 3), (1, 1)]), hashmap_of(vec![(3, 5), (2, 3), (1, 1)]),
prioritization_fee_cache.get_prioritization_fees(&[]), prioritization_fee_cache.get_prioritization_fees(&[]),
@ -804,4 +849,109 @@ mod tests {
); );
} }
} }
#[test]
fn test_purge_duplicated_bank() {
// duplicated bank can exists for same slot before OC.
// prioritization_fee_cache should only have data from OC-ed bank
solana_logger::setup();
let write_account_a = Pubkey::new_unique();
let write_account_b = Pubkey::new_unique();
let write_account_c = Pubkey::new_unique();
let GenesisConfigInfo { genesis_config, .. } = create_genesis_config(10_000);
let bank0 = Bank::new_for_benches(&genesis_config);
let bank_forks = BankForks::new(bank0);
let bank = bank_forks.working_bank();
let collector = solana_sdk::pubkey::new_rand();
let slot: Slot = 999;
let bank1 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, slot));
let bank2 = Arc::new(Bank::new_from_parent(bank, &collector, slot));
let prioritization_fee_cache = PrioritizationFeeCache::default();
// Assert after add transactions for bank1 of slot 1
{
let txs = vec![
build_sanitized_transaction_for_test(2, &write_account_a, &write_account_b),
build_sanitized_transaction_for_test(
1,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
),
];
sync_update(&prioritization_fee_cache, bank1.clone(), txs.iter());
let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&slot,
);
assert_eq!(1, slot_prioritization_fee.len());
assert!(slot_prioritization_fee.contains_key(&bank1.bank_id()));
}
// Assert after add transactions for bank2 of slot 1
{
let txs = vec![
build_sanitized_transaction_for_test(4, &write_account_b, &write_account_c),
build_sanitized_transaction_for_test(
3,
&Pubkey::new_unique(),
&Pubkey::new_unique(),
),
];
sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter());
let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&slot,
);
assert_eq!(2, slot_prioritization_fee.len());
assert!(slot_prioritization_fee.contains_key(&bank1.bank_id()));
assert!(slot_prioritization_fee.contains_key(&bank2.bank_id()));
}
// Assert after finalize with bank1 of slot 1,
{
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank1.bank_id());
let slot_prioritization_fee = PrioritizationFeeCache::get_prioritization_fee(
prioritization_fee_cache.cache.clone(),
&slot,
);
assert_eq!(1, slot_prioritization_fee.len());
assert!(slot_prioritization_fee.contains_key(&bank1.bank_id()));
// and data available for query are from bank1
assert_eq!(
hashmap_of(vec![(slot, 1)]),
prioritization_fee_cache.get_prioritization_fees(&[])
);
assert_eq!(
hashmap_of(vec![(slot, 2)]),
prioritization_fee_cache.get_prioritization_fees(&[write_account_a])
);
assert_eq!(
hashmap_of(vec![(slot, 2)]),
prioritization_fee_cache.get_prioritization_fees(&[write_account_b])
);
assert_eq!(
hashmap_of(vec![(slot, 1)]),
prioritization_fee_cache.get_prioritization_fees(&[write_account_c])
);
assert_eq!(
hashmap_of(vec![(slot, 2)]),
prioritization_fee_cache
.get_prioritization_fees(&[write_account_a, write_account_b])
);
assert_eq!(
hashmap_of(vec![(slot, 2)]),
prioritization_fee_cache.get_prioritization_fees(&[
write_account_a,
write_account_b,
write_account_c
])
);
}
}
} }