Re-use accounts stores (#12885)

* Re-use accounts_db stores

Creating files and dropping mmap areas can be expensive

* Add test for storage finder

Can encounter an infinite loop when the store is too small, but
smaller than the normal store size.

* Fix storage finding

* Check for strong_count == 1

* try_recycle helper
This commit is contained in:
sakridge 2020-11-04 09:17:05 -08:00 committed by GitHub
parent b0d1ae1d8b
commit 43053dcc90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 459 additions and 77 deletions

View File

@ -56,6 +56,8 @@ const PAGE_SIZE: u64 = 4 * 1024;
pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024;
pub const DEFAULT_NUM_THREADS: u32 = 8;
pub const DEFAULT_NUM_DIRS: u32 = 4;
const MAX_RECYCLE_STORES: usize = 5000;
const STORE_META_OVERHEAD: usize = 256;
lazy_static! {
// FROZEN_ACCOUNT_PANIC is used to signal local_cluster that an AccountsDB panic has occurred,
@ -172,9 +174,9 @@ pub enum BankHashVerificationError {
/// Persistent storage structure holding the accounts
#[derive(Debug)]
pub struct AccountStorageEntry {
pub(crate) id: AppendVecId,
pub(crate) id: AtomicUsize,
pub(crate) slot: Slot,
pub(crate) slot: AtomicU64,
/// storage holding the accounts
pub(crate) accounts: AppendVec,
@ -202,8 +204,8 @@ impl AccountStorageEntry {
let accounts = AppendVec::new(&path, true, file_size as usize);
Self {
id,
slot,
id: AtomicUsize::new(id),
slot: AtomicU64::new(slot),
accounts,
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
@ -212,8 +214,8 @@ impl AccountStorageEntry {
pub(crate) fn new_empty_map(id: AppendVecId, accounts_current_len: usize) -> Self {
Self {
id,
slot: 0,
id: AtomicUsize::new(id),
slot: AtomicU64::new(0),
accounts: AppendVec::new_empty_map(accounts_current_len),
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
@ -241,6 +243,15 @@ impl AccountStorageEntry {
*count_and_status = (count, status);
}
pub fn recycle(&self, slot: Slot, id: usize) {
let mut count_and_status = self.count_and_status.write().unwrap();
self.accounts.reset();
*count_and_status = (0, AccountStorageStatus::Available);
self.slot.store(slot, Ordering::Release);
self.id.store(id, Ordering::Relaxed);
self.approx_store_count.store(0, Ordering::Relaxed);
}
pub fn status(&self) -> AccountStorageStatus {
self.count_and_status.read().unwrap().1
}
@ -258,11 +269,11 @@ impl AccountStorageEntry {
}
pub fn slot(&self) -> Slot {
self.slot
self.slot.load(Ordering::Acquire)
}
pub fn append_vec_id(&self) -> AppendVecId {
self.id
self.id.load(Ordering::Relaxed)
}
pub fn flush(&self) -> Result<(), IOError> {
@ -321,8 +332,8 @@ impl AccountStorageEntry {
assert!(
count > 0,
"double remove of account in slot: {}/store: {}!!",
self.slot,
self.id
self.slot(),
self.append_vec_id(),
);
count -= 1;
@ -405,6 +416,8 @@ pub struct AccountsDB {
pub storage: AccountStorage,
recycle_stores: RwLock<Vec<Arc<AccountStorageEntry>>>,
/// distribute the accounts across storage lists
pub next_id: AtomicUsize,
pub shrink_candidate_slots: Mutex<Vec<Slot>>,
@ -455,6 +468,11 @@ struct AccountsStats {
store_find_store: AtomicU64,
store_num_accounts: AtomicU64,
store_total_data: AtomicU64,
recycle_store_count: AtomicU64,
create_store_count: AtomicU64,
store_get_slot_store: AtomicU64,
store_find_existing: AtomicU64,
dropped_stores: AtomicU64,
}
fn make_min_priority_thread_pool() -> ThreadPool {
@ -491,6 +509,7 @@ impl Default for AccountsDB {
AccountsDB {
accounts_index: AccountsIndex::default(),
storage: AccountStorage(DashMap::new()),
recycle_stores: RwLock::new(Vec::new()),
next_id: AtomicUsize::new(0),
shrink_candidate_slots: Mutex::new(Vec::new()),
write_version: AtomicU64::new(0),
@ -1009,7 +1028,7 @@ impl AccountsDB {
account.clone_account(),
*account.hash,
next - start,
(store.id, account.offset),
(store.append_vec_id(), account.offset),
account.meta.write_version,
));
start = next;
@ -1055,7 +1074,7 @@ impl AccountsDB {
},
)
.sum();
let aligned_total: u64 = (alive_total + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1);
let aligned_total: u64 = self.page_align(alive_total);
debug!(
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
@ -1090,7 +1109,13 @@ impl AccountsDB {
find_alive_elapsed = start.as_us();
let mut start = Measure::start("create_and_insert_store_elapsed");
let shrunken_store = self.create_and_insert_store(slot, aligned_total);
let shrunken_store = if let Some(new_store) =
self.try_recycle_and_insert_store(slot, aligned_total, aligned_total + 1024)
{
new_store
} else {
self.create_and_insert_store(slot, aligned_total, "shrink")
};
start.stop();
create_and_insert_store_elapsed = start.as_us();
@ -1102,7 +1127,7 @@ impl AccountsDB {
slot,
&accounts,
&hashes,
|_| shrunken_store.clone(),
|_, _| shrunken_store.clone(),
write_versions.into_iter(),
);
start.stop();
@ -1132,8 +1157,21 @@ impl AccountsDB {
}
rewrite_elapsed.stop();
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop();
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
drop(dead_storages);
if recycle_stores.len() < MAX_RECYCLE_STORES {
recycle_stores.extend(dead_storages);
drop(recycle_stores);
} else {
self.stats
.dropped_stores
.fetch_add(recycle_stores.len() as u64, Ordering::Relaxed);
drop(recycle_stores);
drop(dead_storages);
}
drop_storage_entries_elapsed.stop();
datapoint_info!(
@ -1156,6 +1194,11 @@ impl AccountsDB {
drop_storage_entries_elapsed.as_us(),
i64
),
(
"recycle_stores_write_time",
recycle_stores_write_time.as_us(),
i64
),
);
alive_accounts.len()
}
@ -1287,7 +1330,7 @@ impl AccountsDB {
let accounts = storage.accounts.accounts(0);
let mut retval = B::default();
accounts.iter().for_each(|stored_account| {
scan_func(stored_account, storage.id, &mut retval)
scan_func(stored_account, storage.append_vec_id(), &mut retval)
});
retval
})
@ -1381,9 +1424,69 @@ impl AccountsDB {
.and_then(|account_storage_entry| account_storage_entry.get_account(account_info))
}
fn find_storage_candidate(&self, slot: Slot) -> Arc<AccountStorageEntry> {
fn try_recycle_and_insert_store(
&self,
slot: Slot,
min_size: u64,
max_size: u64,
) -> Option<Arc<AccountStorageEntry>> {
let store = self.try_recycle_store(slot, min_size, max_size)?;
self.insert_store(slot, store.clone());
Some(store)
}
fn try_recycle_store(
&self,
slot: Slot,
min_size: u64,
max_size: u64,
) -> Option<Arc<AccountStorageEntry>> {
let mut max = 0;
let mut min = std::u64::MAX;
let mut avail = 0;
let mut recycle_stores = self.recycle_stores.write().unwrap();
for (i, store) in recycle_stores.iter().enumerate() {
if Arc::strong_count(store) == 1 {
max = std::cmp::max(store.accounts.capacity(), max);
min = std::cmp::min(store.accounts.capacity(), min);
avail += 1;
if store.accounts.capacity() >= min_size && store.accounts.capacity() < max_size {
let ret = recycle_stores.swap_remove(i);
drop(recycle_stores);
let old_id = ret.append_vec_id();
ret.recycle(slot, self.next_id.fetch_add(1, Ordering::Relaxed));
debug!(
"recycling store: {} {:?} old_id: {}",
ret.append_vec_id(),
ret.get_path(),
old_id
);
return Some(ret);
}
}
}
debug!(
"no recycle stores max: {} min: {} len: {} looking: {}, {} avail: {}",
max,
min,
recycle_stores.len(),
min_size,
max_size,
avail,
);
None
}
fn find_storage_candidate(&self, slot: Slot, size: usize) -> Arc<AccountStorageEntry> {
let mut create_extra = false;
let mut get_slot_stores = Measure::start("get_slot_stores");
let slot_stores_lock = self.storage.get_slot_stores(slot);
get_slot_stores.stop();
self.stats
.store_get_slot_store
.fetch_add(get_slot_stores.as_us(), Ordering::Relaxed);
let mut find_existing = Measure::start("find_existing");
if let Some(slot_stores_lock) = slot_stores_lock {
let slot_stores = slot_stores_lock.read().unwrap();
if !slot_stores.is_empty() {
@ -1407,8 +1510,24 @@ impl AccountsDB {
let ret = store.clone();
drop(slot_stores);
if create_extra {
self.create_and_insert_store(slot, self.file_size);
if self
.try_recycle_and_insert_store(slot, size as u64, std::u64::MAX)
.is_none()
{
self.stats
.create_store_count
.fetch_add(1, Ordering::Relaxed);
self.create_and_insert_store(slot, self.file_size, "store extra");
} else {
self.stats
.recycle_store_count
.fetch_add(1, Ordering::Relaxed);
}
}
find_existing.stop();
self.stats
.store_find_existing
.fetch_add(find_existing.as_us(), Ordering::Relaxed);
return ret;
}
// looked at every store, bail...
@ -1418,18 +1537,84 @@ impl AccountsDB {
}
}
}
find_existing.stop();
self.stats
.store_find_existing
.fetch_add(find_existing.as_us(), Ordering::Relaxed);
let store = self.create_and_insert_store(slot, self.file_size);
store.try_available();
let store = if let Some(store) = self.try_recycle_store(slot, size as u64, std::u64::MAX) {
self.stats
.recycle_store_count
.fetch_add(1, Ordering::Relaxed);
store
} else {
self.stats
.create_store_count
.fetch_add(1, Ordering::Relaxed);
self.create_store(slot, self.file_size, "store")
};
// try_available is like taking a lock on the store,
// preventing other threads from using it.
// It must succeed here and happen before insert,
// otherwise another thread could also grab it from the index.
assert!(store.try_available());
self.insert_store(slot, store.clone());
store
}
fn create_and_insert_store(&self, slot: Slot, size: u64) -> Arc<AccountStorageEntry> {
fn page_align(&self, size: u64) -> u64 {
(size + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1)
}
fn has_space_available(&self, slot: Slot, size: u64) -> bool {
let slot_storage = self.storage.get_slot_stores(slot).unwrap();
let slot_storage_r = slot_storage.read().unwrap();
for (_id, store) in slot_storage_r.iter() {
if store.status() == AccountStorageStatus::Available
&& (store.accounts.capacity() - store.accounts.len() as u64) > size
{
return true;
}
}
false
}
fn create_store(&self, slot: Slot, size: u64, from: &str) -> Arc<AccountStorageEntry> {
let path_index = thread_rng().gen_range(0, self.paths.len());
let store =
Arc::new(self.new_storage_entry(slot, &Path::new(&self.paths[path_index]), size));
let store = Arc::new(self.new_storage_entry(
slot,
&Path::new(&self.paths[path_index]),
self.page_align(size),
));
debug!(
"creating store: {} slot: {} len: {} size: {} from: {} path: {:?}",
store.append_vec_id(),
slot,
store.accounts.len(),
store.accounts.capacity(),
from,
store.accounts.get_path()
);
store
}
fn create_and_insert_store(
&self,
slot: Slot,
size: u64,
from: &str,
) -> Arc<AccountStorageEntry> {
let store = self.create_store(slot, size, from);
let store_for_index = store.clone();
self.insert_store(slot, store_for_index);
store
}
fn insert_store(&self, slot: Slot, store: Arc<AccountStorageEntry>) {
let slot_storages: SlotStores = self.storage.get_slot_stores(slot).unwrap_or_else(||
// DashMap entry.or_insert() returns a RefMut, essentially a write lock,
// which is dropped after this block ends, minimizing time held by the lock.
@ -1444,8 +1629,7 @@ impl AccountsDB {
slot_storages
.write()
.unwrap()
.insert(store.id, store_for_index);
store
.insert(store.append_vec_id(), store);
}
pub fn purge_slot(&self, slot: Slot) {
@ -1454,6 +1638,34 @@ impl AccountsDB {
self.purge_slots(&slots);
}
fn recycle_slot_stores(
&self,
total_removed_storage_entries: usize,
slot_stores: &[SlotStores],
) -> u64 {
let mut recycled_count = 0;
let mut recycle_stores_write_time = Measure::start("recycle_stores_write_time");
let mut recycle_stores = self.recycle_stores.write().unwrap();
recycle_stores_write_time.stop();
for slot_entries in slot_stores {
let entry = slot_entries.read().unwrap();
for (_store_id, stores) in entry.iter() {
if recycle_stores.len() > MAX_RECYCLE_STORES {
let dropped_count = total_removed_storage_entries - recycled_count;
self.stats
.dropped_stores
.fetch_add(dropped_count as u64, Ordering::Relaxed);
return recycle_stores_write_time.as_us();
}
recycle_stores.push(stores.clone());
recycled_count += 1;
}
}
recycle_stores_write_time.as_us()
}
fn purge_slots(&self, slots: &HashSet<Slot>) {
//add_root should be called first
let non_roots: Vec<_> = slots
@ -1475,13 +1687,16 @@ impl AccountsDB {
.map(|i| i.accounts.capacity())
.sum::<u64>();
}
all_removed_slot_storages.push(slot_removed_storages);
all_removed_slot_storages.push(slot_removed_storages.clone());
}
}
remove_storages_elapsed.stop();
let num_slots_removed = all_removed_slot_storages.len();
let recycle_stores_write_time =
self.recycle_slot_stores(total_removed_storage_entries, &all_removed_slot_storages);
let mut drop_storage_entries_elapsed = Measure::start("drop_storage_entries_elapsed");
// Backing mmaps for removed storages entries explicitly dropped here outside
// of any locks
@ -1507,6 +1722,11 @@ impl AccountsDB {
i64
),
("total_removed_bytes", total_removed_bytes, i64),
(
"recycle_stores_write_elapsed",
recycle_stores_write_time,
i64
),
);
}
@ -1737,7 +1957,7 @@ impl AccountsDB {
Some(ret)
});
let storage_finder = |slot| self.find_storage_candidate(slot);
let storage_finder = |slot, size| self.find_storage_candidate(slot, size);
self.store_accounts_to(
slot,
accounts,
@ -1747,7 +1967,10 @@ impl AccountsDB {
)
}
fn store_accounts_to<F: FnMut(Slot) -> Arc<AccountStorageEntry>, P: Iterator<Item = u64>>(
fn store_accounts_to<
F: FnMut(Slot, usize) -> Arc<AccountStorageEntry>,
P: Iterator<Item = u64>,
>(
&self,
slot: Slot,
accounts: &[(&Pubkey, &Account)],
@ -1779,7 +2002,10 @@ impl AccountsDB {
let mut total_storage_find_us = 0;
while infos.len() < with_meta.len() {
let mut storage_find = Measure::start("storage_finder");
let storage = storage_finder(slot);
let storage = storage_finder(
slot,
with_meta[infos.len()].1.data.len() + STORE_META_OVERHEAD,
);
storage_find.stop();
total_storage_find_us += storage_find.as_us();
let mut append_accounts = Measure::start("append_accounts");
@ -1791,17 +2017,30 @@ impl AccountsDB {
if rvs.is_empty() {
storage.set_status(AccountStorageStatus::Full);
// See if an account overflows the default append vec size.
let data_len = (with_meta[infos.len()].1.data.len() + 4096) as u64;
if data_len > self.file_size {
self.create_and_insert_store(slot, data_len * 2);
// See if an account overflows the append vecs in the slot.
let data_len = (with_meta[infos.len()].1.data.len() + STORE_META_OVERHEAD) as u64;
if !self.has_space_available(slot, data_len) {
let special_store_size = std::cmp::max(data_len * 2, self.file_size);
if self
.try_recycle_and_insert_store(slot, special_store_size, std::u64::MAX)
.is_none()
{
self.stats
.create_store_count
.fetch_add(1, Ordering::Relaxed);
self.create_and_insert_store(slot, special_store_size, "large create");
} else {
self.stats
.recycle_store_count
.fetch_add(1, Ordering::Relaxed);
}
}
continue;
}
for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) {
storage.add_account();
infos.push(AccountInfo {
store_id: storage.id,
store_id: storage.append_vec_id(),
offset: *offset,
lamports: account.lamports,
});
@ -1850,7 +2089,15 @@ impl AccountsDB {
}
info!("total_stores: {}, newest_slot: {}, oldest_slot: {}, max_slot: {} (num={}), min_slot: {} (num={})",
total_count, newest_slot, oldest_slot, max_slot, max, min_slot, min);
datapoint_info!("accounts_db-stores", ("total_count", total_count, i64));
datapoint_info!(
"accounts_db-stores",
("total_count", total_count, i64),
(
"recycle_count",
self.recycle_stores.read().unwrap().len() as u64,
i64
),
);
datapoint_info!(
"accounts_db-perf-stats",
(
@ -1911,26 +2158,41 @@ impl AccountsDB {
hasher.result()
}
fn accumulate_account_hashes(hashes: Vec<(Pubkey, Hash, u64)>) -> Hash {
let (hash, ..) = Self::do_accumulate_account_hashes_and_capitalization(hashes, false);
fn accumulate_account_hashes(
hashes: Vec<(Pubkey, Hash, u64)>,
slot: Slot,
debug: bool,
) -> Hash {
let (hash, ..) =
Self::do_accumulate_account_hashes_and_capitalization(hashes, false, slot, debug);
hash
}
fn accumulate_account_hashes_and_capitalization(
hashes: Vec<(Pubkey, Hash, u64)>,
slot: Slot,
debug: bool,
) -> (Hash, u64) {
let (hash, cap) = Self::do_accumulate_account_hashes_and_capitalization(hashes, true);
let (hash, cap) =
Self::do_accumulate_account_hashes_and_capitalization(hashes, true, slot, debug);
(hash, cap.unwrap())
}
fn do_accumulate_account_hashes_and_capitalization(
mut hashes: Vec<(Pubkey, Hash, u64)>,
calculate_cap: bool,
slot: Slot,
debug: bool,
) -> (Hash, Option<u64>) {
let mut sort_time = Measure::start("sort");
hashes.par_sort_by(|a, b| a.0.cmp(&b.0));
sort_time.stop();
if debug {
for (key, hash, _lamports) in &hashes {
info!("slot: {} key {} hash {}", slot, key, hash);
}
}
let mut sum_time = Measure::start("cap");
let cap = if calculate_cap {
Some(Self::checked_sum_for_capitalization(
@ -2049,7 +2311,7 @@ impl AccountsDB {
let mut accumulate = Measure::start("accumulate");
let (accumulated_hash, total_lamports) =
Self::accumulate_account_hashes_and_capitalization(hashes);
Self::accumulate_account_hashes_and_capitalization(hashes, slot, false);
accumulate.stop();
datapoint_info!(
"update_accounts_hash",
@ -2136,7 +2398,7 @@ impl AccountsDB {
.into_iter()
.map(|(pubkey, (_, hash))| (pubkey, hash, 0))
.collect();
let ret = Self::accumulate_account_hashes(hashes);
let ret = Self::accumulate_account_hashes(hashes, slot, false);
accumulate.stop();
self.stats
.delta_hash_scan_time_total_us
@ -2188,9 +2450,9 @@ impl AccountsDB {
.get_account_storage_entry(*slot, account_info.store_id)
{
assert_eq!(
*slot, store.slot,
*slot, store.slot(),
"AccountDB::accounts_index corrupted. Storage pointed to: {}, expected: {}, should only point to one slot",
store.slot, *slot
store.slot(), *slot
);
let count = store.remove_account();
if count == 0 {
@ -2236,7 +2498,7 @@ impl AccountsDB {
let accounts = store.accounts.accounts(0);
accounts
.into_iter()
.map(|account| (store.slot, account.meta.pubkey))
.map(|account| (store.slot(), account.meta.pubkey))
.collect::<HashSet<(Slot, Pubkey)>>()
})
.reduce(HashSet::new, |mut reduced, store_pubkeys| {
@ -2346,6 +2608,11 @@ impl AccountsDB {
/// Store the account update.
pub fn store(&self, slot: Slot, accounts: &[(&Pubkey, &Account)]) {
// If all transactions in a batch are errored,
// it's possible to get a store with no accounts.
if accounts.is_empty() {
return;
}
self.assert_frozen_accounts(accounts);
let mut hash_time = Measure::start("hash_accounts");
let hashes = self.hash_accounts(
@ -2416,6 +2683,35 @@ impl AccountsDB {
i64
),
);
datapoint_info!(
"accounts_db_store_timings2",
(
"recycle_store_count",
self.stats.recycle_store_count.swap(0, Ordering::Relaxed),
i64
),
(
"create_store_count",
self.stats.create_store_count.swap(0, Ordering::Relaxed),
i64
),
(
"store_get_slot_store",
self.stats.store_get_slot_store.swap(0, Ordering::Relaxed),
i64
),
(
"store_find_existing",
self.stats.store_find_existing.swap(0, Ordering::Relaxed),
i64
),
(
"dropped_stores",
self.stats.dropped_stores.swap(0, Ordering::Relaxed),
i64
),
);
}
}
@ -2581,12 +2877,25 @@ impl AccountsDB {
}
}
pub(crate) fn print_accounts_stats(&self, label: &'static str) {
pub(crate) fn print_accounts_stats(&self, label: &str) {
self.print_index(label);
self.print_count_and_status(label);
info!("recycle_stores:");
let recycle_stores = self.recycle_stores.read().unwrap();
for entry in recycle_stores.iter() {
info!(
" slot: {} id: {} count_and_status: {:?} approx_store_count: {} len: {} capacity: {}",
entry.slot(),
entry.append_vec_id(),
*entry.count_and_status.read().unwrap(),
entry.approx_store_count.load(Ordering::Relaxed),
entry.accounts.len(),
entry.accounts.capacity(),
);
}
}
fn print_index(&self, label: &'static str) {
fn print_index(&self, label: &str) {
let mut roots: Vec<_> = self.accounts_index.all_roots();
roots.sort();
info!("{}: accounts_index roots: {:?}", label, roots,);
@ -2599,7 +2908,7 @@ impl AccountsDB {
}
}
fn print_count_and_status(&self, label: &'static str) {
fn print_count_and_status(&self, label: &str) {
let mut slots: Vec<_> = self.storage.all_slots();
slots.sort();
info!("{}: count_and status for {} slots:", label, slots.len());
@ -2903,6 +3212,7 @@ pub mod tests {
#[test]
fn test_remove_unrooted_slot_snapshot() {
solana_logger::setup();
let unrooted_slot = 9;
let db = AccountsDB::new(Vec::new(), &ClusterType::Development);
let key = solana_sdk::pubkey::new_rand();
@ -3097,7 +3407,7 @@ pub mod tests {
all_storages.extend(slot_storage.read().unwrap().values().cloned())
}
for storage in all_storages {
*append_vec_histogram.entry(storage.slot).or_insert(0) += 1;
*append_vec_histogram.entry(storage.slot()).or_insert(0) += 1;
}
for count in append_vec_histogram.values() {
assert!(*count >= 2);
@ -3108,7 +3418,6 @@ pub mod tests {
fn test_account_grow() {
let accounts = AccountsDB::new_single();
let count = [0, 1];
let status = [AccountStorageStatus::Available, AccountStorageStatus::Full];
let pubkey1 = solana_sdk::pubkey::new_rand();
let account1 = Account::new(1, DEFAULT_FILE_SIZE as usize / 2, &pubkey1);
@ -3145,20 +3454,14 @@ pub mod tests {
);
// lots of stores, but 3 storages should be enough for everything
for i in 0..25 {
let index = i % 2;
for _ in 0..25 {
accounts.store(0, &[(&pubkey1, &account1)]);
{
assert_eq!(accounts.storage.0.len(), 1);
let stores = &accounts.storage.get_slot_stores(0).unwrap();
let r_stores = stores.read().unwrap();
assert_eq!(r_stores.len(), 3);
assert_eq!(r_stores[&0].count(), count[index]);
assert!(r_stores.len() <= 5);
assert_eq!(r_stores[&0].status(), status[0]);
assert_eq!(r_stores[&1].count(), 1);
assert_eq!(r_stores[&1].status(), status[1]);
assert_eq!(r_stores[&2].count(), count[index ^ 1]);
assert_eq!(r_stores[&2].status(), status[0]);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
@ -4083,11 +4386,10 @@ pub mod tests {
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value());
for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path();
let output_path = output_dir.as_ref().join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
let output_path = output_dir.as_ref().join(AppendVec::new_relative_path(
storage.slot(),
storage.append_vec_id(),
));
fs::copy(storage_path, output_path)?;
}
@ -4497,6 +4799,19 @@ pub mod tests {
}
}
#[test]
fn test_storage_finder() {
solana_logger::setup();
let db = AccountsDB::new_sized(Vec::new(), 16 * 1024);
let key = solana_sdk::pubkey::new_rand();
let lamports = 100;
let data_len = 8190;
let account = Account::new(lamports, data_len, &solana_sdk::pubkey::new_rand());
// pre-populate with a smaller empty store
db.create_and_insert_store(1, 8192, "test_storage_finder");
db.store(1, &[(&key, &account)]);
}
#[test]
fn test_get_snapshot_storages_empty() {
let db = AccountsDB::new(Vec::new(), &ClusterType::Development);
@ -5176,4 +5491,70 @@ pub mod tests {
3
);
}
#[test]
fn test_store_overhead() {
solana_logger::setup();
let accounts = AccountsDB::new_single();
let account = Account::default();
let pubkey = solana_sdk::pubkey::new_rand();
accounts.store(0, &[(&pubkey, &account)]);
let slot_stores = accounts.storage.get_slot_stores(0).unwrap();
let mut total_len = 0;
for (_id, store) in slot_stores.read().unwrap().iter() {
total_len += store.accounts.len();
}
info!("total: {}", total_len);
assert!(total_len < STORE_META_OVERHEAD);
}
#[test]
fn test_store_reuse() {
solana_logger::setup();
let accounts = AccountsDB::new_sized(vec![], 4096);
let size = 100;
let num_accounts: usize = 100;
let mut keys = Vec::new();
for i in 0..num_accounts {
let account = Account::new((i + 1) as u64, size, &Pubkey::default());
let pubkey = solana_sdk::pubkey::new_rand();
accounts.store(0, &[(&pubkey, &account)]);
keys.push(pubkey);
}
accounts.add_root(0);
for (i, key) in keys[1..].iter().enumerate() {
let account = Account::new((1 + i + num_accounts) as u64, size, &Pubkey::default());
accounts.store(1, &[(key, &account)]);
}
accounts.add_root(1);
accounts.clean_accounts(None);
accounts.shrink_all_slots();
accounts.print_accounts_stats("post-shrink");
let num_stores = accounts.recycle_stores.read().unwrap().len();
assert!(num_stores > 0);
let mut account_refs = Vec::new();
let num_to_store = 20;
for (i, key) in keys[..num_to_store].iter().enumerate() {
let account = Account::new(
(1 + i + 2 * num_accounts) as u64,
i + 20,
&Pubkey::default(),
);
accounts.store(2, &[(key, &account)]);
account_refs.push(account);
}
assert!(accounts.recycle_stores.read().unwrap().len() < num_stores);
accounts.print_accounts_stats("post-store");
let mut ancestors = HashMap::new();
ancestors.insert(1, 0);
ancestors.insert(2, 1);
for (key, account_ref) in keys[..num_to_store].iter().zip(account_refs) {
assert_eq!(accounts.load_slow(&ancestors, key).unwrap().0, account_ref);
}
}
}

View File

@ -7732,6 +7732,7 @@ mod tests {
#[test]
fn test_status_cache_ancestors() {
solana_logger::setup();
let (genesis_config, _mint_keypair) = create_genesis_config(500);
let parent = Arc::new(Bank::new(&genesis_config));
let bank1 = Arc::new(new_from_parent(&parent));

View File

@ -275,9 +275,9 @@ where
for (slot, entries) in storage.into_iter() {
let sub_map = map.entry(slot).or_insert_with(HashMap::new);
for entry in entries.into_iter() {
let mut entry: AccountStorageEntry = entry.into();
entry.slot = slot;
sub_map.insert(entry.id, Arc::new(entry));
let entry: AccountStorageEntry = entry.into();
entry.slot.store(slot, Ordering::Relaxed);
sub_map.insert(entry.append_vec_id(), Arc::new(entry));
}
}
map
@ -306,7 +306,8 @@ where
// Move the corresponding AppendVec from the snapshot into the directory pointed
// at by `local_dir`
let append_vec_relative_path = AppendVec::new_relative_path(slot, storage_entry.id);
let append_vec_relative_path =
AppendVec::new_relative_path(slot, storage_entry.append_vec_id());
let append_vec_abs_path = stream_append_vecs_path
.as_ref()
.join(&append_vec_relative_path);

View File

@ -18,7 +18,7 @@ impl solana_frozen_abi::abi_example::IgnoreAsHelper for SerializableAccountStora
impl From<&AccountStorageEntry> for SerializableAccountStorageEntry {
fn from(rhs: &AccountStorageEntry) -> Self {
Self {
id: rhs.id,
id: rhs.append_vec_id(),
accounts_current_len: rhs.accounts.len(),
}
}
@ -235,7 +235,7 @@ impl<'a> TypeContext<'a> for Context {
serialize_iter_as_map(serializable_db.account_storage_entries.iter().map(|x| {
*entry_count.borrow_mut() += x.len();
(
x.first().unwrap().slot,
x.first().unwrap().slot(),
serialize_iter_as_seq(
x.iter()
.map(|x| Self::SerializableAccountStorageEntry::from(x.as_ref())),

View File

@ -27,11 +27,10 @@ fn copy_append_vecs<P: AsRef<Path>>(
let storage_entries = accounts_db.get_snapshot_storages(Slot::max_value());
for storage in storage_entries.iter().flatten() {
let storage_path = storage.get_path();
let output_path = output_dir.as_ref().join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
let output_path = output_dir.as_ref().join(AppendVec::new_relative_path(
storage.slot(),
storage.append_vec_id(),
));
std::fs::copy(storage_path, output_path)?;
}

View File

@ -250,11 +250,11 @@ pub fn archive_snapshot_package(snapshot_package: &AccountsPackage) -> Result<()
for storage in snapshot_package.storages.iter().flatten() {
storage.flush()?;
let storage_path = storage.get_path();
let output_path = staging_accounts_dir.join(
storage_path
.file_name()
.expect("Invalid AppendVec file path"),
);
let output_path =
staging_accounts_dir.join(crate::append_vec::AppendVec::new_relative_path(
storage.slot(),
storage.append_vec_id(),
));
// `storage_path` - The file path where the AppendVec itself is located
// `output_path` - The directory where the AppendVec will be placed in the staging directory.