Introduce background stale AppendVec shrink mechanism (#9219)

* Introduce background AppendVec shrink mechanism

* Support ledger tool

* Clean up

* save

* save

* Fix CI

* More clean up

* Add tests

* Clean up yet more

* Use account.hash...

* Fix typo....

* Add comment

* Rename accounts_cleanup_service
This commit is contained in:
Ryo Onodera 2020-04-06 17:30:23 +09:00 committed by GitHub
parent 7b68628e6c
commit b28ec430e4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 412 additions and 49 deletions

View File

@ -10,29 +10,36 @@ use std::sync::{
use std::thread::{self, sleep, Builder, JoinHandle};
use std::time::Duration;
pub struct AccountsCleanupService {
t_cleanup: JoinHandle<()>,
pub struct AccountsBackgroundService {
t_background: JoinHandle<()>,
}
impl AccountsCleanupService {
const INTERVAL_MS: u64 = 100;
impl AccountsBackgroundService {
pub fn new(bank_forks: Arc<RwLock<BankForks>>, exit: &Arc<AtomicBool>) -> Self {
info!("AccountsCleanupService active");
info!("AccountsBackgroundService active");
let exit = exit.clone();
let t_cleanup = Builder::new()
.name("solana-accounts-cleanup".to_string())
let t_background = Builder::new()
.name("solana-accounts-background".to_string())
.spawn(move || loop {
if exit.load(Ordering::Relaxed) {
break;
}
let bank = bank_forks.read().unwrap().working_bank();
bank.clean_dead_slots();
sleep(Duration::from_millis(100));
bank.process_dead_slots();
// Currently, given INTERVAL_MS, we process 1 slot/100 ms
bank.process_stale_slot();
sleep(Duration::from_millis(INTERVAL_MS));
})
.unwrap();
Self { t_cleanup }
Self { t_background }
}
pub fn join(self) -> thread::Result<()> {
self.t_cleanup.join()
self.t_background.join()
}
}

View File

@ -5,7 +5,7 @@
//! command-line tools to spin up validators and a Rust library
//!
pub mod accounts_cleanup_service;
pub mod accounts_background_service;
pub mod accounts_hash_verifier;
pub mod banking_stage;
pub mod broadcast_stage;

View File

@ -2,7 +2,7 @@
//! validation pipeline in software.
use crate::{
accounts_cleanup_service::AccountsCleanupService,
accounts_background_service::AccountsBackgroundService,
accounts_hash_verifier::AccountsHashVerifier,
broadcast_stage::RetransmitSlotsSender,
cluster_info::ClusterInfo,
@ -49,7 +49,7 @@ pub struct Tvu {
retransmit_stage: RetransmitStage,
replay_stage: ReplayStage,
ledger_cleanup_service: Option<LedgerCleanupService>,
accounts_cleanup_service: AccountsCleanupService,
accounts_background_service: AccountsBackgroundService,
storage_stage: StorageStage,
accounts_hash_verifier: AccountsHashVerifier,
}
@ -211,7 +211,7 @@ impl Tvu {
)
});
let accounts_cleanup_service = AccountsCleanupService::new(bank_forks.clone(), &exit);
let accounts_background_service = AccountsBackgroundService::new(bank_forks.clone(), &exit);
let storage_stage = StorageStage::new(
storage_state,
@ -231,7 +231,7 @@ impl Tvu {
retransmit_stage,
replay_stage,
ledger_cleanup_service,
accounts_cleanup_service,
accounts_background_service,
storage_stage,
accounts_hash_verifier,
}
@ -245,7 +245,7 @@ impl Tvu {
if self.ledger_cleanup_service.is_some() {
self.ledger_cleanup_service.unwrap().join()?;
}
self.accounts_cleanup_service.join()?;
self.accounts_background_service.join()?;
self.replay_stage.join()?;
self.accounts_hash_verifier.join()?;
Ok(())

View File

@ -48,12 +48,13 @@ use std::{
fmt,
io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
sync::{Arc, RwLock},
sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering},
sync::{Arc, Mutex, RwLock},
};
use tempfile::TempDir;
pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024;
const PAGE_SIZE: u64 = 4 * 1024;
pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024;
pub const DEFAULT_NUM_THREADS: u32 = 8;
pub const DEFAULT_NUM_DIRS: u32 = 4;
@ -371,7 +372,7 @@ impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> {
{
use serde::ser::Error;
let mut wr = Cursor::new(vec![]);
let version: u64 = self.accounts_db.write_version.load(Ordering::Relaxed) as u64;
let version = self.accounts_db.write_version.load(Ordering::Relaxed);
let account_storage_serialize = AccountStorageSerialize {
account_storage_entries: self.account_storage_entries,
};
@ -450,8 +451,9 @@ pub struct AccountsDB {
/// distribute the accounts across storage lists
pub next_id: AtomicUsize,
pub shrink_candidate_slots: Mutex<Vec<Slot>>,
write_version: AtomicUsize,
write_version: AtomicU64,
/// Set of storage paths to pick from
paths: Vec<PathBuf>,
@ -499,7 +501,8 @@ impl Default for AccountsDB {
accounts_index: RwLock::new(AccountsIndex::default()),
storage: RwLock::new(AccountStorage(HashMap::new())),
next_id: AtomicUsize::new(0),
write_version: AtomicUsize::new(0),
shrink_candidate_slots: Mutex::new(Vec::new()),
write_version: AtomicU64::new(0),
paths: vec![],
temp_paths: None,
file_size: DEFAULT_FILE_SIZE,
@ -649,8 +652,7 @@ impl AccountsDB {
}
self.next_id.store(max_id + 1, Ordering::Relaxed);
self.write_version
.fetch_add(version as usize, Ordering::Relaxed);
self.write_version.fetch_add(version, Ordering::Relaxed);
self.generate_index();
Ok(())
}
@ -879,6 +881,150 @@ impl AccountsDB {
);
}
// Reads all accounts in given slot's AppendVecs and filter only to alive,
// then create a minimum AppendVed filled with the alive.
fn shrink_stale_slot(&self, slot: Slot) {
trace!("shrink_stale_slot: slot: {}", slot);
let mut stored_accounts = vec![];
{
let storage = self.storage.read().unwrap();
if let Some(stores) = storage.0.get(&slot) {
let mut alive_count = 0;
for store in stores.values() {
alive_count += store.count();
let mut start = 0;
while let Some((account, next)) = store.accounts.get_account(start) {
stored_accounts.push((
account.meta.pubkey,
account.clone_account(),
next - start,
(store.id, account.offset),
account.meta.write_version,
));
start = next;
}
}
if (alive_count as f32 / stored_accounts.len() as f32) >= 0.80 {
trace!(
"shrink_stale_slot: not enough space to shrink: {} / {}",
alive_count,
stored_accounts.len()
);
return;
}
}
}
let alive_accounts: Vec<_> = {
let no_ancestors = HashMap::new();
let accounts_index = self.accounts_index.read().unwrap();
stored_accounts
.iter()
.filter(
|(pubkey, _account, _storage_size, (store_id, offset), _write_version)| {
if let Some((list, _)) = accounts_index.get(pubkey, &no_ancestors) {
list.iter()
.any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset)
} else {
false
}
},
)
.collect()
};
let alive_total: u64 = alive_accounts
.iter()
.map(|(_pubkey, _account, account_size, _location, _write_verion)| *account_size as u64)
.sum();
let aligned_total: u64 = (alive_total + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1);
debug!(
"shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})",
slot,
stored_accounts.len(),
alive_accounts.len(),
alive_total,
aligned_total
);
if aligned_total > 0 {
let mut accounts = Vec::with_capacity(alive_accounts.len());
let mut hashes = Vec::with_capacity(alive_accounts.len());
let mut write_versions = Vec::with_capacity(alive_accounts.len());
for (pubkey, account, _size, _location, write_version) in alive_accounts {
accounts.push((pubkey, account));
hashes.push(account.hash);
write_versions.push(*write_version);
}
let shrunken_store = self.create_and_insert_store(slot, aligned_total);
// here, we're writing back alive_accounts. That should be an atomic operation
// without use of rather wide locks in this whole function, because we're
// mutating rooted slots; There should be no writers to them.
let infos = self.store_accounts_to(
slot,
&accounts,
&hashes,
|_| shrunken_store.clone(),
write_versions.into_iter(),
);
let reclaims = self.update_index(slot, infos, &accounts);
self.handle_reclaims(&reclaims);
let mut storage = self.storage.write().unwrap();
if let Some(slot_storage) = storage.0.get_mut(&slot) {
slot_storage.retain(|_key, store| store.count() > 0);
}
}
}
// Infinitely returns rooted roots in cyclic order
fn next_shrink_slot(&self) -> Option<Slot> {
let next = {
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
candidates.pop()
};
if next.is_some() {
next
} else {
let mut new_all_slots = self.all_root_slots_in_index();
let next = new_all_slots.pop();
let mut candidates = self.shrink_candidate_slots.lock().unwrap();
*candidates = new_all_slots;
next
}
}
fn all_root_slots_in_index(&self) -> Vec<Slot> {
let index = self.accounts_index.read().unwrap();
index.roots.iter().cloned().collect()
}
fn all_slots_in_storage(&self) -> Vec<Slot> {
let storage = self.storage.read().unwrap();
storage.0.keys().cloned().collect()
}
pub fn process_stale_slot(&self) {
if let Some(slot) = self.next_shrink_slot() {
self.shrink_stale_slot(slot);
}
}
pub fn shrink_all_stale_slots(&self) {
for slot in self.all_slots_in_storage() {
self.shrink_stale_slot(slot);
}
}
pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
where
F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>) -> (),
@ -1140,18 +1286,46 @@ impl AccountsDB {
hasher.result()
}
fn bulk_assign_write_version(&self, count: usize) -> u64 {
self.write_version
.fetch_add(count as u64, Ordering::Relaxed)
}
fn store_accounts(
&self,
slot: Slot,
accounts: &[(&Pubkey, &Account)],
hashes: &[Hash],
) -> Vec<AccountInfo> {
let default_account = Account::default();
let mut current_version = self.bulk_assign_write_version(accounts.len());
let write_version_producer = std::iter::from_fn(move || {
let ret = current_version;
current_version += 1;
Some(ret)
});
let storage_finder = |slot| self.find_storage_candidate(slot);
self.store_accounts_to(
slot,
accounts,
hashes,
storage_finder,
write_version_producer,
)
}
fn store_accounts_to<F: FnMut(Slot) -> Arc<AccountStorageEntry>, P: Iterator<Item = u64>>(
&self,
slot: Slot,
accounts: &[(&Pubkey, &Account)],
hashes: &[Hash],
mut storage_finder: F,
mut write_version_producer: P,
) -> Vec<AccountInfo> {
let default_account = Account::default();
let with_meta: Vec<(StoredMeta, &Account)> = accounts
.iter()
.map(|(pubkey, account)| {
let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64;
let account = if account.lamports == 0 {
&default_account
} else {
@ -1160,7 +1334,7 @@ impl AccountsDB {
let data_len = account.data.len() as u64;
let meta = StoredMeta {
write_version,
write_version: write_version_producer.next().unwrap(),
pubkey: **pubkey,
data_len,
};
@ -1169,7 +1343,7 @@ impl AccountsDB {
.collect();
let mut infos: Vec<AccountInfo> = Vec::with_capacity(with_meta.len());
while infos.len() < with_meta.len() {
let storage = self.find_storage_candidate(slot);
let storage = storage_finder(slot);
let rvs = storage
.accounts
.append_accounts(&with_meta[infos.len()..], &hashes[infos.len()..]);
@ -2217,12 +2391,26 @@ pub mod tests {
}
impl AccountsDB {
fn store_count_for_slot(&self, slot: Slot) -> usize {
fn alive_account_count_in_store(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();
let slot_storage = storage.0.get(&slot);
if let Some(slot_storage) = slot_storage {
slot_storage.values().nth(0).unwrap().count()
slot_storage.values().map(|store| store.count()).sum()
} else {
0
}
}
fn all_account_count_in_append_vec(&self, slot: Slot) -> usize {
let storage = self.storage.read().unwrap();
let slot_storage = storage.0.get(&slot);
if let Some(slot_storage) = slot_storage {
slot_storage
.values()
.map(|store| store.accounts.accounts(0).len())
.sum()
} else {
0
}
@ -2256,14 +2444,14 @@ pub mod tests {
accounts.add_root(1);
//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 1);
assert_eq!(accounts.alive_account_count_in_store(0), 1);
assert_eq!(accounts.alive_account_count_in_store(1), 1);
accounts.clean_accounts();
//now old state is cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 1);
assert_eq!(accounts.alive_account_count_in_store(0), 0);
assert_eq!(accounts.alive_account_count_in_store(1), 1);
}
#[test]
@ -2286,14 +2474,14 @@ pub mod tests {
accounts.add_root(1);
//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 2);
assert_eq!(accounts.alive_account_count_in_store(0), 2);
assert_eq!(accounts.alive_account_count_in_store(1), 2);
accounts.clean_accounts();
//still old state behind zero-lamport account isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 1);
assert_eq!(accounts.store_count_for_slot(1), 2);
assert_eq!(accounts.alive_account_count_in_store(0), 1);
assert_eq!(accounts.alive_account_count_in_store(1), 2);
}
#[test]
@ -2317,16 +2505,16 @@ pub mod tests {
accounts.add_root(2);
//even if rooted, old state isn't cleaned up
assert_eq!(accounts.store_count_for_slot(0), 2);
assert_eq!(accounts.store_count_for_slot(1), 1);
assert_eq!(accounts.store_count_for_slot(2), 1);
assert_eq!(accounts.alive_account_count_in_store(0), 2);
assert_eq!(accounts.alive_account_count_in_store(1), 1);
assert_eq!(accounts.alive_account_count_in_store(2), 1);
accounts.clean_accounts();
//both zero lamport and normal accounts are cleaned up
assert_eq!(accounts.store_count_for_slot(0), 0);
assert_eq!(accounts.store_count_for_slot(1), 0);
assert_eq!(accounts.store_count_for_slot(2), 1);
assert_eq!(accounts.alive_account_count_in_store(0), 0);
assert_eq!(accounts.alive_account_count_in_store(1), 0);
assert_eq!(accounts.alive_account_count_in_store(2), 1);
}
#[test]
@ -3429,11 +3617,11 @@ pub mod tests {
// B: Test multiple updates to pubkey1 in a single slot/storage
current_slot += 1;
assert_eq!(0, accounts.store_count_for_slot(current_slot));
assert_eq!(0, accounts.alive_account_count_in_store(current_slot));
assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1));
accounts.store(current_slot, &[(&pubkey1, &account2)]);
accounts.store(current_slot, &[(&pubkey1, &account2)]);
assert_eq!(1, accounts.store_count_for_slot(current_slot));
assert_eq!(1, accounts.alive_account_count_in_store(current_slot));
assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1));
accounts.add_root(current_slot);
@ -3491,10 +3679,167 @@ pub mod tests {
}
#[test]
fn clean_dead_slots_empty() {
fn test_clean_dead_slots_empty() {
let accounts = AccountsDB::new_single();
let mut dead_slots = HashSet::new();
dead_slots.insert(10);
accounts.clean_dead_slots(&dead_slots);
}
#[test]
fn test_shrink_stale_slots_none() {
let accounts = AccountsDB::new_single();
for _ in 0..10 {
accounts.process_stale_slot();
}
accounts.shrink_all_stale_slots();
}
#[test]
fn test_shrink_next_slots() {
let accounts = AccountsDB::new_single();
let mut current_slot = 7;
assert_eq!(
vec![None, None, None],
(0..3)
.map({ |_| accounts.next_shrink_slot() })
.collect::<Vec<_>>()
);
accounts.add_root(current_slot);
assert_eq!(
vec![Some(7), Some(7), Some(7)],
(0..3)
.map({ |_| accounts.next_shrink_slot() })
.collect::<Vec<_>>()
);
current_slot += 1;
accounts.add_root(current_slot);
let slots = (0..6)
.map({ |_| accounts.next_shrink_slot() })
.collect::<Vec<_>>();
// Because the origin of this data is HashMap (not BTreeMap), key order is arbitrary per cycle.
assert!(
vec![Some(7), Some(8), Some(7), Some(8), Some(7), Some(8)] == slots
|| vec![Some(8), Some(7), Some(8), Some(7), Some(8), Some(7)] == slots
);
}
#[test]
fn test_shrink_stale_slots_processed() {
solana_logger::setup();
let accounts = AccountsDB::new_single();
let pubkey_count = 100;
let pubkeys: Vec<_> = (0..pubkey_count).map(|_| Pubkey::new_rand()).collect();
let some_lamport = 223;
let no_data = 0;
let owner = Account::default().owner;
let account = Account::new(some_lamport, no_data, &owner);
let mut current_slot = 0;
current_slot += 1;
for pubkey in &pubkeys {
accounts.store(current_slot, &[(&pubkey, &account)]);
}
let shrink_slot = current_slot;
accounts.add_root(current_slot);
current_slot += 1;
let pubkey_count_after_shrink = 10;
let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink];
for pubkey in updated_pubkeys {
accounts.store(current_slot, &[(&pubkey, &account)]);
}
accounts.add_root(current_slot);
accounts.clean_accounts();
assert_eq!(
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
accounts.shrink_all_stale_slots();
assert_eq!(
pubkey_count_after_shrink,
accounts.all_account_count_in_append_vec(shrink_slot)
);
let no_ancestors = HashMap::default();
accounts.update_accounts_hash(current_slot, &no_ancestors);
accounts
.verify_bank_hash(current_slot, &no_ancestors)
.unwrap();
let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot);
accounts
.verify_bank_hash(current_slot, &no_ancestors)
.unwrap();
// repeating should be no-op
accounts.shrink_all_stale_slots();
assert_eq!(
pubkey_count_after_shrink,
accounts.all_account_count_in_append_vec(shrink_slot)
);
}
#[test]
fn test_shrink_stale_slots_skipped() {
solana_logger::setup();
let accounts = AccountsDB::new_single();
let pubkey_count = 100;
let pubkeys: Vec<_> = (0..pubkey_count).map(|_| Pubkey::new_rand()).collect();
let some_lamport = 223;
let no_data = 0;
let owner = Account::default().owner;
let account = Account::new(some_lamport, no_data, &owner);
let mut current_slot = 0;
current_slot += 1;
for pubkey in &pubkeys {
accounts.store(current_slot, &[(&pubkey, &account)]);
}
let shrink_slot = current_slot;
accounts.add_root(current_slot);
current_slot += 1;
let pubkey_count_after_shrink = 90;
let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink];
for pubkey in updated_pubkeys {
accounts.store(current_slot, &[(&pubkey, &account)]);
}
accounts.add_root(current_slot);
accounts.clean_accounts();
assert_eq!(
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
accounts.shrink_all_stale_slots();
assert_eq!(
pubkey_count,
accounts.all_account_count_in_append_vec(shrink_slot)
);
}
}

View File

@ -160,7 +160,9 @@ impl AppendVec {
data.seek(SeekFrom::Start(0)).unwrap();
data.flush().unwrap();
//UNSAFE: Required to create a Mmap
let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") };
let map = unsafe { MmapMut::map_mut(&data) };
let map =
map.unwrap_or_else(|e| panic!("failed to map the data file (size: {}): {}", size, e));
AppendVec {
path: file.to_path_buf(),

View File

@ -1964,6 +1964,7 @@ impl Bank {
/// calculation and could shield other real accounts.
pub fn verify_snapshot_bank(&self) -> bool {
self.clean_accounts();
self.shrink_all_stale_slots();
// Order and short-circuiting is significant; verify_hash requires a valid bank hash
self.verify_bank_hash() && self.verify_hash()
}
@ -2205,9 +2206,17 @@ impl Bank {
self.rc.accounts.accounts_db.clean_accounts();
}
pub fn clean_dead_slots(&self) {
pub fn process_dead_slots(&self) {
self.rc.accounts.accounts_db.process_dead_slots();
}
pub fn process_stale_slot(&self) {
self.rc.accounts.accounts_db.process_stale_slot();
}
pub fn shrink_all_stale_slots(&self) {
self.rc.accounts.accounts_db.shrink_all_stale_slots();
}
}
impl Drop for Bank {