AcctIdx: only advance age on thread 0 (#25943)
This commit is contained in:
parent
8a3d48b0ee
commit
ae37359b6b
|
@ -56,13 +56,16 @@ impl BgThreads {
|
|||
storage: &Arc<BucketMapHolder<T>>,
|
||||
in_mem: &[Arc<InMemAccountsIndex<T>>],
|
||||
threads: usize,
|
||||
can_advance_age: bool,
|
||||
) -> Self {
|
||||
// stop signal used for THIS batch of bg threads
|
||||
let exit = Arc::new(AtomicBool::default());
|
||||
let handles = Some(
|
||||
(0..threads)
|
||||
.into_iter()
|
||||
.map(|_| {
|
||||
.map(|idx| {
|
||||
// the first thread we start is special
|
||||
let can_advance_age = can_advance_age && idx == 0;
|
||||
let storage_ = Arc::clone(storage);
|
||||
let exit_ = Arc::clone(&exit);
|
||||
let in_mem_ = in_mem.to_vec();
|
||||
|
@ -71,7 +74,7 @@ impl BgThreads {
|
|||
Builder::new()
|
||||
.name("solana-idx-flusher".to_string())
|
||||
.spawn(move || {
|
||||
storage_.background(exit_, in_mem_);
|
||||
storage_.background(exit_, in_mem_, can_advance_age);
|
||||
})
|
||||
.unwrap()
|
||||
})
|
||||
|
@ -113,6 +116,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
&self.storage,
|
||||
&self.in_mem,
|
||||
Self::num_threads(),
|
||||
false, // cannot advance age from any of these threads
|
||||
));
|
||||
}
|
||||
self.storage.set_startup(value);
|
||||
|
@ -157,7 +161,7 @@ impl<T: IndexValue> AccountsIndexStorage<T> {
|
|||
.collect::<Vec<_>>();
|
||||
|
||||
Self {
|
||||
_bg_threads: BgThreads::new(&storage, &in_mem, threads),
|
||||
_bg_threads: BgThreads::new(&storage, &in_mem, threads, true),
|
||||
storage,
|
||||
in_mem,
|
||||
startup_worker_threads: Mutex::default(),
|
||||
|
|
|
@ -132,11 +132,13 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
self.age.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub fn bucket_flushed_at_current_age(&self) {
|
||||
pub fn bucket_flushed_at_current_age(&self, can_advance_age: bool) {
|
||||
let count_buckets_flushed = 1 + self.count_buckets_flushed.fetch_add(1, Ordering::AcqRel);
|
||||
self.maybe_advance_age_internal(
|
||||
self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
|
||||
);
|
||||
if can_advance_age {
|
||||
self.maybe_advance_age_internal(
|
||||
self.all_buckets_flushed_at_current_age_internal(count_buckets_flushed),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/// have all buckets been flushed at the current age?
|
||||
|
@ -297,7 +299,12 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
}
|
||||
|
||||
// intended to execute in a bg thread
|
||||
pub fn background(&self, exit: Arc<AtomicBool>, in_mem: Vec<Arc<InMemAccountsIndex<T>>>) {
|
||||
pub fn background(
|
||||
&self,
|
||||
exit: Arc<AtomicBool>,
|
||||
in_mem: Vec<Arc<InMemAccountsIndex<T>>>,
|
||||
can_advance_age: bool,
|
||||
) {
|
||||
let bins = in_mem.len();
|
||||
let flush = self.disk.is_some();
|
||||
let mut throttling_wait_ms = None;
|
||||
|
@ -312,6 +319,10 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
.remaining_until_next_interval(self.age_interval_ms()),
|
||||
self.stats.remaining_until_next_interval(),
|
||||
);
|
||||
if !can_advance_age {
|
||||
// if this thread cannot advance age, then make sure we don't sleep 0
|
||||
wait = wait.max(1);
|
||||
}
|
||||
if let Some(throttling_wait_ms) = throttling_wait_ms {
|
||||
self.stats
|
||||
.bg_throttling_wait_us
|
||||
|
@ -327,7 +338,9 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
.bg_waiting_us
|
||||
.fetch_add(m.as_us(), Ordering::Relaxed);
|
||||
// likely some time has elapsed. May have been waiting for age time interval to elapse.
|
||||
self.maybe_advance_age();
|
||||
if can_advance_age {
|
||||
self.maybe_advance_age();
|
||||
}
|
||||
}
|
||||
throttling_wait_ms = None;
|
||||
|
||||
|
@ -339,7 +352,7 @@ impl<T: IndexValue> BucketMapHolder<T> {
|
|||
for _ in 0..bins {
|
||||
if flush {
|
||||
let index = self.next_bucket_to_flush();
|
||||
in_mem[index].flush();
|
||||
in_mem[index].flush(can_advance_age);
|
||||
}
|
||||
self.stats.report_stats(self);
|
||||
if self.all_buckets_flushed_at_current_age() {
|
||||
|
@ -453,11 +466,11 @@ pub mod tests {
|
|||
let time = AGE_MS * 8 / 3;
|
||||
let expected = (time / AGE_MS) as Age;
|
||||
let now = Instant::now();
|
||||
test.bucket_flushed_at_current_age(); // done with age 0
|
||||
test.bucket_flushed_at_current_age(true); // done with age 0
|
||||
(0..threads).into_par_iter().for_each(|_| {
|
||||
while now.elapsed().as_millis() < (time as u128) {
|
||||
if test.maybe_advance_age() {
|
||||
test.bucket_flushed_at_current_age();
|
||||
test.bucket_flushed_at_current_age(true);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -472,7 +485,7 @@ pub mod tests {
|
|||
assert_eq!(test.current_age(), 0);
|
||||
for _ in 0..bins {
|
||||
assert!(!test.all_buckets_flushed_at_current_age());
|
||||
test.bucket_flushed_at_current_age();
|
||||
test.bucket_flushed_at_current_age(true);
|
||||
}
|
||||
std::thread::sleep(std::time::Duration::from_millis(AGE_MS * 2));
|
||||
test.maybe_advance_age();
|
||||
|
|
|
@ -99,9 +99,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
}
|
||||
|
||||
/// called after flush scans this bucket at the current age
|
||||
fn set_has_aged(&self, age: Age) {
|
||||
fn set_has_aged(&self, age: Age, can_advance_age: bool) {
|
||||
self.last_age_flushed.store(age, Ordering::Release);
|
||||
self.storage.bucket_flushed_at_current_age();
|
||||
self.storage.bucket_flushed_at_current_age(can_advance_age);
|
||||
}
|
||||
|
||||
fn last_age_flushed(&self) -> Age {
|
||||
|
@ -876,9 +876,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
self.stop_evictions_changes.load(Ordering::Acquire)
|
||||
}
|
||||
|
||||
pub(crate) fn flush(&self) {
|
||||
pub(crate) fn flush(&self, can_advance_age: bool) {
|
||||
if let Some(flush_guard) = FlushGuard::lock(&self.flushing_active) {
|
||||
self.flush_internal(&flush_guard)
|
||||
self.flush_internal(&flush_guard, can_advance_age)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -983,7 +983,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
}
|
||||
|
||||
/// synchronize the in-mem index with the disk index
|
||||
fn flush_internal(&self, flush_guard: &FlushGuard) {
|
||||
fn flush_internal(&self, flush_guard: &FlushGuard, can_advance_age: bool) {
|
||||
let current_age = self.storage.current_age();
|
||||
let iterate_for_age = self.get_should_age(current_age);
|
||||
let startup = self.storage.get_startup();
|
||||
|
@ -1093,7 +1093,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
|
|||
if iterate_for_age {
|
||||
// completed iteration of the buckets at the current age
|
||||
assert_eq!(current_age, self.storage.current_age());
|
||||
self.set_has_aged(current_age);
|
||||
self.set_has_aged(current_age, can_advance_age);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1555,13 +1555,13 @@ mod tests {
|
|||
let test = new_for_test::<u64>();
|
||||
assert!(test.get_should_age(test.storage.current_age()));
|
||||
assert_eq!(test.storage.count_buckets_flushed(), 0);
|
||||
test.set_has_aged(0);
|
||||
test.set_has_aged(0, true);
|
||||
assert!(!test.get_should_age(test.storage.current_age()));
|
||||
assert_eq!(test.storage.count_buckets_flushed(), 1);
|
||||
// simulate rest of buckets aging
|
||||
for _ in 1..BINS_FOR_TESTING {
|
||||
assert!(!test.storage.all_buckets_flushed_at_current_age());
|
||||
test.storage.bucket_flushed_at_current_age();
|
||||
test.storage.bucket_flushed_at_current_age(true);
|
||||
}
|
||||
assert!(test.storage.all_buckets_flushed_at_current_age());
|
||||
// advance age
|
||||
|
|
Loading…
Reference in New Issue