diff --git a/runtime/src/accounts.rs b/runtime/src/accounts.rs index e3d69f1c5d..a6af53560b 100644 --- a/runtime/src/accounts.rs +++ b/runtime/src/accounts.rs @@ -1,6 +1,6 @@ use crate::accounts_db::{ - get_paths_vec, AccountInfo, AccountStorageSlice, AccountsDB, ErrorCounters, - InstructionAccounts, InstructionLoaders, + get_paths_vec, AccountInfo, AccountStorage, AccountsDB, ErrorCounters, InstructionAccounts, + InstructionLoaders, }; use crate::accounts_index::{AccountsIndex, Fork}; use crate::append_vec::StoredAccount; @@ -140,7 +140,7 @@ impl Accounts { } fn load_tx_accounts( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, tx: &Transaction, @@ -180,7 +180,7 @@ impl Accounts { } fn load_executable_accounts( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, program_id: &Pubkey, @@ -222,7 +222,7 @@ impl Accounts { /// For each program_id in the transaction, load its loaders. fn load_loaders( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, tx: &Transaction, diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 5f4955d7c5..a3f120310b 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -63,8 +63,7 @@ pub struct AccountInfo { } /// An offset into the AccountsDB::storage vector type AppendVecId = usize; -type AccountStorage = Vec>; -pub type AccountStorageSlice = [Arc]; +pub type AccountStorage = HashMap>; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; @@ -87,6 +86,8 @@ impl From for AccountStorageStatus { /// Persistent storage structure holding the accounts pub struct AccountStorageEntry { + id: AppendVecId, + fork_id: Fork, /// storage holding the accounts @@ -110,6 +111,7 @@ impl AccountStorageEntry { let accounts = AppendVec::new(&path.join(ACCOUNT_DATA_FILE), true, file_size as usize); AccountStorageEntry { + id, fork_id, accounts, count: AtomicUsize::new(0), @@ -129,13 +131,10 @@ impl AccountStorageEntry { self.count.fetch_add(1, Ordering::Relaxed); } - fn remove_account(&self) -> bool { + fn remove_account(&self) { if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { self.accounts.reset(); self.set_status(AccountStorageStatus::StorageAvailable); - true - } else { - false } } } @@ -171,7 +170,7 @@ impl AccountsDB { let paths = get_paths_vec(&paths); AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), - storage: RwLock::new(vec![]), + storage: RwLock::new(HashMap::new()), next_id: AtomicUsize::new(0), write_version: AtomicUsize::new(0), paths, @@ -193,7 +192,7 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - for x in self.storage.read().unwrap().iter() { + for x in self.storage.read().unwrap().values() { if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 { return true; } @@ -213,7 +212,7 @@ impl AccountsDB { .storage .read() .unwrap() - .iter() + .values() .filter(|store| store.fork_id == fork_id) .cloned() .collect(); @@ -231,7 +230,7 @@ impl AccountsDB { } pub fn load( - storage: &AccountStorageSlice, + storage: &AccountStorage, ancestors: &HashMap, accounts_index: &AccountsIndex, pubkey: &Pubkey, @@ -239,7 +238,7 @@ impl AccountsDB { let info = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref storage - .get(info.id) + .get(&info.id) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) } @@ -249,82 +248,51 @@ impl AccountsDB { Self::load(&storage, ancestors, &accounts_index, pubkey) } - fn get_storage_id(&self, fork_id: Fork, start: usize, current: usize) -> usize { - let mut id = current; - let len: usize; - { - let stores = self.storage.read().unwrap(); - len = stores.len(); - if len > 0 { - if id == std::usize::MAX { - id = start % len; - if stores[id].get_status() == AccountStorageStatus::StorageAvailable { - return id; - } + fn get_exclusive_storage(&self, fork_id: Fork) -> Arc { + let mut stores = self.storage.write().unwrap(); + let mut candidates: Vec> = stores + .values() + .filter_map(|x| { + if x.get_status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id + { + Some(x.clone()) } else { - stores[id].set_status(AccountStorageStatus::StorageFull); + None } + }) + .collect(); + if candidates.is_empty() { + let path_idx = thread_rng().gen_range(0, self.paths.len()); + let storage = self.new_storage_entry(fork_id, &self.paths[path_idx]); + candidates.push(Arc::new(storage)); + } - loop { - id = (id + 1) % len; - if fork_id == stores[id].fork_id - && stores[id].get_status() == AccountStorageStatus::StorageAvailable - { - break; - } - if id == start % len { - break; - } - } - } - } - if len == 0 || id == start % len { - let mut stores = self.storage.write().unwrap(); - // check if new store was already created - if stores.len() == len { - let path_idx = thread_rng().gen_range(0, self.paths.len()); - let storage = self.new_storage_entry(fork_id, &self.paths[path_idx]); - stores.push(Arc::new(storage)); - } - id = stores.len() - 1; - } - id + let rv = thread_rng().gen_range(0, candidates.len()); + stores.remove(&candidates[rv].id); + candidates[rv].clone() } - fn append_account(&self, fork_id: Fork, pubkey: &Pubkey, account: &Account) -> (usize, usize) { - let offset: usize; - let start = self.next_id.fetch_add(1, Ordering::Relaxed); - let mut id = self.get_storage_id(fork_id, start, std::usize::MAX); - - // Even if no lamports, need to preserve the account owner so - // we can update the vote_accounts correctly if this account is purged - // when squashing. - let acc = &mut account.clone(); + fn append_account( + &self, + storage: &Arc, + pubkey: &Pubkey, + account: &Account, + ) -> Option { + let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; + let meta = StorageMeta { + write_version, + pubkey: *pubkey, + data_len: account.data.len() as u64, + }; if account.lamports == 0 { - acc.data.resize(0, 0); + // Even if no lamports, need to preserve the account owner so + // we can update the vote_accounts correctly as roots move forward + let account = &mut account.clone(); + account.data.resize(0, 0); + storage.accounts.append_account(meta, account) + } else { + storage.accounts.append_account(meta, account) } - - loop { - let result: Option; - { - let accounts = &self.storage.read().unwrap()[id]; - let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; - let meta = StorageMeta { - write_version, - pubkey: *pubkey, - data_len: account.data.len() as u64, - }; - result = accounts.accounts.append_account(meta, account); - accounts.add_account(); - } - if let Some(val) = result { - offset = val; - break; - } else { - id = self.get_storage_id(fork_id, start, id); - } - } - (id, offset) } pub fn purge_fork(&self, fork: Fork) { @@ -332,55 +300,107 @@ impl AccountsDB { let is_root = self.accounts_index.read().unwrap().is_root(fork); trace!("PURGING {} {}", fork, is_root); if !is_root { - self.storage.write().unwrap().retain(|x| { - trace!("PURGING {} {}", x.fork_id, fork); - x.fork_id != fork + self.storage.write().unwrap().retain(|_, v| { + trace!("PURGING {} {}", v.fork_id, fork); + v.fork_id != fork }); } } + fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec { + let mut storage = self.get_exclusive_storage(fork_id); + let mut infos = vec![]; + for (pubkey, account) in accounts { + loop { + let rv = self.append_account(&storage, pubkey, account); + if let Some(offset) = rv { + storage.add_account(); + infos.push(AccountInfo { + id: storage.id, + offset, + lamports: account.lamports, + }); + break; + } else { + storage.set_status(AccountStorageStatus::StorageFull); + self.storage.write().unwrap().insert(storage.id, storage); + storage = self.get_exclusive_storage(fork_id); + } + } + } + self.storage.write().unwrap().insert(storage.id, storage); + infos + } + + fn update_index( + &self, + fork_id: Fork, + infos: Vec, + accounts: &[(&Pubkey, &Account)], + ) -> Vec<(Fork, AccountInfo)> { + let mut index = self.accounts_index.write().unwrap(); + let mut reclaims = vec![]; + for (i, info) in infos.into_iter().enumerate() { + let key = &accounts[i].0; + reclaims.extend(index.insert(fork_id, key, info).into_iter()) + } + reclaims + } + + fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet { + let storage = self.storage.read().unwrap(); + for (fork_id, account_info) in reclaims { + if let Some(store) = storage.get(&account_info.id) { + assert_eq!( + fork_id, store.fork_id, + "AccountDB::accounts_index corrupted. Storage should only point to one fork" + ); + store.remove_account(); + } + } + //TODO: performance here could be improved if AccountsDB::storage was organized by fork + let dead_forks: HashSet = storage + .values() + .filter_map(|x| { + if x.count.load(Ordering::Relaxed) == 0 { + Some(x.fork_id) + } else { + None + } + }) + .collect(); + let live_forks: HashSet = storage + .values() + .filter_map(|x| { + if x.count.load(Ordering::Relaxed) > 0 { + Some(x.fork_id) + } else { + None + } + }) + .collect(); + dead_forks.difference(&live_forks).cloned().collect() + } + fn cleanup_dead_forks(&self, dead_forks: &mut HashSet) { + let mut index = self.accounts_index.write().unwrap(); + // a fork is not totally dead until it is older than the root + dead_forks.retain(|fork| *fork < index.last_root); + for fork in dead_forks.iter() { + index.cleanup_dead_fork(*fork); + } + } + /// Store the account update. pub fn store(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) { - //TODO; these blocks should be separate functions and unit tested - let infos: Vec<_> = accounts - .iter() - .map(|(pubkey, account)| { - let (id, offset) = self.append_account(fork_id, pubkey, account); - AccountInfo { - id, - offset, - lamports: account.lamports, - } - }) - .collect(); - - let reclaims: Vec<(Fork, AccountInfo)> = { - let mut index = self.accounts_index.write().unwrap(); - let mut reclaims = vec![]; - for (i, info) in infos.into_iter().enumerate() { - let key = &accounts[i].0; - reclaims.extend(index.insert(fork_id, key, info).into_iter()) - } - reclaims - }; - - let dead_forks: HashSet = { - let stores = self.storage.read().unwrap(); - let mut cleared_forks: HashSet = HashSet::new(); - for (fork_id, account_info) in reclaims { - let cleared = stores[account_info.id].remove_account(); - if cleared { - cleared_forks.insert(fork_id); - } - } - let live_forks: HashSet = stores.iter().map(|x| x.fork_id).collect(); - cleared_forks.difference(&live_forks).cloned().collect() - }; - { - let mut index = self.accounts_index.write().unwrap(); - for fork in dead_forks { - index.cleanup_dead_fork(fork); - } + let infos = self.store_accounts(fork_id, accounts); + let reclaims = self.update_index(fork_id, infos, accounts); + trace!("reclaim: {}", reclaims.len()); + let mut dead_forks = self.remove_dead_accounts(reclaims); + trace!("dead_forks: {}", dead_forks.len()); + self.cleanup_dead_forks(&mut dead_forks); + trace!("purge_forks: {}", dead_forks.len()); + for fork in dead_forks { + self.purge_fork(fork); } } @@ -569,7 +589,6 @@ mod tests { } #[test] - #[ignore] fn test_accountsdb_count_stores() { let paths = get_tmp_accounts_path!(); let db = AccountsDB::new(&paths.paths); @@ -592,15 +611,15 @@ mod tests { { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 2); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2); } } @@ -675,10 +694,10 @@ mod tests { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); assert_eq!( - stores[0].get_status(), + stores[&0].get_status(), AccountStorageStatus::StorageAvailable ); - stores[0].count.load(Ordering::Relaxed) == count + stores[&0].count.load(Ordering::Relaxed) == count } fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { @@ -745,7 +764,7 @@ mod tests { } let mut append_vec_histogram = HashMap::new(); - for storage in accounts.storage.read().unwrap().iter() { + for storage in accounts.storage.read().unwrap().values() { *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; } for count in append_vec_histogram.values() { @@ -754,7 +773,6 @@ mod tests { } #[test] - #[ignore] fn test_account_grow() { let paths = get_tmp_accounts_path!(); let accounts = AccountsDB::new(&paths.paths); @@ -769,8 +787,8 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[0].get_status(), status[0]); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&0].get_status(), status[0]); } let pubkey2 = Pubkey::new_rand(); @@ -779,10 +797,10 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[0].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[0].get_status(), status[1]); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[1].get_status(), status[0]); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&0].get_status(), status[1]); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&1].get_status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); @@ -794,12 +812,12 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 3); - assert_eq!(stores[0].count.load(Ordering::Relaxed), count[index]); - assert_eq!(stores[0].get_status(), status[0]); - assert_eq!(stores[1].count.load(Ordering::Relaxed), 1); - assert_eq!(stores[1].get_status(), status[1]); - assert_eq!(stores[2].count.load(Ordering::Relaxed), count[index ^ 1]); - assert_eq!(stores[2].get_status(), status[0]); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), count[index]); + assert_eq!(stores[&0].get_status(), status[0]); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&1].get_status(), status[1]); + assert_eq!(stores[&2].count.load(Ordering::Relaxed), count[index ^ 1]); + assert_eq!(stores[&2].get_status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); @@ -831,4 +849,41 @@ mod tests { assert!(accounts.load_slow(&ancestors, &pubkeys[0]).is_some()); } + #[test] + fn test_lazy_gc_fork() { + //This test is pedantic + //A fork is purged when a non root bank is cleaned up. If a fork is behind root but it is + //not root, it means we are retaining dead banks. + let paths = get_tmp_accounts_path!(); + let accounts = AccountsDB::new(&paths.paths); + let pubkey = Pubkey::new_rand(); + let account = Account::new(1, 0, &Account::default().owner); + //store an account + accounts.store(0, &[(&pubkey, &account)]); + let ancestors = vec![(0, 0)].into_iter().collect(); + let info = accounts + .accounts_index + .read() + .unwrap() + .get(&pubkey, &ancestors) + .unwrap() + .clone(); + //fork 0 is behind root, but it is not root, therefore it is purged + accounts.add_root(1); + assert!(accounts.accounts_index.read().unwrap().is_purged(0)); + + //fork is still there, since gc is lazy + assert!(accounts.storage.read().unwrap().get(&info.id).is_some()); + + //store causes cleanup + accounts.store(1, &[(&pubkey, &account)]); + + //fork is gone + assert!(accounts.storage.read().unwrap().get(&info.id).is_none()); + + //new value is there + let ancestors = vec![(1, 1)].into_iter().collect(); + assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some(account));; + } + } diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0ae3e16e7f..fae7fc1055 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -9,7 +9,7 @@ pub struct AccountsIndex { account_maps: HashMap>, roots: HashSet, //This value that needs to be stored to recover the index from AppendVec - last_root: Fork, + pub last_root: Fork, } impl AccountsIndex { @@ -59,7 +59,7 @@ impl AccountsIndex { }; rv } - fn is_purged(&self, fork: Fork) -> bool { + pub fn is_purged(&self, fork: Fork) -> bool { !self.is_root(fork) && fork < self.last_root } pub fn is_root(&self, fork: Fork) -> bool {