Introduce ttl eviction for RecycleStore (#15513)
This commit is contained in:
parent
d8ba56ec09
commit
21b43009f6
|
@ -21,7 +21,7 @@ use std::{
|
|||
Arc, RwLock,
|
||||
},
|
||||
thread::{self, sleep, Builder, JoinHandle},
|
||||
time::Duration,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
const INTERVAL_MS: u64 = 100;
|
||||
|
@ -30,6 +30,13 @@ const SHRUNKEN_ACCOUNT_PER_INTERVAL: usize =
|
|||
SHRUNKEN_ACCOUNT_PER_SEC / (1000 / INTERVAL_MS as usize);
|
||||
const CLEAN_INTERVAL_BLOCKS: u64 = 100;
|
||||
|
||||
// This value is chosen to spread the dropping cost over 3 expiration checks
|
||||
// RecycleStores are fully populated almost all of its lifetime. So, otherwise
|
||||
// this would drop MAX_RECYCLE_STORES mmaps at once in the worst case...
|
||||
// (Anyway, the dropping part is outside the AccountsDb::recycle_stores lock
|
||||
// and dropped in this AccountsBackgroundServe, so this shouldn't matter much)
|
||||
const RECYCLE_STORE_EXPIRATION_INTERVAL_SECS: u64 = crate::accounts_db::EXPIRATION_TTL_SECONDS / 3;
|
||||
|
||||
pub type SnapshotRequestSender = Sender<SnapshotRequest>;
|
||||
pub type SnapshotRequestReceiver = Receiver<SnapshotRequest>;
|
||||
pub type DroppedSlotsSender = Sender<Slot>;
|
||||
|
@ -286,6 +293,7 @@ impl AccountsBackgroundService {
|
|||
let mut last_cleaned_block_height = 0;
|
||||
let mut removed_slots_count = 0;
|
||||
let mut total_remove_slots_time = 0;
|
||||
let mut last_expiration_check_time = Instant::now();
|
||||
let t_background = Builder::new()
|
||||
.name("solana-accounts-background".to_string())
|
||||
.spawn(move || loop {
|
||||
|
@ -304,6 +312,8 @@ impl AccountsBackgroundService {
|
|||
&mut total_remove_slots_time,
|
||||
);
|
||||
|
||||
Self::expire_old_recycle_stores(&bank, &mut last_expiration_check_time);
|
||||
|
||||
// Check to see if there were any requests for snapshotting banks
|
||||
// < the current root bank `bank` above.
|
||||
|
||||
|
@ -397,6 +407,16 @@ impl AccountsBackgroundService {
|
|||
*removed_slots_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
fn expire_old_recycle_stores(bank: &Bank, last_expiration_check_time: &mut Instant) {
|
||||
let now = Instant::now();
|
||||
if now.duration_since(*last_expiration_check_time).as_secs()
|
||||
> RECYCLE_STORE_EXPIRATION_INTERVAL_SECS
|
||||
{
|
||||
bank.expire_old_recycle_stores();
|
||||
*last_expiration_check_time = now;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
|
|
|
@ -615,27 +615,61 @@ pub struct StoreAccountsTiming {
|
|||
|
||||
#[derive(Debug, Default)]
|
||||
struct RecycleStores {
|
||||
entries: Vec<Arc<AccountStorageEntry>>,
|
||||
entries: Vec<(Instant, Arc<AccountStorageEntry>)>,
|
||||
total_bytes: u64,
|
||||
}
|
||||
|
||||
// 30 min should be enough to be certain there won't be any prospective recycle uses for given
|
||||
// store entry
|
||||
// That's because it already processed ~2500 slots and ~25 passes of AccountsBackgroundService
|
||||
pub const EXPIRATION_TTL_SECONDS: u64 = 1800;
|
||||
|
||||
impl RecycleStores {
|
||||
fn add_entry(&mut self, new_entry: Arc<AccountStorageEntry>) {
|
||||
self.total_bytes += new_entry.total_bytes();
|
||||
self.entries.push(new_entry)
|
||||
self.entries.push((Instant::now(), new_entry))
|
||||
}
|
||||
|
||||
fn iter(&self) -> std::slice::Iter<Arc<AccountStorageEntry>> {
|
||||
fn iter(&self) -> std::slice::Iter<(Instant, Arc<AccountStorageEntry>)> {
|
||||
self.entries.iter()
|
||||
}
|
||||
|
||||
fn add_entries(&mut self, new_entries: Vec<Arc<AccountStorageEntry>>) {
|
||||
self.total_bytes += new_entries.iter().map(|e| e.total_bytes()).sum::<u64>();
|
||||
self.entries.extend(new_entries);
|
||||
let now = Instant::now();
|
||||
for new_entry in new_entries {
|
||||
self.entries.push((now, new_entry));
|
||||
}
|
||||
}
|
||||
|
||||
fn expire_old_entries(&mut self) -> Vec<Arc<AccountStorageEntry>> {
|
||||
let mut expired = vec![];
|
||||
let now = Instant::now();
|
||||
let mut expired_bytes = 0;
|
||||
self.entries.retain(|(recycled_time, entry)| {
|
||||
if now.duration_since(*recycled_time).as_secs() > EXPIRATION_TTL_SECONDS {
|
||||
if Arc::strong_count(entry) >= 2 {
|
||||
warn!(
|
||||
"Expiring still in-use recycled StorageEntry anyway...: id: {} slot: {}",
|
||||
entry.append_vec_id(),
|
||||
entry.slot(),
|
||||
);
|
||||
}
|
||||
expired_bytes += entry.total_bytes();
|
||||
expired.push(entry.clone());
|
||||
false
|
||||
} else {
|
||||
true
|
||||
}
|
||||
});
|
||||
|
||||
self.total_bytes -= expired_bytes;
|
||||
|
||||
expired
|
||||
}
|
||||
|
||||
fn remove_entry(&mut self, index: usize) -> Arc<AccountStorageEntry> {
|
||||
let removed_entry = self.entries.swap_remove(index);
|
||||
let (_added_time, removed_entry) = self.entries.swap_remove(index);
|
||||
self.total_bytes -= removed_entry.total_bytes();
|
||||
removed_entry
|
||||
}
|
||||
|
@ -2260,7 +2294,7 @@ impl AccountsDb {
|
|||
let mut min = std::u64::MAX;
|
||||
let mut avail = 0;
|
||||
let mut recycle_stores = self.recycle_stores.write().unwrap();
|
||||
for (i, store) in recycle_stores.iter().enumerate() {
|
||||
for (i, (_recycled_time, store)) in recycle_stores.iter().enumerate() {
|
||||
if Arc::strong_count(store) == 1 {
|
||||
max = std::cmp::max(store.accounts.capacity(), max);
|
||||
min = std::cmp::min(store.accounts.capacity(), min);
|
||||
|
@ -2996,6 +3030,25 @@ impl AccountsDb {
|
|||
self.accounts_cache.report_size();
|
||||
}
|
||||
|
||||
pub fn expire_old_recycle_stores(&self) {
|
||||
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_time");
|
||||
let recycle_stores = self.recycle_stores.write().unwrap().expire_old_entries();
|
||||
recycle_stores_write_elapsed.stop();
|
||||
|
||||
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
|
||||
drop(recycle_stores);
|
||||
drop_storage_entries_elapsed.stop();
|
||||
|
||||
self.clean_accounts_stats
|
||||
.purge_stats
|
||||
.drop_storage_entries_elapsed
|
||||
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
|
||||
self.clean_accounts_stats
|
||||
.purge_stats
|
||||
.recycle_stores_write_elapsed
|
||||
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// `force_flush` flushes all the cached roots `<= requested_flush_root`. It also then
|
||||
// flushes:
|
||||
// 1) Any remaining roots if there are > MAX_CACHE_SLOTS remaining slots in the cache,
|
||||
|
@ -4495,15 +4548,16 @@ impl AccountsDb {
|
|||
self.print_count_and_status(label);
|
||||
info!("recycle_stores:");
|
||||
let recycle_stores = self.recycle_stores.read().unwrap();
|
||||
for entry in recycle_stores.iter() {
|
||||
for (recycled_time, entry) in recycle_stores.iter() {
|
||||
info!(
|
||||
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}",
|
||||
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {} (recycled: {:?})",
|
||||
entry.slot(),
|
||||
entry.append_vec_id(),
|
||||
*entry.count_and_status.read().unwrap(),
|
||||
entry.approx_store_count.load(Ordering::Relaxed),
|
||||
entry.accounts.len(),
|
||||
entry.accounts.capacity(),
|
||||
recycled_time,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -8936,4 +8990,74 @@ pub mod tests {
|
|||
assert!(slot_stores(&db, 0).is_empty());
|
||||
assert!(!slot_stores(&db, 1).is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_recycle_stores_expiration() {
|
||||
solana_logger::setup();
|
||||
|
||||
let dummy_path = Path::new("");
|
||||
let dummy_slot = 12;
|
||||
let dummy_size = 1000;
|
||||
|
||||
let dummy_id1 = 22;
|
||||
let entry1 = Arc::new(AccountStorageEntry::new(
|
||||
&dummy_path,
|
||||
dummy_slot,
|
||||
dummy_id1,
|
||||
dummy_size,
|
||||
));
|
||||
|
||||
let dummy_id2 = 44;
|
||||
let entry2 = Arc::new(AccountStorageEntry::new(
|
||||
&dummy_path,
|
||||
dummy_slot,
|
||||
dummy_id2,
|
||||
dummy_size,
|
||||
));
|
||||
|
||||
let mut recycle_stores = RecycleStores::default();
|
||||
recycle_stores.add_entry(entry1);
|
||||
recycle_stores.add_entry(entry2);
|
||||
assert_eq!(recycle_stores.entry_count(), 2);
|
||||
|
||||
// no expiration for newly added entries
|
||||
let expired = recycle_stores.expire_old_entries();
|
||||
assert_eq!(
|
||||
expired
|
||||
.iter()
|
||||
.map(|e| e.append_vec_id())
|
||||
.collect::<Vec<_>>(),
|
||||
Vec::<AppendVecId>::new()
|
||||
);
|
||||
assert_eq!(
|
||||
recycle_stores
|
||||
.iter()
|
||||
.map(|(_, e)| e.append_vec_id())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![dummy_id1, dummy_id2]
|
||||
);
|
||||
assert_eq!(recycle_stores.entry_count(), 2);
|
||||
assert_eq!(recycle_stores.total_bytes(), dummy_size * 2);
|
||||
|
||||
// expiration for only too old entries
|
||||
recycle_stores.entries[0].0 =
|
||||
Instant::now() - Duration::from_secs(EXPIRATION_TTL_SECONDS + 1);
|
||||
let expired = recycle_stores.expire_old_entries();
|
||||
assert_eq!(
|
||||
expired
|
||||
.iter()
|
||||
.map(|e| e.append_vec_id())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![dummy_id1]
|
||||
);
|
||||
assert_eq!(
|
||||
recycle_stores
|
||||
.iter()
|
||||
.map(|(_, e)| e.append_vec_id())
|
||||
.collect::<Vec<_>>(),
|
||||
vec![dummy_id2]
|
||||
);
|
||||
assert_eq!(recycle_stores.entry_count(), 1);
|
||||
assert_eq!(recycle_stores.total_bytes(), dummy_size);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3888,6 +3888,10 @@ impl Bank {
|
|||
.flush_accounts_cache(false, Some(self.slot()))
|
||||
}
|
||||
|
||||
pub fn expire_old_recycle_stores(&self) {
|
||||
self.rc.accounts.accounts_db.expire_old_recycle_stores()
|
||||
}
|
||||
|
||||
fn store_account_and_update_capitalization(&self, pubkey: &Pubkey, new_account: &Account) {
|
||||
if let Some(old_account) = self.get_account(&pubkey) {
|
||||
match new_account.lamports.cmp(&old_account.lamports) {
|
||||
|
|
Loading…
Reference in New Issue