diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 67de70e135..0fd2ac6fa4 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -63,16 +63,27 @@ pub struct AccountInfo { } /// An offset into the AccountsDB::storage vector type AppendVecId = usize; -pub type AccountStorage = HashMap>>; +pub type AccountStorage = HashMap>; pub type InstructionAccounts = Vec; pub type InstructionLoaders = Vec>; -#[derive(Copy, Clone, Debug, PartialEq)] +#[derive(Debug, PartialEq)] pub enum AccountStorageStatus { StorageAvailable = 0, StorageFull = 1, } +impl From for AccountStorageStatus { + fn from(status: usize) -> Self { + use self::AccountStorageStatus::*; + match status { + 0 => StorageAvailable, + 1 => StorageFull, + _ => unreachable!(), + } + } +} + /// Persistent storage structure holding the accounts pub struct AccountStorageEntry { id: AppendVecId, @@ -85,10 +96,10 @@ pub struct AccountStorageEntry { /// Keeps track of the number of accounts stored in a specific AppendVec. /// This is periodically checked to reuse the stores that do not have /// any accounts in it. - count: usize, + count: AtomicUsize, /// status corresponding to the storage - status: AccountStorageStatus, + status: AtomicUsize, } impl AccountStorageEntry { @@ -103,26 +114,25 @@ impl AccountStorageEntry { id, fork_id, accounts, - count: 0, - status: AccountStorageStatus::StorageAvailable, + count: AtomicUsize::new(0), + status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize), } } - pub fn set_status(&mut self, status: AccountStorageStatus) { - self.status = status; + pub fn set_status(&self, status: AccountStorageStatus) { + self.status.store(status as usize, Ordering::Relaxed); } - pub fn status(&self) -> AccountStorageStatus { - self.status + pub fn get_status(&self) -> AccountStorageStatus { + self.status.load(Ordering::Relaxed).into() } - fn add_account(&mut self) { - self.count += 1; + fn add_account(&self) { + self.count.fetch_add(1, Ordering::Relaxed); } - fn remove_account(&mut self) { - self.count -= 1; - if self.count == 0 { + fn remove_account(&self) { + if self.count.fetch_sub(1, Ordering::Relaxed) == 1 { self.accounts.reset(); self.set_status(AccountStorageStatus::StorageAvailable); } @@ -182,9 +192,8 @@ impl AccountsDB { } pub fn has_accounts(&self, fork: Fork) -> bool { - for storage_entry in self.storage.read().unwrap().values() { - let storage_entry = storage_entry.read().unwrap(); - if storage_entry.fork_id == fork && storage_entry.count > 0 { + for x in self.storage.read().unwrap().values() { + if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 { return true; } } @@ -199,18 +208,17 @@ impl AccountsDB { F: Send + Sync, B: Send + Default, { - let storage_maps: Vec>> = self + let storage_maps: Vec> = self .storage .read() .unwrap() .values() - .filter(|store| store.read().unwrap().fork_id == fork_id) + .filter(|store| store.fork_id == fork_id) .cloned() .collect(); storage_maps .into_par_iter() .map(|storage| { - let storage = storage.read().unwrap(); let accounts = storage.accounts.accounts(0); let mut retval = B::default(); accounts @@ -229,17 +237,9 @@ impl AccountsDB { ) -> Option { let info = accounts_index.get(pubkey, ancestors)?; //TODO: thread this as a ref - storage.get(&info.id).and_then(|store| { - Some( - store - .read() - .unwrap() - .accounts - .get_account(info.offset)? - .0 - .clone_account(), - ) - }) + storage + .get(&info.id) + .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account())) } pub fn load_slow(&self, ancestors: &HashMap, pubkey: &Pubkey) -> Option { @@ -248,69 +248,31 @@ impl AccountsDB { Self::load(&storage, ancestors, &accounts_index, pubkey) } - fn with_exclusive_storage(&self, fork_id: Fork, mut access: F) -> bool - where - F: FnMut(&mut AccountStorageEntry) -> bool, - { - let entries: Vec<_> = self - .storage - .read() - .unwrap() - .values() - .filter_map(|entry| { - let res = { - let entry = entry.read().unwrap(); - entry.status() == AccountStorageStatus::StorageAvailable - && entry.fork_id == fork_id - }; - if res { - Some(entry.clone()) - } else { - None - } - }) - .collect(); - - for entry in entries { - if access(&mut entry.write().unwrap()) { - return true; - } - } - - let path_idx = thread_rng().gen_range(0, self.paths.len()); - let mut new_entry = self.new_storage_entry(fork_id, &self.paths[path_idx]); - - let rv = access(&mut new_entry); - - self.storage - .write() - .unwrap() - .insert(new_entry.id, Arc::new(RwLock::new(new_entry))); - - rv - } - - fn append_account( - &self, - storage: &AccountStorageEntry, - pubkey: &Pubkey, - account: &Account, - ) -> Option { - let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; - let meta = StorageMeta { - write_version, - pubkey: *pubkey, - data_len: account.data.len() as u64, + fn fork_storage(&self, fork_id: Fork) -> Arc { + let mut candidates: Vec> = { + let stores = self.storage.read().unwrap(); + stores + .values() + .filter_map(|x| { + if x.get_status() == AccountStorageStatus::StorageAvailable + && x.fork_id == fork_id + { + Some(x.clone()) + } else { + None + } + }) + .collect() }; - if account.lamports == 0 { - // Even if no lamports, need to preserve the account owner so - // we can update the vote_accounts correctly as roots move forward - let account = &mut account.clone(); - account.data.resize(0, 0); - storage.accounts.append_account(meta, account) - } else { - storage.accounts.append_account(meta, account) + if candidates.is_empty() { + let mut stores = self.storage.write().unwrap(); + let path_idx = thread_rng().gen_range(0, self.paths.len()); + let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_idx])); + stores.insert(storage.id, storage.clone()); + candidates.push(storage); } + let rv = thread_rng().gen_range(0, candidates.len()); + candidates[rv].clone() } pub fn purge_fork(&self, fork: Fork) { @@ -319,32 +281,46 @@ impl AccountsDB { trace!("PURGING {} {}", fork, is_root); if !is_root { self.storage.write().unwrap().retain(|_, v| { - trace!("PURGING {} {}", v.read().unwrap().fork_id, fork); - v.read().unwrap().fork_id != fork + trace!("PURGING {} {}", v.fork_id, fork); + v.fork_id != fork }); } } fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec { - let mut infos = vec![]; - - for (pubkey, account) in accounts { - self.with_exclusive_storage(fork_id, |storage| { - if let Some(offset) = self.append_account(&storage, pubkey, account) { - storage.add_account(); - infos.push(AccountInfo { - id: storage.id, - offset, - lamports: account.lamports, - }); - true + let with_meta: Vec<(StorageMeta, &Account)> = accounts + .iter() + .map(|(pubkey, account)| { + let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; + let data_len = if account.lamports == 0 { + 0 } else { - storage.set_status(AccountStorageStatus::StorageFull); - false - } - }); + account.data.len() as u64 + }; + let meta = StorageMeta { + write_version, + pubkey: **pubkey, + data_len, + }; + (meta, *account) + }) + .collect(); + let mut infos: Vec = vec![]; + while infos.len() < with_meta.len() { + let storage = self.fork_storage(fork_id); + let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]); + if rvs.is_empty() { + storage.set_status(AccountStorageStatus::StorageFull); + } + for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) { + storage.add_account(); + infos.push(AccountInfo { + id: storage.id, + offset: *offset, + lamports: account.lamports, + }); + } } - infos } @@ -366,22 +342,20 @@ impl AccountsDB { fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet { let storage = self.storage.read().unwrap(); for (fork_id, account_info) in reclaims { - if let Some(entry) = storage.get(&account_info.id) { + if let Some(store) = storage.get(&account_info.id) { assert_eq!( - fork_id, - entry.read().unwrap().fork_id, + fork_id, store.fork_id, "AccountDB::accounts_index corrupted. Storage should only point to one fork" ); - entry.write().unwrap().remove_account(); + store.remove_account(); } } //TODO: performance here could be improved if AccountsDB::storage was organized by fork let dead_forks: HashSet = storage .values() - .filter_map(|entry| { - let entry = entry.read().unwrap(); - if entry.count == 0 { - Some(entry.fork_id) + .filter_map(|x| { + if x.count.load(Ordering::Relaxed) == 0 { + Some(x.fork_id) } else { None } @@ -389,10 +363,9 @@ impl AccountsDB { .collect(); let live_forks: HashSet = storage .values() - .filter_map(|entry| { - let entry = entry.read().unwrap(); - if entry.count > 0 { - Some(entry.fork_id) + .filter_map(|x| { + if x.count.load(Ordering::Relaxed) > 0 { + Some(x.fork_id) } else { None } @@ -630,15 +603,15 @@ mod tests { { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].read().unwrap().count, 2); - assert_eq!(stores[&1].read().unwrap().count, 2); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2); } db.add_root(1); { let stores = db.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].read().unwrap().count, 2); - assert_eq!(stores[&1].read().unwrap().count, 2); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2); } } @@ -713,11 +686,10 @@ mod tests { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); assert_eq!( - stores[&0].read().unwrap().status(), + stores[&0].get_status(), AccountStorageStatus::StorageAvailable ); - let rv = stores[&0].read().unwrap().count == count; - rv + stores[&0].count.load(Ordering::Relaxed) == count } fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec, fork: Fork) { @@ -785,9 +757,7 @@ mod tests { let mut append_vec_histogram = HashMap::new(); for storage in accounts.storage.read().unwrap().values() { - *append_vec_histogram - .entry(storage.read().unwrap().fork_id) - .or_insert(0) += 1; + *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1; } for count in append_vec_histogram.values() { assert!(*count >= 2); @@ -809,8 +779,8 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 1); - assert_eq!(stores[&0].read().unwrap().count, 1); - assert_eq!(stores[&0].read().unwrap().status(), status[0]); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&0].get_status(), status[0]); } let pubkey2 = Pubkey::new_rand(); @@ -819,10 +789,10 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 2); - assert_eq!(stores[&0].read().unwrap().count, 1); - assert_eq!(stores[&0].read().unwrap().status(), status[1]); - assert_eq!(stores[&1].read().unwrap().count, 1); - assert_eq!(stores[&1].read().unwrap().status(), status[0]); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&0].get_status(), status[1]); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&1].get_status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); @@ -834,12 +804,12 @@ mod tests { { let stores = accounts.storage.read().unwrap(); assert_eq!(stores.len(), 3); - assert_eq!(stores[&0].read().unwrap().count, count[index]); - assert_eq!(stores[&0].read().unwrap().status(), status[0]); - assert_eq!(stores[&1].read().unwrap().count, 1); - assert_eq!(stores[&1].read().unwrap().status(), status[1]); - assert_eq!(stores[&2].read().unwrap().count, count[index ^ 1]); - assert_eq!(stores[&2].read().unwrap().status(), status[0]); + assert_eq!(stores[&0].count.load(Ordering::Relaxed), count[index]); + assert_eq!(stores[&0].get_status(), status[0]); + assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1); + assert_eq!(stores[&1].get_status(), status[1]); + assert_eq!(stores[&2].count.load(Ordering::Relaxed), count[index ^ 1]); + assert_eq!(stores[&2].get_status(), status[0]); } let ancestors = vec![(0, 0)].into_iter().collect(); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index 18ed94491f..b82aaeaaeb 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -144,11 +144,7 @@ impl AppendVec { *offset = pos + len; } - #[allow(clippy::mutex_atomic)] - fn append_ptrs(&self, vals: &[(*const u8, usize)]) -> Option { - // This mutex forces append to be single threaded, but concurrent with reads - // See UNSAFE usage in `append_ptr` - let mut offset = self.append_offset.lock().unwrap(); + fn append_ptrs_locked(&self, offset: &mut usize, vals: &[(*const u8, usize)]) -> Option { let mut end = *offset; for val in vals { //Data is aligned at the next 64 byte offset. Without alignment loading the memory may @@ -165,7 +161,7 @@ impl AppendVec { //crash on some architectures. let pos = align_up!(*offset, mem::size_of::()); for val in vals { - self.append_ptr(&mut offset, val.0, val.1) + self.append_ptr(offset, val.0, val.1) } self.current_len.store(*offset, Ordering::Relaxed); Some(pos) @@ -207,23 +203,40 @@ impl AppendVec { accounts } - pub fn append_account(&self, storage_meta: StorageMeta, account: &Account) -> Option { - let meta_ptr = &storage_meta as *const StorageMeta; - let balance = AccountBalance { - lamports: account.lamports, - owner: account.owner, - executable: account.executable, - }; - let balance_ptr = &balance as *const AccountBalance; - let data_len = account.data.len(); - let data_ptr = account.data.as_ptr(); - let ptrs = [ - (meta_ptr as *const u8, mem::size_of::()), - (balance_ptr as *const u8, mem::size_of::()), - (data_ptr, data_len), - ]; - self.append_ptrs(&ptrs) + #[allow(clippy::mutex_atomic)] + pub fn append_accounts(&self, accounts: &[(StorageMeta, &Account)]) -> Vec { + let mut offset = self.append_offset.lock().unwrap(); + let mut rv = vec![]; + for (storage_meta, account) in accounts { + let meta_ptr = storage_meta as *const StorageMeta; + let balance = AccountBalance { + lamports: account.lamports, + owner: account.owner, + executable: account.executable, + }; + let balance_ptr = &balance as *const AccountBalance; + let data_len = storage_meta.data_len as usize; + let data_ptr = account.data.as_ptr(); + let ptrs = [ + (meta_ptr as *const u8, mem::size_of::()), + (balance_ptr as *const u8, mem::size_of::()), + (data_ptr, data_len), + ]; + if let Some(res) = self.append_ptrs_locked(&mut offset, &ptrs) { + rv.push(res) + } else { + break; + } + } + rv } + + pub fn append_account(&self, storage_meta: StorageMeta, account: &Account) -> Option { + self.append_accounts(&[(storage_meta, account)]) + .first() + .cloned() + } + pub fn append_account_test(&self, data: &(StorageMeta, Account)) -> Option { self.append_account(data.0.clone(), &data.1) }