diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index ca2463f26..66e9c240a 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -27,10 +27,10 @@ pub struct SnapshotConfig { } pub struct BankForks { - banks: HashMap>, + pub banks: HashMap>, working_bank: Arc, root: u64, - snapshot_config: Option, + pub snapshot_config: Option, last_snapshot_slot: u64, } @@ -208,7 +208,7 @@ impl BankForks { self.root } - fn purge_old_snapshots(&self) { + pub fn purge_old_snapshots(&self) { // Remove outdated snapshots let config = self.snapshot_config.as_ref().unwrap(); let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); @@ -221,7 +221,7 @@ impl BankForks { } } - fn generate_snapshot>( + pub fn generate_snapshot>( &self, root: u64, slots_to_snapshot: &[u64], @@ -281,23 +281,8 @@ impl BankForks { mod tests { use super::*; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; - use crate::service::Service; - use crate::snapshot_packager_service::SnapshotPackagerService; - use bincode::serialize_into; - use fs_extra::dir::CopyOptions; - use itertools::Itertools; - use solana_runtime::status_cache::SlotDelta; - use solana_sdk::hash::{hashv, Hash}; + use solana_sdk::hash::Hash; use solana_sdk::pubkey::Pubkey; - use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_sdk::system_transaction; - use solana_sdk::transaction::Result as TransactionResult; - use std::fs; - use std::fs::File; - use std::io::{BufWriter, Write}; - use std::sync::atomic::AtomicBool; - use std::sync::mpsc::channel; - use tempfile::TempDir; #[test] fn test_bank_forks() { @@ -366,379 +351,4 @@ mod tests { bank_forks.insert(child_bank); assert_eq!(bank_forks.active_banks(), vec![1]); } - - fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: String) { - let (snapshot_path, snapshot_package_output_path) = old_bank_forks - .snapshot_config - .as_ref() - .map(|c| (&c.snapshot_path, &c.snapshot_package_output_path)) - .unwrap(); - - let deserialized_bank = snapshot_utils::bank_from_archive( - account_paths, - &old_bank_forks - .snapshot_config - .as_ref() - .unwrap() - .snapshot_path, - snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path), - ) - .unwrap(); - - let bank = old_bank_forks - .banks - .get(&deserialized_bank.slot()) - .unwrap() - .clone(); - bank.compare_bank(&deserialized_bank); - - let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_path); - - for p in slot_snapshot_paths { - snapshot_utils::remove_snapshot(p.slot, &snapshot_path).unwrap(); - } - } - - // creates banks up to "last_slot" and runs the input function `f` on each bank created - // also marks each bank as root and generates snapshots - // finally tries to restore from the last bank's snapshot and compares the restored bank to the - // `last_slot` bank - fn run_bank_forks_snapshot_n(last_slot: u64, f: F, set_root_interval: u64) - where - F: Fn(&mut Bank, &Keypair), - { - solana_logger::setup(); - // Set up snapshotting config - let mut snapshot_test_config = setup_snapshot_test(1); - - let bank_forks = &mut snapshot_test_config.bank_forks; - let accounts_dir = &snapshot_test_config.accounts_dir; - let snapshot_config = &snapshot_test_config.snapshot_config; - let mint_keypair = &snapshot_test_config.genesis_block_info.mint_keypair; - - let (s, _r) = channel(); - let sender = Some(s); - for slot in 0..last_slot { - let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1); - f(&mut bank, &mint_keypair); - let bank = bank_forks.insert(bank); - // Set root to make sure we don't end up with too many account storage entries - // and to allow snapshotting of bank and the purging logic on status_cache to - // kick in - if slot % set_root_interval == 0 || slot == last_slot - 1 { - bank_forks.set_root(bank.slot(), &sender); - } - } - // Generate a snapshot package for last bank - let last_bank = bank_forks.get(last_slot).unwrap(); - let slot_snapshot_paths = - snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); - let snapshot_package = snapshot_utils::package_snapshot( - last_bank, - slot_snapshot_paths - .last() - .expect("no snapshots found in path"), - snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path), - &snapshot_config.snapshot_path, - &last_bank.src.roots(), - ) - .unwrap(); - SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); - - restore_from_snapshot( - bank_forks, - accounts_dir.path().to_str().unwrap().to_string(), - ); - } - - #[test] - fn test_bank_forks_snapshot_n() { - // create banks upto slot 4 and create 1 new account in each bank. test that bank 4 snapshots - // and restores correctly - run_bank_forks_snapshot_n( - 4, - |bank, mint_keypair| { - let key1 = Keypair::new().pubkey(); - let tx = system_transaction::create_user_account( - &mint_keypair, - &key1, - 1, - bank.last_blockhash(), - ); - assert_eq!(bank.process_transaction(&tx), Ok(())); - bank.freeze(); - }, - 1, - ); - } - - fn goto_end_of_slot(bank: &mut Bank) { - let mut tick_hash = bank.last_blockhash(); - loop { - tick_hash = hashv(&[&tick_hash.as_ref(), &[42]]); - bank.register_tick(&tick_hash); - if tick_hash == bank.last_blockhash() { - bank.freeze(); - return; - } - } - } - - #[test] - fn test_bank_forks_status_cache_snapshot_n() { - // create banks upto slot (MAX_CACHE_ENTRIES * 2) + 1 while transferring 1 lamport into 2 different accounts each time - // this is done to ensure the AccountStorageEntries keep getting cleaned up as the root moves - // ahead. Also tests the status_cache purge and status cache snapshotting. - // Makes sure that the last bank is restored correctly - let key1 = Keypair::new().pubkey(); - let key2 = Keypair::new().pubkey(); - for set_root_interval in &[1, 4] { - run_bank_forks_snapshot_n( - (MAX_CACHE_ENTRIES * 2 + 1) as u64, - |bank, mint_keypair| { - let tx = system_transaction::transfer( - &mint_keypair, - &key1, - 1, - bank.parent().unwrap().last_blockhash(), - ); - assert_eq!(bank.process_transaction(&tx), Ok(())); - let tx = system_transaction::transfer( - &mint_keypair, - &key2, - 1, - bank.parent().unwrap().last_blockhash(), - ); - assert_eq!(bank.process_transaction(&tx), Ok(())); - goto_end_of_slot(bank); - }, - *set_root_interval, - ); - } - } - - #[test] - fn test_concurrent_snapshot_packaging() { - solana_logger::setup(); - - // Set up snapshotting config - let mut snapshot_test_config = setup_snapshot_test(1); - - let bank_forks = &mut snapshot_test_config.bank_forks; - let accounts_dir = &snapshot_test_config.accounts_dir; - let snapshots_dir = &snapshot_test_config.snapshot_dir; - let snapshot_config = &snapshot_test_config.snapshot_config; - let mint_keypair = &snapshot_test_config.genesis_block_info.mint_keypair; - let genesis_block = &snapshot_test_config.genesis_block_info.genesis_block; - - // Take snapshot of zeroth bank - let bank0 = bank_forks.get(0).unwrap(); - snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); - - // Set up snapshotting channels - let (sender, receiver) = channel(); - let (fake_sender, _fake_receiver) = channel(); - - // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted - // and the snapshot purging logic will run on every snapshot taken. This means the three - // (including snapshot for bank0 created above) earliest snapshots will get purged by the - // time this loop is done. - - // Also, make a saved copy of the state of the snapshot for a bank with - // bank.slot == saved_slot, so we can use it for a correctness check later. - let saved_snapshots_dir = TempDir::new().unwrap(); - let saved_accounts_dir = TempDir::new().unwrap(); - let saved_slot = 4; - let saved_tar = snapshot_config - .snapshot_package_output_path - .join(saved_slot.to_string()); - for forks in 0..MAX_CACHE_ENTRIES + 2 { - let bank = Bank::new_from_parent( - &bank_forks[forks as u64], - &Pubkey::default(), - (forks + 1) as u64, - ); - let slot = bank.slot(); - let key1 = Keypair::new().pubkey(); - let tx = system_transaction::create_user_account( - &mint_keypair, - &key1, - 1, - genesis_block.hash(), - ); - assert_eq!(bank.process_transaction(&tx), Ok(())); - bank.freeze(); - bank_forks.insert(bank); - - let package_sender = { - if slot == saved_slot as u64 { - // Only send one package on the real sender so that the packaging service - // doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES - // later - &sender - } else { - &fake_sender - } - }; - - bank_forks - .generate_snapshot( - slot, - &vec![], - &package_sender, - snapshot_config - .snapshot_package_output_path - .join(slot.to_string()), - ) - .unwrap(); - - if slot == saved_slot as u64 { - let options = CopyOptions::new(); - fs_extra::dir::copy(accounts_dir, &saved_accounts_dir, &options).unwrap(); - let snapshot_paths: Vec<_> = fs::read_dir(&snapshot_config.snapshot_path) - .unwrap() - .filter_map(|entry| { - let e = entry.unwrap(); - let file_path = e.path(); - let file_name = file_path.file_name().unwrap(); - file_name - .to_str() - .map(|s| s.parse::().ok().map(|_| file_path.clone())) - .unwrap_or(None) - }) - .sorted() - .collect(); - // only save off the snapshot of this slot, we don't need the others. - fs_extra::dir::copy( - &snapshot_paths.last().unwrap(), - &saved_snapshots_dir, - &options, - ) - .unwrap(); - } - } - - // Purge all the outdated snapshots, including the ones needed to generate the package - // currently sitting in the channel - bank_forks.purge_old_snapshots(); - let mut snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshots_dir); - snapshot_paths.sort(); - assert_eq!( - snapshot_paths.iter().map(|path| path.slot).collect_vec(), - (3..=MAX_CACHE_ENTRIES as u64 + 2).collect_vec() - ); - - // Create a SnapshotPackagerService to create tarballs from all the pending - // SnapshotPackage's on the channel. By the time this service starts, we have already - // purged the first two snapshots, which are needed by every snapshot other than - // the last two snapshots. However, the packaging service should still be able to - // correctly construct the earlier snapshots because the SnapshotPackage's on the - // channel hold hard links to these deleted snapshots. We verify this is the case below. - let exit = Arc::new(AtomicBool::new(false)); - let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit); - - // Close the channel so that the package service will exit after reading all the - // packages off the channel - drop(sender); - - // Wait for service to finish - snapshot_packager_service - .join() - .expect("SnapshotPackagerService exited with error"); - - // Check the tar we cached the state for earlier was generated correctly - - // before we compare, stick an empty status_cache in this dir so that the package comparision works - // This is needed since the status_cache is added by the packager and is not collected from - // the source dir for snapshots - let slot_deltas: Vec>> = vec![]; - let dummy_status_cache = - File::create(saved_snapshots_dir.path().join("status_cache")).unwrap(); - let mut status_cache_stream = BufWriter::new(dummy_status_cache); - serialize_into(&mut status_cache_stream, &slot_deltas).unwrap(); - status_cache_stream.flush().unwrap(); - - snapshot_utils::tests::verify_snapshot_tar( - saved_tar, - saved_snapshots_dir.path(), - saved_accounts_dir - .path() - .join(accounts_dir.path().file_name().unwrap()), - ); - } - - #[test] - fn test_slots_to_snapshot() { - solana_logger::setup(); - for add_root_interval in 1..10 { - let (snapshot_sender, _snapshot_receiver) = channel(); - let num_set_roots = MAX_CACHE_ENTRIES * 5; - // Make sure this test never clears bank.slots_since_snapshot - let mut snapshot_test_config = - setup_snapshot_test(add_root_interval * num_set_roots * 2); - let mut current_bank = snapshot_test_config.bank_forks[0].clone(); - let snapshot_sender = Some(snapshot_sender); - for _ in 0..num_set_roots { - for _ in 0..add_root_interval { - let new_slot = current_bank.slot() + 1; - let new_bank = - Bank::new_from_parent(¤t_bank, &Pubkey::default(), new_slot); - snapshot_test_config.bank_forks.insert(new_bank); - current_bank = snapshot_test_config.bank_forks[new_slot].clone(); - } - snapshot_test_config - .bank_forks - .set_root(current_bank.slot(), &snapshot_sender); - } - - let num_old_slots = num_set_roots * add_root_interval - MAX_CACHE_ENTRIES + 1; - let expected_slots_to_snapshot = (num_old_slots as u64 - ..=num_set_roots as u64 * add_root_interval as u64) - .collect_vec(); - - let rooted_bank = snapshot_test_config - .bank_forks - .get(snapshot_test_config.bank_forks.root()) - .unwrap(); - let slots_to_snapshot = rooted_bank.src.roots(); - assert_eq!(slots_to_snapshot, expected_slots_to_snapshot); - } - } - - struct SnapshotTestConfig { - accounts_dir: TempDir, - snapshot_dir: TempDir, - _snapshot_output_path: TempDir, - snapshot_config: SnapshotConfig, - bank_forks: BankForks, - genesis_block_info: GenesisBlockInfo, - } - - fn setup_snapshot_test(snapshot_interval_slots: usize) -> SnapshotTestConfig { - let accounts_dir = TempDir::new().unwrap(); - let snapshot_dir = TempDir::new().unwrap(); - let snapshot_output_path = TempDir::new().unwrap(); - let genesis_block_info = create_genesis_block(10_000); - let bank0 = Bank::new_with_paths( - &genesis_block_info.genesis_block, - Some(accounts_dir.path().to_str().unwrap().to_string()), - ); - bank0.freeze(); - let mut bank_forks = BankForks::new(0, bank0); - - let snapshot_config = SnapshotConfig { - snapshot_interval_slots, - snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), - snapshot_path: PathBuf::from(snapshot_dir.path()), - }; - bank_forks.set_snapshot_config(snapshot_config.clone()); - SnapshotTestConfig { - accounts_dir, - snapshot_dir, - _snapshot_output_path: snapshot_output_path, - snapshot_config, - bank_forks, - genesis_block_info, - } - } } diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index 1374b36d5..c3eb26048 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -255,6 +255,6 @@ mod tests { status_cache_stream.flush().unwrap(); // Check tarball is correct - snapshot_utils::tests::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); + snapshot_utils::verify_snapshot_tar(output_tar_path, snapshots_dir, accounts_dir); } } diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index d499f00c2..ca9d0e00b 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -306,30 +306,21 @@ fn get_io_error(error: &str) -> SnapshotError { SnapshotError::IO(IOError::new(ErrorKind::Other, error)) } -#[cfg(test)] -pub mod tests { - use super::*; - use tempfile::TempDir; +pub fn verify_snapshot_tar(snapshot_tar: P, snapshots_to_verify: Q, storages_to_verify: R) +where + P: AsRef, + Q: AsRef, + R: AsRef, +{ + let temp_dir = tempfile::TempDir::new().unwrap(); + let unpack_dir = temp_dir.path(); + untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap(); - pub fn verify_snapshot_tar( - snapshot_tar: P, - snapshots_to_verify: Q, - storages_to_verify: R, - ) where - P: AsRef, - Q: AsRef, - R: AsRef, - { - let temp_dir = TempDir::new().unwrap(); - let unpack_dir = temp_dir.path(); - untar_snapshot_in(snapshot_tar, &unpack_dir).unwrap(); + // Check snapshots are the same + let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR); + assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap()); - // Check snapshots are the same - let unpacked_snapshots = unpack_dir.join(&TAR_SNAPSHOTS_DIR); - assert!(!dir_diff::is_different(&snapshots_to_verify, unpacked_snapshots).unwrap()); - - // Check the account entries are the same - let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR); - assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); - } + // Check the account entries are the same + let unpacked_accounts = unpack_dir.join(&TAR_ACCOUNTS_DIR); + assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); } diff --git a/core/src/storage_stage.rs b/core/src/storage_stage.rs index b00093d52..26721777f 100644 --- a/core/src/storage_stage.rs +++ b/core/src/storage_stage.rs @@ -5,6 +5,7 @@ use crate::bank_forks::BankForks; use crate::chacha_cuda::chacha_cbc_encrypt_file_many_keys; use crate::cluster_info::ClusterInfo; +use crate::contact_info::ContactInfo; use crate::result::{Error, Result}; use crate::service::Service; use rand::{Rng, SeedableRng}; @@ -43,7 +44,7 @@ type ReplicatorMap = Vec>>; #[derive(Default)] pub struct StorageStateInner { storage_results: StorageResults, - storage_keys: StorageKeys, + pub storage_keys: StorageKeys, replicator_map: ReplicatorMap, storage_blockhash: Hash, slot: u64, @@ -61,7 +62,7 @@ struct StorageSlots { #[derive(Clone, Default)] pub struct StorageState { - state: Arc>, + pub state: Arc>, } pub struct StorageStage { @@ -626,30 +627,25 @@ impl Service for StorageStage { } } +pub fn test_cluster_info(id: &Pubkey) -> Arc> { + let contact_info = ContactInfo::new_localhost(id, 0); + let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); + Arc::new(RwLock::new(cluster_info)) +} + #[cfg(test)] mod tests { use super::*; - use crate::blocktree_processor; - use crate::cluster_info::ClusterInfo; - use crate::contact_info::ContactInfo; use crate::genesis_utils::{create_genesis_block, GenesisBlockInfo}; use crate::service::Service; use rayon::prelude::*; - use solana_ledger::blocktree::{create_new_tmp_ledger, Blocktree}; - use solana_ledger::entry; use solana_runtime::bank::Bank; - use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; - use solana_sdk::hash::{Hash, Hasher}; - use solana_sdk::pubkey::Pubkey; + use solana_sdk::hash::Hasher; use solana_sdk::signature::{Keypair, KeypairUtil}; - use solana_storage_api::storage_instruction::StorageAccountType; use std::cmp::{max, min}; - use std::fs::remove_dir_all; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::mpsc::channel; use std::sync::{Arc, RwLock}; - use std::thread::sleep; - use std::time::Duration; #[test] fn test_storage_stage_none_ledger() { @@ -684,224 +680,6 @@ mod tests { storage_stage.join().unwrap(); } - fn test_cluster_info(id: &Pubkey) -> Arc> { - let contact_info = ContactInfo::new_localhost(id, 0); - let cluster_info = ClusterInfo::new_with_invalid_keypair(contact_info); - Arc::new(RwLock::new(cluster_info)) - } - - #[test] - fn test_storage_stage_process_banks() { - solana_logger::setup(); - let keypair = Arc::new(Keypair::new()); - let storage_keypair = Arc::new(Keypair::new()); - let exit = Arc::new(AtomicBool::new(false)); - - let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); - let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - - let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); - let slot = 1; - let bank = Arc::new(Bank::new(&genesis_block)); - let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( - &[bank.clone()], - vec![0], - ))); - - let cluster_info = test_cluster_info(&keypair.pubkey()); - let (bank_sender, bank_receiver) = channel(); - let storage_state = StorageState::new( - &bank.last_blockhash(), - SLOTS_PER_TURN_TEST, - bank.slots_per_segment(), - ); - let storage_stage = StorageStage::new( - &storage_state, - bank_receiver, - Some(blocktree.clone()), - &keypair, - &storage_keypair, - &exit.clone(), - &bank_forks, - &cluster_info, - ); - bank_sender.send(vec![bank.clone()]).unwrap(); - - let keypair = Keypair::new(); - let hash = Hash::default(); - let signature = keypair.sign_message(&hash.as_ref()); - - let mut result = storage_state.get_mining_result(&signature); - - assert_eq!(result, Hash::default()); - - let mut last_bank = bank; - let rooted_banks = (slot..slot + last_bank.slots_per_segment() + 1) - .map(|i| { - let bank = Arc::new(Bank::new_from_parent(&last_bank, &keypair.pubkey(), i)); - blocktree_processor::process_entries( - &bank, - &entry::create_ticks(64, bank.last_blockhash()), - true, - ) - .expect("failed process entries"); - last_bank = bank; - last_bank.clone() - }) - .collect::>(); - bank_sender.send(rooted_banks).unwrap(); - - if solana_ledger::perf_libs::api().is_some() { - for _ in 0..5 { - result = storage_state.get_mining_result(&signature); - if result != Hash::default() { - info!("found result = {:?} sleeping..", result); - break; - } - info!("result = {:?} sleeping..", result); - sleep(Duration::new(1, 0)); - } - } - - info!("joining..?"); - exit.store(true, Ordering::Relaxed); - storage_stage.join().unwrap(); - - if solana_ledger::perf_libs::api().is_some() { - assert_ne!(result, Hash::default()); - } else { - assert_eq!(result, Hash::default()); - } - - remove_dir_all(ledger_path).unwrap(); - } - - #[test] - fn test_storage_stage_process_account_proofs() { - solana_logger::setup(); - let keypair = Arc::new(Keypair::new()); - let storage_keypair = Arc::new(Keypair::new()); - let replicator_keypair = Arc::new(Keypair::new()); - let exit = Arc::new(AtomicBool::new(false)); - - let GenesisBlockInfo { - mut genesis_block, - mint_keypair, - .. - } = create_genesis_block(1000); - genesis_block - .native_instruction_processors - .push(solana_storage_program::solana_storage_program!()); - let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); - - let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); - - let bank = Bank::new(&genesis_block); - let bank = Arc::new(bank); - let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( - &[bank.clone()], - vec![0], - ))); - let cluster_info = test_cluster_info(&keypair.pubkey()); - - let (bank_sender, bank_receiver) = channel(); - let storage_state = StorageState::new( - &bank.last_blockhash(), - SLOTS_PER_TURN_TEST, - bank.slots_per_segment(), - ); - let storage_stage = StorageStage::new( - &storage_state, - bank_receiver, - Some(blocktree.clone()), - &keypair, - &storage_keypair, - &exit.clone(), - &bank_forks, - &cluster_info, - ); - bank_sender.send(vec![bank.clone()]).unwrap(); - - // create accounts - let bank = Arc::new(Bank::new_from_parent(&bank, &keypair.pubkey(), 1)); - let account_ix = storage_instruction::create_storage_account( - &mint_keypair.pubkey(), - &Pubkey::new_rand(), - &replicator_keypair.pubkey(), - 1, - StorageAccountType::Replicator, - ); - let account_tx = Transaction::new_signed_instructions( - &[&mint_keypair], - account_ix, - bank.last_blockhash(), - ); - bank.process_transaction(&account_tx).expect("create"); - - bank_sender.send(vec![bank.clone()]).unwrap(); - - let mut reference_keys; - { - let keys = &storage_state.state.read().unwrap().storage_keys; - reference_keys = vec![0; keys.len()]; - reference_keys.copy_from_slice(keys); - } - - let keypair = Keypair::new(); - - let mining_proof_ix = storage_instruction::mining_proof( - &replicator_keypair.pubkey(), - Hash::default(), - 0, - keypair.sign_message(b"test"), - bank.last_blockhash(), - ); - - let next_bank = Arc::new(Bank::new_from_parent(&bank, &keypair.pubkey(), 2)); - //register ticks so the program reports a different segment - blocktree_processor::process_entries( - &next_bank, - &entry::create_ticks( - DEFAULT_TICKS_PER_SLOT * next_bank.slots_per_segment() + 1, - bank.last_blockhash(), - ), - true, - ) - .unwrap(); - let message = Message::new_with_payer(vec![mining_proof_ix], Some(&mint_keypair.pubkey())); - let mining_proof_tx = Transaction::new( - &[&mint_keypair, replicator_keypair.as_ref()], - message, - next_bank.last_blockhash(), - ); - next_bank - .process_transaction(&mining_proof_tx) - .expect("process txs"); - bank_sender.send(vec![next_bank]).unwrap(); - - for _ in 0..5 { - { - let keys = &storage_state.state.read().unwrap().storage_keys; - if keys[..] != *reference_keys.as_slice() { - break; - } - } - - sleep(Duration::new(1, 0)); - } - - debug!("joining..?"); - exit.store(true, Ordering::Relaxed); - storage_stage.join().unwrap(); - - { - let keys = &storage_state.state.read().unwrap().storage_keys; - assert_ne!(keys[..], *reference_keys); - } - - remove_dir_all(ledger_path).unwrap(); - } - #[test] fn test_signature_distribution() { // See that signatures have an even-ish distribution.. diff --git a/core/tests/bank_forks.rs b/core/tests/bank_forks.rs new file mode 100644 index 000000000..0f0495dde --- /dev/null +++ b/core/tests/bank_forks.rs @@ -0,0 +1,404 @@ +// Long-running bank_forks tests + +#[cfg(test)] +mod tests { + use bincode::serialize_into; + use fs_extra::dir::CopyOptions; + use itertools::Itertools; + use solana_core::bank_forks::{BankForks, SnapshotConfig}; + use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; + use solana_core::service::Service; + use solana_core::snapshot_packager_service::SnapshotPackagerService; + use solana_core::snapshot_utils; + use solana_runtime::bank::Bank; + use solana_runtime::status_cache::SlotDelta; + use solana_runtime::status_cache::MAX_CACHE_ENTRIES; + use solana_sdk::hash::hashv; + use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::system_transaction; + use solana_sdk::transaction::Result as TransactionResult; + use std::fs; + use std::fs::File; + use std::io::{BufWriter, Write}; + use std::path::PathBuf; + use std::sync::atomic::AtomicBool; + use std::sync::mpsc::channel; + use std::sync::Arc; + use tempfile::TempDir; + + struct SnapshotTestConfig { + accounts_dir: TempDir, + snapshot_dir: TempDir, + _snapshot_output_path: TempDir, + snapshot_config: SnapshotConfig, + bank_forks: BankForks, + genesis_block_info: GenesisBlockInfo, + } + + fn setup_snapshot_test(snapshot_interval_slots: usize) -> SnapshotTestConfig { + let accounts_dir = TempDir::new().unwrap(); + let snapshot_dir = TempDir::new().unwrap(); + let snapshot_output_path = TempDir::new().unwrap(); + let genesis_block_info = create_genesis_block(10_000); + let bank0 = Bank::new_with_paths( + &genesis_block_info.genesis_block, + Some(accounts_dir.path().to_str().unwrap().to_string()), + ); + bank0.freeze(); + let mut bank_forks = BankForks::new(0, bank0); + + let snapshot_config = SnapshotConfig { + snapshot_interval_slots, + snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), + snapshot_path: PathBuf::from(snapshot_dir.path()), + }; + bank_forks.set_snapshot_config(snapshot_config.clone()); + SnapshotTestConfig { + accounts_dir, + snapshot_dir, + _snapshot_output_path: snapshot_output_path, + snapshot_config, + bank_forks, + genesis_block_info, + } + } + + fn restore_from_snapshot(old_bank_forks: &BankForks, account_paths: String) { + let (snapshot_path, snapshot_package_output_path) = old_bank_forks + .snapshot_config + .as_ref() + .map(|c| (&c.snapshot_path, &c.snapshot_package_output_path)) + .unwrap(); + + let deserialized_bank = snapshot_utils::bank_from_archive( + account_paths, + &old_bank_forks + .snapshot_config + .as_ref() + .unwrap() + .snapshot_path, + snapshot_utils::get_snapshot_tar_path(snapshot_package_output_path), + ) + .unwrap(); + + let bank = old_bank_forks + .banks + .get(&deserialized_bank.slot()) + .unwrap() + .clone(); + bank.compare_bank(&deserialized_bank); + + let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_path); + + for p in slot_snapshot_paths { + snapshot_utils::remove_snapshot(p.slot, &snapshot_path).unwrap(); + } + } + + // creates banks up to "last_slot" and runs the input function `f` on each bank created + // also marks each bank as root and generates snapshots + // finally tries to restore from the last bank's snapshot and compares the restored bank to the + // `last_slot` bank + fn run_bank_forks_snapshot_n(last_slot: u64, f: F, set_root_interval: u64) + where + F: Fn(&mut Bank, &Keypair), + { + solana_logger::setup(); + // Set up snapshotting config + let mut snapshot_test_config = setup_snapshot_test(1); + + let bank_forks = &mut snapshot_test_config.bank_forks; + let accounts_dir = &snapshot_test_config.accounts_dir; + let snapshot_config = &snapshot_test_config.snapshot_config; + let mint_keypair = &snapshot_test_config.genesis_block_info.mint_keypair; + + let (s, _r) = channel(); + let sender = Some(s); + for slot in 0..last_slot { + let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1); + f(&mut bank, &mint_keypair); + let bank = bank_forks.insert(bank); + // Set root to make sure we don't end up with too many account storage entries + // and to allow snapshotting of bank and the purging logic on status_cache to + // kick in + if slot % set_root_interval == 0 || slot == last_slot - 1 { + bank_forks.set_root(bank.slot(), &sender); + } + } + // Generate a snapshot package for last bank + let last_bank = bank_forks.get(last_slot).unwrap(); + let slot_snapshot_paths = + snapshot_utils::get_snapshot_paths(&snapshot_config.snapshot_path); + let snapshot_package = snapshot_utils::package_snapshot( + last_bank, + slot_snapshot_paths + .last() + .expect("no snapshots found in path"), + snapshot_utils::get_snapshot_tar_path(&snapshot_config.snapshot_package_output_path), + &snapshot_config.snapshot_path, + &last_bank.src.roots(), + ) + .unwrap(); + SnapshotPackagerService::package_snapshots(&snapshot_package).unwrap(); + + restore_from_snapshot( + bank_forks, + accounts_dir.path().to_str().unwrap().to_string(), + ); + } + + #[test] + fn test_bank_forks_snapshot_n() { + // create banks upto slot 4 and create 1 new account in each bank. test that bank 4 snapshots + // and restores correctly + run_bank_forks_snapshot_n( + 4, + |bank, mint_keypair| { + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + bank.last_blockhash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + }, + 1, + ); + } + + fn goto_end_of_slot(bank: &mut Bank) { + let mut tick_hash = bank.last_blockhash(); + loop { + tick_hash = hashv(&[&tick_hash.as_ref(), &[42]]); + bank.register_tick(&tick_hash); + if tick_hash == bank.last_blockhash() { + bank.freeze(); + return; + } + } + } + + #[test] + fn test_concurrent_snapshot_packaging() { + solana_logger::setup(); + + // Set up snapshotting config + let mut snapshot_test_config = setup_snapshot_test(1); + + let bank_forks = &mut snapshot_test_config.bank_forks; + let accounts_dir = &snapshot_test_config.accounts_dir; + let snapshots_dir = &snapshot_test_config.snapshot_dir; + let snapshot_config = &snapshot_test_config.snapshot_config; + let mint_keypair = &snapshot_test_config.genesis_block_info.mint_keypair; + let genesis_block = &snapshot_test_config.genesis_block_info.genesis_block; + + // Take snapshot of zeroth bank + let bank0 = bank_forks.get(0).unwrap(); + snapshot_utils::add_snapshot(&snapshot_config.snapshot_path, bank0).unwrap(); + + // Set up snapshotting channels + let (sender, receiver) = channel(); + let (fake_sender, _fake_receiver) = channel(); + + // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted + // and the snapshot purging logic will run on every snapshot taken. This means the three + // (including snapshot for bank0 created above) earliest snapshots will get purged by the + // time this loop is done. + + // Also, make a saved copy of the state of the snapshot for a bank with + // bank.slot == saved_slot, so we can use it for a correctness check later. + let saved_snapshots_dir = TempDir::new().unwrap(); + let saved_accounts_dir = TempDir::new().unwrap(); + let saved_slot = 4; + let saved_tar = snapshot_config + .snapshot_package_output_path + .join(saved_slot.to_string()); + for forks in 0..MAX_CACHE_ENTRIES + 2 { + let bank = Bank::new_from_parent( + &bank_forks[forks as u64], + &Pubkey::default(), + (forks + 1) as u64, + ); + let slot = bank.slot(); + let key1 = Keypair::new().pubkey(); + let tx = system_transaction::create_user_account( + &mint_keypair, + &key1, + 1, + genesis_block.hash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + bank.freeze(); + bank_forks.insert(bank); + + let package_sender = { + if slot == saved_slot as u64 { + // Only send one package on the real sender so that the packaging service + // doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES + // later + &sender + } else { + &fake_sender + } + }; + + bank_forks + .generate_snapshot( + slot, + &vec![], + &package_sender, + snapshot_config + .snapshot_package_output_path + .join(slot.to_string()), + ) + .unwrap(); + + if slot == saved_slot as u64 { + let options = CopyOptions::new(); + fs_extra::dir::copy(accounts_dir, &saved_accounts_dir, &options).unwrap(); + let snapshot_paths: Vec<_> = fs::read_dir(&snapshot_config.snapshot_path) + .unwrap() + .filter_map(|entry| { + let e = entry.unwrap(); + let file_path = e.path(); + let file_name = file_path.file_name().unwrap(); + file_name + .to_str() + .map(|s| s.parse::().ok().map(|_| file_path.clone())) + .unwrap_or(None) + }) + .sorted() + .collect(); + // only save off the snapshot of this slot, we don't need the others. + fs_extra::dir::copy( + &snapshot_paths.last().unwrap(), + &saved_snapshots_dir, + &options, + ) + .unwrap(); + } + } + + // Purge all the outdated snapshots, including the ones needed to generate the package + // currently sitting in the channel + bank_forks.purge_old_snapshots(); + let mut snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshots_dir); + snapshot_paths.sort(); + assert_eq!( + snapshot_paths.iter().map(|path| path.slot).collect_vec(), + (3..=MAX_CACHE_ENTRIES as u64 + 2).collect_vec() + ); + + // Create a SnapshotPackagerService to create tarballs from all the pending + // SnapshotPackage's on the channel. By the time this service starts, we have already + // purged the first two snapshots, which are needed by every snapshot other than + // the last two snapshots. However, the packaging service should still be able to + // correctly construct the earlier snapshots because the SnapshotPackage's on the + // channel hold hard links to these deleted snapshots. We verify this is the case below. + let exit = Arc::new(AtomicBool::new(false)); + let snapshot_packager_service = SnapshotPackagerService::new(receiver, &exit); + + // Close the channel so that the package service will exit after reading all the + // packages off the channel + drop(sender); + + // Wait for service to finish + snapshot_packager_service + .join() + .expect("SnapshotPackagerService exited with error"); + + // Check the tar we cached the state for earlier was generated correctly + + // before we compare, stick an empty status_cache in this dir so that the package comparision works + // This is needed since the status_cache is added by the packager and is not collected from + // the source dir for snapshots + let slot_deltas: Vec>> = vec![]; + let dummy_status_cache = + File::create(saved_snapshots_dir.path().join("status_cache")).unwrap(); + let mut status_cache_stream = BufWriter::new(dummy_status_cache); + serialize_into(&mut status_cache_stream, &slot_deltas).unwrap(); + status_cache_stream.flush().unwrap(); + + snapshot_utils::verify_snapshot_tar( + saved_tar, + saved_snapshots_dir.path(), + saved_accounts_dir + .path() + .join(accounts_dir.path().file_name().unwrap()), + ); + } + + #[test] + fn test_slots_to_snapshot() { + solana_logger::setup(); + for add_root_interval in 1..10 { + let (snapshot_sender, _snapshot_receiver) = channel(); + let num_set_roots = MAX_CACHE_ENTRIES * 5; + // Make sure this test never clears bank.slots_since_snapshot + let mut snapshot_test_config = + setup_snapshot_test(add_root_interval * num_set_roots * 2); + let mut current_bank = snapshot_test_config.bank_forks[0].clone(); + let snapshot_sender = Some(snapshot_sender); + for _ in 0..num_set_roots { + for _ in 0..add_root_interval { + let new_slot = current_bank.slot() + 1; + let new_bank = + Bank::new_from_parent(¤t_bank, &Pubkey::default(), new_slot); + snapshot_test_config.bank_forks.insert(new_bank); + current_bank = snapshot_test_config.bank_forks[new_slot].clone(); + } + snapshot_test_config + .bank_forks + .set_root(current_bank.slot(), &snapshot_sender); + } + + let num_old_slots = num_set_roots * add_root_interval - MAX_CACHE_ENTRIES + 1; + let expected_slots_to_snapshot = (num_old_slots as u64 + ..=num_set_roots as u64 * add_root_interval as u64) + .collect_vec(); + + let rooted_bank = snapshot_test_config + .bank_forks + .get(snapshot_test_config.bank_forks.root()) + .unwrap(); + let slots_to_snapshot = rooted_bank.src.roots(); + assert_eq!(slots_to_snapshot, expected_slots_to_snapshot); + } + } + + #[test] + fn test_bank_forks_status_cache_snapshot_n() { + // create banks upto slot (MAX_CACHE_ENTRIES * 2) + 1 while transferring 1 lamport into 2 different accounts each time + // this is done to ensure the AccountStorageEntries keep getting cleaned up as the root moves + // ahead. Also tests the status_cache purge and status cache snapshotting. + // Makes sure that the last bank is restored correctly + let key1 = Keypair::new().pubkey(); + let key2 = Keypair::new().pubkey(); + for set_root_interval in &[1, 4] { + run_bank_forks_snapshot_n( + (MAX_CACHE_ENTRIES * 2 + 1) as u64, + |bank, mint_keypair| { + let tx = system_transaction::transfer( + &mint_keypair, + &key1, + 1, + bank.parent().unwrap().last_blockhash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + let tx = system_transaction::transfer( + &mint_keypair, + &key2, + 1, + bank.parent().unwrap().last_blockhash(), + ); + assert_eq!(bank.process_transaction(&tx), Ok(())); + goto_end_of_slot(bank); + }, + *set_root_interval, + ); + } + } +} diff --git a/core/tests/storage_stage.rs b/core/tests/storage_stage.rs new file mode 100644 index 000000000..cb0b22b1e --- /dev/null +++ b/core/tests/storage_stage.rs @@ -0,0 +1,244 @@ +// Long-running storage_stage tests + +#[macro_use] +extern crate solana_ledger; + +#[cfg(test)] +mod tests { + use log::*; + use solana_core::bank_forks::BankForks; + use solana_core::blocktree_processor; + use solana_core::genesis_utils::{create_genesis_block, GenesisBlockInfo}; + use solana_core::service::Service; + use solana_core::storage_stage::{test_cluster_info, SLOTS_PER_TURN_TEST}; + use solana_core::storage_stage::{StorageStage, StorageState}; + use solana_ledger::blocktree::{create_new_tmp_ledger, Blocktree}; + use solana_ledger::entry; + use solana_runtime::bank::Bank; + use solana_sdk::clock::DEFAULT_TICKS_PER_SLOT; + use solana_sdk::hash::Hash; + use solana_sdk::message::Message; + use solana_sdk::pubkey::Pubkey; + use solana_sdk::signature::{Keypair, KeypairUtil}; + use solana_sdk::transaction::Transaction; + use solana_storage_api::storage_instruction; + use solana_storage_api::storage_instruction::StorageAccountType; + use std::fs::remove_dir_all; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::mpsc::channel; + use std::sync::{Arc, RwLock}; + use std::thread::sleep; + use std::time::Duration; + + #[test] + fn test_storage_stage_process_account_proofs() { + solana_logger::setup(); + let keypair = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); + let replicator_keypair = Arc::new(Keypair::new()); + let exit = Arc::new(AtomicBool::new(false)); + + let GenesisBlockInfo { + mut genesis_block, + mint_keypair, + .. + } = create_genesis_block(1000); + genesis_block + .native_instruction_processors + .push(solana_storage_program::solana_storage_program!()); + let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); + + let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); + + let bank = Bank::new(&genesis_block); + let bank = Arc::new(bank); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( + &[bank.clone()], + vec![0], + ))); + let cluster_info = test_cluster_info(&keypair.pubkey()); + + let (bank_sender, bank_receiver) = channel(); + let storage_state = StorageState::new( + &bank.last_blockhash(), + SLOTS_PER_TURN_TEST, + bank.slots_per_segment(), + ); + let storage_stage = StorageStage::new( + &storage_state, + bank_receiver, + Some(blocktree.clone()), + &keypair, + &storage_keypair, + &exit.clone(), + &bank_forks, + &cluster_info, + ); + bank_sender.send(vec![bank.clone()]).unwrap(); + + // create accounts + let bank = Arc::new(Bank::new_from_parent(&bank, &keypair.pubkey(), 1)); + let account_ix = storage_instruction::create_storage_account( + &mint_keypair.pubkey(), + &Pubkey::new_rand(), + &replicator_keypair.pubkey(), + 1, + StorageAccountType::Replicator, + ); + let account_tx = Transaction::new_signed_instructions( + &[&mint_keypair], + account_ix, + bank.last_blockhash(), + ); + bank.process_transaction(&account_tx).expect("create"); + + bank_sender.send(vec![bank.clone()]).unwrap(); + + let mut reference_keys; + { + let keys = &storage_state.state.read().unwrap().storage_keys; + reference_keys = vec![0; keys.len()]; + reference_keys.copy_from_slice(keys); + } + + let keypair = Keypair::new(); + + let mining_proof_ix = storage_instruction::mining_proof( + &replicator_keypair.pubkey(), + Hash::default(), + 0, + keypair.sign_message(b"test"), + bank.last_blockhash(), + ); + + let next_bank = Arc::new(Bank::new_from_parent(&bank, &keypair.pubkey(), 2)); + //register ticks so the program reports a different segment + blocktree_processor::process_entries( + &next_bank, + &entry::create_ticks( + DEFAULT_TICKS_PER_SLOT * next_bank.slots_per_segment() + 1, + bank.last_blockhash(), + ), + true, + ) + .unwrap(); + let message = Message::new_with_payer(vec![mining_proof_ix], Some(&mint_keypair.pubkey())); + let mining_proof_tx = Transaction::new( + &[&mint_keypair, replicator_keypair.as_ref()], + message, + next_bank.last_blockhash(), + ); + next_bank + .process_transaction(&mining_proof_tx) + .expect("process txs"); + bank_sender.send(vec![next_bank]).unwrap(); + + for _ in 0..5 { + { + let keys = &storage_state.state.read().unwrap().storage_keys; + if keys[..] != *reference_keys.as_slice() { + break; + } + } + + sleep(Duration::new(1, 0)); + } + + debug!("joining..?"); + exit.store(true, Ordering::Relaxed); + storage_stage.join().unwrap(); + + { + let keys = &storage_state.state.read().unwrap().storage_keys; + assert_ne!(keys[..], *reference_keys); + } + + remove_dir_all(ledger_path).unwrap(); + } + + #[test] + fn test_storage_stage_process_banks() { + solana_logger::setup(); + let keypair = Arc::new(Keypair::new()); + let storage_keypair = Arc::new(Keypair::new()); + let exit = Arc::new(AtomicBool::new(false)); + + let GenesisBlockInfo { genesis_block, .. } = create_genesis_block(1000); + let (ledger_path, _blockhash) = create_new_tmp_ledger!(&genesis_block); + + let blocktree = Arc::new(Blocktree::open(&ledger_path).unwrap()); + let slot = 1; + let bank = Arc::new(Bank::new(&genesis_block)); + let bank_forks = Arc::new(RwLock::new(BankForks::new_from_banks( + &[bank.clone()], + vec![0], + ))); + + let cluster_info = test_cluster_info(&keypair.pubkey()); + let (bank_sender, bank_receiver) = channel(); + let storage_state = StorageState::new( + &bank.last_blockhash(), + SLOTS_PER_TURN_TEST, + bank.slots_per_segment(), + ); + let storage_stage = StorageStage::new( + &storage_state, + bank_receiver, + Some(blocktree.clone()), + &keypair, + &storage_keypair, + &exit.clone(), + &bank_forks, + &cluster_info, + ); + bank_sender.send(vec![bank.clone()]).unwrap(); + + let keypair = Keypair::new(); + let hash = Hash::default(); + let signature = keypair.sign_message(&hash.as_ref()); + + let mut result = storage_state.get_mining_result(&signature); + + assert_eq!(result, Hash::default()); + + let mut last_bank = bank; + let rooted_banks = (slot..slot + last_bank.slots_per_segment() + 1) + .map(|i| { + let bank = Arc::new(Bank::new_from_parent(&last_bank, &keypair.pubkey(), i)); + blocktree_processor::process_entries( + &bank, + &entry::create_ticks(64, bank.last_blockhash()), + true, + ) + .expect("failed process entries"); + last_bank = bank; + last_bank.clone() + }) + .collect::>(); + bank_sender.send(rooted_banks).unwrap(); + + if solana_ledger::perf_libs::api().is_some() { + for _ in 0..5 { + result = storage_state.get_mining_result(&signature); + if result != Hash::default() { + info!("found result = {:?} sleeping..", result); + break; + } + info!("result = {:?} sleeping..", result); + sleep(Duration::new(1, 0)); + } + } + + info!("joining..?"); + exit.store(true, Ordering::Relaxed); + storage_stage.join().unwrap(); + + if solana_ledger::perf_libs::api().is_some() { + assert_ne!(result, Hash::default()); + } else { + assert_eq!(result, Hash::default()); + } + + remove_dir_all(ledger_path).unwrap(); + } +}