diff --git a/validator/src/main.rs b/validator/src/main.rs index 01e051158..1da96609b 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -80,39 +80,25 @@ fn new_spinner_progress_bar() -> ProgressBar { progress_bar } -fn download_tar_bz2( - rpc_addr: &SocketAddr, - archive_name: &str, - download_path: &Path, - is_snapshot: bool, -) -> Result<(), String> { - let archive_path = download_path.join(archive_name); - if archive_path.is_file() { - return Ok(()); +fn download_file(url: &str, destination_file: &Path, not_found_ok: bool) -> Result<(), String> { + if destination_file.is_file() { + return Err(format!("{:?} already exists", destination_file)); } - fs::create_dir_all(download_path).map_err(|err| err.to_string())?; - - let temp_archive_path = { - let mut p = archive_path.clone(); - p.set_extension(".tmp"); - p - }; - - let url = format!("http://{}/{}", rpc_addr, archive_name); let download_start = Instant::now(); + fs::create_dir_all(destination_file.parent().unwrap()).map_err(|err| err.to_string())?; + + let temp_destination_file = destination_file.with_extension(".tmp"); + let progress_bar = new_spinner_progress_bar(); progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url)); let client = reqwest::blocking::Client::new(); - let response = client - .get(url.as_str()) - .send() - .map_err(|err| err.to_string())?; + let response = client.get(url).send().map_err(|err| err.to_string())?; - if is_snapshot && response.status() == reqwest::StatusCode::NOT_FOUND { + if response.status() == reqwest::StatusCode::NOT_FOUND && not_found_ok { progress_bar.finish_and_clear(); - warn!("Snapshot not found at {}", url); + info!("Archive not found at {}", url); return Ok(()); } @@ -159,10 +145,10 @@ fn download_tar_bz2( response, }; - let mut file = File::create(&temp_archive_path) - .map_err(|err| format!("Unable to create {:?}: {:?}", temp_archive_path, err))?; + let mut file = File::create(&temp_destination_file) + .map_err(|err| format!("Unable to create {:?}: {:?}", temp_destination_file, err))?; std::io::copy(&mut source, &mut file) - .map_err(|err| format!("Unable to write {:?}: {:?}", temp_archive_path, err))?; + .map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?; source.progress_bar.finish_and_clear(); info!( @@ -176,28 +162,32 @@ fn download_tar_bz2( ) ); - if !is_snapshot { - info!("Extracting {:?}...", archive_path); - let extract_start = Instant::now(); - let tar_bz2 = File::open(&temp_archive_path) - .map_err(|err| format!("Unable to open {}: {:?}", archive_name, err))?; - let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2)); - let mut archive = tar::Archive::new(tar); - archive - .unpack(download_path) - .map_err(|err| format!("Unable to unpack {}: {:?}", archive_name, err))?; - info!( - "Extracted {} in {:?}", - archive_name, - Instant::now().duration_since(extract_start) - ); - } - std::fs::rename(temp_archive_path, archive_path) + std::fs::rename(temp_destination_file, destination_file) .map_err(|err| format!("Unable to rename: {:?}", err))?; Ok(()) } +fn extract_archive(archive_filename: &Path, destination_dir: &Path) -> Result<(), String> { + info!("Extracting {:?}...", archive_filename); + let extract_start = Instant::now(); + + fs::create_dir_all(destination_dir).map_err(|err| err.to_string())?; + let tar_bz2 = File::open(&archive_filename) + .map_err(|err| format!("Unable to open {:?}: {:?}", archive_filename, err))?; + let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2)); + let mut archive = tar::Archive::new(tar); + archive + .unpack(destination_dir) + .map_err(|err| format!("Unable to unpack {:?}: {:?}", archive_filename, err))?; + info!( + "Extracted {:?} in {:?}", + archive_filename, + Instant::now().duration_since(extract_start) + ); + Ok(()) +} + fn get_shred_rpc_peers( cluster_info: &Arc>, expected_shred_version: Option, @@ -296,23 +286,22 @@ fn get_rpc_node( .collect(); let rpc_peers_blacklisted = rpc_peers_total - rpc_peers.len(); - if rpc_peers_blacklisted == rpc_peers_total { - // If all nodes are blacklisted and no additional nodes are discovered after 60 seconds, - // remove the blacklist and try them all again - if blacklist_timeout.elapsed().as_secs() > 60 { - info!("Node blacklist timeout expired"); - blacklisted_rpc_nodes.clear(); - continue; - } - } else { - blacklist_timeout = Instant::now(); - } - info!( "Total {} RPC nodes found. {} blacklisted ", rpc_peers_total, rpc_peers_blacklisted ); + if rpc_peers_blacklisted == rpc_peers_total { + // If all nodes are blacklisted and no additional nodes are discovered after 60 seconds, + // remove the blacklist and try them all again + if blacklist_timeout.elapsed().as_secs() > 60 { + info!("Blacklist timeout expired"); + blacklisted_rpc_nodes.clear(); + } + continue; + } + blacklist_timeout = Instant::now(); + let mut highest_snapshot_hash: Option<(Slot, Hash)> = None; let eligible_rpc_peers = if snapshot_not_required { rpc_peers @@ -358,8 +347,14 @@ fn get_rpc_node( } Some(highest_snapshot_hash) => { info!( - "Highest available snapshot slot is {}, available from {:?}", + "Highest available snapshot slot is {}, available from {} node{}: {:?}", highest_snapshot_hash.0, + eligible_rpc_peers.len(), + if eligible_rpc_peers.len() > 1 { + "s" + } else { + "" + }, eligible_rpc_peers .iter() .map(|contact_info| contact_info.id) @@ -452,38 +447,49 @@ fn check_vote_account( fn download_genesis( rpc_addr: &SocketAddr, - rpc_client: &RpcClient, ledger_path: &Path, validator_config: &mut ValidatorConfig, ) -> Result<(), String> { - let genesis_hash = rpc_client - .get_genesis_hash() - .map_err(|err| format!("Failed to get genesis hash: {}", err))?; + let genesis_package = ledger_path.join("genesis.tar.bz2"); - if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { - if expected_genesis_hash != genesis_hash { - return Err(format!( - "Genesis hash mismatch: expected {} but local genesis hash is {}", - expected_genesis_hash, genesis_hash, - )); + let genesis_config = if !genesis_package.exists() { + let tmp_genesis_path = ledger_path.join("tmp-genesis"); + let tmp_genesis_package = tmp_genesis_path.join("genesis.tar.bz2"); + + let _ignored = fs::remove_dir_all(&tmp_genesis_path); + download_file( + &format!("http://{}/{}", rpc_addr, "genesis.tar.bz2"), + &tmp_genesis_package, + false, + )?; + extract_archive(&tmp_genesis_package, &ledger_path)?; + + let tmp_genesis_config = GenesisConfig::load(&ledger_path) + .map_err(|err| format!("Failed to load downloaded genesis config: {}", err))?; + + if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { + if expected_genesis_hash != tmp_genesis_config.hash() { + return Err(format!( + "Genesis hash mismatch: expected {} but downloaded genesis hash is {}", + expected_genesis_hash, + tmp_genesis_config.hash(), + )); + } } + + std::fs::rename(tmp_genesis_package, genesis_package) + .map_err(|err| format!("Unable to rename: {:?}", err))?; + tmp_genesis_config + } else { + GenesisConfig::load(&ledger_path) + .map_err(|err| format!("Failed to load genesis config: {}", err))? + }; + + if validator_config.expected_genesis_hash.is_none() { + info!("Expected genesis hash set to {}", genesis_config.hash()); + // If no particular genesis hash is expected use the one that's here + validator_config.expected_genesis_hash = Some(genesis_config.hash()); } - - download_tar_bz2(&rpc_addr, "genesis.tar.bz2", &ledger_path, false) - .map_err(|err| format!("Failed to download genesis config: {}", err))?; - - let genesis_config = GenesisConfig::load(&ledger_path) - .map_err(|err| format!("Failed to load genesis config: {}", err))?; - - if genesis_config.hash() != genesis_hash { - return Err(format!( - "Genesis hash mismatch: expected {} but downloaded genesis hash is {}", - genesis_hash, - genesis_config.hash(), - )); - } - - validator_config.expected_genesis_hash = Some(genesis_hash); Ok(()) } @@ -501,13 +507,15 @@ fn download_snapshot( fs::remove_file(&snapshot_package) .map_err(|err| format!("error removing {:?}: {}", snapshot_package, err))?; } - download_tar_bz2( - rpc_addr, - snapshot_package.file_name().unwrap().to_str().unwrap(), - snapshot_package.parent().unwrap(), + download_file( + &format!( + "http://{}/{}", + rpc_addr, + snapshot_package.file_name().unwrap().to_str().unwrap() + ), + &snapshot_package, true, - ) - .map_err(|err| format!("Failed to fetch snapshot: {:?}", err))?; + )?; Ok(()) } @@ -1117,11 +1125,25 @@ pub fn main() { .and_then(|_| { download_genesis( &rpc_contact_info.rpc, - &rpc_client, &ledger_path, &mut validator_config, ) }) + .and_then(|_| { + if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash { + // Sanity check that the RPC node is using the expected genesis hash before + // downloading a snapshot from it + let rpc_genesis_hash = rpc_client + .get_genesis_hash() + .map_err(|err| format!("Failed to get genesis hash: {}", err))?; + + if expected_genesis_hash != rpc_genesis_hash { + return Err(format!("Genesis hash mismatch: expected {} but RPC node genesis hash is {}", + expected_genesis_hash, rpc_genesis_hash)); + } + } + Ok(()) + }) .and_then(|_| download_snapshot(&rpc_contact_info.rpc, &ledger_path, snapshot_hash)) .and_then(|_| { if !validator_config.voting_disabled && !no_check_vote_account {