AcctIdx: resize in-mem after startup for disk index (#21676)
This commit is contained in:
parent
824994db69
commit
181c0092d6
|
@ -905,6 +905,11 @@ impl<T: IndexValue> AccountsIndex<T> {
|
||||||
AccountsIndexIterator::new(self, range, collect_all_unsorted)
|
AccountsIndexIterator::new(self, range, collect_all_unsorted)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// is the accounts index using disk as a backing store
|
||||||
|
pub fn is_disk_index_enabled(&self) -> bool {
|
||||||
|
self.storage.storage.is_disk_index_enabled()
|
||||||
|
}
|
||||||
|
|
||||||
fn do_checked_scan_accounts<F, R>(
|
fn do_checked_scan_accounts<F, R>(
|
||||||
&self,
|
&self,
|
||||||
metric_name: &'static str,
|
metric_name: &'static str,
|
||||||
|
|
|
@ -102,11 +102,18 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
||||||
}
|
}
|
||||||
self.storage.set_startup(value);
|
self.storage.set_startup(value);
|
||||||
if !value {
|
if !value {
|
||||||
|
// transitioning from startup to !startup (ie. steady state)
|
||||||
// shutdown the bg threads
|
// shutdown the bg threads
|
||||||
*self.startup_worker_threads.lock().unwrap() = None;
|
*self.startup_worker_threads.lock().unwrap() = None;
|
||||||
|
// maybe shrink hashmaps
|
||||||
|
self.shrink_to_fit();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn shrink_to_fit(&self) {
|
||||||
|
self.in_mem.iter().for_each(|mem| mem.shrink_to_fit())
|
||||||
|
}
|
||||||
|
|
||||||
fn num_threads() -> usize {
|
fn num_threads() -> usize {
|
||||||
std::cmp::max(2, num_cpus::get() / 4)
|
std::cmp::max(2, num_cpus::get() / 4)
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,11 @@ impl<T: IndexValue> Debug for BucketMapHolder<T> {
|
||||||
|
|
||||||
#[allow(clippy::mutex_atomic)]
|
#[allow(clippy::mutex_atomic)]
|
||||||
impl<T: IndexValue> BucketMapHolder<T> {
|
impl<T: IndexValue> BucketMapHolder<T> {
|
||||||
|
/// is the accounts index using disk as a backing store
|
||||||
|
pub fn is_disk_index_enabled(&self) -> bool {
|
||||||
|
self.disk.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
pub fn increment_age(&self) {
|
pub fn increment_age(&self) {
|
||||||
// since we are about to change age, there are now 0 buckets that have been flushed at this age
|
// since we are about to change age, there are now 0 buckets that have been flushed at this age
|
||||||
// this should happen before the age.fetch_add
|
// this should happen before the age.fetch_add
|
||||||
|
@ -352,6 +357,7 @@ pub mod tests {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
let bins = 100;
|
let bins = 100;
|
||||||
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
|
||||||
|
assert!(!test.is_disk_index_enabled());
|
||||||
let bins = test.bins as u64;
|
let bins = test.bins as u64;
|
||||||
let interval_ms = test.age_interval_ms();
|
let interval_ms = test.age_interval_ms();
|
||||||
// 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
|
// 90% of time elapsed, all but 1 bins flushed, should not wait since we'll end up right on time
|
||||||
|
@ -376,6 +382,17 @@ pub mod tests {
|
||||||
assert_eq!(result, None);
|
assert_eq!(result, None);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_disk_index_enabled() {
|
||||||
|
let bins = 1;
|
||||||
|
let config = AccountsIndexConfig {
|
||||||
|
index_limit_mb: Some(0),
|
||||||
|
..AccountsIndexConfig::default()
|
||||||
|
};
|
||||||
|
let test = BucketMapHolder::<u64>::new(bins, &Some(config), 1);
|
||||||
|
assert!(test.is_disk_index_enabled());
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_age_time() {
|
fn test_age_time() {
|
||||||
solana_logger::setup();
|
solana_logger::setup();
|
||||||
|
|
|
@ -28,13 +28,15 @@ type K = Pubkey;
|
||||||
type CacheRangesHeld = RwLock<Vec<Option<RangeInclusive<Pubkey>>>>;
|
type CacheRangesHeld = RwLock<Vec<Option<RangeInclusive<Pubkey>>>>;
|
||||||
pub type SlotT<T> = (Slot, T);
|
pub type SlotT<T> = (Slot, T);
|
||||||
|
|
||||||
|
type InMemMap<T> = HashMap<Pubkey, AccountMapEntry<T>>;
|
||||||
|
|
||||||
#[allow(dead_code)] // temporary during staging
|
#[allow(dead_code)] // temporary during staging
|
||||||
// one instance of this represents one bin of the accounts index.
|
// one instance of this represents one bin of the accounts index.
|
||||||
pub struct InMemAccountsIndex<T: IndexValue> {
|
pub struct InMemAccountsIndex<T: IndexValue> {
|
||||||
last_age_flushed: AtomicU8,
|
last_age_flushed: AtomicU8,
|
||||||
|
|
||||||
// backing store
|
// backing store
|
||||||
map_internal: RwLock<HashMap<Pubkey, AccountMapEntry<T>>>,
|
map_internal: RwLock<InMemMap<T>>,
|
||||||
storage: Arc<BucketMapHolder<T>>,
|
storage: Arc<BucketMapHolder<T>>,
|
||||||
bin: usize,
|
bin: usize,
|
||||||
|
|
||||||
|
@ -104,6 +106,16 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
&self.map_internal
|
&self.map_internal
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Release entire in-mem hashmap to free all memory associated with it.
|
||||||
|
/// Idea is that during startup we needed a larger map than we need during runtime.
|
||||||
|
/// When using disk-buckets, in-mem index grows over time with dynamic use and then shrinks, in theory back to 0.
|
||||||
|
pub fn shrink_to_fit(&self) {
|
||||||
|
// shrink_to_fit could be quite expensive on large map sizes, which 'no disk buckets' could produce, so avoid shrinking in case we end up here
|
||||||
|
if self.storage.is_disk_index_enabled() {
|
||||||
|
self.map_internal.write().unwrap().shrink_to_fit();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub fn items<R>(&self, range: &Option<&R>) -> Vec<(K, AccountMapEntry<T>)>
|
pub fn items<R>(&self, range: &Option<&R>) -> Vec<(K, AccountMapEntry<T>)>
|
||||||
where
|
where
|
||||||
R: RangeBounds<Pubkey> + std::fmt::Debug,
|
R: RangeBounds<Pubkey> + std::fmt::Debug,
|
||||||
|
@ -898,6 +910,10 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
||||||
occupied.remove();
|
occupied.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if map.is_empty() {
|
||||||
|
map.shrink_to_fit();
|
||||||
|
}
|
||||||
|
drop(map);
|
||||||
self.stats()
|
self.stats()
|
||||||
.insert_or_delete_mem_count(false, self.bin, removed);
|
.insert_or_delete_mem_count(false, self.bin, removed);
|
||||||
Self::update_stat(&self.stats().flush_entries_removed_from_mem, removed as u64);
|
Self::update_stat(&self.stats().flush_entries_removed_from_mem, removed as u64);
|
||||||
|
|
Loading…
Reference in New Issue