diff --git a/Cargo.lock b/Cargo.lock index da9d34fee..4c83e97ed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3861,6 +3861,8 @@ version = "0.18.0-pre2" dependencies = [ "bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", + "console 0.7.7 (registry+https://github.com/rust-lang/crates.io-index)", + "indicatif 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest 0.9.20 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.40 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/src/bank_forks.rs b/core/src/bank_forks.rs index d4747112a..353868dbc 100644 --- a/core/src/bank_forks.rs +++ b/core/src/bank_forks.rs @@ -16,35 +16,14 @@ use std::time::Instant; #[derive(Clone, Debug, Eq, PartialEq)] pub struct SnapshotConfig { - snapshot_path: PathBuf, - snapshot_package_output_path: PathBuf, - snapshot_interval_slots: usize, -} + // Generate a new snapshot every this many slots + pub snapshot_interval_slots: usize, -impl SnapshotConfig { - pub fn new( - snapshot_path: PathBuf, - snapshot_package_output_path: PathBuf, - snapshot_interval_slots: usize, - ) -> Self { - Self { - snapshot_path, - snapshot_package_output_path, - snapshot_interval_slots, - } - } + // Where to store the latest packaged snapshot + pub snapshot_package_output_path: PathBuf, - pub fn snapshot_path(&self) -> &Path { - self.snapshot_path.as_path() - } - - pub fn snapshot_package_output_path(&self) -> &Path { - &self.snapshot_package_output_path.as_path() - } - - pub fn snapshot_interval_slots(&self) -> usize { - self.snapshot_interval_slots - } + // Where to place the snapshots for recent slots + pub snapshot_path: PathBuf, } pub struct BankForks { @@ -234,10 +213,7 @@ impl BankForks { // Generate a snapshot if snapshots are configured and it's been an appropriate number // of banks since the last snapshot if self.snapshot_config.is_some() && snapshot_package_sender.is_some() { - let config = self - .snapshot_config - .as_ref() - .expect("Called package_snapshot without a snapshot configuration"); + let config = self.snapshot_config.as_ref().unwrap(); info!("setting snapshot root: {}", root); if root - self.slots_since_snapshot[0] >= config.snapshot_interval_slots as u64 { let mut snapshot_time = Measure::start("total-snapshot-ms"); @@ -308,6 +284,7 @@ impl BankForks { .cloned() .expect("root must exist in BankForks"); snapshot_utils::add_snapshot(&config.snapshot_path, &bank, slots_since_snapshot)?; + // Package the relevant snapshots let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&config.snapshot_path); @@ -835,7 +812,7 @@ mod tests { genesis_block_info: GenesisBlockInfo, } - fn setup_snapshot_test(snapshot_interval: usize) -> SnapshotTestConfig { + fn setup_snapshot_test(snapshot_interval_slots: usize) -> SnapshotTestConfig { let accounts_dir = TempDir::new().unwrap(); let snapshot_dir = TempDir::new().unwrap(); let snapshot_output_path = TempDir::new().unwrap(); @@ -847,11 +824,11 @@ mod tests { bank0.freeze(); let mut bank_forks = BankForks::new(0, bank0); - let snapshot_config = SnapshotConfig::new( - PathBuf::from(snapshot_dir.path()), - PathBuf::from(snapshot_output_path.path()), - snapshot_interval, - ); + let snapshot_config = SnapshotConfig { + snapshot_interval_slots, + snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), + snapshot_path: PathBuf::from(snapshot_dir.path()), + }; bank_forks.set_snapshot_config(snapshot_config.clone()); SnapshotTestConfig { accounts_dir, diff --git a/core/src/snapshot_utils.rs b/core/src/snapshot_utils.rs index 2c883abce..662ded2ad 100644 --- a/core/src/snapshot_utils.rs +++ b/core/src/snapshot_utils.rs @@ -14,7 +14,6 @@ use std::fs::File; use std::io::{BufReader, BufWriter, Error as IOError, ErrorKind}; use std::path::{Path, PathBuf}; use tar::Archive; -use tempfile::TempDir; const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; @@ -92,29 +91,43 @@ pub fn package_snapshot, Q: AsRef>( Ok(package) } -pub fn get_snapshot_paths>(snapshot_path: P) -> Vec { - let paths = fs::read_dir(&snapshot_path).expect("Invalid snapshot path"); - let mut names = paths - .filter_map(|entry| { - entry.ok().and_then(|e| { - e.path() - .file_name() - .and_then(|n| n.to_str().map(|s| s.parse::().ok())) - .unwrap_or(None) - }) - }) - .map(|slot| { - let snapshot_path = snapshot_path.as_ref().join(slot.to_string()); - SlotSnapshotPaths { - slot, - snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), - snapshot_status_cache_path: snapshot_path.join(SNAPSHOT_STATUS_CACHE_FILE_NAME), - } - }) - .collect::>(); +pub fn get_snapshot_paths>(snapshot_path: P) -> Vec +where + P: std::fmt::Debug, +{ + match fs::read_dir(&snapshot_path) { + Ok(paths) => { + let mut names = paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(|s| s.parse::().ok())) + .unwrap_or(None) + }) + }) + .map(|slot| { + let snapshot_path = snapshot_path.as_ref().join(slot.to_string()); + SlotSnapshotPaths { + slot, + snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), + snapshot_status_cache_path: snapshot_path + .join(SNAPSHOT_STATUS_CACHE_FILE_NAME), + } + }) + .collect::>(); - names.sort(); - names + names.sort(); + names + } + Err(err) => { + info!( + "Unable to read snapshot directory {:?}: {}", + snapshot_path, err + ); + vec![] + } + } } pub fn add_snapshot>( @@ -172,7 +185,7 @@ pub fn remove_snapshot>(slot: u64, snapshot_path: P) -> Result<() } pub fn bank_slot_from_archive>(snapshot_tar: P) -> Result { - let tempdir = TempDir::new()?; + let tempdir = tempfile::TempDir::new()?; untar_snapshot_in(&snapshot_tar, &tempdir)?; let unpacked_snapshots_dir = tempdir.path().join(TAR_SNAPSHOTS_DIR); let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); @@ -191,7 +204,7 @@ pub fn bank_from_archive>( snapshot_tar: P, ) -> Result { // Untar the snapshot into a temp directory under `snapshot_config.snapshot_path()` - let unpack_dir = tempfile::tempdir_in(snapshot_config.snapshot_path())?; + let unpack_dir = tempfile::tempdir_in(&snapshot_config.snapshot_path)?; untar_snapshot_in(&snapshot_tar, &unpack_dir)?; let unpacked_accounts_dir = unpack_dir.as_ref().join(TAR_ACCOUNTS_DIR); @@ -199,14 +212,19 @@ pub fn bank_from_archive>( let snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); let bank = rebuild_bank_from_snapshots(account_paths, &snapshot_paths, unpacked_accounts_dir)?; - // Move the unpacked snapshots into `snapshot_config.snapshot_path()` - let dir_files = fs::read_dir(unpacked_snapshots_dir).expect("Invalid snapshot path"); + // Move the unpacked snapshots into `snapshot_config.snapshot_path` + let dir_files = fs::read_dir(&unpacked_snapshots_dir).unwrap_or_else(|err| { + panic!( + "Invalid snapshot path {:?}: {}", + unpacked_snapshots_dir, err + ) + }); let paths: Vec = dir_files .filter_map(|entry| entry.ok().map(|e| e.path())) .collect(); let mut copy_options = CopyOptions::new(); copy_options.overwrite = true; - fs_extra::move_items(&paths, snapshot_config.snapshot_path(), ©_options)?; + fs_extra::move_items(&paths, &snapshot_config.snapshot_path, ©_options)?; Ok(bank) } @@ -238,7 +256,7 @@ where let last_root_paths = snapshot_paths .last() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; - info!("Load from {:?}", &last_root_paths.snapshot_file_path); + info!("Loading from {:?}", &last_root_paths.snapshot_file_path); let file = File::open(&last_root_paths.snapshot_file_path)?; let mut stream = BufReader::new(file); let bank: Bank = deserialize_from(&mut stream).map_err(|e| get_io_error(&e.to_string()))?; diff --git a/core/src/validator.rs b/core/src/validator.rs index 43d5e4f73..e0ba6f324 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -375,22 +375,20 @@ fn get_bank_forks( dev_halt_at_slot: Option, ) -> (BankForks, Vec, LeaderScheduleCache) { let (mut bank_forks, bank_forks_info, leader_schedule_cache) = { - let mut result = None; - if snapshot_config.is_some() { - let snapshot_config = snapshot_config.as_ref().unwrap(); - - // Blow away any remnants in the snapshots directory - let _ = fs::remove_dir_all(snapshot_config.snapshot_path()); - fs::create_dir_all(&snapshot_config.snapshot_path()) + if let Some(snapshot_config) = snapshot_config.as_ref() { + info!( + "Initializing snapshot path: {:?}", + snapshot_config.snapshot_path + ); + let _ = fs::remove_dir_all(&snapshot_config.snapshot_path); + fs::create_dir_all(&snapshot_config.snapshot_path) .expect("Couldn't create snapshot directory"); - // Get the path to the tar let tar = snapshot_utils::get_snapshot_tar_path( - &snapshot_config.snapshot_package_output_path(), + &snapshot_config.snapshot_package_output_path, ); - - // Check that the snapshot tar exists, try to load the snapshot if it does if tar.exists() { + info!("Loading snapshot package: {:?}", tar); // Fail hard here if snapshot fails to load, don't silently continue let deserialized_bank = snapshot_utils::bank_from_archive( account_paths @@ -401,33 +399,29 @@ fn get_bank_forks( ) .expect("Load from snapshot failed"); - result = Some( - blocktree_processor::process_blocktree_from_root( - blocktree, - Arc::new(deserialized_bank), - verify_ledger, - dev_halt_at_slot, - ) - .expect("processing blocktree after loading snapshot failed"), - ); - } - } - - // If a snapshot doesn't exist - if result.is_none() { - result = Some( - blocktree_processor::process_blocktree( - &genesis_block, - &blocktree, - account_paths, + return blocktree_processor::process_blocktree_from_root( + blocktree, + Arc::new(deserialized_bank), verify_ledger, dev_halt_at_slot, ) - .expect("process_blocktree failed"), - ); + .expect("processing blocktree after loading snapshot failed"); + } else { + info!("Snapshot package does not exist: {:?}", tar); + } + } else { + info!("Snapshots disabled"); } - result.unwrap() + info!("Processing ledger from genesis"); + blocktree_processor::process_blocktree( + &genesis_block, + &blocktree, + account_paths, + verify_ledger, + dev_halt_at_slot, + ) + .expect("process_blocktree failed") }; if snapshot_config.is_some() { diff --git a/local_cluster/tests/local_cluster.rs b/local_cluster/tests/local_cluster.rs index b038d081f..18be28e55 100644 --- a/local_cluster/tests/local_cluster.rs +++ b/local_cluster/tests/local_cluster.rs @@ -313,11 +313,11 @@ fn test_snapshots_restart_validity() { // Set up the cluster with 1 snapshotting validator let mut snapshot_validator_config = ValidatorConfig::default(); snapshot_validator_config.rpc_config.enable_fullnode_exit = true; - snapshot_validator_config.snapshot_config = Some(SnapshotConfig::new( - snapshot_path, - snapshot_package_output_path.clone(), + snapshot_validator_config.snapshot_config = Some(SnapshotConfig { snapshot_interval_slots, - )); + snapshot_package_output_path: snapshot_package_output_path.clone(), + snapshot_path, + }); let num_account_paths = 4; let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths); let mut all_account_storage_dirs = vec![account_storage_dirs]; diff --git a/multinode-demo/validator.sh b/multinode-demo/validator.sh index 7edd98502..25de11c3f 100755 --- a/multinode-demo/validator.sh +++ b/multinode-demo/validator.sh @@ -184,7 +184,6 @@ default_arg --identity "$identity_keypair_path" default_arg --voting-keypair "$voting_keypair_path" default_arg --storage-keypair "$storage_keypair_path" default_arg --ledger "$ledger_dir" -#default_arg --snapshot-interval-slots 100 if [[ -n $SOLANA_CUDA ]]; then program=$solana_validator_cuda diff --git a/run.sh b/run.sh index e8f8bdbdc..e4212818c 100755 --- a/run.sh +++ b/run.sh @@ -93,7 +93,6 @@ args=( --rpc-port 8899 --rpc-drone-address 127.0.0.1:9900 --accounts "$dataDir"/accounts - --snapshot-interval-slots 100 ) if [[ -n $blockstreamSocket ]]; then args+=(--blockstream "$blockstreamSocket") diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 0f70f50a9..208d73bdf 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -457,6 +457,8 @@ impl AccountsDB { let path_index = thread_rng().gen_range(0, local_account_paths.len()); let local_dir = &local_account_paths[path_index]; + std::fs::create_dir_all(local_dir).expect("Create directory failed"); + // Move the corresponding AppendVec from the snapshot into the directory pointed // at by `local_dir` let append_vec_relative_path = @@ -465,8 +467,13 @@ impl AccountsDB { append_vecs_path.as_ref().join(&append_vec_relative_path); let mut copy_options = CopyOptions::new(); copy_options.overwrite = true; - fs_extra::move_items(&vec![append_vec_abs_path], &local_dir, ©_options) - .map_err(|e| AccountsDB::get_io_error(&e.to_string()))?; + fs_extra::move_items(&vec![&append_vec_abs_path], &local_dir, ©_options) + .map_err(|e| { + AccountsDB::get_io_error(&format!( + "Unable to move {:?} to {:?}: {}", + append_vec_abs_path, local_dir, e + )) + })?; // Notify the AppendVec of the new file location let local_path = local_dir.join(append_vec_relative_path); diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 0e9ba3001..b58a67463 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -11,7 +11,9 @@ homepage = "https://solana.com/" [dependencies] bzip2 = "0.3.3" clap = "2.33.0" +console = "0.7.7" log = "0.4.8" +indicatif = "0.11.0" reqwest = "0.9.20" serde_json = "1.0.40" solana-client = { path = "../client", version = "0.18.0-pre2" } diff --git a/validator/src/main.rs b/validator/src/main.rs index 20bf97b4a..ccd3ab6e1 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,5 +1,7 @@ use bzip2::bufread::BzDecoder; -use clap::{crate_description, crate_name, crate_version, value_t, App, Arg}; +use clap::{crate_description, crate_name, crate_version, value_t, value_t_or_exit, App, Arg}; +use console::Emoji; +use indicatif::{ProgressBar, ProgressStyle}; use log::*; use solana_client::rpc_client::RpcClient; use solana_core::bank_forks::SnapshotConfig; @@ -16,6 +18,7 @@ use solana_sdk::hash::Hash; use solana_sdk::signature::{read_keypair, Keypair, KeypairUtil}; use solana_sdk::timing::Slot; use std::fs::{self, File}; +use std::io::{self, Read}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; use std::process::exit; @@ -30,7 +33,19 @@ fn port_range_validator(port_range: String) -> Result<(), String> { } } -fn download_archive( +static TRUCK: Emoji = Emoji("🚚 ", ""); +static SPARKLE: Emoji = Emoji("✨ ", ""); + +/// Creates a new process bar for processing that will take an unknown amount of time +fn new_spinner_progress_bar() -> ProgressBar { + let progress_bar = ProgressBar::new(42); + progress_bar + .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); + progress_bar.enable_steady_tick(100); + progress_bar +} + +fn download_tar_bz2( rpc_addr: &SocketAddr, archive_name: &str, download_path: &Path, @@ -47,19 +62,72 @@ fn download_archive( }; let url = format!("http://{}/{}", rpc_addr, archive_name); - println!("Downloading {}...", url); let download_start = Instant::now(); - let mut response = reqwest::get(&url).map_err(|err| format!("Unable to get: {:?}", err))?; + let progress_bar = new_spinner_progress_bar(); + progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url)); + + let client = reqwest::Client::new(); + let response = client + .get(url.as_str()) + .send() + .and_then(|response| response.error_for_status()) + .map_err(|err| format!("Unable to get: {:?}", err))?; + let download_size = { + response + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .and_then(|content_length| content_length.to_str().ok()) + .and_then(|content_length| content_length.parse().ok()) + .unwrap_or(0) + }; + progress_bar.set_length(download_size); + progress_bar.set_style( + ProgressStyle::default_bar() + .template(&format!( + "{}{}Downloading {} {}", + "{spinner:.green} ", + TRUCK, + url, + "[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})" + )) + .progress_chars("=> "), + ); + + struct DownloadProgress { + progress_bar: ProgressBar, + response: R, + } + + impl Read for DownloadProgress { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.response.read(buf).map(|n| { + self.progress_bar.inc(n as u64); + n + }) + } + } + + let mut source = DownloadProgress { + progress_bar, + response, + }; + let mut file = File::create(&temp_archive_path) .map_err(|err| format!("Unable to create {:?}: {:?}", temp_archive_path, err))?; - std::io::copy(&mut response, &mut file) + std::io::copy(&mut source, &mut file) .map_err(|err| format!("Unable to write {:?}: {:?}", temp_archive_path, err))?; + source.progress_bar.finish_and_clear(); println!( - "Downloaded {} in {:?}", - archive_name, - Instant::now().duration_since(download_start) + " {}{}", + SPARKLE, + format!( + "Downloaded {} ({} bytes) in {:?}", + url, + download_size, + Instant::now().duration_since(download_start), + ) ); if extract { @@ -112,17 +180,33 @@ fn initialize_ledger_path( exit(1); }); - let genesis_blockhash = RpcClient::new_socket(rpc_addr) + let client = RpcClient::new_socket(rpc_addr); + let genesis_blockhash = client .get_genesis_blockhash() .map_err(|err| err.to_string())?; fs::create_dir_all(ledger_path).map_err(|err| err.to_string())?; - download_archive(&rpc_addr, "genesis.tar.bz2", ledger_path, true)?; + download_tar_bz2(&rpc_addr, "genesis.tar.bz2", ledger_path, true)?; + if !no_snapshot_fetch { - let _ = fs::remove_file(ledger_path.join("snapshot.tar.bz2")); - download_archive(&rpc_addr, "snapshot.tar.bz2", ledger_path, false) - .unwrap_or_else(|err| eprintln!("Warning: Unable to fetch snapshot: {:?}", err)); + let snapshot_package = solana_core::snapshot_utils::get_snapshot_tar_path(ledger_path); + if snapshot_package.exists() { + fs::remove_file(&snapshot_package) + .unwrap_or_else(|err| warn!("error removing {:?}: {}", snapshot_package, err)); + } + download_tar_bz2( + &rpc_addr, + snapshot_package.file_name().unwrap().to_str().unwrap(), + snapshot_package.parent().unwrap(), + false, + ) + .unwrap_or_else(|err| eprintln!("Warning: Unable to fetch snapshot: {:?}", err)); + } + + match client.get_slot() { + Ok(slot) => info!("Entrypoint currently at slot {}", slot), + Err(err) => warn!("Failed to get_slot from entrypoint: {}", err), } Ok(genesis_blockhash) @@ -202,7 +286,7 @@ fn main() { .long("no-snapshot-fetch") .takes_value(false) .requires("entrypoint") - .help("Do not attempt to fetch a snapshot from the cluster entrypoint"), + .help("Do not attempt to fetch a new snapshot from the cluster entrypoint, start from a local snapshot if present"), ) .arg( Arg::with_name("no_voting") @@ -278,7 +362,8 @@ fn main() { .long("snapshot-interval-slots") .value_name("SNAPSHOT_INTERVAL_SLOTS") .takes_value(true) - .help("Number of slots between generating snapshots"), + .default_value("100") + .help("Number of slots between generating snapshots, 0 to disable snapshots"), ) .arg( clap::Arg::with_name("limit_ledger_size") @@ -359,14 +444,24 @@ fn main() { Some(ledger_path.join("accounts").to_str().unwrap().to_string()); } - validator_config.snapshot_config = matches.value_of("snapshot_interval_slots").map(|s| { - let snapshots_dir = ledger_path.clone().join("snapshot"); - fs::create_dir_all(&snapshots_dir).expect("Failed to create snapshots directory"); - SnapshotConfig::new( - snapshots_dir, - ledger_path.clone(), - s.parse::().unwrap(), - ) + let snapshot_interval_slots = value_t_or_exit!(matches, "snapshot_interval_slots", usize); + let snapshot_path = ledger_path.clone().join("snapshot"); + fs::create_dir_all(&snapshot_path).unwrap_or_else(|err| { + eprintln!( + "Failed to create snapshots directory {:?}: {}", + snapshot_path, err + ); + exit(1); + }); + + validator_config.snapshot_config = Some(SnapshotConfig { + snapshot_interval_slots: if snapshot_interval_slots > 0 { + snapshot_interval_slots + } else { + std::usize::MAX + }, + snapshot_path, + snapshot_package_output_path: ledger_path.clone(), }); if matches.is_present("limit_ledger_size") {