Cleanup v1 shrink path (#15009)

move legacy functions to another impl block
This commit is contained in:
sakridge 2021-02-02 13:29:46 -08:00 committed by GitHub
parent fddbfe1052
commit 37aac5a12d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 363 additions and 350 deletions

View File

@ -611,7 +611,11 @@ pub struct AccountsDB {
/// distribute the accounts across storage lists
pub next_id: AtomicUsize,
/// Set of shrinkable stores organized by map of slot to append_vec_id
pub shrink_candidate_slots: Mutex<ShrinkCandidates>,
/// Legacy shrink slots to support non-cached code-path.
pub shrink_candidate_slots_v1: Mutex<Vec<Slot>>,
pub(crate) write_version: AtomicU64,
@ -1061,13 +1065,6 @@ impl AccountsDB {
..AccountsDB::new(Vec::new(), &ClusterType::Development)
}
}
#[cfg(test)]
pub fn new_sized(paths: Vec<PathBuf>, file_size: u64) -> Self {
AccountsDB {
file_size,
..AccountsDB::new(paths, &ClusterType::Development)
}
}
fn new_storage_entry(&self, slot: Slot, path: &Path, size: u64) -> AccountStorageEntry {
AccountStorageEntry::new(
@ -1842,340 +1839,6 @@ impl AccountsDB {
}
}
// Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVec filled with the alive.
fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize {
trace!("shrink_stale_slot: slot: {}", slot);
let mut stored_accounts = vec![];
let mut storage_read_elapsed = Measure::start("storage_read_elapsed");
{
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
let stores = stores_lock.read().unwrap();
let mut alive_count = 0;
let mut stored_count = 0;
let mut written_bytes = 0;
let mut total_bytes = 0;
for store in stores.values() {
alive_count += store.count();
stored_count += store.approx_stored_count();
written_bytes += store.written_bytes();
total_bytes += store.total_bytes();
}
if alive_count == stored_count && stores.values().len() == 1 {
trace!(
"shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}",
slot,
alive_count,
stored_count,
if forced { " (forced)" } else { "" },
);
return 0;
} else if !forced {
let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
let not_sparse = !sparse_by_count && !sparse_by_bytes;
let too_small_to_shrink = total_bytes <= PAGE_SIZE;
if not_sparse || too_small_to_shrink {
return 0;
}
info!(
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
);
}
for store in stores.values() {
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
account.meta.pubkey,
account.clone_account(),
*account.hash,
next - start,
(store.append_vec_id(), account.offset),
account.meta.write_version,
));
start = next;
}
}
}
}
storage_read_elapsed.stop();
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let alive_accounts: Vec<_> = {
stored_accounts
.iter()
.filter(
|(
pubkey,
_account,
_account_hash,
_storage_size,
(store_id, offset),
_write_version,
)| {
if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None)
{
locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
} else {
false
}
},
)
.collect()
};
index_read_elapsed.stop();
let alive_total: u64 = alive_accounts
.iter()
.map(
|(_pubkey, _account, _account_hash, account_size, _location, _write_verion)| {
*account_size as u64
},
)
.sum();
let aligned_total: u64 = self.page_align(alive_total);
debug!(
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
slot,
stored_accounts.len(),
alive_accounts.len(),
alive_total,
aligned_total
);
let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
let mut dead_storages = vec![];
let mut find_alive_elapsed = 0;
let mut create_and_insert_store_elapsed = 0;
let mut write_storage_elapsed = 0;
let mut store_accounts_timing = StoreAccountsTiming::default();
if aligned_total > 0 {
let mut start = Measure::start("find_alive_elapsed");
let mut accounts = Vec::with_capacity(alive_accounts.len());
let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len());
for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts
{
accounts.push((pubkey, account));
hashes.push(*account_hash);
write_versions.push(*write_version);
}
start.stop();
find_alive_elapsed = start.as_us();
let mut start = Measure::start("create_and_insert_store_elapsed");
let shrunken_store = if let Some(new_store) =
self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024)
{
new_store
} else {
let maybe_shrink_paths = self.shrink_paths.read().unwrap();
if let Some(ref shrink_paths) = *maybe_shrink_paths {
self.create_and_insert_store_with_paths(
slot,
aligned_total,
"shrink-w-path",
shrink_paths,
)
} else {
self.create_and_insert_store(slot, aligned_total, "shrink")
}
};
start.stop();
create_and_insert_store_elapsed = start.as_us();
// here, we're writing back alive_accounts. That should be an atomic operation
// without use of rather wide locks in this whole function, because we're
// mutating rooted slots; There should be no writers to them.
store_accounts_timing = self.store_accounts_custom(
slot,
&accounts,
&hashes,
Some(Box::new(move |_, _| shrunken_store.clone())),
Some(Box::new(write_versions.into_iter())),
false,
);
let mut start = Measure::start("write_storage_elapsed");
if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
slot_stores.write().unwrap().retain(|_key, store| {
if store.count() == 0 {
dead_storages.push(store.clone());
}
store.count() > 0
});
}
start.stop();
write_storage_elapsed = start.as_us();
}
rewrite_elapsed.stop();
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.len() < MAX_RECYCLE_STORES {
recycle_stores.extend(dead_storages);
drop(recycle_stores);
} else {
self.stats
.dropped_stores
.fetch_add(recycle_stores.len() as u64, Ordering::Relaxed);
drop(recycle_stores);
drop(dead_storages);
}
drop_storage_entries_elapsed.stop();
self.shrink_stats
.num_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
self.shrink_stats
.storage_read_elapsed
.fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.find_alive_elapsed
.fetch_add(find_alive_elapsed, Ordering::Relaxed);
self.shrink_stats
.create_and_insert_store_elapsed
.fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
self.shrink_stats.store_accounts_elapsed.fetch_add(
store_accounts_timing.store_accounts_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.report();
alive_accounts.len()
}
fn do_reset_uncleaned_roots_v1(
&self,
candidates: &mut MutexGuard<Vec<Slot>>,
max_clean_root: Option<Slot>,
) {
let previous_roots = self.accounts_index.reset_uncleaned_roots(max_clean_root);
candidates.extend(previous_roots);
}
#[cfg(test)]
fn reset_uncleaned_roots_v1(&self) {
self.do_reset_uncleaned_roots_v1(&mut self.shrink_candidate_slots_v1.lock().unwrap(), None);
}
fn do_shrink_stale_slot_v1(&self, slot: Slot) -> usize {
self.do_shrink_slot_v1(slot, false)
}
fn do_shrink_slot_forced_v1(&self, slot: Slot) {
self.do_shrink_slot_v1(slot, true);
}
fn shrink_stale_slot_v1(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
let mut shrunken_account_total = 0;
let mut shrunk_slot_count = 0;
let start = Instant::now();
let num_roots = self.accounts_index.num_roots();
loop {
if let Some(slot) = self.do_next_shrink_slot_v1(candidates) {
shrunken_account_total += self.do_shrink_stale_slot_v1(slot);
} else {
return 0;
}
if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 {
debug!(
"do_shrink_stale_slot_v1: {} {} {}us",
shrunk_slot_count,
candidates.len(),
start.elapsed().as_micros()
);
break;
}
shrunk_slot_count += 1;
}
shrunken_account_total
}
// Infinitely returns rooted roots in cyclic order
fn do_next_shrink_slot_v1(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> Option<Slot> {
// At this point, a lock (= candidates) is ensured to be held to keep
// do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates.
// Also, candidates in the lock may be swapped here if it's empty.
let next = candidates.pop();
if next.is_some() {
next
} else {
let mut new_all_slots = self.all_root_slots_in_index();
let next = new_all_slots.pop();
// refresh candidates for later calls!
**candidates = new_all_slots;
next
}
}
#[cfg(test)]
fn next_shrink_slot_v1(&self) -> Option<Slot> {
let mut candidates = self.shrink_candidate_slots_v1.lock().unwrap();
self.do_next_shrink_slot_v1(&mut candidates)
}
pub fn process_stale_slot_v1(&self) -> usize {
let mut measure = Measure::start("stale_slot_shrink-ms");
let candidates = self.shrink_candidate_slots_v1.try_lock();
if candidates.is_err() {
// skip and return immediately if locked by clean_accounts()
// the calling background thread will just retry later.
return 0;
}
// hold this lock as long as this shrinking process is running to avoid conflicts
// with clean_accounts().
let mut candidates = candidates.unwrap();
let count = self.shrink_stale_slot_v1(&mut candidates);
measure.stop();
inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize);
count
}
#[cfg(test)]
fn shrink_all_stale_slots_v1(&self) {
for slot in self.all_slots_in_storage() {
self.do_shrink_stale_slot_v1(slot);
}
}
fn all_slots_in_storage(&self) -> Vec<Slot> {
self.storage.all_slots()
}
@ -2416,14 +2079,6 @@ impl AccountsDB {
.map(|loaded_account| (loaded_account.account(), slot))
}
#[cfg(test)]
pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize {
self.storage
.get_slot_stores(slot)
.map(|storages| storages.read().unwrap().values().map(|s| s.count()).sum())
.unwrap_or(0)
}
pub fn load_account_hash(&self, ancestors: &Ancestors, pubkey: &Pubkey) -> Hash {
let (slot, store_id, offset) = {
let (lock, index) = self
@ -4644,13 +4299,371 @@ impl AccountsDB {
}
}
}
}
#[cfg(test)]
impl AccountsDB {
pub fn new_sized(paths: Vec<PathBuf>, file_size: u64) -> Self {
AccountsDB {
file_size,
..AccountsDB::new(paths, &ClusterType::Development)
}
}
#[cfg(test)]
pub fn get_append_vec_id(&self, pubkey: &Pubkey, slot: Slot) -> Option<AppendVecId> {
let ancestors = vec![(slot, 1)].into_iter().collect();
let result = self.accounts_index.get(&pubkey, Some(&ancestors), None);
result.map(|(list, index)| list.slot_list()[index].1.store_id)
}
pub fn alive_account_count_in_slot(&self, slot: Slot) -> usize {
self.storage
.get_slot_stores(slot)
.map(|storages| storages.read().unwrap().values().map(|s| s.count()).sum())
.unwrap_or(0)
}
}
/// Legacy shrink functions to support non-cached path.
/// Should be able to be deleted after cache path is the only path.
impl AccountsDB {
// Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVec filled with the alive.
// v1 path shrinks all stores in the slot
//
// Requires all stores in the slot to be re-written otherwise the accounts_index
// store ref count could become incorrect.
fn do_shrink_slot_v1(&self, slot: Slot, forced: bool) -> usize {
trace!("shrink_stale_slot: slot: {}", slot);
let mut stored_accounts = vec![];
let mut storage_read_elapsed = Measure::start("storage_read_elapsed");
{
if let Some(stores_lock) = self.storage.get_slot_stores(slot) {
let stores = stores_lock.read().unwrap();
let mut alive_count = 0;
let mut stored_count = 0;
let mut written_bytes = 0;
let mut total_bytes = 0;
for store in stores.values() {
alive_count += store.count();
stored_count += store.approx_stored_count();
written_bytes += store.written_bytes();
total_bytes += store.total_bytes();
}
if alive_count == stored_count && stores.values().len() == 1 {
trace!(
"shrink_stale_slot ({}): not able to shrink at all: alive/stored: {} / {} {}",
slot,
alive_count,
stored_count,
if forced { " (forced)" } else { "" },
);
return 0;
} else if !forced {
let sparse_by_count = (alive_count as f32 / stored_count as f32) <= 0.8;
let sparse_by_bytes = (written_bytes as f32 / total_bytes as f32) <= 0.8;
let not_sparse = !sparse_by_count && !sparse_by_bytes;
let too_small_to_shrink = total_bytes <= PAGE_SIZE;
if not_sparse || too_small_to_shrink {
return 0;
}
info!(
"shrink_stale_slot ({}): not_sparse: {} count: {}/{} byte: {}/{}",
slot, not_sparse, alive_count, stored_count, written_bytes, total_bytes,
);
}
for store in stores.values() {
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
account.meta.pubkey,
account.clone_account(),
*account.hash,
next - start,
(store.append_vec_id(), account.offset),
account.meta.write_version,
));
start = next;
}
}
}
}
storage_read_elapsed.stop();
let mut index_read_elapsed = Measure::start("index_read_elapsed");
let alive_accounts: Vec<_> = {
stored_accounts
.iter()
.filter(
|(
pubkey,
_account,
_account_hash,
_storage_size,
(store_id, offset),
_write_version,
)| {
if let Some((locked_entry, _)) = self.accounts_index.get(pubkey, None, None)
{
locked_entry
.slot_list()
.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
} else {
false
}
},
)
.collect()
};
index_read_elapsed.stop();
let alive_total: u64 = alive_accounts
.iter()
.map(
|(_pubkey, _account, _account_hash, account_size, _location, _write_verion)| {
*account_size as u64
},
)
.sum();
let aligned_total: u64 = self.page_align(alive_total);
debug!(
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
slot,
stored_accounts.len(),
alive_accounts.len(),
alive_total,
aligned_total
);
let mut rewrite_elapsed = Measure::start("rewrite_elapsed");
let mut dead_storages = vec![];
let mut find_alive_elapsed = 0;
let mut create_and_insert_store_elapsed = 0;
let mut write_storage_elapsed = 0;
let mut store_accounts_timing = StoreAccountsTiming::default();
if aligned_total > 0 {
let mut start = Measure::start("find_alive_elapsed");
let mut accounts = Vec::with_capacity(alive_accounts.len());
let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len());
for (pubkey, account, account_hash, _size, _location, write_version) in &alive_accounts
{
accounts.push((pubkey, account));
hashes.push(*account_hash);
write_versions.push(*write_version);
}
start.stop();
find_alive_elapsed = start.as_us();
let mut start = Measure::start("create_and_insert_store_elapsed");
let shrunken_store = if let Some(new_store) =
self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024)
{
new_store
} else {
let maybe_shrink_paths = self.shrink_paths.read().unwrap();
if let Some(ref shrink_paths) = *maybe_shrink_paths {
self.create_and_insert_store_with_paths(
slot,
aligned_total,
"shrink-w-path",
shrink_paths,
)
} else {
self.create_and_insert_store(slot, aligned_total, "shrink")
}
};
start.stop();
create_and_insert_store_elapsed = start.as_us();
// here, we're writing back alive_accounts. That should be an atomic operation
// without use of rather wide locks in this whole function, because we're
// mutating rooted slots; There should be no writers to them.
store_accounts_timing = self.store_accounts_custom(
slot,
&accounts,
&hashes,
Some(Box::new(move |_, _| shrunken_store.clone())),
Some(Box::new(write_versions.into_iter())),
false,
);
let mut start = Measure::start("write_storage_elapsed");
if let Some(slot_stores) = self.storage.get_slot_stores(slot) {
slot_stores.write().unwrap().retain(|_key, store| {
if store.count() == 0 {
dead_storages.push(store.clone());
}
store.count() > 0
});
}
start.stop();
write_storage_elapsed = start.as_us();
}
rewrite_elapsed.stop();
let mut recycle_stores_write_elapsed = Measure::start("recycle_stores_write_elapsed");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_elapsed.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
if recycle_stores.len() < MAX_RECYCLE_STORES {
recycle_stores.extend(dead_storages);
drop(recycle_stores);
} else {
self.stats
.dropped_stores
.fetch_add(recycle_stores.len() as u64, Ordering::Relaxed);
drop(recycle_stores);
drop(dead_storages);
}
drop_storage_entries_elapsed.stop();
self.shrink_stats
.num_slots_shrunk
.fetch_add(1, Ordering::Relaxed);
self.shrink_stats
.storage_read_elapsed
.fetch_add(storage_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.index_read_elapsed
.fetch_add(index_read_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.find_alive_elapsed
.fetch_add(find_alive_elapsed, Ordering::Relaxed);
self.shrink_stats
.create_and_insert_store_elapsed
.fetch_add(create_and_insert_store_elapsed, Ordering::Relaxed);
self.shrink_stats.store_accounts_elapsed.fetch_add(
store_accounts_timing.store_accounts_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.update_index_elapsed.fetch_add(
store_accounts_timing.update_index_elapsed,
Ordering::Relaxed,
);
self.shrink_stats.handle_reclaims_elapsed.fetch_add(
store_accounts_timing.handle_reclaims_elapsed,
Ordering::Relaxed,
);
self.shrink_stats
.write_storage_elapsed
.fetch_add(write_storage_elapsed, Ordering::Relaxed);
self.shrink_stats
.rewrite_elapsed
.fetch_add(rewrite_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.drop_storage_entries_elapsed
.fetch_add(drop_storage_entries_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats
.recycle_stores_write_elapsed
.fetch_add(recycle_stores_write_elapsed.as_us(), Ordering::Relaxed);
self.shrink_stats.report();
alive_accounts.len()
}
fn do_reset_uncleaned_roots_v1(
&self,
candidates: &mut MutexGuard<Vec<Slot>>,
max_clean_root: Option<Slot>,
) {
let previous_roots = self.accounts_index.reset_uncleaned_roots(max_clean_root);
candidates.extend(previous_roots);
}
#[cfg(test)]
fn reset_uncleaned_roots_v1(&self) {
self.do_reset_uncleaned_roots_v1(&mut self.shrink_candidate_slots_v1.lock().unwrap(), None);
}
fn do_shrink_stale_slot_v1(&self, slot: Slot) -> usize {
self.do_shrink_slot_v1(slot, false)
}
fn do_shrink_slot_forced_v1(&self, slot: Slot) {
self.do_shrink_slot_v1(slot, true);
}
fn shrink_stale_slot_v1(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> usize {
let mut shrunken_account_total = 0;
let mut shrunk_slot_count = 0;
let start = Instant::now();
let num_roots = self.accounts_index.num_roots();
loop {
if let Some(slot) = self.do_next_shrink_slot_v1(candidates) {
shrunken_account_total += self.do_shrink_stale_slot_v1(slot);
} else {
return 0;
}
if start.elapsed().as_millis() > 100 || shrunk_slot_count > num_roots / 10 {
debug!(
"do_shrink_stale_slot_v1: {} {} {}us",
shrunk_slot_count,
candidates.len(),
start.elapsed().as_micros()
);
break;
}
shrunk_slot_count += 1;
}
shrunken_account_total
}
// Infinitely returns rooted roots in cyclic order
fn do_next_shrink_slot_v1(&self, candidates: &mut MutexGuard<Vec<Slot>>) -> Option<Slot> {
// At this point, a lock (= candidates) is ensured to be held to keep
// do_reset_uncleaned_roots() (in clean_accounts()) from updating candidates.
// Also, candidates in the lock may be swapped here if it's empty.
let next = candidates.pop();
if next.is_some() {
next
} else {
let mut new_all_slots = self.all_root_slots_in_index();
let next = new_all_slots.pop();
// refresh candidates for later calls!
**candidates = new_all_slots;
next
}
}
#[cfg(test)]
fn next_shrink_slot_v1(&self) -> Option<Slot> {
let mut candidates = self.shrink_candidate_slots_v1.lock().unwrap();
self.do_next_shrink_slot_v1(&mut candidates)
}
pub fn process_stale_slot_v1(&self) -> usize {
let mut measure = Measure::start("stale_slot_shrink-ms");
let candidates = self.shrink_candidate_slots_v1.try_lock();
if candidates.is_err() {
// skip and return immediately if locked by clean_accounts()
// the calling background thread will just retry later.
return 0;
}
// hold this lock as long as this shrinking process is running to avoid conflicts
// with clean_accounts().
let mut candidates = candidates.unwrap();
let count = self.shrink_stale_slot_v1(&mut candidates);
measure.stop();
inc_new_counter_info!("stale_slot_shrink-ms", measure.as_ms() as usize);
count
}
#[cfg(test)]
fn shrink_all_stale_slots_v1(&self) {
for slot in self.all_slots_in_storage() {
self.do_shrink_stale_slot_v1(slot);
}
}
}
#[cfg(test)]