diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 94ded2984d..cac98e4b1e 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -133,8 +133,9 @@ impl<'de> Deserialize<'de> for AccountStorage { #[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)] pub enum AccountStorageStatus { - StorageAvailable = 0, - StorageFull = 1, + Available = 0, + Full = 1, + Candidate = 2, } /// Persistent storage structure holding the accounts @@ -167,7 +168,7 @@ impl AccountStorageEntry { id, fork_id, accounts, - count_and_status: RwLock::new((0, AccountStorageStatus::StorageAvailable)), + count_and_status: RwLock::new((0, AccountStorageStatus::Available)), } } @@ -176,7 +177,7 @@ impl AccountStorageEntry { let count = count_and_status.0; - if status == AccountStorageStatus::StorageFull && count == 0 { + if status == AccountStorageStatus::Full && count == 0 { // this case arises when the append_vec is full (store_ptrs fails), // but all accounts have already been removed from the storage // @@ -186,7 +187,7 @@ impl AccountStorageEntry { // the append_vec has previously been completely full // self.accounts.reset(); - status = AccountStorageStatus::StorageAvailable; + status = AccountStorageStatus::Available; } *count_and_status = (count, status); @@ -205,11 +206,23 @@ impl AccountStorageEntry { *count_and_status = (count_and_status.0 + 1, count_and_status.1); } + fn try_available(&self) -> bool { + let mut count_and_status = self.count_and_status.write().unwrap(); + let (count, status) = *count_and_status; + + if status == AccountStorageStatus::Available { + *count_and_status = (count, AccountStorageStatus::Candidate); + true + } else { + false + } + } + fn remove_account(&self) -> usize { let mut count_and_status = self.count_and_status.write().unwrap(); let (count, mut status) = *count_and_status; - if count == 1 && status == AccountStorageStatus::StorageFull { + if count == 1 && status == AccountStorageStatus::Full { // this case arises when we remove the last account from the // storage, but we've learned from previous write attempts that // the storage is full @@ -222,7 +235,7 @@ impl AccountStorageEntry { // otherwise, the storage may be in flight with a store() // call self.accounts.reset(); - status = AccountStorageStatus::StorageAvailable; + status = AccountStorageStatus::Available; } *count_and_status = (count - 1, status); @@ -378,35 +391,34 @@ impl AccountsDB { Self::load(&storage, ancestors, &accounts_index, pubkey) } - fn fork_storage(&self, fork_id: Fork) -> Arc { - let mut candidates: Vec> = { - let stores = self.storage.read().unwrap(); - let fork_stores = stores.0.get(&fork_id); - if let Some(fork_stores) = fork_stores { - fork_stores - .values() - .filter_map(|x| { - if x.status() == AccountStorageStatus::StorageAvailable { - Some(x.clone()) - } else { - None - } - }) - .collect() - } else { - vec![] + fn find_storage_candidate(&self, fork_id: Fork) -> Arc { + let stores = self.storage.read().unwrap(); + + if let Some(fork_stores) = stores.0.get(&fork_id) { + if !fork_stores.is_empty() { + // pick an available store at random by iterating from a random point + let to_skip = thread_rng().gen_range(0, stores.0.len()); + + for (i, store) in fork_stores.values().cycle().skip(to_skip).enumerate() { + if store.try_available() { + return store.clone(); + } + // looked at every store, bail... + if i == fork_stores.len() { + break; + } + } } - }; - if candidates.is_empty() { - let mut stores = self.storage.write().unwrap(); - let path_index = thread_rng().gen_range(0, self.paths.len()); - let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index])); - let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new); - fork_storage.insert(storage.id, storage.clone()); - candidates.push(storage); } - let rv = thread_rng().gen_range(0, candidates.len()); - candidates[rv].clone() + drop(stores); + + let mut stores = self.storage.write().unwrap(); + let path_index = thread_rng().gen_range(0, self.paths.len()); + let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new); + let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index])); + store.try_available(); + fork_storage.insert(store.id, store.clone()); + store } pub fn purge_fork(&self, fork: Fork) { @@ -437,10 +449,11 @@ impl AccountsDB { .collect(); let mut infos: Vec = vec![]; while infos.len() < with_meta.len() { - let storage = self.fork_storage(fork_id); + let storage = self.find_storage_candidate(fork_id); let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]); if rvs.is_empty() { - storage.set_status(AccountStorageStatus::StorageFull); + storage.set_status(AccountStorageStatus::Full); + continue; } for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) { storage.add_account(); @@ -450,6 +463,8 @@ impl AccountsDB { lamports: account.lamports, }); } + // restore the state to available + storage.set_status(AccountStorageStatus::Available); } infos } @@ -955,10 +970,7 @@ mod tests { fn check_storage(accounts: &AccountsDB, count: usize) -> bool { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.0[&0].len(), 1); - assert_eq!( - stores.0[&0][&0].status(), - AccountStorageStatus::StorageAvailable - ); + assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available); stores.0[&0][&0].count() == count } @@ -1064,10 +1076,7 @@ mod tests { let paths = get_tmp_accounts_path!(); let accounts = AccountsDB::new(&paths.paths); let count = [0, 1]; - let status = [ - AccountStorageStatus::StorageAvailable, - AccountStorageStatus::StorageFull, - ]; + let status = [AccountStorageStatus::Available, AccountStorageStatus::Full]; let pubkey1 = Pubkey::new_rand(); let account1 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, &pubkey1); accounts.store(0, &[(&pubkey1, &account1)]); @@ -1075,10 +1084,7 @@ mod tests { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.0.len(), 1); assert_eq!(stores.0[&0][&0].count(), 1); - assert_eq!( - stores.0[&0][&0].status(), - AccountStorageStatus::StorageAvailable - ); + assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available); } let pubkey2 = Pubkey::new_rand(); @@ -1089,12 +1095,9 @@ mod tests { assert_eq!(stores.0.len(), 1); assert_eq!(stores.0[&0].len(), 2); assert_eq!(stores.0[&0][&0].count(), 1); - assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::StorageFull); + assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Full); assert_eq!(stores.0[&0][&1].count(), 1); - assert_eq!( - stores.0[&0][&1].status(), - AccountStorageStatus::StorageAvailable - ); + assert_eq!(stores.0[&0][&1].status(), AccountStorageStatus::Available); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -1222,4 +1225,51 @@ mod tests { check_accounts(&daccounts, &pubkeys, 0, 100, 2); check_accounts(&daccounts, &pubkeys1, 1, 10, 1); } + + #[test] + #[ignore] + fn test_store_account_stress() { + let fork_id = 42; + let num_threads = 2; + let paths = get_tmp_accounts_path!(); + + let min_file_bytes = std::mem::size_of::() + + std::mem::size_of::(); + + let db = Arc::new(AccountsDB::new_with_file_size( + &paths.paths, + min_file_bytes as u64, + )); + + db.add_root(fork_id); + let thread_hdls: Vec<_> = (0..num_threads) + .into_iter() + .map(|_| { + let db = db.clone(); + std::thread::Builder::new() + .name("account-writers".to_string()) + .spawn(move || { + let pubkey = Pubkey::new_rand(); + let mut account = Account::new(1, 0, &pubkey); + let mut i = 0; + loop { + let account_bal = thread_rng().gen_range(1, 99); + account.lamports = account_bal; + db.store(fork_id, &[(&pubkey, &account)]); + let (account, fork) = db.load_slow(&HashMap::new(), &pubkey).expect( + &format!("Could not fetch stored account {}, iter {}", pubkey, i), + ); + assert_eq!(fork, fork_id); + assert_eq!(account.lamports, account_bal); + i += 1; + } + }) + .unwrap() + }) + .collect(); + + for t in thread_hdls { + t.join().unwrap(); + } + } }