add ancient metrics (#25656)

* add ancient metrics

* review feedback
This commit is contained in:
Jeff Washington (jwash) 2022-06-01 10:36:23 -05:00 committed by GitHub
parent 07958fd8ed
commit 8bb76fcd87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 276 additions and 44 deletions

View File

@ -1081,6 +1081,8 @@ pub struct AccountsDb {
shrink_stats: ShrinkStats,
shrink_ancient_stats: ShrinkAncientStats,
pub cluster_type: Option<ClusterType>,
pub account_indexes: AccountSecondaryIndexes,
@ -1387,6 +1389,11 @@ impl CleanAccountsStats {
}
}
#[derive(Debug, Default)]
struct ShrinkAncientStats {
shrink_stats: ShrinkStats,
}
#[derive(Debug, Default)]
struct ShrinkStats {
last_report: AtomicInterval,
@ -1517,6 +1524,135 @@ impl ShrinkStats {
}
}
impl ShrinkAncientStats {
fn report(&self) {
if self.shrink_stats.last_report.should_update(1000) {
datapoint_info!(
"shrink_ancient_stats",
(
"num_slots_shrunk",
self.shrink_stats
.num_slots_shrunk
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"storage_read_elapsed",
self.shrink_stats
.storage_read_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"index_read_elapsed",
self.shrink_stats
.index_read_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"find_alive_elapsed",
self.shrink_stats
.find_alive_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"create_and_insert_store_elapsed",
self.shrink_stats
.create_and_insert_store_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"store_accounts_elapsed",
self.shrink_stats
.store_accounts_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"update_index_elapsed",
self.shrink_stats
.update_index_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"handle_reclaims_elapsed",
self.shrink_stats
.handle_reclaims_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"write_storage_elapsed",
self.shrink_stats
.write_storage_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"rewrite_elapsed",
self.shrink_stats.rewrite_elapsed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"drop_storage_entries_elapsed",
self.shrink_stats
.drop_storage_entries_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"recycle_stores_write_time",
self.shrink_stats
.recycle_stores_write_elapsed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"accounts_removed",
self.shrink_stats
.accounts_removed
.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"bytes_removed",
self.shrink_stats.bytes_removed.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"bytes_written",
self.shrink_stats.bytes_written.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"skipped_shrink",
self.shrink_stats.skipped_shrink.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"alive_accounts",
self.shrink_stats.alive_accounts.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"dead_accounts",
self.shrink_stats.dead_accounts.swap(0, Ordering::Relaxed) as i64,
i64
),
(
"accounts_loaded",
self.shrink_stats.accounts_loaded.swap(0, Ordering::Relaxed) as i64,
i64
),
);
}
}
}
fn quarter_thread_count() -> usize {
std::cmp::max(2, num_cpus::get() / 4)
}
@ -1771,6 +1907,7 @@ impl AccountsDb {
external_purge_slots_stats: PurgeStats::default(),
clean_accounts_stats: CleanAccountsStats::default(),
shrink_stats: ShrinkStats::default(),
shrink_ancient_stats: ShrinkAncientStats::default(),
stats: AccountsStats::default(),
cluster_type: None,
account_indexes: AccountSecondaryIndexes::default(),
@ -3291,8 +3428,13 @@ impl AccountsDb {
self.combine_ancient_slots(old_slots);
}
fn create_ancient_append_vec(&self, slot: Slot) -> Option<(Slot, Arc<AccountStorageEntry>)> {
let (new_ancient_storage, _time) =
/// create new ancient append vec
/// return it and the elapsed time for metrics
fn create_ancient_append_vec(
&self,
slot: Slot,
) -> (Option<(Slot, Arc<AccountStorageEntry>)>, Duration) {
let (new_ancient_storage, time) =
self.get_store_for_shrink(slot, get_ancient_append_vec_capacity());
info!(
"ancient_append_vec: creating initial ancient append vec: {}, size: {}, id: {}",
@ -3300,22 +3442,27 @@ impl AccountsDb {
get_ancient_append_vec_capacity(),
new_ancient_storage.append_vec_id(),
);
Some((slot, new_ancient_storage))
(
Some((slot, new_ancient_storage)),
Duration::from_micros(time),
)
}
/// return true if created
/// also return elapsed time for metrics
fn maybe_create_ancient_append_vec(
&self,
current_ancient: &mut Option<(Slot, Arc<AccountStorageEntry>)>,
slot: Slot,
) -> bool {
) -> (bool, Duration) {
if current_ancient.is_none() {
// our oldest slot is not an append vec of max size, or we filled the previous one.
// So, create a new ancient append vec at 'slot'
*current_ancient = self.create_ancient_append_vec(slot);
true
let result = self.create_ancient_append_vec(slot);
*current_ancient = result.0;
(true, result.1)
} else {
false
(false, Duration::default())
}
}
@ -3339,14 +3486,14 @@ impl AccountsDb {
ancient_store: &Arc<AccountStorageEntry>,
accounts: &AccountsToStore,
storage_selector: StorageSelector,
) {
) -> StoreAccountsTiming {
let (accounts, hashes) = accounts.get(storage_selector);
let _store_accounts_timing = self.store_accounts_frozen(
self.store_accounts_frozen(
(ancient_slot, accounts),
Some(hashes),
Some(ancient_store),
None,
);
)
}
/// get the storages from 'slot' to squash
@ -3433,13 +3580,14 @@ impl AccountsDb {
};
// this code is copied from shrink. I would like to combine it into a helper function, but the borrow checker has defeated my efforts so far.
let (stored_accounts, _num_stores, _original_bytes) =
let (stored_accounts, _num_stores, original_bytes) =
self.get_unique_accounts_from_storages(old_storages.iter());
// sort by pubkey to keep account index lookups close
let mut stored_accounts = stored_accounts.into_iter().collect::<Vec<_>>();
stored_accounts.sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let alive_total_collect = AtomicUsize::new(0);
let len = stored_accounts.len();
@ -3471,18 +3619,50 @@ impl AccountsDb {
});
});
let mut create_and_insert_store_elapsed = 0;
let alive_accounts = alive_accounts_collect.into_inner().unwrap();
let alive_total = alive_total_collect.load(Ordering::Relaxed);
index_read_elapsed.stop();
let aligned_total: u64 = Self::page_align(alive_total as u64);
// we could sort these
// could follow what shrink does more closely
if stored_accounts.is_empty() {
continue; // skipping slot with no useful accounts to write
}
let mut drop_root;
let mut ids = vec![];
let total_starting_accounts = stored_accounts.len();
let total_accounts_after_shrink = alive_accounts.len();
if alive_accounts.is_empty() {
// slot has no alive accounts
// but, it was a root and it probably had unref'd accounts
// so, the slot is no longer a root
drop_root = true;
} else {
self.maybe_create_ancient_append_vec(&mut current_ancient, slot);
let (_, time) = self.maybe_create_ancient_append_vec(&mut current_ancient, slot);
create_and_insert_store_elapsed += time.as_micros() as u64;
let (ancient_slot, ancient_store) =
current_ancient.as_ref().map(|(a, b)| (*a, b)).unwrap();
let available_bytes = ancient_store.accounts.remaining_bytes();
let mut start = Measure::start("find_alive_elapsed");
let to_store = AccountsToStore::new(available_bytes, &alive_accounts, slot);
start.stop();
let find_alive_elapsed = start.as_us();
let mut ids = vec![ancient_store.append_vec_id()];
// if this slot is not the ancient slot we're writing to, then this root will be dropped
let mut drop_root = slot != ancient_slot;
let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
// write what we can to the current ancient storage
let mut store_accounts_timing = self.store_ancient_accounts(
ancient_slot,
ancient_store,
&to_store,
StorageSelector::Primary,
);
// handle accounts from 'slot' which did not fit into the current ancient append vec
if to_store.has_overflow() {
// we need a new ancient append vec
let result = self.create_ancient_append_vec(slot);
create_and_insert_store_elapsed += result.1.as_micros() as u64;
current_ancient = result.0;
let (ancient_slot, ancient_store) =
current_ancient.as_ref().map(|(a, b)| (*a, b)).unwrap();
let available_bytes = ancient_store.accounts.remaining_bytes();
@ -3494,47 +3674,97 @@ impl AccountsDb {
// if this slot is not the ancient slot we're writing to, then this root will be dropped
drop_root = slot != ancient_slot;
// write what we can to the current ancient storage
self.store_ancient_accounts(
// write the rest to the next ancient storage
let timing = self.store_ancient_accounts(
ancient_slot,
ancient_store,
&to_store,
StorageSelector::Primary,
StorageSelector::Overflow,
);
// handle accounts from 'slot' which did not fit into the current ancient append vec
if to_store.has_overflow() {
// we need a new ancient append vec
current_ancient = self.create_ancient_append_vec(slot);
let (ancient_slot, ancient_store) =
current_ancient.as_ref().map(|(a, b)| (*a, b)).unwrap();
// now that this slot will be used to create a new ancient append vec, there will still be a root present at this slot, so don't drop this root
drop_root = false;
ids.push(ancient_store.append_vec_id());
// write the rest to the next ancient storage
self.store_ancient_accounts(
ancient_slot,
ancient_store,
&to_store,
StorageSelector::Overflow,
);
}
store_accounts_timing.store_accounts_elapsed = timing.store_accounts_elapsed;
store_accounts_timing.update_index_elapsed = timing.update_index_elapsed;
store_accounts_timing.handle_reclaims_elapsed = timing.handle_reclaims_elapsed;
}
rewrite_elapsed.stop();
let mut start = Measure::start("write_storage_elapsed");
// Purge old, overwritten storage entries
let mut dead_storages = vec![];
self.mark_dirty_dead_stores(slot, &mut dead_storages, |store| {
ids.contains(&store.append_vec_id())
});
start.stop();
let write_storage_elapsed = start.as_us();
self.drop_or_recycle_stores(dead_storages);
if drop_root {
dropped_roots.push(slot);
}
self.shrink_ancient_stats
.shrink_stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_ancient_stats
.shrink_stats
.create_and_insert_store_elapsed
.fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
self.shrink_ancient_stats
.shrink_stats
.store_accounts_elapsed
.fetch_add(
store_accounts_timing.store_accounts_elapsed,
Ordering::Relaxed,
);
self.shrink_ancient_stats
.shrink_stats
.update_index_elapsed
.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_ancient_stats
.shrink_stats
.handle_reclaims_elapsed
.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_ancient_stats
.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_ancient_stats
.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_ancient_stats
.shrink_stats
.accounts_removed
.fetch_add(
total_starting_accounts - total_accounts_after_shrink,
Ordering::Relaxed,
);
self.shrink_ancient_stats
.shrink_stats
.bytes_removed
.fetch_add(
original_bytes.saturating_sub(aligned_total),
Ordering::Relaxed,
);
self.shrink_ancient_stats
.shrink_stats
.bytes_written
.fetch_add(aligned_total, Ordering::Relaxed);
self.shrink_ancient_stats
.shrink_stats
.find_alive_elapsed
.fetch_add(find_alive_elapsed, Ordering::Relaxed);
self.shrink_ancient_stats
.shrink_stats
.num_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
}
if !dropped_roots.is_empty() {
@ -3543,6 +3773,8 @@ impl AccountsDb {
.clean_dead_slot(*slot, &mut AccountsIndexRootsStats::default());
});
}
self.shrink_ancient_stats.report();
}
pub fn shrink_candidate_slots(&self) -> usize {