store_accounts to use try_available (#4523)

* store_accounts to use try_available

* tighter

* clippy
This commit is contained in:
Rob Walker 2019-06-04 11:21:12 -07:00 committed by GitHub
parent 3635a68129
commit e7129757c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 103 additions and 53 deletions

View File

@ -133,8 +133,9 @@ impl<'de> Deserialize<'de> for AccountStorage {
#[derive(Debug, PartialEq, Copy, Clone, Deserialize, Serialize)]
pub enum AccountStorageStatus {
StorageAvailable = 0,
StorageFull = 1,
Available = 0,
Full = 1,
Candidate = 2,
}
/// Persistent storage structure holding the accounts
@ -167,7 +168,7 @@ impl AccountStorageEntry {
id,
fork_id,
accounts,
count_and_status: RwLock::new((0, AccountStorageStatus::StorageAvailable)),
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
}
}
@ -176,7 +177,7 @@ impl AccountStorageEntry {
let count = count_and_status.0;
if status == AccountStorageStatus::StorageFull && count == 0 {
if status == AccountStorageStatus::Full && count == 0 {
// this case arises when the append_vec is full (store_ptrs fails),
// but all accounts have already been removed from the storage
//
@ -186,7 +187,7 @@ impl AccountStorageEntry {
// the append_vec has previously been completely full
//
self.accounts.reset();
status = AccountStorageStatus::StorageAvailable;
status = AccountStorageStatus::Available;
}
*count_and_status = (count, status);
@ -205,11 +206,23 @@ impl AccountStorageEntry {
*count_and_status = (count_and_status.0 + 1, count_and_status.1);
}
fn try_available(&self) -> bool {
let mut count_and_status = self.count_and_status.write().unwrap();
let (count, status) = *count_and_status;
if status == AccountStorageStatus::Available {
*count_and_status = (count, AccountStorageStatus::Candidate);
true
} else {
false
}
}
fn remove_account(&self) -> usize {
let mut count_and_status = self.count_and_status.write().unwrap();
let (count, mut status) = *count_and_status;
if count == 1 && status == AccountStorageStatus::StorageFull {
if count == 1 && status == AccountStorageStatus::Full {
// this case arises when we remove the last account from the
// storage, but we've learned from previous write attempts that
// the storage is full
@ -222,7 +235,7 @@ impl AccountStorageEntry {
// otherwise, the storage may be in flight with a store()
// call
self.accounts.reset();
status = AccountStorageStatus::StorageAvailable;
status = AccountStorageStatus::Available;
}
*count_and_status = (count - 1, status);
@ -378,35 +391,34 @@ impl AccountsDB {
Self::load(&storage, ancestors, &accounts_index, pubkey)
}
fn fork_storage(&self, fork_id: Fork) -> Arc<AccountStorageEntry> {
let mut candidates: Vec<Arc<AccountStorageEntry>> = {
let stores = self.storage.read().unwrap();
let fork_stores = stores.0.get(&fork_id);
if let Some(fork_stores) = fork_stores {
fork_stores
.values()
.filter_map(|x| {
if x.status() == AccountStorageStatus::StorageAvailable {
Some(x.clone())
} else {
None
}
})
.collect()
} else {
vec![]
fn find_storage_candidate(&self, fork_id: Fork) -> Arc<AccountStorageEntry> {
let stores = self.storage.read().unwrap();
if let Some(fork_stores) = stores.0.get(&fork_id) {
if !fork_stores.is_empty() {
// pick an available store at random by iterating from a random point
let to_skip = thread_rng().gen_range(0, stores.0.len());
for (i, store) in fork_stores.values().cycle().skip(to_skip).enumerate() {
if store.try_available() {
return store.clone();
}
// looked at every store, bail...
if i == fork_stores.len() {
break;
}
}
}
};
if candidates.is_empty() {
let mut stores = self.storage.write().unwrap();
let path_index = thread_rng().gen_range(0, self.paths.len());
let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index]));
let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new);
fork_storage.insert(storage.id, storage.clone());
candidates.push(storage);
}
let rv = thread_rng().gen_range(0, candidates.len());
candidates[rv].clone()
drop(stores);
let mut stores = self.storage.write().unwrap();
let path_index = thread_rng().gen_range(0, self.paths.len());
let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new);
let store = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index]));
store.try_available();
fork_storage.insert(store.id, store.clone());
store
}
pub fn purge_fork(&self, fork: Fork) {
@ -437,10 +449,11 @@ impl AccountsDB {
.collect();
let mut infos: Vec<AccountInfo> = vec![];
while infos.len() < with_meta.len() {
let storage = self.fork_storage(fork_id);
let storage = self.find_storage_candidate(fork_id);
let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]);
if rvs.is_empty() {
storage.set_status(AccountStorageStatus::StorageFull);
storage.set_status(AccountStorageStatus::Full);
continue;
}
for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) {
storage.add_account();
@ -450,6 +463,8 @@ impl AccountsDB {
lamports: account.lamports,
});
}
// restore the state to available
storage.set_status(AccountStorageStatus::Available);
}
infos
}
@ -955,10 +970,7 @@ mod tests {
fn check_storage(accounts: &AccountsDB, count: usize) -> bool {
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0[&0].len(), 1);
assert_eq!(
stores.0[&0][&0].status(),
AccountStorageStatus::StorageAvailable
);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available);
stores.0[&0][&0].count() == count
}
@ -1064,10 +1076,7 @@ mod tests {
let paths = get_tmp_accounts_path!();
let accounts = AccountsDB::new(&paths.paths);
let count = [0, 1];
let status = [
AccountStorageStatus::StorageAvailable,
AccountStorageStatus::StorageFull,
];
let status = [AccountStorageStatus::Available, AccountStorageStatus::Full];
let pubkey1 = Pubkey::new_rand();
let account1 = Account::new(1, ACCOUNT_DATA_FILE_SIZE as usize / 2, &pubkey1);
accounts.store(0, &[(&pubkey1, &account1)]);
@ -1075,10 +1084,7 @@ mod tests {
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0][&0].count(), 1);
assert_eq!(
stores.0[&0][&0].status(),
AccountStorageStatus::StorageAvailable
);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Available);
}
let pubkey2 = Pubkey::new_rand();
@ -1089,12 +1095,9 @@ mod tests {
assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].len(), 2);
assert_eq!(stores.0[&0][&0].count(), 1);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::StorageFull);
assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::Full);
assert_eq!(stores.0[&0][&1].count(), 1);
assert_eq!(
stores.0[&0][&1].status(),
AccountStorageStatus::StorageAvailable
);
assert_eq!(stores.0[&0][&1].status(), AccountStorageStatus::Available);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(
@ -1222,4 +1225,51 @@ mod tests {
check_accounts(&daccounts, &pubkeys, 0, 100, 2);
check_accounts(&daccounts, &pubkeys1, 1, 10, 1);
}
#[test]
#[ignore]
fn test_store_account_stress() {
let fork_id = 42;
let num_threads = 2;
let paths = get_tmp_accounts_path!();
let min_file_bytes = std::mem::size_of::<StorageMeta>()
+ std::mem::size_of::<crate::append_vec::AccountBalance>();
let db = Arc::new(AccountsDB::new_with_file_size(
&paths.paths,
min_file_bytes as u64,
));
db.add_root(fork_id);
let thread_hdls: Vec<_> = (0..num_threads)
.into_iter()
.map(|_| {
let db = db.clone();
std::thread::Builder::new()
.name("account-writers".to_string())
.spawn(move || {
let pubkey = Pubkey::new_rand();
let mut account = Account::new(1, 0, &pubkey);
let mut i = 0;
loop {
let account_bal = thread_rng().gen_range(1, 99);
account.lamports = account_bal;
db.store(fork_id, &[(&pubkey, &account)]);
let (account, fork) = db.load_slow(&HashMap::new(), &pubkey).expect(
&format!("Could not fetch stored account {}, iter {}", pubkey, i),
);
assert_eq!(fork, fork_id);
assert_eq!(account.lamports, account_bal);
i += 1;
}
})
.unwrap()
})
.collect();
for t in thread_hdls {
t.join().unwrap();
}
}
}