prioritization fee cache: remove lru crate (#30)

This commit is contained in:
Kirill Fomichev 2024-03-27 13:09:17 -04:00 committed by GitHub
parent cfd5b71b28
commit ba9c25c41e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 56 additions and 65 deletions

1
Cargo.lock generated
View File

@ -6994,7 +6994,6 @@ dependencies = [
"libc", "libc",
"libsecp256k1", "libsecp256k1",
"log", "log",
"lru",
"lz4", "lz4",
"memmap2", "memmap2",
"memoffset 0.9.0", "memoffset 0.9.0",

View File

@ -5681,7 +5681,6 @@ dependencies = [
"lazy_static", "lazy_static",
"libc", "libc",
"log", "log",
"lru",
"lz4", "lz4",
"memmap2", "memmap2",
"mockall", "mockall",

View File

@ -30,7 +30,6 @@ itertools = { workspace = true }
lazy_static = { workspace = true } lazy_static = { workspace = true }
libc = { workspace = true } libc = { workspace = true }
log = { workspace = true } log = { workspace = true }
lru = { workspace = true }
lz4 = { workspace = true } lz4 = { workspace = true }
memmap2 = { workspace = true } memmap2 = { workspace = true }
mockall = { workspace = true } mockall = { workspace = true }

View File

@ -165,11 +165,7 @@ impl Default for PrioritizationFee {
impl PrioritizationFee { impl PrioritizationFee {
/// Update self for minimum transaction fee in the block and minimum fee for each writable account. /// Update self for minimum transaction fee in the block and minimum fee for each writable account.
pub fn update( pub fn update(&mut self, transaction_fee: u64, writable_accounts: Vec<Pubkey>) {
&mut self,
transaction_fee: u64,
writable_accounts: Vec<Pubkey>,
) -> Result<(), PrioritizationFeeError> {
let (_, update_time) = measure!( let (_, update_time) = measure!(
{ {
if !self.is_finalized { if !self.is_finalized {
@ -199,7 +195,6 @@ impl PrioritizationFee {
self.metrics self.metrics
.accumulate_total_update_elapsed_us(update_time.as_us()); .accumulate_total_update_elapsed_us(update_time.as_us());
Ok(())
} }
/// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are /// Accounts that have minimum fees lesser or equal to the minimum fee in the block are redundant, they are
@ -283,9 +278,7 @@ mod tests {
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// [5, a, b ] --> [5, 5, 5, nil ] // [5, a, b ] --> [5, 5, 5, nil ]
{ {
assert!(prioritization_fee prioritization_fee.update(5, vec![write_account_a, write_account_b]);
.update(5, vec![write_account_a, write_account_b])
.is_ok());
assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap());
assert_eq!( assert_eq!(
5, 5,
@ -309,9 +302,7 @@ mod tests {
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// [9, b, c ] --> [5, 5, 5, 9 ] // [9, b, c ] --> [5, 5, 5, 9 ]
{ {
assert!(prioritization_fee prioritization_fee.update(9, vec![write_account_b, write_account_c]);
.update(9, vec![write_account_b, write_account_c])
.is_ok());
assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!(5, prioritization_fee.get_min_transaction_fee().unwrap());
assert_eq!( assert_eq!(
5, 5,
@ -338,9 +329,7 @@ mod tests {
// ----------------------------------------------------------------------- // -----------------------------------------------------------------------
// [2, a, c ] --> [2, 2, 5, 2 ] // [2, a, c ] --> [2, 2, 5, 2 ]
{ {
assert!(prioritization_fee prioritization_fee.update(2, vec![write_account_a, write_account_c]);
.update(2, vec![write_account_a, write_account_c])
.is_ok());
assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap()); assert_eq!(2, prioritization_fee.get_min_transaction_fee().unwrap());
assert_eq!( assert_eq!(
2, 2,

View File

@ -2,7 +2,6 @@ use {
crate::{bank::Bank, compute_budget_details::GetComputeBudgetDetails, prioritization_fee::*}, crate::{bank::Bank, compute_budget_details::GetComputeBudgetDetails, prioritization_fee::*},
crossbeam_channel::{unbounded, Receiver, Sender}, crossbeam_channel::{unbounded, Receiver, Sender},
log::*, log::*,
lru::LruCache,
solana_measure::measure, solana_measure::measure,
solana_sdk::{ solana_sdk::{
clock::{BankId, Slot}, clock::{BankId, Slot},
@ -122,6 +121,7 @@ impl PrioritizationFeeCacheMetrics {
} }
} }
#[derive(Debug)]
enum CacheServiceUpdate { enum CacheServiceUpdate {
TransactionUpdate { TransactionUpdate {
slot: Slot, slot: Slot,
@ -141,7 +141,7 @@ enum CacheServiceUpdate {
/// and collecting stats and reporting metrics. /// and collecting stats and reporting metrics.
#[derive(Debug)] #[derive(Debug)]
pub struct PrioritizationFeeCache { pub struct PrioritizationFeeCache {
cache: Arc<RwLock<LruCache<Slot, PrioritizationFee>>>, cache: Arc<RwLock<BTreeMap<Slot, PrioritizationFee>>>,
service_thread: Option<JoinHandle<()>>, service_thread: Option<JoinHandle<()>>,
sender: Sender<CacheServiceUpdate>, sender: Sender<CacheServiceUpdate>,
metrics: Arc<PrioritizationFeeCacheMetrics>, metrics: Arc<PrioritizationFeeCacheMetrics>,
@ -166,17 +166,17 @@ impl Drop for PrioritizationFeeCache {
impl PrioritizationFeeCache { impl PrioritizationFeeCache {
pub fn new(capacity: u64) -> Self { pub fn new(capacity: u64) -> Self {
let metrics = Arc::new(PrioritizationFeeCacheMetrics::default()); let cache = Arc::new(RwLock::new(BTreeMap::new()));
let (sender, receiver) = unbounded(); let (sender, receiver) = unbounded();
let cache = Arc::new(RwLock::new(LruCache::new(capacity as usize))); let metrics = Arc::new(PrioritizationFeeCacheMetrics::default());
let cache_clone = cache.clone();
let metrics_clone = metrics.clone();
let service_thread = Some( let service_thread = Some(
Builder::new() Builder::new()
.name("solPrFeeCachSvc".to_string()) .name("solPrFeeCachSvc".to_string())
.spawn(move || { .spawn({
Self::service_loop(cache_clone, receiver, metrics_clone); let cache = cache.clone();
let metrics = metrics.clone();
move || Self::service_loop(cache, capacity as usize, receiver, metrics)
}) })
.unwrap(), .unwrap(),
); );
@ -261,8 +261,7 @@ impl PrioritizationFeeCache {
}); });
} }
/// Internal function is invoked by worker thread to update slot's minimum prioritization fee, /// Internal function is invoked by worker thread to update slot's minimum prioritization fee.
/// Cache lock contends here.
fn update_cache( fn update_cache(
unfinalized: &mut UnfinalizedPrioritizationFees, unfinalized: &mut UnfinalizedPrioritizationFees,
slot: Slot, slot: Slot,
@ -273,7 +272,7 @@ impl PrioritizationFeeCache {
) { ) {
let (_, entry_update_time) = measure!( let (_, entry_update_time) = measure!(
{ {
let _ = unfinalized unfinalized
.entry(slot) .entry(slot)
.or_default() .or_default()
.entry(bank_id) .entry(bank_id)
@ -288,7 +287,8 @@ impl PrioritizationFeeCache {
fn finalize_slot( fn finalize_slot(
unfinalized: &mut UnfinalizedPrioritizationFees, unfinalized: &mut UnfinalizedPrioritizationFees,
cache: &RwLock<LruCache<Slot, PrioritizationFee>>, cache: &RwLock<BTreeMap<Slot, PrioritizationFee>>,
cache_max_size: usize,
slot: Slot, slot: Slot,
bank_id: BankId, bank_id: BankId,
metrics: &PrioritizationFeeCacheMetrics, metrics: &PrioritizationFeeCacheMetrics,
@ -340,7 +340,10 @@ impl PrioritizationFeeCache {
let (_, cache_lock_time) = measure!( let (_, cache_lock_time) = measure!(
{ {
let mut cache = cache.write().unwrap(); let mut cache = cache.write().unwrap();
cache.put(slot, slot_prioritization_fee); while cache.len() >= cache_max_size {
cache.pop_first();
}
cache.insert(slot, slot_prioritization_fee);
}, },
"cache_lock_time" "cache_lock_time"
); );
@ -349,7 +352,8 @@ impl PrioritizationFeeCache {
} }
fn service_loop( fn service_loop(
cache: Arc<RwLock<LruCache<Slot, PrioritizationFee>>>, cache: Arc<RwLock<BTreeMap<Slot, PrioritizationFee>>>,
cache_max_size: usize,
receiver: Receiver<CacheServiceUpdate>, receiver: Receiver<CacheServiceUpdate>,
metrics: Arc<PrioritizationFeeCacheMetrics>, metrics: Arc<PrioritizationFeeCacheMetrics>,
) { ) {
@ -373,7 +377,14 @@ impl PrioritizationFeeCache {
&metrics, &metrics,
), ),
CacheServiceUpdate::BankFinalized { slot, bank_id } => { CacheServiceUpdate::BankFinalized { slot, bank_id } => {
Self::finalize_slot(&mut unfinalized, &cache, slot, bank_id, &metrics); Self::finalize_slot(
&mut unfinalized,
&cache,
cache_max_size,
slot,
bank_id,
&metrics,
);
metrics.report(slot); metrics.report(slot);
} }
CacheServiceUpdate::Exit => { CacheServiceUpdate::Exit => {
@ -385,12 +396,7 @@ impl PrioritizationFeeCache {
/// Returns number of blocks that have finalized minimum fees collection /// Returns number of blocks that have finalized minimum fees collection
pub fn available_block_count(&self) -> usize { pub fn available_block_count(&self) -> usize {
self.cache self.cache.read().unwrap().len()
.read()
.unwrap()
.iter()
.filter(|(_slot, slot_prioritization_fee)| slot_prioritization_fee.is_finalized())
.count()
} }
pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> Vec<(Slot, u64)> { pub fn get_prioritization_fees(&self, account_keys: &[Pubkey]) -> Vec<(Slot, u64)> {
@ -398,7 +404,6 @@ impl PrioritizationFeeCache {
.read() .read()
.unwrap() .unwrap()
.iter() .iter()
.filter(|(_slot, slot_prioritization_fee)| slot_prioritization_fee.is_finalized())
.map(|(slot, slot_prioritization_fee)| { .map(|(slot, slot_prioritization_fee)| {
let mut fee = slot_prioritization_fee let mut fee = slot_prioritization_fee
.get_min_transaction_fee() .get_min_transaction_fee()
@ -471,7 +476,7 @@ mod tests {
.load(Ordering::Relaxed) .load(Ordering::Relaxed)
!= expected_update_count != expected_update_count
{ {
std::thread::sleep(std::time::Duration::from_millis(100)); std::thread::sleep(std::time::Duration::from_millis(10));
} }
} }
@ -486,7 +491,7 @@ mod tests {
// wait till finalization is done // wait till finalization is done
loop { loop {
let mut cache = prioritization_fee_cache.cache.write().unwrap(); let cache = prioritization_fee_cache.cache.read().unwrap();
if let Some(slot_cache) = cache.get(&slot) { if let Some(slot_cache) = cache.get(&slot) {
if slot_cache.is_finalized() { if slot_cache.is_finalized() {
return; return;
@ -528,14 +533,14 @@ mod tests {
// assert empty cache // assert empty cache
{ {
let mut lock = prioritization_fee_cache.cache.write().unwrap(); let lock = prioritization_fee_cache.cache.read().unwrap();
assert!(lock.get(&slot).is_none()); assert!(lock.get(&slot).is_none());
} }
// assert after prune, account a and c should be removed from cache to save space // assert after prune, account a and c should be removed from cache to save space
{ {
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id()); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, slot, bank.bank_id());
let mut lock = prioritization_fee_cache.cache.write().unwrap(); let lock = prioritization_fee_cache.cache.read().unwrap();
let fee = lock.get(&slot).unwrap(); let fee = lock.get(&slot).unwrap();
assert_eq!(2, fee.get_min_transaction_fee().unwrap()); assert_eq!(2, fee.get_min_transaction_fee().unwrap());
assert!(fee.get_writable_account_fee(&write_account_a).is_none()); assert!(fee.get_writable_account_fee(&write_account_a).is_none());
@ -568,7 +573,7 @@ mod tests {
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id()); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 1, bank1.bank_id());
// add slot 2 entry to cache, but not finalize it // add slot 2 entry to cache, but not finalize it
let bank2 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 3)); let bank2 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 2));
let txs = vec![build_sanitized_transaction_for_test( let txs = vec![build_sanitized_transaction_for_test(
1, 1,
&Pubkey::new_unique(), &Pubkey::new_unique(),
@ -576,7 +581,7 @@ mod tests {
)]; )];
sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter()); sync_update(&prioritization_fee_cache, bank2.clone(), txs.iter());
let bank3 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 2)); let bank3 = Arc::new(Bank::new_from_parent(bank.clone(), &collector, 3));
sync_update( sync_update(
&prioritization_fee_cache, &prioritization_fee_cache,
bank3.clone(), bank3.clone(),
@ -587,7 +592,7 @@ mod tests {
)] )]
.iter(), .iter(),
); );
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank3.bank_id()); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id());
// assert available block count should be 2 finalized blocks // assert available block count should be 2 finalized blocks
assert_eq!(2, prioritization_fee_cache.available_block_count()); assert_eq!(2, prioritization_fee_cache.available_block_count());
@ -738,28 +743,28 @@ mod tests {
// after block is completed // after block is completed
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id()); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 2, bank2.bank_id());
assert_eq!( assert_eq!(
vec![(2, 3), (1, 1)], vec![(1, 1), (2, 3)],
prioritization_fee_cache.get_prioritization_fees(&[]), prioritization_fee_cache.get_prioritization_fees(&[]),
); );
assert_eq!( assert_eq!(
vec![(2, 3), (1, 2)], vec![(1, 2), (2, 3)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), prioritization_fee_cache.get_prioritization_fees(&[write_account_a]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 2)], vec![(1, 2), (2, 4)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), prioritization_fee_cache.get_prioritization_fees(&[write_account_b]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 1)], vec![(1, 1), (2, 4)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), prioritization_fee_cache.get_prioritization_fees(&[write_account_c]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 2)], vec![(1, 2), (2, 4)],
prioritization_fee_cache prioritization_fee_cache
.get_prioritization_fees(&[write_account_a, write_account_b]), .get_prioritization_fees(&[write_account_a, write_account_b]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 2)], vec![(1, 2), (2, 4)],
prioritization_fee_cache.get_prioritization_fees(&[ prioritization_fee_cache.get_prioritization_fees(&[
write_account_a, write_account_a,
write_account_b, write_account_b,
@ -781,28 +786,28 @@ mod tests {
sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter()); sync_update(&prioritization_fee_cache, bank3.clone(), txs.iter());
// before block is marked as completed // before block is marked as completed
assert_eq!( assert_eq!(
vec![(2, 3), (1, 1)], vec![(1, 1), (2, 3)],
prioritization_fee_cache.get_prioritization_fees(&[]), prioritization_fee_cache.get_prioritization_fees(&[]),
); );
assert_eq!( assert_eq!(
vec![(2, 3), (1, 2)], vec![(1, 2), (2, 3)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), prioritization_fee_cache.get_prioritization_fees(&[write_account_a]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 2)], vec![(1, 2), (2, 4)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), prioritization_fee_cache.get_prioritization_fees(&[write_account_b]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 1)], vec![(1, 1), (2, 4)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), prioritization_fee_cache.get_prioritization_fees(&[write_account_c]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 2)], vec![(1, 2), (2, 4)],
prioritization_fee_cache prioritization_fee_cache
.get_prioritization_fees(&[write_account_a, write_account_b]), .get_prioritization_fees(&[write_account_a, write_account_b]),
); );
assert_eq!( assert_eq!(
vec![(2, 4), (1, 2)], vec![(1, 2), (2, 4)],
prioritization_fee_cache.get_prioritization_fees(&[ prioritization_fee_cache.get_prioritization_fees(&[
write_account_a, write_account_a,
write_account_b, write_account_b,
@ -812,28 +817,28 @@ mod tests {
// after block is completed // after block is completed
sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id()); sync_finalize_priority_fee_for_test(&prioritization_fee_cache, 3, bank3.bank_id());
assert_eq!( assert_eq!(
vec![(3, 5), (2, 3), (1, 1)], vec![(1, 1), (2, 3), (3, 5)],
prioritization_fee_cache.get_prioritization_fees(&[]), prioritization_fee_cache.get_prioritization_fees(&[]),
); );
assert_eq!( assert_eq!(
vec![(3, 6), (2, 3), (1, 2)], vec![(1, 2), (2, 3), (3, 6)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_a]), prioritization_fee_cache.get_prioritization_fees(&[write_account_a]),
); );
assert_eq!( assert_eq!(
vec![(3, 5), (2, 4), (1, 2)], vec![(1, 2), (2, 4), (3, 5)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_b]), prioritization_fee_cache.get_prioritization_fees(&[write_account_b]),
); );
assert_eq!( assert_eq!(
vec![(3, 6), (2, 4), (1, 1)], vec![(1, 1), (2, 4), (3, 6)],
prioritization_fee_cache.get_prioritization_fees(&[write_account_c]), prioritization_fee_cache.get_prioritization_fees(&[write_account_c]),
); );
assert_eq!( assert_eq!(
vec![(3, 6), (2, 4), (1, 2)], vec![(1, 2), (2, 4), (3, 6)],
prioritization_fee_cache prioritization_fee_cache
.get_prioritization_fees(&[write_account_a, write_account_b]), .get_prioritization_fees(&[write_account_a, write_account_b]),
); );
assert_eq!( assert_eq!(
vec![(3, 6), (2, 4), (1, 2)], vec![(1, 2), (2, 4), (3, 6)],
prioritization_fee_cache.get_prioritization_fees(&[ prioritization_fee_cache.get_prioritization_fees(&[
write_account_a, write_account_a,
write_account_b, write_account_b,