From b28ec430e44ae365dc654814c8d64c49783ec0d6 Mon Sep 17 00:00:00 2001 From: Ryo Onodera Date: Mon, 6 Apr 2020 17:30:23 +0900 Subject: [PATCH] Introduce background stale AppendVec shrink mechanism (#9219) * Introduce background AppendVec shrink mechanism * Support ledger tool * Clean up * save * save * Fix CI * More clean up * Add tests * Clean up yet more * Use account.hash... * Fix typo.... * Add comment * Rename accounts_cleanup_service --- ...vice.rs => accounts_background_service.rs} | 27 +- core/src/lib.rs | 2 +- core/src/tvu.rs | 10 +- runtime/src/accounts_db.rs | 407 ++++++++++++++++-- runtime/src/append_vec.rs | 4 +- runtime/src/bank.rs | 11 +- 6 files changed, 412 insertions(+), 49 deletions(-) rename core/src/{accounts_cleanup_service.rs => accounts_background_service.rs} (56%) diff --git a/core/src/accounts_cleanup_service.rs b/core/src/accounts_background_service.rs similarity index 56% rename from core/src/accounts_cleanup_service.rs rename to core/src/accounts_background_service.rs index d05595972b..af8024df13 100644 --- a/core/src/accounts_cleanup_service.rs +++ b/core/src/accounts_background_service.rs @@ -10,29 +10,36 @@ use std::sync::{ use std::thread::{self, sleep, Builder, JoinHandle}; use std::time::Duration; -pub struct AccountsCleanupService { - t_cleanup: JoinHandle<()>, +pub struct AccountsBackgroundService { + t_background: JoinHandle<()>, } -impl AccountsCleanupService { +const INTERVAL_MS: u64 = 100; + +impl AccountsBackgroundService { pub fn new(bank_forks: Arc>, exit: &Arc) -> Self { - info!("AccountsCleanupService active"); + info!("AccountsBackgroundService active"); let exit = exit.clone(); - let t_cleanup = Builder::new() - .name("solana-accounts-cleanup".to_string()) + let t_background = Builder::new() + .name("solana-accounts-background".to_string()) .spawn(move || loop { if exit.load(Ordering::Relaxed) { break; } let bank = bank_forks.read().unwrap().working_bank(); - bank.clean_dead_slots(); - sleep(Duration::from_millis(100)); + + bank.process_dead_slots(); + + // Currently, given INTERVAL_MS, we process 1 slot/100 ms + bank.process_stale_slot(); + + sleep(Duration::from_millis(INTERVAL_MS)); }) .unwrap(); - Self { t_cleanup } + Self { t_background } } pub fn join(self) -> thread::Result<()> { - self.t_cleanup.join() + self.t_background.join() } } diff --git a/core/src/lib.rs b/core/src/lib.rs index c594787dbd..a0b7a6c6d3 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -5,7 +5,7 @@ //! command-line tools to spin up validators and a Rust library //! -pub mod accounts_cleanup_service; +pub mod accounts_background_service; pub mod accounts_hash_verifier; pub mod banking_stage; pub mod broadcast_stage; diff --git a/core/src/tvu.rs b/core/src/tvu.rs index a73e4580cc..d3b89483a3 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -2,7 +2,7 @@ //! validation pipeline in software. use crate::{ - accounts_cleanup_service::AccountsCleanupService, + accounts_background_service::AccountsBackgroundService, accounts_hash_verifier::AccountsHashVerifier, broadcast_stage::RetransmitSlotsSender, cluster_info::ClusterInfo, @@ -49,7 +49,7 @@ pub struct Tvu { retransmit_stage: RetransmitStage, replay_stage: ReplayStage, ledger_cleanup_service: Option, - accounts_cleanup_service: AccountsCleanupService, + accounts_background_service: AccountsBackgroundService, storage_stage: StorageStage, accounts_hash_verifier: AccountsHashVerifier, } @@ -211,7 +211,7 @@ impl Tvu { ) }); - let accounts_cleanup_service = AccountsCleanupService::new(bank_forks.clone(), &exit); + let accounts_background_service = AccountsBackgroundService::new(bank_forks.clone(), &exit); let storage_stage = StorageStage::new( storage_state, @@ -231,7 +231,7 @@ impl Tvu { retransmit_stage, replay_stage, ledger_cleanup_service, - accounts_cleanup_service, + accounts_background_service, storage_stage, accounts_hash_verifier, } @@ -245,7 +245,7 @@ impl Tvu { if self.ledger_cleanup_service.is_some() { self.ledger_cleanup_service.unwrap().join()?; } - self.accounts_cleanup_service.join()?; + self.accounts_background_service.join()?; self.replay_stage.join()?; self.accounts_hash_verifier.join()?; Ok(()) diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 245793f584..e83a9dfcc1 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -48,12 +48,13 @@ use std::{ fmt, io::{BufReader, Cursor, Error as IOError, ErrorKind, Read, Result as IOResult}, path::{Path, PathBuf}, - sync::atomic::{AtomicBool, AtomicUsize, Ordering}, - sync::{Arc, RwLock}, + sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}, + sync::{Arc, Mutex, RwLock}, }; use tempfile::TempDir; -pub const DEFAULT_FILE_SIZE: u64 = 4 * 1024 * 1024; +const PAGE_SIZE: u64 = 4 * 1024; +pub const DEFAULT_FILE_SIZE: u64 = PAGE_SIZE * 1024; pub const DEFAULT_NUM_THREADS: u32 = 8; pub const DEFAULT_NUM_DIRS: u32 = 4; @@ -371,7 +372,7 @@ impl<'a, 'b> Serialize for AccountsDBSerialize<'a, 'b> { { use serde::ser::Error; let mut wr = Cursor::new(vec![]); - let version: u64 = self.accounts_db.write_version.load(Ordering::Relaxed) as u64; + let version = self.accounts_db.write_version.load(Ordering::Relaxed); let account_storage_serialize = AccountStorageSerialize { account_storage_entries: self.account_storage_entries, }; @@ -450,8 +451,9 @@ pub struct AccountsDB { /// distribute the accounts across storage lists pub next_id: AtomicUsize, + pub shrink_candidate_slots: Mutex>, - write_version: AtomicUsize, + write_version: AtomicU64, /// Set of storage paths to pick from paths: Vec, @@ -499,7 +501,8 @@ impl Default for AccountsDB { accounts_index: RwLock::new(AccountsIndex::default()), storage: RwLock::new(AccountStorage(HashMap::new())), next_id: AtomicUsize::new(0), - write_version: AtomicUsize::new(0), + shrink_candidate_slots: Mutex::new(Vec::new()), + write_version: AtomicU64::new(0), paths: vec![], temp_paths: None, file_size: DEFAULT_FILE_SIZE, @@ -649,8 +652,7 @@ impl AccountsDB { } self.next_id.store(max_id + 1, Ordering::Relaxed); - self.write_version - .fetch_add(version as usize, Ordering::Relaxed); + self.write_version.fetch_add(version, Ordering::Relaxed); self.generate_index(); Ok(()) } @@ -879,6 +881,150 @@ impl AccountsDB { ); } + // Reads all accounts in given slot's AppendVecs and filter only to alive, + // then create a minimum AppendVed filled with the alive. + fn shrink_stale_slot(&self, slot: Slot) { + trace!("shrink_stale_slot: slot: {}", slot); + + let mut stored_accounts = vec![]; + { + let storage = self.storage.read().unwrap(); + if let Some(stores) = storage.0.get(&slot) { + let mut alive_count = 0; + for store in stores.values() { + alive_count += store.count(); + let mut start = 0; + while let Some((account, next)) = store.accounts.get_account(start) { + stored_accounts.push(( + account.meta.pubkey, + account.clone_account(), + next - start, + (store.id, account.offset), + account.meta.write_version, + )); + start = next; + } + } + if (alive_count as f32 / stored_accounts.len() as f32) >= 0.80 { + trace!( + "shrink_stale_slot: not enough space to shrink: {} / {}", + alive_count, + stored_accounts.len() + ); + return; + } + } + } + + let alive_accounts: Vec<_> = { + let no_ancestors = HashMap::new(); + let accounts_index = self.accounts_index.read().unwrap(); + stored_accounts + .iter() + .filter( + |(pubkey, _account, _storage_size, (store_id, offset), _write_version)| { + if let Some((list, _)) = accounts_index.get(pubkey, &no_ancestors) { + list.iter() + .any(|(_slot, i)| i.store_id == *store_id && i.offset == *offset) + } else { + false + } + }, + ) + .collect() + }; + + let alive_total: u64 = alive_accounts + .iter() + .map(|(_pubkey, _account, account_size, _location, _write_verion)| *account_size as u64) + .sum(); + let aligned_total: u64 = (alive_total + (PAGE_SIZE - 1)) & !(PAGE_SIZE - 1); + + debug!( + "shrinking: slot: {}, stored_accounts: {} => alive_accounts: {} ({} bytes; aligned to: {})", + slot, + stored_accounts.len(), + alive_accounts.len(), + alive_total, + aligned_total + ); + + if aligned_total > 0 { + let mut accounts = Vec::with_capacity(alive_accounts.len()); + let mut hashes = Vec::with_capacity(alive_accounts.len()); + let mut write_versions = Vec::with_capacity(alive_accounts.len()); + + for (pubkey, account, _size, _location, write_version) in alive_accounts { + accounts.push((pubkey, account)); + hashes.push(account.hash); + write_versions.push(*write_version); + } + + let shrunken_store = self.create_and_insert_store(slot, aligned_total); + + // here, we're writing back alive_accounts. That should be an atomic operation + // without use of rather wide locks in this whole function, because we're + // mutating rooted slots; There should be no writers to them. + let infos = self.store_accounts_to( + slot, + &accounts, + &hashes, + |_| shrunken_store.clone(), + write_versions.into_iter(), + ); + let reclaims = self.update_index(slot, infos, &accounts); + + self.handle_reclaims(&reclaims); + + let mut storage = self.storage.write().unwrap(); + if let Some(slot_storage) = storage.0.get_mut(&slot) { + slot_storage.retain(|_key, store| store.count() > 0); + } + } + } + + // Infinitely returns rooted roots in cyclic order + fn next_shrink_slot(&self) -> Option { + let next = { + let mut candidates = self.shrink_candidate_slots.lock().unwrap(); + candidates.pop() + }; + + if next.is_some() { + next + } else { + let mut new_all_slots = self.all_root_slots_in_index(); + let next = new_all_slots.pop(); + + let mut candidates = self.shrink_candidate_slots.lock().unwrap(); + *candidates = new_all_slots; + + next + } + } + + fn all_root_slots_in_index(&self) -> Vec { + let index = self.accounts_index.read().unwrap(); + index.roots.iter().cloned().collect() + } + + fn all_slots_in_storage(&self) -> Vec { + let storage = self.storage.read().unwrap(); + storage.0.keys().cloned().collect() + } + + pub fn process_stale_slot(&self) { + if let Some(slot) = self.next_shrink_slot() { + self.shrink_stale_slot(slot); + } + } + + pub fn shrink_all_stale_slots(&self) { + for slot in self.all_slots_in_storage() { + self.shrink_stale_slot(slot); + } + } + pub fn scan_accounts(&self, ancestors: &HashMap, scan_func: F) -> A where F: Fn(&mut A, Option<(&Pubkey, Account, Slot)>) -> (), @@ -1140,18 +1286,46 @@ impl AccountsDB { hasher.result() } + fn bulk_assign_write_version(&self, count: usize) -> u64 { + self.write_version + .fetch_add(count as u64, Ordering::Relaxed) + } + fn store_accounts( &self, slot: Slot, accounts: &[(&Pubkey, &Account)], hashes: &[Hash], ) -> Vec { - let default_account = Account::default(); + let mut current_version = self.bulk_assign_write_version(accounts.len()); + let write_version_producer = std::iter::from_fn(move || { + let ret = current_version; + current_version += 1; + Some(ret) + }); + let storage_finder = |slot| self.find_storage_candidate(slot); + self.store_accounts_to( + slot, + accounts, + hashes, + storage_finder, + write_version_producer, + ) + } + + fn store_accounts_to Arc, P: Iterator>( + &self, + slot: Slot, + accounts: &[(&Pubkey, &Account)], + hashes: &[Hash], + mut storage_finder: F, + mut write_version_producer: P, + ) -> Vec { + let default_account = Account::default(); let with_meta: Vec<(StoredMeta, &Account)> = accounts .iter() .map(|(pubkey, account)| { - let write_version = self.write_version.fetch_add(1, Ordering::Relaxed) as u64; let account = if account.lamports == 0 { &default_account } else { @@ -1160,7 +1334,7 @@ impl AccountsDB { let data_len = account.data.len() as u64; let meta = StoredMeta { - write_version, + write_version: write_version_producer.next().unwrap(), pubkey: **pubkey, data_len, }; @@ -1169,7 +1343,7 @@ impl AccountsDB { .collect(); let mut infos: Vec = Vec::with_capacity(with_meta.len()); while infos.len() < with_meta.len() { - let storage = self.find_storage_candidate(slot); + let storage = storage_finder(slot); let rvs = storage .accounts .append_accounts(&with_meta[infos.len()..], &hashes[infos.len()..]); @@ -2217,12 +2391,26 @@ pub mod tests { } impl AccountsDB { - fn store_count_for_slot(&self, slot: Slot) -> usize { + fn alive_account_count_in_store(&self, slot: Slot) -> usize { let storage = self.storage.read().unwrap(); let slot_storage = storage.0.get(&slot); if let Some(slot_storage) = slot_storage { - slot_storage.values().nth(0).unwrap().count() + slot_storage.values().map(|store| store.count()).sum() + } else { + 0 + } + } + + fn all_account_count_in_append_vec(&self, slot: Slot) -> usize { + let storage = self.storage.read().unwrap(); + + let slot_storage = storage.0.get(&slot); + if let Some(slot_storage) = slot_storage { + slot_storage + .values() + .map(|store| store.accounts.accounts(0).len()) + .sum() } else { 0 } @@ -2256,14 +2444,14 @@ pub mod tests { accounts.add_root(1); //even if rooted, old state isn't cleaned up - assert_eq!(accounts.store_count_for_slot(0), 1); - assert_eq!(accounts.store_count_for_slot(1), 1); + assert_eq!(accounts.alive_account_count_in_store(0), 1); + assert_eq!(accounts.alive_account_count_in_store(1), 1); accounts.clean_accounts(); //now old state is cleaned up - assert_eq!(accounts.store_count_for_slot(0), 0); - assert_eq!(accounts.store_count_for_slot(1), 1); + assert_eq!(accounts.alive_account_count_in_store(0), 0); + assert_eq!(accounts.alive_account_count_in_store(1), 1); } #[test] @@ -2286,14 +2474,14 @@ pub mod tests { accounts.add_root(1); //even if rooted, old state isn't cleaned up - assert_eq!(accounts.store_count_for_slot(0), 2); - assert_eq!(accounts.store_count_for_slot(1), 2); + assert_eq!(accounts.alive_account_count_in_store(0), 2); + assert_eq!(accounts.alive_account_count_in_store(1), 2); accounts.clean_accounts(); //still old state behind zero-lamport account isn't cleaned up - assert_eq!(accounts.store_count_for_slot(0), 1); - assert_eq!(accounts.store_count_for_slot(1), 2); + assert_eq!(accounts.alive_account_count_in_store(0), 1); + assert_eq!(accounts.alive_account_count_in_store(1), 2); } #[test] @@ -2317,16 +2505,16 @@ pub mod tests { accounts.add_root(2); //even if rooted, old state isn't cleaned up - assert_eq!(accounts.store_count_for_slot(0), 2); - assert_eq!(accounts.store_count_for_slot(1), 1); - assert_eq!(accounts.store_count_for_slot(2), 1); + assert_eq!(accounts.alive_account_count_in_store(0), 2); + assert_eq!(accounts.alive_account_count_in_store(1), 1); + assert_eq!(accounts.alive_account_count_in_store(2), 1); accounts.clean_accounts(); //both zero lamport and normal accounts are cleaned up - assert_eq!(accounts.store_count_for_slot(0), 0); - assert_eq!(accounts.store_count_for_slot(1), 0); - assert_eq!(accounts.store_count_for_slot(2), 1); + assert_eq!(accounts.alive_account_count_in_store(0), 0); + assert_eq!(accounts.alive_account_count_in_store(1), 0); + assert_eq!(accounts.alive_account_count_in_store(2), 1); } #[test] @@ -3429,11 +3617,11 @@ pub mod tests { // B: Test multiple updates to pubkey1 in a single slot/storage current_slot += 1; - assert_eq!(0, accounts.store_count_for_slot(current_slot)); + assert_eq!(0, accounts.alive_account_count_in_store(current_slot)); assert_eq!(1, accounts.ref_count_for_pubkey(&pubkey1)); accounts.store(current_slot, &[(&pubkey1, &account2)]); accounts.store(current_slot, &[(&pubkey1, &account2)]); - assert_eq!(1, accounts.store_count_for_slot(current_slot)); + assert_eq!(1, accounts.alive_account_count_in_store(current_slot)); assert_eq!(3, accounts.ref_count_for_pubkey(&pubkey1)); accounts.add_root(current_slot); @@ -3491,10 +3679,167 @@ pub mod tests { } #[test] - fn clean_dead_slots_empty() { + fn test_clean_dead_slots_empty() { let accounts = AccountsDB::new_single(); let mut dead_slots = HashSet::new(); dead_slots.insert(10); accounts.clean_dead_slots(&dead_slots); } + + #[test] + fn test_shrink_stale_slots_none() { + let accounts = AccountsDB::new_single(); + + for _ in 0..10 { + accounts.process_stale_slot(); + } + + accounts.shrink_all_stale_slots(); + } + + #[test] + fn test_shrink_next_slots() { + let accounts = AccountsDB::new_single(); + + let mut current_slot = 7; + + assert_eq!( + vec![None, None, None], + (0..3) + .map({ |_| accounts.next_shrink_slot() }) + .collect::>() + ); + + accounts.add_root(current_slot); + + assert_eq!( + vec![Some(7), Some(7), Some(7)], + (0..3) + .map({ |_| accounts.next_shrink_slot() }) + .collect::>() + ); + + current_slot += 1; + accounts.add_root(current_slot); + + let slots = (0..6) + .map({ |_| accounts.next_shrink_slot() }) + .collect::>(); + + // Because the origin of this data is HashMap (not BTreeMap), key order is arbitrary per cycle. + assert!( + vec![Some(7), Some(8), Some(7), Some(8), Some(7), Some(8)] == slots + || vec![Some(8), Some(7), Some(8), Some(7), Some(8), Some(7)] == slots + ); + } + + #[test] + fn test_shrink_stale_slots_processed() { + solana_logger::setup(); + + let accounts = AccountsDB::new_single(); + + let pubkey_count = 100; + let pubkeys: Vec<_> = (0..pubkey_count).map(|_| Pubkey::new_rand()).collect(); + + let some_lamport = 223; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + + let mut current_slot = 0; + + current_slot += 1; + for pubkey in &pubkeys { + accounts.store(current_slot, &[(&pubkey, &account)]); + } + let shrink_slot = current_slot; + accounts.add_root(current_slot); + + current_slot += 1; + let pubkey_count_after_shrink = 10; + let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink]; + + for pubkey in updated_pubkeys { + accounts.store(current_slot, &[(&pubkey, &account)]); + } + accounts.add_root(current_slot); + + accounts.clean_accounts(); + + assert_eq!( + pubkey_count, + accounts.all_account_count_in_append_vec(shrink_slot) + ); + accounts.shrink_all_stale_slots(); + assert_eq!( + pubkey_count_after_shrink, + accounts.all_account_count_in_append_vec(shrink_slot) + ); + + let no_ancestors = HashMap::default(); + accounts.update_accounts_hash(current_slot, &no_ancestors); + accounts + .verify_bank_hash(current_slot, &no_ancestors) + .unwrap(); + + let accounts = reconstruct_accounts_db_via_serialization(&accounts, current_slot); + accounts + .verify_bank_hash(current_slot, &no_ancestors) + .unwrap(); + + // repeating should be no-op + accounts.shrink_all_stale_slots(); + assert_eq!( + pubkey_count_after_shrink, + accounts.all_account_count_in_append_vec(shrink_slot) + ); + } + + #[test] + fn test_shrink_stale_slots_skipped() { + solana_logger::setup(); + + let accounts = AccountsDB::new_single(); + + let pubkey_count = 100; + let pubkeys: Vec<_> = (0..pubkey_count).map(|_| Pubkey::new_rand()).collect(); + + let some_lamport = 223; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + + let mut current_slot = 0; + + current_slot += 1; + for pubkey in &pubkeys { + accounts.store(current_slot, &[(&pubkey, &account)]); + } + let shrink_slot = current_slot; + accounts.add_root(current_slot); + + current_slot += 1; + let pubkey_count_after_shrink = 90; + let updated_pubkeys = &pubkeys[0..pubkey_count - pubkey_count_after_shrink]; + + for pubkey in updated_pubkeys { + accounts.store(current_slot, &[(&pubkey, &account)]); + } + accounts.add_root(current_slot); + + accounts.clean_accounts(); + + assert_eq!( + pubkey_count, + accounts.all_account_count_in_append_vec(shrink_slot) + ); + accounts.shrink_all_stale_slots(); + assert_eq!( + pubkey_count, + accounts.all_account_count_in_append_vec(shrink_slot) + ); + } } diff --git a/runtime/src/append_vec.rs b/runtime/src/append_vec.rs index f319cae758..832cea6ffe 100644 --- a/runtime/src/append_vec.rs +++ b/runtime/src/append_vec.rs @@ -160,7 +160,9 @@ impl AppendVec { data.seek(SeekFrom::Start(0)).unwrap(); data.flush().unwrap(); //UNSAFE: Required to create a Mmap - let map = unsafe { MmapMut::map_mut(&data).expect("failed to map the data file") }; + let map = unsafe { MmapMut::map_mut(&data) }; + let map = + map.unwrap_or_else(|e| panic!("failed to map the data file (size: {}): {}", size, e)); AppendVec { path: file.to_path_buf(), diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 604a006c0c..9faea2b264 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -1964,6 +1964,7 @@ impl Bank { /// calculation and could shield other real accounts. pub fn verify_snapshot_bank(&self) -> bool { self.clean_accounts(); + self.shrink_all_stale_slots(); // Order and short-circuiting is significant; verify_hash requires a valid bank hash self.verify_bank_hash() && self.verify_hash() } @@ -2205,9 +2206,17 @@ impl Bank { self.rc.accounts.accounts_db.clean_accounts(); } - pub fn clean_dead_slots(&self) { + pub fn process_dead_slots(&self) { self.rc.accounts.accounts_db.process_dead_slots(); } + + pub fn process_stale_slot(&self) { + self.rc.accounts.accounts_db.process_stale_slot(); + } + + pub fn shrink_all_stale_slots(&self) { + self.rc.accounts.accounts_db.shrink_all_stale_slots(); + } } impl Drop for Bank {