From dea663d50967e0efbb9397f76ec0206e52890055 Mon Sep 17 00:00:00 2001 From: sakridge Date: Mon, 3 Jun 2019 15:34:32 -0700 Subject: [PATCH] Storage arranged by fork (#4518) --- runtime/src/accounts_db.rs | 214 +++++++++++++++++++++---------------- 1 file changed, 124 insertions(+), 90 deletions(-) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 7484b1dc70..94ded2984d 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -77,7 +77,7 @@ pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; #[derive(Default, Debug)] -pub struct AccountStorage(HashMap>); +pub struct AccountStorage(HashMap>>); struct AccountStorageVisitor; @@ -95,8 +95,12 @@ impl<'de> Visitor<'de> for AccountStorageVisitor { { let mut map = HashMap::new(); - while let Some((key, value)) = access.next_entry()? { - map.insert(key, Arc::new(value)); + while let Some((storage_id, storage_entry)) = access.next_entry()? { + let storage_entry: AccountStorageEntry = storage_entry; + let storage_fork_map = map + .entry(storage_entry.fork_id) + .or_insert_with(HashMap::new); + storage_fork_map.insert(storage_id, Arc::new(storage_entry)); } Ok(AccountStorage(map)) @@ -109,8 +113,10 @@ impl Serialize for AccountStorage { S: Serializer, { let mut map = serializer.serialize_map(Some(self.0.len()))?; - for (k, v) in &self.0 { - map.serialize_entry(k, &**v)?; + for fork_storage in self.0.values() { + for (storage_id, account_storage_entry) in fork_storage { + map.serialize_entry(storage_id, &**account_storage_entry)?; + } } map.end() } @@ -199,7 +205,7 @@ impl AccountStorageEntry { *count_and_status = (count_and_status.0 + 1, count_and_status.1); } - fn remove_account(&self) { + fn remove_account(&self) -> usize { let mut count_and_status = self.count_and_status.write().unwrap(); let (count, mut status) = *count_and_status; @@ -220,6 +226,7 @@ impl AccountStorageEntry { } *count_and_status = (count - 1, status); + count_and_status.0 } } @@ -300,9 +307,11 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - for x in self.storage.read().unwrap().0.values() { - if x.fork_id == fork && x.count() > 0 { - return true; + if let Some(storage_forks) = self.storage.read().unwrap().0.get(&fork) { + for x in storage_forks.values() { + if x.count() > 0 { + return true; + } } } false @@ -321,8 +330,9 @@ impl AccountsDB { .read() .unwrap() .0 + .get(&fork_id) + .unwrap_or(&HashMap::new()) .values() - .filter(|store| store.fork_id == fork_id) .cloned() .collect(); self.thread_pool.install(|| { @@ -348,11 +358,14 @@ impl AccountsDB { ) -> Option<(Account, Fork)> { let (info, fork) = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref - storage - .0 - .get(&info.id) - .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) - .map(|account| (account, fork)) + if let Some(fork_storage) = storage.0.get(&fork) { + fork_storage + .get(&info.id) + .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) + .map(|account| (account, fork)) + } else { + None + } } pub fn load_slow( @@ -368,24 +381,28 @@ impl AccountsDB { fn fork_storage(&self, fork_id: Fork) -> Arc { let mut candidates: Vec> = { let stores = self.storage.read().unwrap(); - stores - .0 - .values() - .filter_map(|x| { - if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id - { - Some(x.clone()) - } else { - None - } - }) - .collect() + let fork_stores = stores.0.get(&fork_id); + if let Some(fork_stores) = fork_stores { + fork_stores + .values() + .filter_map(|x| { + if x.status() == AccountStorageStatus::StorageAvailable { + Some(x.clone()) + } else { + None + } + }) + .collect() + } else { + vec![] + } }; if candidates.is_empty() { let mut stores = self.storage.write().unwrap(); let path_index = thread_rng().gen_range(0, self.paths.len()); let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index])); - stores.0.insert(storage.id, storage.clone()); + let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new); + fork_storage.insert(storage.id, storage.clone()); candidates.push(storage); } let rv = thread_rng().gen_range(0, candidates.len()); @@ -396,10 +413,7 @@ impl AccountsDB { //add_root should be called first let is_root = self.accounts_index.read().unwrap().is_root(fork); if !is_root { - self.storage.write().unwrap().0.retain(|_, v| { - trace!("PURGING {} {}", v.fork_id, fork); - v.fork_id != fork - }); + self.storage.write().unwrap().0.remove(&fork); } } @@ -457,34 +471,36 @@ impl AccountsDB { fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet { let storage = self.storage.read().unwrap(); + let mut dead_forks = HashSet::new(); for (fork_id, account_info) in reclaims { - if let Some(store) = storage.0.get(&account_info.id) { - assert_eq!( - fork_id, store.fork_id, - "AccountDB::accounts_index corrupted. Storage should only point to one fork" - ); - store.remove_account(); + if let Some(fork_storage) = storage.0.get(&fork_id) { + if let Some(store) = fork_storage.get(&account_info.id) { + assert_eq!( + fork_id, store.fork_id, + "AccountDB::accounts_index corrupted. Storage should only point to one fork" + ); + let count = store.remove_account(); + if count == 0 { + dead_forks.insert(fork_id); + } + } } } - //TODO: performance here could be improved if AccountsDB::storage was organized by fork - let dead_forks: HashSet = storage - .0 - .values() - .filter_map(|x| { - if x.count() == 0 { - Some(x.fork_id) - } else { - None + + dead_forks.retain(|fork| { + if let Some(fork_storage) = storage.0.get(&fork) { + for x in fork_storage.values() { + if x.count() != 0 { + return false; + } } - }) - .collect(); - let live_forks: HashSet = storage - .0 - .values() - .filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None }) - .collect(); - dead_forks.difference(&live_forks).cloned().collect() + } + true + }); + + dead_forks } + fn cleanup_dead_forks(&self, dead_forks: &mut HashSet) { let mut index = self.accounts_index.write().unwrap(); // a fork is not totally dead until it is older than the root @@ -527,14 +543,7 @@ impl AccountsDB { } fn generate_index(&mut self) { - let mut forks: Vec = self - .storage - .read() - .unwrap() - .0 - .values() - .map(|x| x.fork_id) - .collect(); + let mut forks: Vec = self.storage.read().unwrap().0.keys().cloned().collect(); forks.sort(); for fork_id in forks.iter() { @@ -617,7 +626,14 @@ impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor { let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; - let mut ids: Vec = storage.read().unwrap().0.keys().cloned().collect(); + let mut ids: Vec = storage + .read() + .unwrap() + .0 + .values() + .flat_map(HashMap::keys) + .cloned() + .collect(); ids.sort(); let mut accounts_db = AccountsDB { @@ -829,6 +845,7 @@ mod tests { #[test] fn test_accountsdb_count_stores() { + solana_logger::setup(); let paths = get_tmp_accounts_path!(); let db = AccountsDB::new(&paths.paths); @@ -849,16 +866,22 @@ mod tests { db.store(1, &[(&pubkeys[0], &account)]); { let stores = db.storage.read().unwrap(); - assert_eq!(stores.0.len(), 2); - assert_eq!(stores.0[&0].count(), 2); - assert_eq!(stores.0[&1].count(), 2); + let fork_0_stores = &stores.0.get(&0).unwrap(); + let fork_1_stores = &stores.0.get(&1).unwrap(); + assert_eq!(fork_0_stores.len(), 1); + assert_eq!(fork_1_stores.len(), 1); + assert_eq!(fork_0_stores[&0].count(), 2); + assert_eq!(fork_1_stores[&1].count(), 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); - assert_eq!(stores.0.len(), 2); - assert_eq!(stores.0[&0].count(), 2); - assert_eq!(stores.0[&1].count(), 2); + let fork_0_stores = &stores.0.get(&0).unwrap(); + let fork_1_stores = &stores.0.get(&1).unwrap(); + assert_eq!(fork_0_stores.len(), 1); + assert_eq!(fork_1_stores.len(), 1); + assert_eq!(fork_0_stores[&0].count(), 2); + assert_eq!(fork_1_stores[&1].count(), 2); } } @@ -931,12 +954,12 @@ mod tests { fn check_storage(accounts: &AccountsDB, count: usize) -> bool { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.0.len(), 1); + assert_eq!(stores.0[&0].len(), 1); assert_eq!( - stores.0[&0].status(), + stores.0[&0][&0].status(), AccountStorageStatus::StorageAvailable ); - stores.0[&0].count() == count + stores.0[&0][&0].count() == count } fn check_accounts( @@ -1021,7 +1044,14 @@ mod tests { } let mut append_vec_histogram = HashMap::new(); - for storage in accounts.storage.read().unwrap().0.values() { + for storage in accounts + .storage + .read() + .unwrap() + .0 + .values() + .flat_map(|x| x.values()) + { *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; } for count in append_vec_histogram.values() { @@ -1044,9 +1074,9 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.0.len(), 1); - assert_eq!(stores.0[&0].count(), 1); + assert_eq!(stores.0[&0][&0].count(), 1); assert_eq!( - stores.0[&0].status(), + stores.0[&0][&0].status(), AccountStorageStatus::StorageAvailable ); } @@ -1056,12 +1086,13 @@ mod tests { accounts.store(0, &[(&pubkey2, &account2)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.0.len(), 2); - assert_eq!(stores.0[&0].count(), 1); - assert_eq!(stores.0[&0].status(), AccountStorageStatus::StorageFull); - assert_eq!(stores.0[&1].count(), 1); + assert_eq!(stores.0.len(), 1); + assert_eq!(stores.0[&0].len(), 2); + assert_eq!(stores.0[&0][&0].count(), 1); + assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::StorageFull); + assert_eq!(stores.0[&0][&1].count(), 1); assert_eq!( - stores.0[&1].status(), + stores.0[&0][&1].status(), AccountStorageStatus::StorageAvailable ); } @@ -1081,13 +1112,14 @@ mod tests { accounts.store(0, &[(&pubkey1, &account1)]); { let stores = accounts.storage.read().unwrap(); - assert_eq!(stores.0.len(), 3); - assert_eq!(stores.0[&0].count(), count[index]); - assert_eq!(stores.0[&0].status(), status[0]); - assert_eq!(stores.0[&1].count(), 1); - assert_eq!(stores.0[&1].status(), status[1]); - assert_eq!(stores.0[&2].count(), count[index ^ 1]); - assert_eq!(stores.0[&2].status(), status[0]); + assert_eq!(stores.0.len(), 1); + assert_eq!(stores.0[&0].len(), 3); + assert_eq!(stores.0[&0][&0].count(), count[index]); + assert_eq!(stores.0[&0][&0].status(), status[0]); + assert_eq!(stores.0[&0][&1].count(), 1); + assert_eq!(stores.0[&0][&1].status(), status[1]); + assert_eq!(stores.0[&0][&2].count(), count[index ^ 1]); + assert_eq!(stores.0[&0][&2].status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!( @@ -1150,13 +1182,15 @@ mod tests { assert!(accounts.accounts_index.read().unwrap().is_purged(0)); //fork is still there, since gc is lazy - assert!(accounts.storage.read().unwrap().0.get(&info.id).is_some()); + assert!(accounts.storage.read().unwrap().0[&0] + .get(&info.id) + .is_some()); //store causes cleanup accounts.store(1, &[(&pubkey, &account)]); //fork is gone - assert!(accounts.storage.read().unwrap().0.get(&info.id).is_none()); + assert!(accounts.storage.read().unwrap().0.get(&0).is_none()); //new value is there let ancestors = vec![(1, 1)].into_iter().collect();