diff --git a/download-utils/src/lib.rs b/download-utils/src/lib.rs index 90db4ec1f..8decda47b 100644 --- a/download-utils/src/lib.rs +++ b/download-utils/src/lib.rs @@ -9,7 +9,7 @@ use std::io; use std::io::Read; use std::net::SocketAddr; use std::path::{Path, PathBuf}; -use std::time::Instant; +use std::time::{Duration, Instant}; static TRUCK: Emoji = Emoji("🚚 ", ""); static SPARKLE: Emoji = Emoji("✨ ", ""); @@ -23,11 +23,41 @@ fn new_spinner_progress_bar() -> ProgressBar { progress_bar } -pub fn download_file( +/// Structure modeling information about download progress +#[derive(Debug)] +pub struct DownloadProgressRecord { + // Duration since the beginning of the download + pub elapsed_time: Duration, + // Duration since the the last notification + pub last_elapsed_time: Duration, + // the bytes/sec speed measured for the last notification period + pub last_throughput: f32, + // the bytes/sec speed measured from the beginning + pub total_throughput: f32, + // total bytes of the download + pub total_bytes: usize, + // bytes downloaded so far + pub current_bytes: usize, + // percentage downloaded + pub percentage_done: f32, + // Estimated remaining time (in seconds) to finish the download if it keeps at the the last download speed + pub estimated_remaining_time: f32, + // The times of the progress is being notified, it starts from 1 and increments by 1 each time + pub notification_count: u64, +} + +/// This callback allows the caller to get notified of the download progress modelled by DownloadProgressRecord +/// Return "true" to continue the download +/// Return "false" to abort the download +pub fn download_file( url: &str, destination_file: &Path, use_progress_bar: bool, -) -> Result<(), String> { + progress_notify_callback: &Option, +) -> Result<(), String> +where + F: Fn(&DownloadProgressRecord) -> bool, +{ if destination_file.is_file() { return Err(format!("{:?} already exists", destination_file)); } @@ -83,7 +113,10 @@ pub fn download_file( info!("Downloading {} bytes from {}", download_size, url); } - struct DownloadProgress { + struct DownloadProgress + where + F: Fn(&DownloadProgressRecord) -> bool, + { progress_bar: ProgressBar, response: R, last_print: Instant, @@ -91,30 +124,71 @@ pub fn download_file( last_print_bytes: usize, download_size: f32, use_progress_bar: bool, + start_time: Instant, + callback: Option, + notification_count: u64, } - impl Read for DownloadProgress { + impl Read for DownloadProgress + where + F: Fn(&DownloadProgressRecord) -> bool, + { fn read(&mut self, buf: &mut [u8]) -> io::Result { - self.response.read(buf).map(|n| { - if self.use_progress_bar { - self.progress_bar.inc(n as u64); - } else { - self.current_bytes += n; - if self.last_print.elapsed().as_secs() > 5 { - let total_bytes_f32 = self.current_bytes as f32; - let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32; - info!( - "downloaded {} bytes {:.1}% {:.1} bytes/s", - self.current_bytes, - 100f32 * (total_bytes_f32 / self.download_size), - diff_bytes_f32 / self.last_print.elapsed().as_secs_f32(), - ); - self.last_print = Instant::now(); - self.last_print_bytes = self.current_bytes; - } + let n = self.response.read(buf)?; + + self.current_bytes += n; + let total_bytes_f32 = self.current_bytes as f32; + let diff_bytes_f32 = (self.current_bytes - self.last_print_bytes) as f32; + let last_throughput = diff_bytes_f32 / self.last_print.elapsed().as_secs_f32(); + let estimated_remaining_time = if last_throughput > 0_f32 { + (self.download_size - self.current_bytes as f32) / last_throughput + } else { + f32::MAX + }; + + let mut progress_record = DownloadProgressRecord { + elapsed_time: self.start_time.elapsed(), + last_elapsed_time: self.last_print.elapsed(), + last_throughput, + total_throughput: self.current_bytes as f32 + / self.start_time.elapsed().as_secs_f32(), + total_bytes: self.download_size as usize, + current_bytes: self.current_bytes, + percentage_done: 100f32 * (total_bytes_f32 / self.download_size), + estimated_remaining_time, + notification_count: self.notification_count, + }; + let mut to_update_progress = false; + if progress_record.last_elapsed_time.as_secs() > 5 { + self.last_print = Instant::now(); + self.last_print_bytes = self.current_bytes; + to_update_progress = true; + self.notification_count += 1; + progress_record.notification_count = self.notification_count + } + + if self.use_progress_bar { + self.progress_bar.inc(n as u64); + } else if to_update_progress { + info!( + "downloaded {} bytes {:.1}% {:.1} bytes/s", + self.current_bytes, + progress_record.percentage_done, + progress_record.last_throughput, + ); + } + + if let Some(callback) = &self.callback { + if to_update_progress && !callback(&progress_record) { + info!("Download is aborted by the caller"); + return Err(io::Error::new( + io::ErrorKind::Other, + "Download is aborted by the caller", + )); } - n - }) + } + + Ok(n) } } @@ -126,6 +200,9 @@ pub fn download_file( last_print_bytes: 0, download_size: (download_size as f32).max(1f32), use_progress_bar, + start_time: Instant::now(), + callback: progress_notify_callback.as_ref(), + notification_count: 0, }; File::create(&temp_destination_file) @@ -164,6 +241,7 @@ pub fn download_genesis_if_missing( &format!("http://{}/{}", rpc_addr, DEFAULT_GENESIS_ARCHIVE), &tmp_genesis_package, use_progress_bar, + &None:: bool>, )?; Ok(tmp_genesis_package) @@ -172,13 +250,17 @@ pub fn download_genesis_if_missing( } } -pub fn download_snapshot( +pub fn download_snapshot( rpc_addr: &SocketAddr, snapshot_output_dir: &Path, desired_snapshot_hash: (Slot, Hash), use_progress_bar: bool, maximum_snapshots_to_retain: usize, -) -> Result<(), String> { + progress_notify_callback: &Option, +) -> Result<(), String> +where + F: Fn(&DownloadProgressRecord) -> bool, +{ snapshot_utils::purge_old_snapshot_archives(snapshot_output_dir, maximum_snapshots_to_retain); for compression in &[ @@ -208,6 +290,7 @@ pub fn download_snapshot( ), &desired_snapshot_package, use_progress_bar, + &progress_notify_callback, ) .is_ok() { diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 7f3847675..4f0f3bcb6 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -20,7 +20,7 @@ use solana_core::{ optimistic_confirmation_verifier::OptimisticConfirmationVerifier, validator::ValidatorConfig, }; -use solana_download_utils::download_snapshot; +use solana_download_utils::{download_snapshot, DownloadProgressRecord}; use solana_ledger::{ ancestor_iterator::AncestorIterator, blockstore::{Blockstore, PurgeType}, @@ -1685,6 +1685,7 @@ fn test_snapshot_download() { archive_snapshot_hash, false, snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + &None:: bool>, ) .unwrap(); diff --git a/sdk/cargo-build-bpf/src/main.rs b/sdk/cargo-build-bpf/src/main.rs index c60d1b7d7..cb0790b0d 100644 --- a/sdk/cargo-build-bpf/src/main.rs +++ b/sdk/cargo-build-bpf/src/main.rs @@ -4,7 +4,7 @@ use { crate_description, crate_name, crate_version, value_t, value_t_or_exit, values_t, App, Arg, }, regex::Regex, - solana_download_utils::download_file, + solana_download_utils::{download_file, DownloadProgressRecord}, solana_sdk::signature::{write_keypair_file, Keypair}, std::{ collections::HashMap, @@ -112,7 +112,12 @@ fn install_if_missing( url.push_str(version); url.push('/'); url.push_str(file.to_str().unwrap()); - download_file(&url.as_str(), &file, true)?; + download_file( + &url.as_str(), + &file, + true, + &None:: bool>, + )?; fs::create_dir_all(&target_path).map_err(|err| err.to_string())?; let zip = File::open(&file).map_err(|err| err.to_string())?; let tar = BzDecoder::new(BufReader::new(zip)); diff --git a/validator/src/main.rs b/validator/src/main.rs index 65cdf5edd..7f8231e85 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -34,7 +34,7 @@ use { is_snapshot_config_invalid, Validator, ValidatorConfig, ValidatorStartProgress, }, }, - solana_download_utils::download_snapshot, + solana_download_utils::{download_snapshot, DownloadProgressRecord}, solana_genesis_utils::download_then_check_genesis_hash, solana_ledger::blockstore_db::BlockstoreRecoveryMode, solana_perf::recycler::enable_recycler_warming, @@ -83,6 +83,10 @@ enum Operation { const EXCLUDE_KEY: &str = "account-index-exclude-key"; const INCLUDE_KEY: &str = "account-index-include-key"; +// The default minimal snapshot download speed (bytes/second) +const DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED: u64 = 10485760; +// The maximum times of snapshot download abort and retry +const MAX_SNAPSHOT_DOWNLOAD_ABORT: u32 = 5; fn monitor_validator(ledger_path: &Path) { let dashboard = Dashboard::new(ledger_path, None, None).unwrap_or_else(|err| { @@ -742,6 +746,8 @@ fn rpc_bootstrap( maximum_local_snapshot_age: Slot, should_check_duplicate_instance: bool, start_progress: &Arc>, + minimal_snapshot_download_speed: f32, + maximum_snapshot_download_abort: u64, ) { if !no_port_check { let mut order: Vec<_> = (0..cluster_entrypoints.len()).collect(); @@ -760,6 +766,7 @@ fn rpc_bootstrap( let mut blacklisted_rpc_nodes = HashSet::new(); let mut gossip = None; + let mut download_abort_count = 0; loop { if gossip.is_none() { *start_progress.write().unwrap() = ValidatorStartProgress::SearchingForRpcService; @@ -891,7 +898,40 @@ fn rpc_bootstrap( snapshot_hash, use_progress_bar, maximum_snapshots_to_retain, + &Some(|download_progress: &DownloadProgressRecord| { + debug!("Download progress: {:?}", download_progress); + + if download_progress.last_throughput < minimal_snapshot_download_speed + && download_progress.notification_count <= 1 + && download_progress.percentage_done <= 2_f32 + && download_progress.estimated_remaining_time > 60_f32 + && download_abort_count < maximum_snapshot_download_abort { + if let Some(ref trusted_validators) = validator_config.trusted_validators { + if trusted_validators.contains(&rpc_contact_info.id) + && trusted_validators.len() == 1 + && bootstrap_config.no_untrusted_rpc { + warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, but will NOT abort \ + and try a different node as it is the only trusted validator and the no-untrusted-rpc is set. \ + Abort count: {}, Progress detail: {:?}", + download_progress.last_throughput, minimal_snapshot_download_speed, + download_abort_count, download_progress); + return true; // Do not abort download from the one-and-only trusted validator + } + } + warn!("The snapshot download is too slow, throughput: {} < min speed {} bytes/sec, will abort \ + and try a different node. Abort count: {}, Progress detail: {:?}", + download_progress.last_throughput, minimal_snapshot_download_speed, + download_abort_count, download_progress); + false + } else { + true + } + }), ); + + if ret.is_err() { + download_abort_count += 1; + } gossip_service.join().unwrap(); ret }) @@ -970,6 +1010,8 @@ pub fn main() { .to_string(); let default_rpc_threads = num_cpus::get().to_string(); let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string(); + let default_min_snapshot_download_speed = &DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED.to_string(); + let default_max_snapshot_download_abort = &MAX_SNAPSHOT_DOWNLOAD_ABORT.to_string(); let matches = App::new(crate_name!()).about(crate_description!()) .version(solana_version::version!()) @@ -1273,6 +1315,25 @@ pub fn main() { .default_value(default_max_snapshot_to_retain) .help("The maximum number of snapshots to hold on to when purging older snapshots.") ) + .arg( + Arg::with_name("minimal_snapshot_download_speed") + .long("minimal-snapshot-download-speed") + .value_name("MINIMAL_SNAPSHOT_DOWNLOAD_SPEED") + .takes_value(true) + .default_value(default_min_snapshot_download_speed) + .help("The minimal speed of snapshot downloads measured in bytes/second. \ + If the initial download speed falls below this threshold, the system will \ + retry the download against a different rpc node."), + ) + .arg( + Arg::with_name("maximum_snapshot_download_abort") + .long("maximum-snapshot-download-abort") + .value_name("MAXIMUM_SNAPSHOT_DOWNLOAD_ABORT") + .takes_value(true) + .default_value(default_max_snapshot_download_abort) + .help("The maximum number of times to abort and retry when encountering a \ + slow snapshot download."), + ) .arg( Arg::with_name("contact_debug_interval") .long("contact-debug-interval") @@ -2185,6 +2246,11 @@ pub fn main() { let maximum_local_snapshot_age = value_t_or_exit!(matches, "maximum_local_snapshot_age", u64); let maximum_snapshots_to_retain = value_t_or_exit!(matches, "maximum_snapshots_to_retain", usize); + let minimal_snapshot_download_speed = + value_t_or_exit!(matches, "minimal_snapshot_download_speed", f32); + let maximum_snapshot_download_abort = + value_t_or_exit!(matches, "maximum_snapshot_download_abort", u64); + let snapshot_output_dir = if matches.is_present("snapshots") { PathBuf::from(matches.value_of("snapshots").unwrap()) } else { @@ -2444,6 +2510,8 @@ pub fn main() { maximum_local_snapshot_age, should_check_duplicate_instance, &start_progress, + minimal_snapshot_download_speed, + maximum_snapshot_download_abort, ); *start_progress.write().unwrap() = ValidatorStartProgress::Initializing; }