Storage arranged by fork (#4518)

This commit is contained in:
sakridge 2019-06-03 15:34:32 -07:00 committed by GitHub
parent 9754e551cb
commit dea663d509
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 124 additions and 90 deletions

View File

@ -77,7 +77,7 @@ pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>; pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
#[derive(Default, Debug)] #[derive(Default, Debug)]
pub struct AccountStorage(HashMap<usize, Arc<AccountStorageEntry>>); pub struct AccountStorage(HashMap<Fork, HashMap<usize, Arc<AccountStorageEntry>>>);
struct AccountStorageVisitor; struct AccountStorageVisitor;
@ -95,8 +95,12 @@ impl<'de> Visitor<'de> for AccountStorageVisitor {
{ {
let mut map = HashMap::new(); let mut map = HashMap::new();
while let Some((key, value)) = access.next_entry()? { while let Some((storage_id, storage_entry)) = access.next_entry()? {
map.insert(key, Arc::new(value)); let storage_entry: AccountStorageEntry = storage_entry;
let storage_fork_map = map
.entry(storage_entry.fork_id)
.or_insert_with(HashMap::new);
storage_fork_map.insert(storage_id, Arc::new(storage_entry));
} }
Ok(AccountStorage(map)) Ok(AccountStorage(map))
@ -109,8 +113,10 @@ impl Serialize for AccountStorage {
S: Serializer, S: Serializer,
{ {
let mut map = serializer.serialize_map(Some(self.0.len()))?; let mut map = serializer.serialize_map(Some(self.0.len()))?;
for (k, v) in &self.0 { for fork_storage in self.0.values() {
map.serialize_entry(k, &**v)?; for (storage_id, account_storage_entry) in fork_storage {
map.serialize_entry(storage_id, &**account_storage_entry)?;
}
} }
map.end() map.end()
} }
@ -199,7 +205,7 @@ impl AccountStorageEntry {
*count_and_status = (count_and_status.0 + 1, count_and_status.1); *count_and_status = (count_and_status.0 + 1, count_and_status.1);
} }
fn remove_account(&self) { fn remove_account(&self) -> usize {
let mut count_and_status = self.count_and_status.write().unwrap(); let mut count_and_status = self.count_and_status.write().unwrap();
let (count, mut status) = *count_and_status; let (count, mut status) = *count_and_status;
@ -220,6 +226,7 @@ impl AccountStorageEntry {
} }
*count_and_status = (count - 1, status); *count_and_status = (count - 1, status);
count_and_status.0
} }
} }
@ -300,9 +307,11 @@ impl AccountsDB {
} }
pub fn has_accounts(&self, fork: Fork) -> bool { pub fn has_accounts(&self, fork: Fork) -> bool {
for x in self.storage.read().unwrap().0.values() { if let Some(storage_forks) = self.storage.read().unwrap().0.get(&fork) {
if x.fork_id == fork && x.count() > 0 { for x in storage_forks.values() {
return true; if x.count() > 0 {
return true;
}
} }
} }
false false
@ -321,8 +330,9 @@ impl AccountsDB {
.read() .read()
.unwrap() .unwrap()
.0 .0
.get(&fork_id)
.unwrap_or(&HashMap::new())
.values() .values()
.filter(|store| store.fork_id == fork_id)
.cloned() .cloned()
.collect(); .collect();
self.thread_pool.install(|| { self.thread_pool.install(|| {
@ -348,11 +358,14 @@ impl AccountsDB {
) -> Option<(Account, Fork)> { ) -> Option<(Account, Fork)> {
let (info, fork) = accounts_index.get(pubkey, ancestors)?; let (info, fork) = accounts_index.get(pubkey, ancestors)?;
//TODO: thread this as a ref //TODO: thread this as a ref
storage if let Some(fork_storage) = storage.0.get(&fork) {
.0 fork_storage
.get(&info.id) .get(&info.id)
.and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
.map(|account| (account, fork)) .map(|account| (account, fork))
} else {
None
}
} }
pub fn load_slow( pub fn load_slow(
@ -368,24 +381,28 @@ impl AccountsDB {
fn fork_storage(&self, fork_id: Fork) -> Arc<AccountStorageEntry> { fn fork_storage(&self, fork_id: Fork) -> Arc<AccountStorageEntry> {
let mut candidates: Vec<Arc<AccountStorageEntry>> = { let mut candidates: Vec<Arc<AccountStorageEntry>> = {
let stores = self.storage.read().unwrap(); let stores = self.storage.read().unwrap();
stores let fork_stores = stores.0.get(&fork_id);
.0 if let Some(fork_stores) = fork_stores {
.values() fork_stores
.filter_map(|x| { .values()
if x.status() == AccountStorageStatus::StorageAvailable && x.fork_id == fork_id .filter_map(|x| {
{ if x.status() == AccountStorageStatus::StorageAvailable {
Some(x.clone()) Some(x.clone())
} else { } else {
None None
} }
}) })
.collect() .collect()
} else {
vec![]
}
}; };
if candidates.is_empty() { if candidates.is_empty() {
let mut stores = self.storage.write().unwrap(); let mut stores = self.storage.write().unwrap();
let path_index = thread_rng().gen_range(0, self.paths.len()); let path_index = thread_rng().gen_range(0, self.paths.len());
let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index])); let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_index]));
stores.0.insert(storage.id, storage.clone()); let fork_storage = stores.0.entry(fork_id).or_insert_with(HashMap::new);
fork_storage.insert(storage.id, storage.clone());
candidates.push(storage); candidates.push(storage);
} }
let rv = thread_rng().gen_range(0, candidates.len()); let rv = thread_rng().gen_range(0, candidates.len());
@ -396,10 +413,7 @@ impl AccountsDB {
//add_root should be called first //add_root should be called first
let is_root = self.accounts_index.read().unwrap().is_root(fork); let is_root = self.accounts_index.read().unwrap().is_root(fork);
if !is_root { if !is_root {
self.storage.write().unwrap().0.retain(|_, v| { self.storage.write().unwrap().0.remove(&fork);
trace!("PURGING {} {}", v.fork_id, fork);
v.fork_id != fork
});
} }
} }
@ -457,34 +471,36 @@ impl AccountsDB {
fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet<Fork> { fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet<Fork> {
let storage = self.storage.read().unwrap(); let storage = self.storage.read().unwrap();
let mut dead_forks = HashSet::new();
for (fork_id, account_info) in reclaims { for (fork_id, account_info) in reclaims {
if let Some(store) = storage.0.get(&account_info.id) { if let Some(fork_storage) = storage.0.get(&fork_id) {
assert_eq!( if let Some(store) = fork_storage.get(&account_info.id) {
fork_id, store.fork_id, assert_eq!(
"AccountDB::accounts_index corrupted. Storage should only point to one fork" fork_id, store.fork_id,
); "AccountDB::accounts_index corrupted. Storage should only point to one fork"
store.remove_account(); );
let count = store.remove_account();
if count == 0 {
dead_forks.insert(fork_id);
}
}
} }
} }
//TODO: performance here could be improved if AccountsDB::storage was organized by fork
let dead_forks: HashSet<Fork> = storage dead_forks.retain(|fork| {
.0 if let Some(fork_storage) = storage.0.get(&fork) {
.values() for x in fork_storage.values() {
.filter_map(|x| { if x.count() != 0 {
if x.count() == 0 { return false;
Some(x.fork_id) }
} else {
None
} }
}) }
.collect(); true
let live_forks: HashSet<Fork> = storage });
.0
.values() dead_forks
.filter_map(|x| if x.count() > 0 { Some(x.fork_id) } else { None })
.collect();
dead_forks.difference(&live_forks).cloned().collect()
} }
fn cleanup_dead_forks(&self, dead_forks: &mut HashSet<Fork>) { fn cleanup_dead_forks(&self, dead_forks: &mut HashSet<Fork>) {
let mut index = self.accounts_index.write().unwrap(); let mut index = self.accounts_index.write().unwrap();
// a fork is not totally dead until it is older than the root // a fork is not totally dead until it is older than the root
@ -527,14 +543,7 @@ impl AccountsDB {
} }
fn generate_index(&mut self) { fn generate_index(&mut self) {
let mut forks: Vec<Fork> = self let mut forks: Vec<Fork> = self.storage.read().unwrap().0.keys().cloned().collect();
.storage
.read()
.unwrap()
.0
.values()
.map(|x| x.fork_id)
.collect();
forks.sort(); forks.sort();
for fork_id in forks.iter() { for fork_id in forks.iter() {
@ -617,7 +626,14 @@ impl<'a> serde::de::Visitor<'a> for AccountsDBVisitor {
let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; let write_version: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?; let file_size: u64 = deserialize_from(&mut rd).map_err(Error::custom)?;
let mut ids: Vec<usize> = storage.read().unwrap().0.keys().cloned().collect(); let mut ids: Vec<usize> = storage
.read()
.unwrap()
.0
.values()
.flat_map(HashMap::keys)
.cloned()
.collect();
ids.sort(); ids.sort();
let mut accounts_db = AccountsDB { let mut accounts_db = AccountsDB {
@ -829,6 +845,7 @@ mod tests {
#[test] #[test]
fn test_accountsdb_count_stores() { fn test_accountsdb_count_stores() {
solana_logger::setup();
let paths = get_tmp_accounts_path!(); let paths = get_tmp_accounts_path!();
let db = AccountsDB::new(&paths.paths); let db = AccountsDB::new(&paths.paths);
@ -849,16 +866,22 @@ mod tests {
db.store(1, &[(&pubkeys[0], &account)]); db.store(1, &[(&pubkeys[0], &account)]);
{ {
let stores = db.storage.read().unwrap(); let stores = db.storage.read().unwrap();
assert_eq!(stores.0.len(), 2); let fork_0_stores = &stores.0.get(&0).unwrap();
assert_eq!(stores.0[&0].count(), 2); let fork_1_stores = &stores.0.get(&1).unwrap();
assert_eq!(stores.0[&1].count(), 2); assert_eq!(fork_0_stores.len(), 1);
assert_eq!(fork_1_stores.len(), 1);
assert_eq!(fork_0_stores[&0].count(), 2);
assert_eq!(fork_1_stores[&1].count(), 2);
} }
db.add_root(1); db.add_root(1);
{ {
let stores = db.storage.read().unwrap(); let stores = db.storage.read().unwrap();
assert_eq!(stores.0.len(), 2); let fork_0_stores = &stores.0.get(&0).unwrap();
assert_eq!(stores.0[&0].count(), 2); let fork_1_stores = &stores.0.get(&1).unwrap();
assert_eq!(stores.0[&1].count(), 2); assert_eq!(fork_0_stores.len(), 1);
assert_eq!(fork_1_stores.len(), 1);
assert_eq!(fork_0_stores[&0].count(), 2);
assert_eq!(fork_1_stores[&1].count(), 2);
} }
} }
@ -931,12 +954,12 @@ mod tests {
fn check_storage(accounts: &AccountsDB, count: usize) -> bool { fn check_storage(accounts: &AccountsDB, count: usize) -> bool {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 1); assert_eq!(stores.0[&0].len(), 1);
assert_eq!( assert_eq!(
stores.0[&0].status(), stores.0[&0][&0].status(),
AccountStorageStatus::StorageAvailable AccountStorageStatus::StorageAvailable
); );
stores.0[&0].count() == count stores.0[&0][&0].count() == count
} }
fn check_accounts( fn check_accounts(
@ -1021,7 +1044,14 @@ mod tests {
} }
let mut append_vec_histogram = HashMap::new(); let mut append_vec_histogram = HashMap::new();
for storage in accounts.storage.read().unwrap().0.values() { for storage in accounts
.storage
.read()
.unwrap()
.0
.values()
.flat_map(|x| x.values())
{
*append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1;
} }
for count in append_vec_histogram.values() { for count in append_vec_histogram.values() {
@ -1044,9 +1074,9 @@ mod tests {
{ {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 1); assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].count(), 1); assert_eq!(stores.0[&0][&0].count(), 1);
assert_eq!( assert_eq!(
stores.0[&0].status(), stores.0[&0][&0].status(),
AccountStorageStatus::StorageAvailable AccountStorageStatus::StorageAvailable
); );
} }
@ -1056,12 +1086,13 @@ mod tests {
accounts.store(0, &[(&pubkey2, &account2)]); accounts.store(0, &[(&pubkey2, &account2)]);
{ {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 2); assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].count(), 1); assert_eq!(stores.0[&0].len(), 2);
assert_eq!(stores.0[&0].status(), AccountStorageStatus::StorageFull); assert_eq!(stores.0[&0][&0].count(), 1);
assert_eq!(stores.0[&1].count(), 1); assert_eq!(stores.0[&0][&0].status(), AccountStorageStatus::StorageFull);
assert_eq!(stores.0[&0][&1].count(), 1);
assert_eq!( assert_eq!(
stores.0[&1].status(), stores.0[&0][&1].status(),
AccountStorageStatus::StorageAvailable AccountStorageStatus::StorageAvailable
); );
} }
@ -1081,13 +1112,14 @@ mod tests {
accounts.store(0, &[(&pubkey1, &account1)]); accounts.store(0, &[(&pubkey1, &account1)]);
{ {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.0.len(), 3); assert_eq!(stores.0.len(), 1);
assert_eq!(stores.0[&0].count(), count[index]); assert_eq!(stores.0[&0].len(), 3);
assert_eq!(stores.0[&0].status(), status[0]); assert_eq!(stores.0[&0][&0].count(), count[index]);
assert_eq!(stores.0[&1].count(), 1); assert_eq!(stores.0[&0][&0].status(), status[0]);
assert_eq!(stores.0[&1].status(), status[1]); assert_eq!(stores.0[&0][&1].count(), 1);
assert_eq!(stores.0[&2].count(), count[index ^ 1]); assert_eq!(stores.0[&0][&1].status(), status[1]);
assert_eq!(stores.0[&2].status(), status[0]); assert_eq!(stores.0[&0][&2].count(), count[index ^ 1]);
assert_eq!(stores.0[&0][&2].status(), status[0]);
} }
let ancestors = vec![(0, 0)].into_iter().collect(); let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!( assert_eq!(
@ -1150,13 +1182,15 @@ mod tests {
assert!(accounts.accounts_index.read().unwrap().is_purged(0)); assert!(accounts.accounts_index.read().unwrap().is_purged(0));
//fork is still there, since gc is lazy //fork is still there, since gc is lazy
assert!(accounts.storage.read().unwrap().0.get(&info.id).is_some()); assert!(accounts.storage.read().unwrap().0[&0]
.get(&info.id)
.is_some());
//store causes cleanup //store causes cleanup
accounts.store(1, &[(&pubkey, &account)]); accounts.store(1, &[(&pubkey, &account)]);
//fork is gone //fork is gone
assert!(accounts.storage.read().unwrap().0.get(&info.id).is_none()); assert!(accounts.storage.read().unwrap().0.get(&0).is_none());
//new value is there //new value is there
let ancestors = vec![(1, 1)].into_iter().collect(); let ancestors = vec![(1, 1)].into_iter().collect();