fix accounts_db storage.reset() (#4094)

* fix accounts_db storage.reset()

* fix compilation errors, remove unused, fix test_accounts_grow() failure
This commit is contained in:
Rob Walker 2019-05-01 09:27:13 -07:00 committed by GitHub
parent ad27c30623
commit cb528af4e2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 62 deletions

View File

@ -263,7 +263,7 @@ mod tests {
use super::*;
use crate::blocktree::create_new_tmp_ledger;
use crate::blocktree::tests::entries_to_blobs;
use crate::entry::{create_ticks, next_entry, Entry};
use crate::entry::{create_ticks, next_entry, next_entry_mut, Entry};
use solana_sdk::genesis_block::GenesisBlock;
use solana_sdk::hash::Hash;
use solana_sdk::instruction::InstructionError;
@ -1036,4 +1036,77 @@ mod tests {
// Should not see duplicate signature error
assert_eq!(bank.process_transaction(&fail_tx), Ok(()));
}
#[test]
#[ignore]
fn test_process_entries_stress() {
// this test throws lots of rayon threads at process_entries()
// finds bugs in very low-layer stuff
solana_logger::setup();
let (genesis_block, mint_keypair) = GenesisBlock::new(1_000_000_000);
let mut bank = Bank::new(&genesis_block);
const NUM_TRANSFERS: usize = 100;
let keypairs: Vec<_> = (0..NUM_TRANSFERS * 2).map(|_| Keypair::new()).collect();
// give everybody one lamport
for keypair in &keypairs {
bank.transfer(1, &mint_keypair, &keypair.pubkey())
.expect("funding failed");
}
let mut i = 0;
let mut hash = bank.last_blockhash();
loop {
let entries: Vec<_> = (0..NUM_TRANSFERS)
.map(|i| {
next_entry_mut(
&mut hash,
0,
vec![system_transaction::transfer(
&keypairs[i],
&keypairs[i + NUM_TRANSFERS].pubkey(),
1,
bank.last_blockhash(),
0,
)],
)
})
.collect();
info!("paying iteration {}", i);
process_entries(&bank, &entries).expect("paying failed");
let entries: Vec<_> = (0..NUM_TRANSFERS)
.map(|i| {
next_entry_mut(
&mut hash,
0,
vec![system_transaction::transfer(
&keypairs[i + NUM_TRANSFERS],
&keypairs[i].pubkey(),
1,
bank.last_blockhash(),
0,
)],
)
})
.collect();
info!("refunding iteration {}", i);
process_entries(&bank, &entries).expect("refunding failed");
// advance to next block
process_entries(
&bank,
&(0..bank.ticks_per_slot())
.map(|_| next_entry_mut(&mut hash, 1, vec![]))
.collect::<Vec<_>>(),
)
.expect("process ticks failed");
i += 1;
bank = Bank::new_from_parent(&Arc::new(bank), &Pubkey::default(), i as u64);
bank.squash();
}
}
}

View File

@ -67,23 +67,12 @@ pub type AccountStorage = HashMap<usize, Arc<AccountStorageEntry>>;
pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
#[derive(Debug, PartialEq)]
#[derive(Debug, PartialEq, Clone, Copy)]
pub enum AccountStorageStatus {
StorageAvailable = 0,
StorageFull = 1,
}
impl From<usize> for AccountStorageStatus {
fn from(status: usize) -> Self {
use self::AccountStorageStatus::*;
match status {
0 => StorageAvailable,
1 => StorageFull,
_ => unreachable!(),
}
}
}
/// Persistent storage structure holding the accounts
pub struct AccountStorageEntry {
id: AppendVecId,
@ -94,12 +83,11 @@ pub struct AccountStorageEntry {
accounts: AppendVec,
/// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have
/// any accounts in it.
count: AtomicUsize,
/// status corresponding to the storage
status: AtomicUsize,
/// This is periodically checked to reuse the stores that do not have
/// any accounts in it
/// status corresponding to the storage, lets us know that
/// the append_vec, once maxed out, then emptied, can be reclaimed
count_and_status: RwLock<(usize, AccountStorageStatus)>,
}
impl AccountStorageEntry {
@ -114,28 +102,65 @@ impl AccountStorageEntry {
id,
fork_id,
accounts,
count: AtomicUsize::new(0),
status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
count_and_status: RwLock::new((0, AccountStorageStatus::StorageAvailable)),
}
}
pub fn set_status(&self, status: AccountStorageStatus) {
self.status.store(status as usize, Ordering::Relaxed);
pub fn set_status(&self, mut status: AccountStorageStatus) {
let mut count_and_status = self.count_and_status.write().unwrap();
let count = count_and_status.0;
if status == AccountStorageStatus::StorageFull && count == 0 {
// this case arises when the append_vec is full (store_ptrs fails),
// but all accounts have already been removed from the storage
//
// the only time it's safe to call reset() on an append_vec is when
// every account has been removed
// **and**
// the append_vec has previously been completely full
//
self.accounts.reset();
status = AccountStorageStatus::StorageAvailable;
}
*count_and_status = (count, status);
}
pub fn get_status(&self) -> AccountStorageStatus {
self.status.load(Ordering::Relaxed).into()
pub fn status(&self) -> AccountStorageStatus {
self.count_and_status.read().unwrap().1
}
pub fn count(&self) -> usize {
self.count_and_status.read().unwrap().0
}
fn add_account(&self) {
self.count.fetch_add(1, Ordering::Relaxed);
let mut count_and_status = self.count_and_status.write().unwrap();
*count_and_status = (count_and_status.0 + 1, count_and_status.1);
}
fn remove_account(&self) {
if self.count.fetch_sub(1, Ordering::Relaxed) == 1 {
let mut count_and_status = self.count_and_status.write().unwrap();
let (count, mut status) = *count_and_status;
if count == 1 && status == AccountStorageStatus::StorageFull {
// this case arises when we remove the last account from the
// storage, but we've learned from previous write attempts that
// the storage is full
//
// the only time it's safe to call reset() on an append_vec is when
// every account has been removed
// **and**
// the append_vec has previously been completely full
//
// otherwise, the storage may be in flight with a store()
// call
self.accounts.reset();
self.set_status(AccountStorageStatus::StorageAvailable);
status = AccountStorageStatus::StorageAvailable;
}
*count_and_status = (count - 1, status);
}
}
@ -193,7 +218,7 @@ impl AccountsDB {
pub fn has_accounts(&self, fork: Fork) -> bool {
for x in self.storage.read().unwrap().values() {
if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 {
if x.fork_id == fork && x.count() > 0 {
return true;
}
}
@ -254,8 +279,7 @@ impl AccountsDB {
stores
.values()
.filter_map(|x| {
if x.get_status() == AccountStorageStatus::StorageAvailable
&& x.fork_id == fork_id
if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id
{
Some(x.clone())
} else {
@ -354,7 +378,7 @@ impl AccountsDB {
let dead_forks: HashSet<Fork> = storage
.values()
.filter_map(|x| {
if x.count.load(Ordering::Relaxed) == 0 {
if x.count() == 0 {
Some(x.fork_id)
} else {
None
@ -363,13 +387,7 @@ impl AccountsDB {
.collect();
let live_forks: HashSet<Fork> = storage
.values()
.filter_map(|x| {
if x.count.load(Ordering::Relaxed) > 0 {
Some(x.fork_id)
} else {
None
}
})
.filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None })
.collect();
dead_forks.difference(&live_forks).cloned().collect()
}
@ -603,15 +621,15 @@ mod tests {
{
let stores = db.storage.read().unwrap();
assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2);
assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2);
assert_eq!(stores[&0].count(), 2);
assert_eq!(stores[&1].count(), 2);
}
db.add_root(1);
{
let stores = db.storage.read().unwrap();
assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2);
assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2);
assert_eq!(stores[&0].count(), 2);
assert_eq!(stores[&1].count(), 2);
}
}
@ -685,11 +703,8 @@ mod tests {
fn check_storage(accounts: &AccountsDB, count: usize) -> bool {
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 1);
assert_eq!(
stores[&0].get_status(),
AccountStorageStatus::StorageAvailable
);
stores[&0].count.load(Ordering::Relaxed) == count
assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable);
stores[&0].count() == count
}
fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec<Pubkey>, fork: Fork) {
@ -779,8 +794,8 @@ mod tests {
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 1);
assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&0].get_status(), status[0]);
assert_eq!(stores[&0].count(), 1);
assert_eq!(stores[&0].status(), AccountStorageStatus::StorageAvailable);
}
let pubkey2 = Pubkey::new_rand();
@ -789,27 +804,28 @@ mod tests {
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&0].get_status(), status[1]);
assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&1].get_status(), status[0]);
assert_eq!(stores[&0].count(), 1);
assert_eq!(stores[&0].status(), AccountStorageStatus::StorageFull);
assert_eq!(stores[&1].count(), 1);
assert_eq!(stores[&1].status(), AccountStorageStatus::StorageAvailable);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1);
assert_eq!(accounts.load_slow(&ancestors, &pubkey2).unwrap(), account2);
// lots of stores, but 3 storages should be enough for everything
for i in 0..25 {
let index = i % 2;
accounts.store(0, &[(&pubkey1, &account1)]);
{
let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 3);
assert_eq!(stores[&0].count.load(Ordering::Relaxed), count[index]);
assert_eq!(stores[&0].get_status(), status[0]);
assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&1].get_status(), status[1]);
assert_eq!(stores[&2].count.load(Ordering::Relaxed), count[index ^ 1]);
assert_eq!(stores[&2].get_status(), status[0]);
assert_eq!(stores[&0].count(), count[index]);
assert_eq!(stores[&0].status(), status[0]);
assert_eq!(stores[&1].count(), 1);
assert_eq!(stores[&1].status(), status[1]);
assert_eq!(stores[&2].count(), count[index ^ 1]);
assert_eq!(stores[&2].status(), status[0]);
}
let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1);

View File

@ -74,7 +74,7 @@ impl AppendVec {
.open(file)
.expect("Unable to open data file");
data.seek(SeekFrom::Start(size as u64)).unwrap();
data.seek(SeekFrom::Start((size - 1) as u64)).unwrap();
data.write_all(&[0]).unwrap();
data.seek(SeekFrom::Start(0)).unwrap();
data.flush().unwrap();
@ -153,7 +153,7 @@ impl AppendVec {
end += val.1;
}
if (self.file_size as usize) <= end {
if (self.file_size as usize) < end {
return None;
}