diff --git a/Cargo.lock b/Cargo.lock index cd6605ef1..89c2a852c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3892,6 +3892,20 @@ dependencies = [ "winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", ] +[[package]] +name = "solana-download-utils" +version = "1.1.0" +dependencies = [ + "bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", + "console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", + "indicatif 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", + "reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", + "solana-ledger 1.1.0", + "solana-sdk 1.1.0", + "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", +] + [[package]] name = "solana-exchange-program" version = "1.1.0" @@ -4126,6 +4140,7 @@ dependencies = [ "solana-client 1.1.0", "solana-config-program 1.1.0", "solana-core 1.1.0", + "solana-download-utils 1.1.0", "solana-exchange-program 1.1.0", "solana-faucet 1.1.0", "solana-genesis-programs 1.1.0", @@ -4494,19 +4509,17 @@ dependencies = [ name = "solana-validator" version = "1.1.0" dependencies = [ - "bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)", "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)", "console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)", "gag 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)", - "indicatif 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)", - "reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)", "serde_json 1.0.46 (registry+https://github.com/rust-lang/crates.io-index)", "solana-clap-utils 1.1.0", "solana-client 1.1.0", "solana-core 1.1.0", + "solana-download-utils 1.1.0", "solana-faucet 1.1.0", "solana-ledger 1.1.0", "solana-logger 1.1.0", @@ -4517,7 +4530,6 @@ dependencies = [ "solana-sdk 1.1.0", "solana-vote-program 1.1.0", "solana-vote-signer 1.1.0", - "tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 58011c8bc..14c61c1d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ members = [ "cli-config", "client", "core", + "download-utils", "faucet", "perf", "validator", diff --git a/core/src/rpc_service.rs b/core/src/rpc_service.rs index 9252bac68..9e7fecaa2 100644 --- a/core/src/rpc_service.rs +++ b/core/src/rpc_service.rs @@ -85,9 +85,19 @@ impl RpcRequestMiddleware { } fn get(&self, path: &str) -> RequestMiddlewareAction { - let filename = self.ledger_path.join( - path.split_at(1).1, // Drop leading '/' from path - ); + let stem = path.split_at(1).1; // Drop leading '/' from path + let filename = { + match path { + "/genesis.tar.bz2" => self.ledger_path.join(stem), + _ => self + .snapshot_config + .as_ref() + .unwrap() + .snapshot_package_output_path + .join(stem), + } + }; + info!("get {} -> {:?}", path, filename); RequestMiddlewareAction::Respond { diff --git a/download-utils/Cargo.toml b/download-utils/Cargo.toml new file mode 100644 index 000000000..5ff80ee61 --- /dev/null +++ b/download-utils/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "solana-download-utils" +version = "1.1.0" +description = "Solana Download Utils" +authors = ["Solana Maintainers "] +repository = "https://github.com/solana-labs/solana" +license = "Apache-2.0" +homepage = "https://solana.com/" +edition = "2018" + +[dependencies] +bzip2 = "0.3.3" +console = "0.9.2" +indicatif = "0.14.0" +log = "0.4.8" +reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "rustls-tls", "json"] } +solana-sdk = { path = "../sdk", version = "1.1.0" } +solana-ledger = { path = "../ledger", version = "1.1.0" } +tar = "0.4.26" + +[lib] +crate-type = ["lib"] +name = "solana_download_utils" diff --git a/download-utils/src/lib.rs b/download-utils/src/lib.rs new file mode 100644 index 000000000..bf291e9bd --- /dev/null +++ b/download-utils/src/lib.rs @@ -0,0 +1,192 @@ +use bzip2::bufread::BzDecoder; +use console::Emoji; +use indicatif::{ProgressBar, ProgressStyle}; +use log::*; +use solana_sdk::clock::Slot; +use solana_sdk::genesis_config::GenesisConfig; +use solana_sdk::hash::Hash; +use std::fs::{self, File}; +use std::io; +use std::io::Read; +use std::net::SocketAddr; +use std::path::Path; +use std::time::Instant; + +static TRUCK: Emoji = Emoji("🚚 ", ""); +static SPARKLE: Emoji = Emoji("✨ ", ""); + +/// Creates a new process bar for processing that will take an unknown amount of time +fn new_spinner_progress_bar() -> ProgressBar { + let progress_bar = ProgressBar::new(42); + progress_bar + .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); + progress_bar.enable_steady_tick(100); + progress_bar +} + +pub fn download_file(url: &str, destination_file: &Path) -> Result<(), String> { + if destination_file.is_file() { + return Err(format!("{:?} already exists", destination_file)); + } + let download_start = Instant::now(); + + fs::create_dir_all(destination_file.parent().unwrap()).map_err(|err| err.to_string())?; + + let temp_destination_file = destination_file.with_extension(".tmp"); + + let progress_bar = new_spinner_progress_bar(); + progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url)); + + let response = reqwest::blocking::Client::new() + .get(url) + .send() + .and_then(|response| response.error_for_status()) + .map_err(|err| { + progress_bar.finish_and_clear(); + err.to_string() + })?; + + let download_size = { + response + .headers() + .get(reqwest::header::CONTENT_LENGTH) + .and_then(|content_length| content_length.to_str().ok()) + .and_then(|content_length| content_length.parse().ok()) + .unwrap_or(0) + }; + progress_bar.set_length(download_size); + progress_bar.set_style( + ProgressStyle::default_bar() + .template(&format!( + "{}{}Downloading {} {}", + "{spinner:.green} ", + TRUCK, + url, + "[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})" + )) + .progress_chars("=> "), + ); + + struct DownloadProgress { + progress_bar: ProgressBar, + response: R, + } + + impl Read for DownloadProgress { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.response.read(buf).map(|n| { + self.progress_bar.inc(n as u64); + n + }) + } + } + + let mut source = DownloadProgress { + progress_bar, + response, + }; + + File::create(&temp_destination_file) + .and_then(|mut file| std::io::copy(&mut source, &mut file)) + .map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?; + + source.progress_bar.finish_and_clear(); + info!( + " {}{}", + SPARKLE, + format!( + "Downloaded {} ({} bytes) in {:?}", + url, + download_size, + Instant::now().duration_since(download_start), + ) + ); + + std::fs::rename(temp_destination_file, destination_file) + .map_err(|err| format!("Unable to rename: {:?}", err))?; + + Ok(()) +} + +fn extract_archive(archive_filename: &Path, destination_dir: &Path) -> Result<(), String> { + info!("Extracting {:?}...", archive_filename); + let extract_start = Instant::now(); + + fs::create_dir_all(destination_dir).map_err(|err| err.to_string())?; + let tar_bz2 = File::open(&archive_filename) + .map_err(|err| format!("Unable to open {:?}: {:?}", archive_filename, err))?; + let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2)); + let mut archive = tar::Archive::new(tar); + archive + .unpack(destination_dir) + .map_err(|err| format!("Unable to unpack {:?}: {:?}", archive_filename, err))?; + info!( + "Extracted {:?} in {:?}", + archive_filename, + Instant::now().duration_since(extract_start) + ); + Ok(()) +} + +pub fn download_genesis( + rpc_addr: &SocketAddr, + ledger_path: &Path, + expected_genesis_hash: Option, +) -> Result { + let genesis_package = ledger_path.join("genesis.tar.bz2"); + + let genesis_config = if !genesis_package.exists() { + let tmp_genesis_path = ledger_path.join("tmp-genesis"); + let tmp_genesis_package = tmp_genesis_path.join("genesis.tar.bz2"); + + let _ignored = fs::remove_dir_all(&tmp_genesis_path); + download_file( + &format!("http://{}/{}", rpc_addr, "genesis.tar.bz2"), + &tmp_genesis_package, + )?; + extract_archive(&tmp_genesis_package, &ledger_path)?; + + let tmp_genesis_config = GenesisConfig::load(&ledger_path) + .map_err(|err| format!("Failed to load downloaded genesis config: {}", err))?; + + if let Some(expected_genesis_hash) = expected_genesis_hash { + if expected_genesis_hash != tmp_genesis_config.hash() { + return Err(format!( + "Genesis hash mismatch: expected {} but downloaded genesis hash is {}", + expected_genesis_hash, + tmp_genesis_config.hash(), + )); + } + } + + std::fs::rename(tmp_genesis_package, genesis_package) + .map_err(|err| format!("Unable to rename: {:?}", err))?; + tmp_genesis_config + } else { + GenesisConfig::load(&ledger_path) + .map_err(|err| format!("Failed to load genesis config: {}", err))? + }; + + Ok(genesis_config.hash()) +} + +pub fn download_snapshot( + rpc_addr: &SocketAddr, + ledger_path: &Path, + snapshot_hash: (Slot, Hash), +) -> Result<(), String> { + let snapshot_package = + solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path, &snapshot_hash); + if snapshot_package.exists() { + Ok(()) + } else { + download_file( + &format!( + "http://{}/{}", + rpc_addr, + snapshot_package.file_name().unwrap().to_str().unwrap() + ), + &snapshot_package, + ) + } +} diff --git a/local-cluster/Cargo.toml b/local-cluster/Cargo.toml index f4d4d0d4a..45f75364e 100644 --- a/local-cluster/Cargo.toml +++ b/local-cluster/Cargo.toml @@ -16,6 +16,7 @@ solana-archiver-lib = { path = "../archiver-lib", version = "1.1.0" } solana-config-program = { path = "../programs/config", version = "1.1.0" } solana-core = { path = "../core", version = "1.1.0" } solana-client = { path = "../client", version = "1.1.0" } +solana-download-utils = { path = "../download-utils", version = "1.1.0" } solana-faucet = { path = "../faucet", version = "1.1.0" } solana-exchange-program = { path = "../programs/exchange", version = "1.1.0" } solana-genesis-programs = { path = "../genesis-programs", version = "1.1.0" } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 84d320340..2fb1239f8 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -7,6 +7,7 @@ use solana_core::{ broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH, gossip_service::discover_cluster, validator::ValidatorConfig, }; +use solana_download_utils::download_snapshot; use solana_ledger::{ bank_forks::SnapshotConfig, blockstore::Blockstore, leader_schedule::FixedSchedule, leader_schedule::LeaderSchedule, snapshot_utils, @@ -606,13 +607,69 @@ fn test_softlaunch_operating_mode() { } } +#[test] +#[serial] +fn test_snapshot_download() { + solana_logger::setup(); + // First set up the cluster with 1 node + let snapshot_interval_slots = 50; + let num_account_paths = 3; + + let leader_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + let validator_snapshot_test_config = + setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); + + let stake = 10_000; + let config = ClusterConfig { + node_stakes: vec![stake], + cluster_lamports: 1_000_000, + validator_configs: vec![leader_snapshot_test_config.validator_config.clone()], + ..ClusterConfig::default() + }; + + let mut cluster = LocalCluster::new(&config); + + // Get slot after which this was generated + let snapshot_package_output_path = &leader_snapshot_test_config + .validator_config + .snapshot_config + .as_ref() + .unwrap() + .snapshot_package_output_path; + + trace!("Waiting for snapshot"); + let (archive_filename, archive_snapshot_hash) = + wait_for_next_snapshot(&cluster, &snapshot_package_output_path); + + trace!("found: {:?}", archive_filename); + let validator_archive_path = snapshot_utils::get_snapshot_archive_path( + &validator_snapshot_test_config.snapshot_output_path, + &archive_snapshot_hash, + ); + + // Download the snapshot, then boot a validator from it. + download_snapshot( + &cluster.entry_point_info.rpc, + &validator_archive_path, + archive_snapshot_hash, + ) + .unwrap(); + + cluster.add_validator( + &validator_snapshot_test_config.validator_config, + stake, + Arc::new(Keypair::new()), + ); +} + #[allow(unused_attributes)] #[test] #[serial] fn test_snapshot_restart_tower() { // First set up the cluster with 2 nodes let snapshot_interval_slots = 10; - let num_account_paths = 4; + let num_account_paths = 2; let leader_snapshot_test_config = setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); @@ -774,7 +831,7 @@ fn test_snapshots_blockstore_floor() { fn test_snapshots_restart_validity() { solana_logger::setup(); let snapshot_interval_slots = 10; - let num_account_paths = 4; + let num_account_paths = 1; let mut snapshot_test_config = setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths); let snapshot_package_output_path = &snapshot_test_config diff --git a/validator/Cargo.toml b/validator/Cargo.toml index 5813d6879..b9b56889e 100644 --- a/validator/Cargo.toml +++ b/validator/Cargo.toml @@ -9,18 +9,16 @@ license = "Apache-2.0" homepage = "https://solana.com/" [dependencies] -bzip2 = "0.3.3" clap = "2.33.0" chrono = { version = "0.4.10", features = ["serde"] } console = "0.9.2" log = "0.4.8" -indicatif = "0.14.0" rand = "0.6.5" -reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "rustls-tls", "json"] } serde_json = "1.0.46" solana-clap-utils = { path = "../clap-utils", version = "1.1.0" } solana-client = { path = "../client", version = "1.1.0" } solana-core = { path = "../core", version = "1.1.0" } +solana-download-utils = { path = "../download-utils", version = "1.1.0" } solana-faucet = { path = "../faucet", version = "1.1.0" } solana-ledger = { path = "../ledger", version = "1.1.0" } solana-logger = { path = "../logger", version = "1.1.0" } @@ -31,7 +29,6 @@ solana-runtime = { path = "../runtime", version = "1.1.0" } solana-sdk = { path = "../sdk", version = "1.1.0" } solana-vote-program = { path = "../programs/vote", version = "1.1.0" } solana-vote-signer = { path = "../vote-signer", version = "1.1.0" } -tar = "0.4.26" [target."cfg(unix)".dependencies] gag = "0.1.10" diff --git a/validator/src/main.rs b/validator/src/main.rs index c817651e1..c3152e44b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -1,9 +1,6 @@ -use bzip2::bufread::BzDecoder; use clap::{ crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg, ArgMatches, }; -use console::Emoji; -use indicatif::{ProgressBar, ProgressStyle}; use log::*; use rand::{thread_rng, Rng}; use solana_clap_utils::{ @@ -20,11 +17,11 @@ use solana_core::{ rpc::JsonRpcConfig, validator::{Validator, ValidatorConfig}, }; +use solana_download_utils::{download_genesis, download_snapshot}; use solana_ledger::bank_forks::SnapshotConfig; use solana_perf::recycler::enable_recycler_warming; use solana_sdk::{ clock::Slot, - genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey, signature::{Keypair, Signer}, @@ -32,9 +29,8 @@ use solana_sdk::{ use std::{ collections::HashSet, fs::{self, File}, - io::{self, Read}, net::{SocketAddr, TcpListener, UdpSocket}, - path::{Path, PathBuf}, + path::PathBuf, process::exit, str::FromStr, sync::{ @@ -65,122 +61,6 @@ fn hash_validator(hash: String) -> Result<(), String> { .map_err(|e| format!("{:?}", e)) } -static TRUCK: Emoji = Emoji("🚚 ", ""); -static SPARKLE: Emoji = Emoji("✨ ", ""); - -/// Creates a new process bar for processing that will take an unknown amount of time -fn new_spinner_progress_bar() -> ProgressBar { - let progress_bar = ProgressBar::new(42); - progress_bar - .set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}")); - progress_bar.enable_steady_tick(100); - progress_bar -} - -fn download_file(url: &str, destination_file: &Path) -> Result<(), String> { - if destination_file.is_file() { - return Err(format!("{:?} already exists", destination_file)); - } - let download_start = Instant::now(); - - fs::create_dir_all(destination_file.parent().unwrap()).map_err(|err| err.to_string())?; - - let temp_destination_file = destination_file.with_extension(".tmp"); - - let progress_bar = new_spinner_progress_bar(); - progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url)); - - let response = reqwest::blocking::Client::new() - .get(url) - .send() - .and_then(|response| response.error_for_status()) - .map_err(|err| { - progress_bar.finish_and_clear(); - err.to_string() - })?; - - let download_size = { - response - .headers() - .get(reqwest::header::CONTENT_LENGTH) - .and_then(|content_length| content_length.to_str().ok()) - .and_then(|content_length| content_length.parse().ok()) - .unwrap_or(0) - }; - progress_bar.set_length(download_size); - progress_bar.set_style( - ProgressStyle::default_bar() - .template(&format!( - "{}{}Downloading {} {}", - "{spinner:.green} ", - TRUCK, - url, - "[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})" - )) - .progress_chars("=> "), - ); - - struct DownloadProgress { - progress_bar: ProgressBar, - response: R, - } - - impl Read for DownloadProgress { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.response.read(buf).map(|n| { - self.progress_bar.inc(n as u64); - n - }) - } - } - - let mut source = DownloadProgress { - progress_bar, - response, - }; - - File::create(&temp_destination_file) - .and_then(|mut file| std::io::copy(&mut source, &mut file)) - .map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?; - - source.progress_bar.finish_and_clear(); - info!( - " {}{}", - SPARKLE, - format!( - "Downloaded {} ({} bytes) in {:?}", - url, - download_size, - Instant::now().duration_since(download_start), - ) - ); - - std::fs::rename(temp_destination_file, destination_file) - .map_err(|err| format!("Unable to rename: {:?}", err))?; - - Ok(()) -} - -fn extract_archive(archive_filename: &Path, destination_dir: &Path) -> Result<(), String> { - info!("Extracting {:?}...", archive_filename); - let extract_start = Instant::now(); - - fs::create_dir_all(destination_dir).map_err(|err| err.to_string())?; - let tar_bz2 = File::open(&archive_filename) - .map_err(|err| format!("Unable to open {:?}: {:?}", archive_filename, err))?; - let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2)); - let mut archive = tar::Archive::new(tar); - archive - .unpack(destination_dir) - .map_err(|err| format!("Unable to unpack {:?}: {:?}", archive_filename, err))?; - info!( - "Extracted {:?} in {:?}", - archive_filename, - Instant::now().duration_since(extract_start) - ); - Ok(()) -} - fn get_shred_rpc_peers( cluster_info: &Arc>, expected_shred_version: Option, @@ -458,74 +338,6 @@ fn check_vote_account( Ok(()) } -fn download_genesis( - rpc_addr: &SocketAddr, - ledger_path: &Path, - validator_config: &mut ValidatorConfig, -) -> Result<(), String> { - let genesis_package = ledger_path.join("genesis.tar.bz2"); - - let genesis_config = if !genesis_package.exists() { - let tmp_genesis_path = ledger_path.join("tmp-genesis"); - let tmp_genesis_package = tmp_genesis_path.join("genesis.tar.bz2"); - - let _ignored = fs::remove_dir_all(&tmp_genesis_path); - download_file( - &format!("http://{}/{}", rpc_addr, "genesis.tar.bz2"), - &tmp_genesis_package, - )?; - extract_archive(&tmp_genesis_package, &ledger_path)?; - - let tmp_genesis_config = GenesisConfig::load(&ledger_path) - .map_err(|err| format!("Failed to load downloaded genesis config: {}", err))?; - - if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { - if expected_genesis_hash != tmp_genesis_config.hash() { - return Err(format!( - "Genesis hash mismatch: expected {} but downloaded genesis hash is {}", - expected_genesis_hash, - tmp_genesis_config.hash(), - )); - } - } - - std::fs::rename(tmp_genesis_package, genesis_package) - .map_err(|err| format!("Unable to rename: {:?}", err))?; - tmp_genesis_config - } else { - GenesisConfig::load(&ledger_path) - .map_err(|err| format!("Failed to load genesis config: {}", err))? - }; - - if validator_config.expected_genesis_hash.is_none() { - info!("Expected genesis hash set to {}", genesis_config.hash()); - // If no particular genesis hash is expected use the one that's here - validator_config.expected_genesis_hash = Some(genesis_config.hash()); - } - Ok(()) -} - -fn download_snapshot( - rpc_addr: &SocketAddr, - ledger_path: &Path, - snapshot_hash: (Slot, Hash), -) -> Result<(), String> { - let snapshot_package = - solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path, &snapshot_hash); - if snapshot_package.exists() { - Ok(()) - } else { - download_file( - &format!( - "http://{}/{}", - rpc_addr, - snapshot_package.file_name().unwrap().to_str().unwrap() - ), - &snapshot_package, - ) - } -} - // This function is duplicated in ledger-tool/src/main.rs... fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option> { if matches.is_present(name) { @@ -1157,11 +969,19 @@ pub fn main() { Err(err) => Err(format!("Failed to get RPC node version: {}", err)), } .and_then(|_| { - download_genesis( + let genesis_hash = download_genesis( &rpc_contact_info.rpc, &ledger_path, - &mut validator_config, - ) + validator_config.expected_genesis_hash, + ); + + if let Ok(genesis_hash) = genesis_hash { + if validator_config.expected_genesis_hash.is_none() { + info!("Expected genesis hash set to {}", genesis_hash); + validator_config.expected_genesis_hash = Some(genesis_hash); + } + } + genesis_hash }) .and_then(|_| { if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {