lockfree storage (#3963)

This commit is contained in:
anatoly yakovenko 2019-04-24 11:51:57 -05:00 committed by GitHub
parent 3eed6a6090
commit c969975fde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 151 additions and 168 deletions

View File

@ -63,16 +63,27 @@ pub struct AccountInfo {
} }
/// An offset into the AccountsDB::storage vector /// An offset into the AccountsDB::storage vector
type AppendVecId = usize; type AppendVecId = usize;
pub type AccountStorage = HashMap<usize, Arc<RwLock<AccountStorageEntry>>>; pub type AccountStorage = HashMap<usize, Arc<AccountStorageEntry>>;
pub type InstructionAccounts = Vec<Account>; pub type InstructionAccounts = Vec<Account>;
pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>; pub type InstructionLoaders = Vec<Vec<(Pubkey, Account)>>;
#[derive(Copy, Clone, Debug, PartialEq)] #[derive(Debug, PartialEq)]
pub enum AccountStorageStatus { pub enum AccountStorageStatus {
StorageAvailable = 0, StorageAvailable = 0,
StorageFull = 1, StorageFull = 1,
} }
impl From<usize> for AccountStorageStatus {
fn from(status: usize) -> Self {
use self::AccountStorageStatus::*;
match status {
0 => StorageAvailable,
1 => StorageFull,
_ => unreachable!(),
}
}
}
/// Persistent storage structure holding the accounts /// Persistent storage structure holding the accounts
pub struct AccountStorageEntry { pub struct AccountStorageEntry {
id: AppendVecId, id: AppendVecId,
@ -85,10 +96,10 @@ pub struct AccountStorageEntry {
/// Keeps track of the number of accounts stored in a specific AppendVec. /// Keeps track of the number of accounts stored in a specific AppendVec.
/// This is periodically checked to reuse the stores that do not have /// This is periodically checked to reuse the stores that do not have
/// any accounts in it. /// any accounts in it.
count: usize, count: AtomicUsize,
/// status corresponding to the storage /// status corresponding to the storage
status: AccountStorageStatus, status: AtomicUsize,
} }
impl AccountStorageEntry { impl AccountStorageEntry {
@ -103,26 +114,25 @@ impl AccountStorageEntry {
id, id,
fork_id, fork_id,
accounts, accounts,
count: 0, count: AtomicUsize::new(0),
status: AccountStorageStatus::StorageAvailable, status: AtomicUsize::new(AccountStorageStatus::StorageAvailable as usize),
} }
} }
pub fn set_status(&mut self, status: AccountStorageStatus) { pub fn set_status(&self, status: AccountStorageStatus) {
self.status = status; self.status.store(status as usize, Ordering::Relaxed);
} }
pub fn status(&self) -> AccountStorageStatus { pub fn get_status(&self) -> AccountStorageStatus {
self.status self.status.load(Ordering::Relaxed).into()
} }
fn add_account(&mut self) { fn add_account(&self) {
self.count += 1; self.count.fetch_add(1, Ordering::Relaxed);
} }
fn remove_account(&mut self) { fn remove_account(&self) {
self.count -= 1; if self.count.fetch_sub(1, Ordering::Relaxed) == 1 {
if self.count == 0 {
self.accounts.reset(); self.accounts.reset();
self.set_status(AccountStorageStatus::StorageAvailable); self.set_status(AccountStorageStatus::StorageAvailable);
} }
@ -182,9 +192,8 @@ impl AccountsDB {
} }
pub fn has_accounts(&self, fork: Fork) -> bool { pub fn has_accounts(&self, fork: Fork) -> bool {
for storage_entry in self.storage.read().unwrap().values() { for x in self.storage.read().unwrap().values() {
let storage_entry = storage_entry.read().unwrap(); if x.fork_id == fork && x.count.load(Ordering::Relaxed) > 0 {
if storage_entry.fork_id == fork && storage_entry.count > 0 {
return true; return true;
} }
} }
@ -199,18 +208,17 @@ impl AccountsDB {
F: Send + Sync, F: Send + Sync,
B: Send + Default, B: Send + Default,
{ {
let storage_maps: Vec<Arc<RwLock<AccountStorageEntry>>> = self let storage_maps: Vec<Arc<AccountStorageEntry>> = self
.storage .storage
.read() .read()
.unwrap() .unwrap()
.values() .values()
.filter(|store| store.read().unwrap().fork_id == fork_id) .filter(|store| store.fork_id == fork_id)
.cloned() .cloned()
.collect(); .collect();
storage_maps storage_maps
.into_par_iter() .into_par_iter()
.map(|storage| { .map(|storage| {
let storage = storage.read().unwrap();
let accounts = storage.accounts.accounts(0); let accounts = storage.accounts.accounts(0);
let mut retval = B::default(); let mut retval = B::default();
accounts accounts
@ -229,17 +237,9 @@ impl AccountsDB {
) -> Option<Account> { ) -> Option<Account> {
let info = accounts_index.get(pubkey, ancestors)?; let info = accounts_index.get(pubkey, ancestors)?;
//TODO: thread this as a ref //TODO: thread this as a ref
storage.get(&info.id).and_then(|store| { storage
Some( .get(&info.id)
store .and_then(|store| Some(store.accounts.get_account(info.offset)?.0.clone_account()))
.read()
.unwrap()
.accounts
.get_account(info.offset)?
.0
.clone_account(),
)
})
} }
pub fn load_slow(&self, ancestors: &HashMap<Fork, usize>, pubkey: &Pubkey) -> Option<Account> { pub fn load_slow(&self, ancestors: &HashMap<Fork, usize>, pubkey: &Pubkey) -> Option<Account> {
@ -248,69 +248,31 @@ impl AccountsDB {
Self::load(&storage, ancestors, &accounts_index, pubkey) Self::load(&storage, ancestors, &accounts_index, pubkey)
} }
fn with_exclusive_storage<F>(&self, fork_id: Fork, mut access: F) -> bool fn fork_storage(&self, fork_id: Fork) -> Arc<AccountStorageEntry> {
where let mut candidates: Vec<Arc<AccountStorageEntry>> = {
F: FnMut(&mut AccountStorageEntry) -> bool, let stores = self.storage.read().unwrap();
{ stores
let entries: Vec<_> = self .values()
.storage .filter_map(|x| {
.read() if x.get_status() == AccountStorageStatus::StorageAvailable
.unwrap() && x.fork_id == fork_id
.values() {
.filter_map(|entry| { Some(x.clone())
let res = { } else {
let entry = entry.read().unwrap(); None
entry.status() == AccountStorageStatus::StorageAvailable }
&& entry.fork_id == fork_id })
}; .collect()
if res {
Some(entry.clone())
} else {
None
}
})
.collect();
for entry in entries {
if access(&mut entry.write().unwrap()) {
return true;
}
}
let path_idx = thread_rng().gen_range(0, self.paths.len());
let mut new_entry = self.new_storage_entry(fork_id, &self.paths[path_idx]);
let rv = access(&mut new_entry);
self.storage
.write()
.unwrap()
.insert(new_entry.id, Arc::new(RwLock::new(new_entry)));
rv
}
fn append_account(
&self,
storage: &AccountStorageEntry,
pubkey: &Pubkey,
account: &Account,
) -> Option<usize> {
let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64;
let meta = StorageMeta {
write_version,
pubkey: *pubkey,
data_len: account.data.len() as u64,
}; };
if account.lamports == 0 { if candidates.is_empty() {
// Even if no lamports, need to preserve the account owner so let mut stores = self.storage.write().unwrap();
// we can update the vote_accounts correctly as roots move forward let path_idx = thread_rng().gen_range(0, self.paths.len());
let account = &mut account.clone(); let storage = Arc::new(self.new_storage_entry(fork_id, &self.paths[path_idx]));
account.data.resize(0, 0); stores.insert(storage.id, storage.clone());
storage.accounts.append_account(meta, account) candidates.push(storage);
} else {
storage.accounts.append_account(meta, account)
} }
let rv = thread_rng().gen_range(0, candidates.len());
candidates[rv].clone()
} }
pub fn purge_fork(&self, fork: Fork) { pub fn purge_fork(&self, fork: Fork) {
@ -319,32 +281,46 @@ impl AccountsDB {
trace!("PURGING {} {}", fork, is_root); trace!("PURGING {} {}", fork, is_root);
if !is_root { if !is_root {
self.storage.write().unwrap().retain(|_, v| { self.storage.write().unwrap().retain(|_, v| {
trace!("PURGING {} {}", v.read().unwrap().fork_id, fork); trace!("PURGING {} {}", v.fork_id, fork);
v.read().unwrap().fork_id != fork v.fork_id != fork
}); });
} }
} }
fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec<AccountInfo> { fn store_accounts(&self, fork_id: Fork, accounts: &[(&Pubkey, &Account)]) -> Vec<AccountInfo> {
let mut infos = vec![]; let with_meta: Vec<(StorageMeta, &Account)> = accounts
.iter()
for (pubkey, account) in accounts { .map(|(pubkey, account)| {
self.with_exclusive_storage(fork_id, |storage| { let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64;
if let Some(offset) = self.append_account(&storage, pubkey, account) { let data_len = if account.lamports == 0 {
storage.add_account(); 0
infos.push(AccountInfo {
id: storage.id,
offset,
lamports: account.lamports,
});
true
} else { } else {
storage.set_status(AccountStorageStatus::StorageFull); account.data.len() as u64
false };
} let meta = StorageMeta {
}); write_version,
pubkey: **pubkey,
data_len,
};
(meta, *account)
})
.collect();
let mut infos: Vec<AccountInfo> = vec![];
while infos.len() < with_meta.len() {
let storage = self.fork_storage(fork_id);
let rvs = storage.accounts.append_accounts(&with_meta[infos.len()..]);
if rvs.is_empty() {
storage.set_status(AccountStorageStatus::StorageFull);
}
for (offset, (_, account)) in rvs.iter().zip(&with_meta[infos.len()..]) {
storage.add_account();
infos.push(AccountInfo {
id: storage.id,
offset: *offset,
lamports: account.lamports,
});
}
} }
infos infos
} }
@ -366,22 +342,20 @@ impl AccountsDB {
fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet<Fork> { fn remove_dead_accounts(&self, reclaims: Vec<(Fork, AccountInfo)>) -> HashSet<Fork> {
let storage = self.storage.read().unwrap(); let storage = self.storage.read().unwrap();
for (fork_id, account_info) in reclaims { for (fork_id, account_info) in reclaims {
if let Some(entry) = storage.get(&account_info.id) { if let Some(store) = storage.get(&account_info.id) {
assert_eq!( assert_eq!(
fork_id, fork_id, store.fork_id,
entry.read().unwrap().fork_id,
"AccountDB::accounts_index corrupted. Storage should only point to one fork" "AccountDB::accounts_index corrupted. Storage should only point to one fork"
); );
entry.write().unwrap().remove_account(); store.remove_account();
} }
} }
//TODO: performance here could be improved if AccountsDB::storage was organized by fork //TODO: performance here could be improved if AccountsDB::storage was organized by fork
let dead_forks: HashSet<Fork> = storage let dead_forks: HashSet<Fork> = storage
.values() .values()
.filter_map(|entry| { .filter_map(|x| {
let entry = entry.read().unwrap(); if x.count.load(Ordering::Relaxed) == 0 {
if entry.count == 0 { Some(x.fork_id)
Some(entry.fork_id)
} else { } else {
None None
} }
@ -389,10 +363,9 @@ impl AccountsDB {
.collect(); .collect();
let live_forks: HashSet<Fork> = storage let live_forks: HashSet<Fork> = storage
.values() .values()
.filter_map(|entry| { .filter_map(|x| {
let entry = entry.read().unwrap(); if x.count.load(Ordering::Relaxed) > 0 {
if entry.count > 0 { Some(x.fork_id)
Some(entry.fork_id)
} else { } else {
None None
} }
@ -630,15 +603,15 @@ mod tests {
{ {
let stores = db.storage.read().unwrap(); let stores = db.storage.read().unwrap();
assert_eq!(stores.len(), 2); assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].read().unwrap().count, 2); assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2);
assert_eq!(stores[&1].read().unwrap().count, 2); assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2);
} }
db.add_root(1); db.add_root(1);
{ {
let stores = db.storage.read().unwrap(); let stores = db.storage.read().unwrap();
assert_eq!(stores.len(), 2); assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].read().unwrap().count, 2); assert_eq!(stores[&0].count.load(Ordering::Relaxed), 2);
assert_eq!(stores[&1].read().unwrap().count, 2); assert_eq!(stores[&1].count.load(Ordering::Relaxed), 2);
} }
} }
@ -713,11 +686,10 @@ mod tests {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 1); assert_eq!(stores.len(), 1);
assert_eq!( assert_eq!(
stores[&0].read().unwrap().status(), stores[&0].get_status(),
AccountStorageStatus::StorageAvailable AccountStorageStatus::StorageAvailable
); );
let rv = stores[&0].read().unwrap().count == count; stores[&0].count.load(Ordering::Relaxed) == count
rv
} }
fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec<Pubkey>, fork: Fork) { fn check_accounts(accounts: &AccountsDB, pubkeys: &Vec<Pubkey>, fork: Fork) {
@ -785,9 +757,7 @@ mod tests {
let mut append_vec_histogram = HashMap::new(); let mut append_vec_histogram = HashMap::new();
for storage in accounts.storage.read().unwrap().values() { for storage in accounts.storage.read().unwrap().values() {
*append_vec_histogram *append_vec_histogram.entry(storage.fork_id).or_insert(0) += 1;
.entry(storage.read().unwrap().fork_id)
.or_insert(0) += 1;
} }
for count in append_vec_histogram.values() { for count in append_vec_histogram.values() {
assert!(*count >= 2); assert!(*count >= 2);
@ -809,8 +779,8 @@ mod tests {
{ {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 1); assert_eq!(stores.len(), 1);
assert_eq!(stores[&0].read().unwrap().count, 1); assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&0].read().unwrap().status(), status[0]); assert_eq!(stores[&0].get_status(), status[0]);
} }
let pubkey2 = Pubkey::new_rand(); let pubkey2 = Pubkey::new_rand();
@ -819,10 +789,10 @@ mod tests {
{ {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 2); assert_eq!(stores.len(), 2);
assert_eq!(stores[&0].read().unwrap().count, 1); assert_eq!(stores[&0].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&0].read().unwrap().status(), status[1]); assert_eq!(stores[&0].get_status(), status[1]);
assert_eq!(stores[&1].read().unwrap().count, 1); assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&1].read().unwrap().status(), status[0]); assert_eq!(stores[&1].get_status(), status[0]);
} }
let ancestors = vec![(0, 0)].into_iter().collect(); let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1);
@ -834,12 +804,12 @@ mod tests {
{ {
let stores = accounts.storage.read().unwrap(); let stores = accounts.storage.read().unwrap();
assert_eq!(stores.len(), 3); assert_eq!(stores.len(), 3);
assert_eq!(stores[&0].read().unwrap().count, count[index]); assert_eq!(stores[&0].count.load(Ordering::Relaxed), count[index]);
assert_eq!(stores[&0].read().unwrap().status(), status[0]); assert_eq!(stores[&0].get_status(), status[0]);
assert_eq!(stores[&1].read().unwrap().count, 1); assert_eq!(stores[&1].count.load(Ordering::Relaxed), 1);
assert_eq!(stores[&1].read().unwrap().status(), status[1]); assert_eq!(stores[&1].get_status(), status[1]);
assert_eq!(stores[&2].read().unwrap().count, count[index ^ 1]); assert_eq!(stores[&2].count.load(Ordering::Relaxed), count[index ^ 1]);
assert_eq!(stores[&2].read().unwrap().status(), status[0]); assert_eq!(stores[&2].get_status(), status[0]);
} }
let ancestors = vec![(0, 0)].into_iter().collect(); let ancestors = vec![(0, 0)].into_iter().collect();
assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1); assert_eq!(accounts.load_slow(&ancestors, &pubkey1).unwrap(), account1);

View File

@ -144,11 +144,7 @@ impl AppendVec {
*offset = pos + len; *offset = pos + len;
} }
#[allow(clippy::mutex_atomic)] fn append_ptrs_locked(&self, offset: &mut usize, vals: &[(*const u8, usize)]) -> Option<usize> {
fn append_ptrs(&self, vals: &[(*const u8, usize)]) -> Option<usize> {
// This mutex forces append to be single threaded, but concurrent with reads
// See UNSAFE usage in `append_ptr`
let mut offset = self.append_offset.lock().unwrap();
let mut end = *offset; let mut end = *offset;
for val in vals { for val in vals {
//Data is aligned at the next 64 byte offset. Without alignment loading the memory may //Data is aligned at the next 64 byte offset. Without alignment loading the memory may
@ -165,7 +161,7 @@ impl AppendVec {
//crash on some architectures. //crash on some architectures.
let pos = align_up!(*offset, mem::size_of::<u64>()); let pos = align_up!(*offset, mem::size_of::<u64>());
for val in vals { for val in vals {
self.append_ptr(&mut offset, val.0, val.1) self.append_ptr(offset, val.0, val.1)
} }
self.current_len.store(*offset, Ordering::Relaxed); self.current_len.store(*offset, Ordering::Relaxed);
Some(pos) Some(pos)
@ -207,23 +203,40 @@ impl AppendVec {
accounts accounts
} }
pub fn append_account(&self, storage_meta: StorageMeta, account: &Account) -> Option<usize> { #[allow(clippy::mutex_atomic)]
let meta_ptr = &storage_meta as *const StorageMeta; pub fn append_accounts(&self, accounts: &[(StorageMeta, &Account)]) -> Vec<usize> {
let balance = AccountBalance { let mut offset = self.append_offset.lock().unwrap();
lamports: account.lamports, let mut rv = vec![];
owner: account.owner, for (storage_meta, account) in accounts {
executable: account.executable, let meta_ptr = storage_meta as *const StorageMeta;
}; let balance = AccountBalance {
let balance_ptr = &balance as *const AccountBalance; lamports: account.lamports,
let data_len = account.data.len(); owner: account.owner,
let data_ptr = account.data.as_ptr(); executable: account.executable,
let ptrs = [ };
(meta_ptr as *const u8, mem::size_of::<StorageMeta>()), let balance_ptr = &balance as *const AccountBalance;
(balance_ptr as *const u8, mem::size_of::<AccountBalance>()), let data_len = storage_meta.data_len as usize;
(data_ptr, data_len), let data_ptr = account.data.as_ptr();
]; let ptrs = [
self.append_ptrs(&ptrs) (meta_ptr as *const u8, mem::size_of::<StorageMeta>()),
(balance_ptr as *const u8, mem::size_of::<AccountBalance>()),
(data_ptr, data_len),
];
if let Some(res) = self.append_ptrs_locked(&mut offset, &ptrs) {
rv.push(res)
} else {
break;
}
}
rv
} }
pub fn append_account(&self, storage_meta: StorageMeta, account: &Account) -> Option<usize> {
self.append_accounts(&[(storage_meta, account)])
.first()
.cloned()
}
pub fn append_account_test(&self, data: &(StorageMeta, Account)) -> Option<usize> { pub fn append_account_test(&self, data: &(StorageMeta, Account)) -> Option<usize> {
self.append_account(data.0.clone(), &data.1) self.append_account(data.0.clone(), &data.1)
} }