From 087c43b9eff1e9cfad56721993b74d50fee7eb16 Mon Sep 17 00:00:00 2001 From: carllin Date: Wed, 21 Aug 2019 23:59:11 -0700 Subject: [PATCH] Add snapshotting integration test (#5519) * Add snapshotting integration test * Update ContactInfo on restart in local cluster nodes --- Cargo.lock | 2 + core/src/bank_forks.rs | 1 + core/src/cluster.rs | 3 +- core/src/snapshot_utils.rs | 19 +++- local_cluster/Cargo.toml | 2 + local_cluster/src/cluster_tests.rs | 42 +++++++- local_cluster/src/lib.rs | 2 + local_cluster/src/local_cluster.rs | 19 +++- local_cluster/tests/local_cluster.rs | 137 +++++++++++++++++++++++++-- runtime/src/accounts_db.rs | 15 +-- 10 files changed, 214 insertions(+), 28 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c572b6a4d..c68f416ff 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3535,6 +3535,7 @@ name = "solana-local-cluster" version = "0.18.0-pre2" dependencies = [ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", "serial_test 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "serial_test_derive 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", "solana-client 0.18.0-pre2", @@ -3547,6 +3548,7 @@ dependencies = [ "solana-storage-program 0.18.0-pre2", "solana-vote-api 0.18.0-pre2", "symlink 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)", + "tempfile 3.1.0 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index ed9d6f6cd..d4747112a 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -238,6 +238,7 @@ impl BankForks { .snapshot_config .as_ref() .expect("Called package_snapshot without a snapshot configuration"); + info!("setting snapshot root: {}", root); if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 { let mut snapshot_time = Measure::start("total-snapshot-ms"); let r = self.generate_snapshot( diff --git a/core/src/cluster.rs b/core/src/cluster.rs index 7d0ff3411..08227689d 100644 --- a/core/src/cluster.rs +++ b/core/src/cluster.rs @@ -1,8 +1,9 @@ +use crate::validator::ValidatorConfig; use solana_client::thin_client::ThinClient; use solana_sdk::pubkey::Pubkey; pub trait Cluster { fn get_node_pubkeys(&self) -> Vec; fn get_validator_client(&self, pubkey: &Pubkey) -> Option; - fn restart_node(&mut self, pubkey: Pubkey); + fn restart_node(&mut self, pubkey: Pubkey, config: &ValidatorConfig); } diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 52f71ff37..2c883abce 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -14,6 +14,7 @@ use std::fs::File; use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind}; use std::path::{Path, PathBuf}; use tar::Archive; +use tempfile::TempDir; const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; @@ -57,8 +58,6 @@ pub fn package_snapshot, Q: AsRef>( snapshot_package_output_file: P, snapshot_path: Q, ) -> Result { - let slot = bank.slot(); - // Hard link all the snapshots we need for this package let snapshot_hard_links_dir = tempfile::tempdir_in(snapshot_path)?; @@ -73,7 +72,7 @@ pub fn package_snapshot, Q: AsRef>( // Create a snapshot package info!( "Snapshot for bank: {} has {} account storage entries", - slot, + bank.slot(), account_storage_entries.len() ); @@ -172,6 +171,20 @@ pub fn remove_snapshot>(slot: u64, snapshot_path: P) -> Result<() Ok(()) } +pub fn bank_slot_from_archive>(snapshot_tar: P) -> Result { + let tempdir = TempDir::new()?; + untar_snapshot_in(&snapshot_tar, &tempdir)?; + let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR); + let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); + let last_root_paths = snapshot_paths + .last() + .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; + let file = File::open(&last_root_paths.snapshot_file_path)?; + let mut stream = BufReader::new(file); + let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?; + Ok(bank.slot()) +} + pub fn bank_from_archive>( account_paths: String, snapshot_config: &SnapshotConfig, diff --git a/local_cluster/Cargo.toml b/local_cluster/Cargo.toml index 59eceef6f..d78865452 100644 --- a/local_cluster/Cargo.toml +++ b/local_cluster/Cargo.toml @@ -10,6 +10,7 @@ homepage = "https://solana.com/" [dependencies] log = "0.4.8" +rand = "0.6.5" solana-core = { path = "../core", version = "0.18.0-pre2" } solana-client = { path = "../client", version = "0.18.0-pre2" } solana-logger = { path = "../logger", version = "0.18.0-pre2" } @@ -20,6 +21,7 @@ solana-storage-api = { path = "../programs/storage_api", version = "0.18.0-pre2" solana-storage-program = { path = "../programs/storage_program", version = "0.18.0-pre2" } solana-vote-api = { path = "../programs/vote_api", version = "0.18.0-pre2" } symlink = "0.1.0" +tempfile = "3.1.0" [dev-dependencies] serial_test = "0.2.0" diff --git a/local_cluster/src/cluster_tests.rs b/local_cluster/src/cluster_tests.rs index e10a5ee85..ba9bdbd0d 100644 --- a/local_cluster/src/cluster_tests.rs +++ b/local_cluster/src/cluster_tests.rs @@ -1,3 +1,4 @@ +use rand::{thread_rng, Rng}; use solana_client::thin_client::create_client; /// Cluster independant integration tests /// @@ -25,7 +26,12 @@ use solana_sdk::{ }, transport::TransportError, }; -use std::{collections::HashSet, path::Path, thread::sleep, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + path::Path, + thread::sleep, + time::Duration, +}; const DEFAULT_SLOT_MILLIS: u64 = (DEFAULT_TICKS_PER_SLOT * 1000) / DEFAULT_TICKS_PER_SECOND; @@ -65,8 +71,25 @@ pub fn spend_and_verify_all_nodes( } } -pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num_txs: u64) { +pub fn verify_balances( + expected_balances: HashMap, + node: &ContactInfo, +) { let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE); + for (pk, b) in expected_balances { + let bal = client.poll_get_balance(&pk).expect("balance in source"); + assert_eq!(bal, b); + } +} + +pub fn send_many_transactions( + node: &ContactInfo, + funding_keypair: &Keypair, + max_tokens_per_transfer: u64, + num_txs: u64, +) -> HashMap { + let client = create_client(node.client_facing_addr(), FULLNODE_PORT_RANGE); + let mut expected_balances = HashMap::new(); for _ in 0..num_txs { let random_keypair = Keypair::new(); let bal = client @@ -74,12 +97,23 @@ pub fn send_many_transactions(node: &ContactInfo, funding_keypair: &Keypair, num .expect("balance in source"); assert!(bal > 0); let (blockhash, _fee_calculator) = client.get_recent_blockhash().unwrap(); - let mut transaction = - system_transaction::transfer(&funding_keypair, &random_keypair.pubkey(), 1, blockhash); + let transfer_amount = thread_rng().gen_range(1, max_tokens_per_transfer); + + let mut transaction = system_transaction::transfer( + &funding_keypair, + &random_keypair.pubkey(), + transfer_amount, + blockhash, + ); + client .retry_transfer(&funding_keypair, &mut transaction, 5) .unwrap(); + + expected_balances.insert(random_keypair.pubkey(), transfer_amount); } + + expected_balances } pub fn fullnode_exit(entry_point_info: &ContactInfo, nodes: usize) { diff --git a/local_cluster/src/lib.rs b/local_cluster/src/lib.rs index 79a94a695..d2c114e47 100644 --- a/local_cluster/src/lib.rs +++ b/local_cluster/src/lib.rs @@ -9,3 +9,5 @@ extern crate solana_core; #[macro_use] extern crate solana_storage_program; + +extern crate tempfile; diff --git a/local_cluster/src/local_cluster.rs b/local_cluster/src/local_cluster.rs index 70a0adedb..7d370579e 100644 --- a/local_cluster/src/local_cluster.rs +++ b/local_cluster/src/local_cluster.rs @@ -585,19 +585,28 @@ impl Cluster for LocalCluster { }) } - fn restart_node(&mut self, pubkey: Pubkey) { + fn restart_node(&mut self, pubkey: Pubkey, config: &ValidatorConfig) { // Shut down the fullnode let mut node = self.fullnodes.remove(&pubkey).unwrap(); node.exit(); node.join().unwrap(); - // Restart the node - let fullnode_info = &self.fullnode_infos[&pubkey].info; - let config = &self.fullnode_infos[&pubkey].config; - let node = Node::new_localhost_with_pubkey(&fullnode_info.keypair.pubkey()); + // Update the stored ContactInfo for this node + let node_pubkey = &self.fullnode_infos[&pubkey].info.keypair.pubkey(); + let node = Node::new_localhost_with_pubkey(&node_pubkey); + self.fullnode_infos + .get_mut(&pubkey) + .unwrap() + .info + .contact_info = node.info.clone(); if pubkey == self.entry_point_info.id { self.entry_point_info = node.info.clone(); } + + // Restart the node + self.fullnode_infos.get_mut(&pubkey).unwrap().config = config.clone(); + let fullnode_info = &self.fullnode_infos[&pubkey].info; + let restarted_node = Validator::new( node, &fullnode_info.keypair, diff --git a/local_cluster/tests/local_cluster.rs b/local_cluster/tests/local_cluster.rs index 1fefd92c7..b038d081f 100644 --- a/local_cluster/tests/local_cluster.rs +++ b/local_cluster/tests/local_cluster.rs @@ -3,16 +3,25 @@ extern crate solana_core; use log::*; use serial_test_derive::serial; use solana_core::{ - blocktree::Blocktree, broadcast_stage::BroadcastStageType, cluster::Cluster, - gossip_service::discover_cluster, validator::ValidatorConfig, + bank_forks::SnapshotConfig, blocktree::Blocktree, broadcast_stage::BroadcastStageType, + cluster::Cluster, gossip_service::discover_cluster, snapshot_utils, validator::ValidatorConfig, }; use solana_local_cluster::{ cluster_tests, local_cluster::{ClusterConfig, LocalCluster}, }; -use solana_runtime::epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}; +use solana_runtime::{ + accounts_db::AccountsDB, + epoch_schedule::{EpochSchedule, MINIMUM_SLOTS_PER_EPOCH}, +}; use solana_sdk::{client::SyncClient, poh_config::PohConfig, timing}; -use std::{collections::HashSet, thread::sleep, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + fs, + thread::sleep, + time::Duration, +}; +use tempfile::TempDir; #[test] #[serial] @@ -232,7 +241,7 @@ fn test_forwarding() { .unwrap(); // Confirm that transactions were forwarded to and processed by the leader. - cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 20); + cluster_tests::send_many_transactions(&validator_info, &cluster.funding_keypair, 10, 20); } #[test] @@ -242,10 +251,11 @@ fn test_restart_node() { error!("test_restart_node"); let slots_per_epoch = MINIMUM_SLOTS_PER_EPOCH as u64; let ticks_per_slot = 16; + let validator_config = ValidatorConfig::default(); let mut cluster = LocalCluster::new(&ClusterConfig { node_stakes: vec![3], cluster_lamports: 100, - validator_configs: vec![ValidatorConfig::default()], + validator_configs: vec![validator_config.clone()], ticks_per_slot, slots_per_epoch, ..ClusterConfig::default() @@ -257,14 +267,19 @@ fn test_restart_node() { timing::DEFAULT_TICKS_PER_SLOT, slots_per_epoch, ); - cluster.restart_node(nodes[0]); + cluster.restart_node(nodes[0], &validator_config); cluster_tests::sleep_n_epochs( 0.5, &cluster.genesis_block.poh_config, timing::DEFAULT_TICKS_PER_SLOT, slots_per_epoch, ); - cluster_tests::send_many_transactions(&cluster.entry_point_info, &cluster.funding_keypair, 1); + cluster_tests::send_many_transactions( + &cluster.entry_point_info, + &cluster.funding_keypair, + 10, + 1, + ); } #[test] @@ -282,6 +297,100 @@ fn test_listener_startup() { assert_eq!(cluster_nodes.len(), 4); } +#[test] +#[serial] +fn test_snapshots_restart_validity() { + let temp_dir = TempDir::new().unwrap(); + let snapshot_path = temp_dir.path().join("bank_states"); + let snapshot_package_output_path = temp_dir.path().join("tar"); + let snapshot_interval_slots = 25; + + // Create the snapshot directories + fs::create_dir_all(&snapshot_path).expect("Failed to create snapshots bank state directory"); + fs::create_dir_all(&snapshot_package_output_path) + .expect("Failed to create snapshots tar directory"); + + // Set up the cluster with 1 snapshotting validator + let mut snapshot_validator_config = ValidatorConfig::default(); + snapshot_validator_config.rpc_config.enable_fullnode_exit = true; + snapshot_validator_config.snapshot_config = Some(SnapshotConfig::new( + snapshot_path, + snapshot_package_output_path.clone(), + snapshot_interval_slots, + )); + let num_account_paths = 4; + let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); + let mut all_account_storage_dirs = vec![account_storage_dirs]; + snapshot_validator_config.account_paths = Some(account_storage_paths); + + let config = ClusterConfig { + node_stakes: vec![10000], + cluster_lamports: 100000, + validator_configs: vec![snapshot_validator_config.clone()], + ..ClusterConfig::default() + }; + + // Create and reboot the node from snapshot `num_runs` times + let num_runs = 3; + let mut expected_balances = HashMap::new(); + let mut cluster = LocalCluster::new(&config); + for _ in 0..num_runs { + // Push transactions to one of the nodes and confirm that transactions were + // forwarded to and processed. + trace!("Sending transactions"); + let new_balances = cluster_tests::send_many_transactions( + &cluster.entry_point_info, + &cluster.funding_keypair, + 10, + 10, + ); + + expected_balances.extend(new_balances); + + // Get slot after which this was generated + let client = cluster + .get_validator_client(&cluster.entry_point_info.id) + .unwrap(); + let last_slot = client.get_slot().expect("Couldn't get slot"); + + // Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot + // must include the transactions just pushed + let tar = snapshot_utils::get_snapshot_tar_path(&snapshot_package_output_path); + trace!("Waiting for tar to be generated"); + loop { + if tar.exists() && snapshot_utils::bank_slot_from_archive(&tar).unwrap() >= last_slot { + break; + } + sleep(Duration::from_millis(100)); + } + + // Create new account paths since fullnode exit is not guaranteed to cleanup RPC threads, + // which may delete the old accounts on exit at any point + let (new_account_storage_dirs, new_account_storage_paths) = + generate_account_paths(num_account_paths); + all_account_storage_dirs.push(new_account_storage_dirs); + snapshot_validator_config.account_paths = Some(new_account_storage_paths); + + // Restart a node + trace!("Restarting cluster from snapshot"); + let nodes = cluster.get_node_pubkeys(); + cluster.restart_node(nodes[0], &snapshot_validator_config); + + // Verify account balances on validator + trace!("Verifying balances"); + cluster_tests::verify_balances(expected_balances.clone(), &cluster.entry_point_info); + + // Check that we can still push transactions + trace!("Spending and verifying"); + cluster_tests::spend_and_verify_all_nodes( + &cluster.entry_point_info, + &cluster.funding_keypair, + 1, + HashSet::new(), + ); + } +} + #[allow(unused_attributes)] #[test] #[serial] @@ -453,3 +562,15 @@ fn run_repairman_catchup(num_repairmen: u64) { sleep(Duration::from_secs(1)); } } + +fn generate_account_paths(num_account_paths: usize) -> (Vec, String) { + let account_storage_dirs: Vec = (0..num_account_paths) + .map(|_| TempDir::new().unwrap()) + .collect(); + let account_storage_paths: Vec<_> = account_storage_dirs + .iter() + .map(|a| a.path().to_str().unwrap().to_string()) + .collect(); + let account_storage_paths = AccountsDB::format_paths(account_storage_paths); + (account_storage_dirs, account_storage_paths) +} diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index c771010b2..0f70f50a9 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -427,13 +427,10 @@ impl AccountsDB { } } - pub fn paths(&self) -> String { - let paths: Vec = self - .paths - .read() - .unwrap() + pub fn format_paths>(paths: Vec

) -> String { + let paths: Vec = paths .iter() - .map(|p| p.to_str().unwrap().to_owned()) + .map(|p| p.as_ref().to_str().unwrap().to_owned()) .collect(); paths.join(",") } @@ -1417,7 +1414,11 @@ pub mod tests { let buf = writer.into_inner(); let mut reader = BufReader::new(&buf[..]); let daccounts = AccountsDB::new(None); - let local_paths = daccounts.paths(); + + let local_paths = { + let paths = daccounts.paths.read().unwrap(); + AccountsDB::format_paths(paths.to_vec()) + }; let copied_accounts = TempDir::new().unwrap(); // Simulate obtaining a copy of the AppendVecs from a tarball copy_append_vecs(&accounts, copied_accounts.path()).unwrap();