Move download code to download-utils crate (#8704)

This commit is contained in:
sakridge 2020-03-07 07:08:01 -08:00 committed by GitHub
parent a7d1346d51
commit 97986a5241
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 319 additions and 206 deletions

20
Cargo.lock generated
View File

@ -3892,6 +3892,20 @@ dependencies = [
"winapi 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "solana-download-utils"
version = "1.1.0"
dependencies = [
"bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"indicatif 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-ledger 1.1.0",
"solana-sdk 1.1.0",
"tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "solana-exchange-program"
version = "1.1.0"
@ -4126,6 +4140,7 @@ dependencies = [
"solana-client 1.1.0",
"solana-config-program 1.1.0",
"solana-core 1.1.0",
"solana-download-utils 1.1.0",
"solana-exchange-program 1.1.0",
"solana-faucet 1.1.0",
"solana-genesis-programs 1.1.0",
@ -4494,19 +4509,17 @@ dependencies = [
name = "solana-validator"
version = "1.1.0"
dependencies = [
"bzip2 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.10 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
"console 0.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"gag 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
"indicatif 0.14.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.6.5 (registry+https://github.com/rust-lang/crates.io-index)",
"reqwest 0.10.1 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.46 (registry+https://github.com/rust-lang/crates.io-index)",
"solana-clap-utils 1.1.0",
"solana-client 1.1.0",
"solana-core 1.1.0",
"solana-download-utils 1.1.0",
"solana-faucet 1.1.0",
"solana-ledger 1.1.0",
"solana-logger 1.1.0",
@ -4517,7 +4530,6 @@ dependencies = [
"solana-sdk 1.1.0",
"solana-vote-program 1.1.0",
"solana-vote-signer 1.1.0",
"tar 0.4.26 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]

View File

@ -10,6 +10,7 @@ members = [
"cli-config",
"client",
"core",
"download-utils",
"faucet",
"perf",
"validator",

View File

@ -85,9 +85,19 @@ impl RpcRequestMiddleware {
}
fn get(&self, path: &str) -> RequestMiddlewareAction {
let filename = self.ledger_path.join(
path.split_at(1).1, // Drop leading '/' from path
);
let stem = path.split_at(1).1; // Drop leading '/' from path
let filename = {
match path {
"/genesis.tar.bz2" => self.ledger_path.join(stem),
_ => self
.snapshot_config
.as_ref()
.unwrap()
.snapshot_package_output_path
.join(stem),
}
};
info!("get {} -> {:?}", path, filename);
RequestMiddlewareAction::Respond {

23
download-utils/Cargo.toml Normal file
View File

@ -0,0 +1,23 @@
[package]
name = "solana-download-utils"
version = "1.1.0"
description = "Solana Download Utils"
authors = ["Solana Maintainers <maintainers@solana.com>"]
repository = "https://github.com/solana-labs/solana"
license = "Apache-2.0"
homepage = "https://solana.com/"
edition = "2018"
[dependencies]
bzip2 = "0.3.3"
console = "0.9.2"
indicatif = "0.14.0"
log = "0.4.8"
reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "rustls-tls", "json"] }
solana-sdk = { path = "../sdk", version = "1.1.0" }
solana-ledger = { path = "../ledger", version = "1.1.0" }
tar = "0.4.26"
[lib]
crate-type = ["lib"]
name = "solana_download_utils"

192
download-utils/src/lib.rs Normal file
View File

@ -0,0 +1,192 @@
use bzip2::bufread::BzDecoder;
use console::Emoji;
use indicatif::{ProgressBar, ProgressStyle};
use log::*;
use solana_sdk::clock::Slot;
use solana_sdk::genesis_config::GenesisConfig;
use solana_sdk::hash::Hash;
use std::fs::{self, File};
use std::io;
use std::io::Read;
use std::net::SocketAddr;
use std::path::Path;
use std::time::Instant;
static TRUCK: Emoji = Emoji("🚚 ", "");
static SPARKLE: Emoji = Emoji("", "");
/// Creates a new process bar for processing that will take an unknown amount of time
fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
pub fn download_file(url: &str, destination_file: &Path) -> Result<(), String> {
if destination_file.is_file() {
return Err(format!("{:?} already exists", destination_file));
}
let download_start = Instant::now();
fs::create_dir_all(destination_file.parent().unwrap()).map_err(|err| err.to_string())?;
let temp_destination_file = destination_file.with_extension(".tmp");
let progress_bar = new_spinner_progress_bar();
progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url));
let response = reqwest::blocking::Client::new()
.get(url)
.send()
.and_then(|response| response.error_for_status())
.map_err(|err| {
progress_bar.finish_and_clear();
err.to_string()
})?;
let download_size = {
response
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.and_then(|content_length| content_length.to_str().ok())
.and_then(|content_length| content_length.parse().ok())
.unwrap_or(0)
};
progress_bar.set_length(download_size);
progress_bar.set_style(
ProgressStyle::default_bar()
.template(&format!(
"{}{}Downloading {} {}",
"{spinner:.green} ",
TRUCK,
url,
"[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})"
))
.progress_chars("=> "),
);
struct DownloadProgress<R> {
progress_bar: ProgressBar,
response: R,
}
impl<R: Read> Read for DownloadProgress<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.response.read(buf).map(|n| {
self.progress_bar.inc(n as u64);
n
})
}
}
let mut source = DownloadProgress {
progress_bar,
response,
};
File::create(&temp_destination_file)
.and_then(|mut file| std::io::copy(&mut source, &mut file))
.map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?;
source.progress_bar.finish_and_clear();
info!(
" {}{}",
SPARKLE,
format!(
"Downloaded {} ({} bytes) in {:?}",
url,
download_size,
Instant::now().duration_since(download_start),
)
);
std::fs::rename(temp_destination_file, destination_file)
.map_err(|err| format!("Unable to rename: {:?}", err))?;
Ok(())
}
fn extract_archive(archive_filename: &Path, destination_dir: &Path) -> Result<(), String> {
info!("Extracting {:?}...", archive_filename);
let extract_start = Instant::now();
fs::create_dir_all(destination_dir).map_err(|err| err.to_string())?;
let tar_bz2 = File::open(&archive_filename)
.map_err(|err| format!("Unable to open {:?}: {:?}", archive_filename, err))?;
let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2));
let mut archive = tar::Archive::new(tar);
archive
.unpack(destination_dir)
.map_err(|err| format!("Unable to unpack {:?}: {:?}", archive_filename, err))?;
info!(
"Extracted {:?} in {:?}",
archive_filename,
Instant::now().duration_since(extract_start)
);
Ok(())
}
pub fn download_genesis(
rpc_addr: &SocketAddr,
ledger_path: &Path,
expected_genesis_hash: Option<Hash>,
) -> Result<Hash, String> {
let genesis_package = ledger_path.join("genesis.tar.bz2");
let genesis_config = if !genesis_package.exists() {
let tmp_genesis_path = ledger_path.join("tmp-genesis");
let tmp_genesis_package = tmp_genesis_path.join("genesis.tar.bz2");
let _ignored = fs::remove_dir_all(&tmp_genesis_path);
download_file(
&format!("http://{}/{}", rpc_addr, "genesis.tar.bz2"),
&tmp_genesis_package,
)?;
extract_archive(&tmp_genesis_package, &ledger_path)?;
let tmp_genesis_config = GenesisConfig::load(&ledger_path)
.map_err(|err| format!("Failed to load downloaded genesis config: {}", err))?;
if let Some(expected_genesis_hash) = expected_genesis_hash {
if expected_genesis_hash != tmp_genesis_config.hash() {
return Err(format!(
"Genesis hash mismatch: expected {} but downloaded genesis hash is {}",
expected_genesis_hash,
tmp_genesis_config.hash(),
));
}
}
std::fs::rename(tmp_genesis_package, genesis_package)
.map_err(|err| format!("Unable to rename: {:?}", err))?;
tmp_genesis_config
} else {
GenesisConfig::load(&ledger_path)
.map_err(|err| format!("Failed to load genesis config: {}", err))?
};
Ok(genesis_config.hash())
}
pub fn download_snapshot(
rpc_addr: &SocketAddr,
ledger_path: &Path,
snapshot_hash: (Slot, Hash),
) -> Result<(), String> {
let snapshot_package =
solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path, &snapshot_hash);
if snapshot_package.exists() {
Ok(())
} else {
download_file(
&format!(
"http://{}/{}",
rpc_addr,
snapshot_package.file_name().unwrap().to_str().unwrap()
),
&snapshot_package,
)
}
}

View File

@ -16,6 +16,7 @@ solana-archiver-lib = { path = "../archiver-lib", version = "1.1.0" }
solana-config-program = { path = "../programs/config", version = "1.1.0" }
solana-core = { path = "../core", version = "1.1.0" }
solana-client = { path = "../client", version = "1.1.0" }
solana-download-utils = { path = "../download-utils", version = "1.1.0" }
solana-faucet = { path = "../faucet", version = "1.1.0" }
solana-exchange-program = { path = "../programs/exchange", version = "1.1.0" }
solana-genesis-programs = { path = "../genesis-programs", version = "1.1.0" }

View File

@ -7,6 +7,7 @@ use solana_core::{
broadcast_stage::BroadcastStageType, consensus::VOTE_THRESHOLD_DEPTH,
gossip_service::discover_cluster, validator::ValidatorConfig,
};
use solana_download_utils::download_snapshot;
use solana_ledger::{
bank_forks::SnapshotConfig, blockstore::Blockstore, leader_schedule::FixedSchedule,
leader_schedule::LeaderSchedule, snapshot_utils,
@ -606,13 +607,69 @@ fn test_softlaunch_operating_mode() {
}
}
#[test]
#[serial]
fn test_snapshot_download() {
solana_logger::setup();
// First set up the cluster with 1 node
let snapshot_interval_slots = 50;
let num_account_paths = 3;
let leader_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
let validator_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
let stake = 10_000;
let config = ClusterConfig {
node_stakes: vec![stake],
cluster_lamports: 1_000_000,
validator_configs: vec![leader_snapshot_test_config.validator_config.clone()],
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&config);
// Get slot after which this was generated
let snapshot_package_output_path = &leader_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.snapshot_package_output_path;
trace!("Waiting for snapshot");
let (archive_filename, archive_snapshot_hash) =
wait_for_next_snapshot(&cluster, &snapshot_package_output_path);
trace!("found: {:?}", archive_filename);
let validator_archive_path = snapshot_utils::get_snapshot_archive_path(
&validator_snapshot_test_config.snapshot_output_path,
&archive_snapshot_hash,
);
// Download the snapshot, then boot a validator from it.
download_snapshot(
&cluster.entry_point_info.rpc,
&validator_archive_path,
archive_snapshot_hash,
)
.unwrap();
cluster.add_validator(
&validator_snapshot_test_config.validator_config,
stake,
Arc::new(Keypair::new()),
);
}
#[allow(unused_attributes)]
#[test]
#[serial]
fn test_snapshot_restart_tower() {
// First set up the cluster with 2 nodes
let snapshot_interval_slots = 10;
let num_account_paths = 4;
let num_account_paths = 2;
let leader_snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
@ -774,7 +831,7 @@ fn test_snapshots_blockstore_floor() {
fn test_snapshots_restart_validity() {
solana_logger::setup();
let snapshot_interval_slots = 10;
let num_account_paths = 4;
let num_account_paths = 1;
let mut snapshot_test_config =
setup_snapshot_validator_config(snapshot_interval_slots, num_account_paths);
let snapshot_package_output_path = &snapshot_test_config

View File

@ -9,18 +9,16 @@ license = "Apache-2.0"
homepage = "https://solana.com/"
[dependencies]
bzip2 = "0.3.3"
clap = "2.33.0"
chrono = { version = "0.4.10", features = ["serde"] }
console = "0.9.2"
log = "0.4.8"
indicatif = "0.14.0"
rand = "0.6.5"
reqwest = { version = "0.10.1", default-features = false, features = ["blocking", "rustls-tls", "json"] }
serde_json = "1.0.46"
solana-clap-utils = { path = "../clap-utils", version = "1.1.0" }
solana-client = { path = "../client", version = "1.1.0" }
solana-core = { path = "../core", version = "1.1.0" }
solana-download-utils = { path = "../download-utils", version = "1.1.0" }
solana-faucet = { path = "../faucet", version = "1.1.0" }
solana-ledger = { path = "../ledger", version = "1.1.0" }
solana-logger = { path = "../logger", version = "1.1.0" }
@ -31,7 +29,6 @@ solana-runtime = { path = "../runtime", version = "1.1.0" }
solana-sdk = { path = "../sdk", version = "1.1.0" }
solana-vote-program = { path = "../programs/vote", version = "1.1.0" }
solana-vote-signer = { path = "../vote-signer", version = "1.1.0" }
tar = "0.4.26"
[target."cfg(unix)".dependencies]
gag = "0.1.10"

View File

@ -1,9 +1,6 @@
use bzip2::bufread::BzDecoder;
use clap::{
crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, Arg, ArgMatches,
};
use console::Emoji;
use indicatif::{ProgressBar, ProgressStyle};
use log::*;
use rand::{thread_rng, Rng};
use solana_clap_utils::{
@ -20,11 +17,11 @@ use solana_core::{
rpc::JsonRpcConfig,
validator::{Validator, ValidatorConfig},
};
use solana_download_utils::{download_genesis, download_snapshot};
use solana_ledger::bank_forks::SnapshotConfig;
use solana_perf::recycler::enable_recycler_warming;
use solana_sdk::{
clock::Slot,
genesis_config::GenesisConfig,
hash::Hash,
pubkey::Pubkey,
signature::{Keypair, Signer},
@ -32,9 +29,8 @@ use solana_sdk::{
use std::{
collections::HashSet,
fs::{self, File},
io::{self, Read},
net::{SocketAddr, TcpListener, UdpSocket},
path::{Path, PathBuf},
path::PathBuf,
process::exit,
str::FromStr,
sync::{
@ -65,122 +61,6 @@ fn hash_validator(hash: String) -> Result<(), String> {
.map_err(|e| format!("{:?}", e))
}
static TRUCK: Emoji = Emoji("🚚 ", "");
static SPARKLE: Emoji = Emoji("", "");
/// Creates a new process bar for processing that will take an unknown amount of time
fn new_spinner_progress_bar() -> ProgressBar {
let progress_bar = ProgressBar::new(42);
progress_bar
.set_style(ProgressStyle::default_spinner().template("{spinner:.green} {wide_msg}"));
progress_bar.enable_steady_tick(100);
progress_bar
}
fn download_file(url: &str, destination_file: &Path) -> Result<(), String> {
if destination_file.is_file() {
return Err(format!("{:?} already exists", destination_file));
}
let download_start = Instant::now();
fs::create_dir_all(destination_file.parent().unwrap()).map_err(|err| err.to_string())?;
let temp_destination_file = destination_file.with_extension(".tmp");
let progress_bar = new_spinner_progress_bar();
progress_bar.set_message(&format!("{}Downloading {}...", TRUCK, url));
let response = reqwest::blocking::Client::new()
.get(url)
.send()
.and_then(|response| response.error_for_status())
.map_err(|err| {
progress_bar.finish_and_clear();
err.to_string()
})?;
let download_size = {
response
.headers()
.get(reqwest::header::CONTENT_LENGTH)
.and_then(|content_length| content_length.to_str().ok())
.and_then(|content_length| content_length.parse().ok())
.unwrap_or(0)
};
progress_bar.set_length(download_size);
progress_bar.set_style(
ProgressStyle::default_bar()
.template(&format!(
"{}{}Downloading {} {}",
"{spinner:.green} ",
TRUCK,
url,
"[{bar:40.cyan/blue}] {bytes}/{total_bytes} ({eta})"
))
.progress_chars("=> "),
);
struct DownloadProgress<R> {
progress_bar: ProgressBar,
response: R,
}
impl<R: Read> Read for DownloadProgress<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.response.read(buf).map(|n| {
self.progress_bar.inc(n as u64);
n
})
}
}
let mut source = DownloadProgress {
progress_bar,
response,
};
File::create(&temp_destination_file)
.and_then(|mut file| std::io::copy(&mut source, &mut file))
.map_err(|err| format!("Unable to write {:?}: {:?}", temp_destination_file, err))?;
source.progress_bar.finish_and_clear();
info!(
" {}{}",
SPARKLE,
format!(
"Downloaded {} ({} bytes) in {:?}",
url,
download_size,
Instant::now().duration_since(download_start),
)
);
std::fs::rename(temp_destination_file, destination_file)
.map_err(|err| format!("Unable to rename: {:?}", err))?;
Ok(())
}
fn extract_archive(archive_filename: &Path, destination_dir: &Path) -> Result<(), String> {
info!("Extracting {:?}...", archive_filename);
let extract_start = Instant::now();
fs::create_dir_all(destination_dir).map_err(|err| err.to_string())?;
let tar_bz2 = File::open(&archive_filename)
.map_err(|err| format!("Unable to open {:?}: {:?}", archive_filename, err))?;
let tar = BzDecoder::new(std::io::BufReader::new(tar_bz2));
let mut archive = tar::Archive::new(tar);
archive
.unpack(destination_dir)
.map_err(|err| format!("Unable to unpack {:?}: {:?}", archive_filename, err))?;
info!(
"Extracted {:?} in {:?}",
archive_filename,
Instant::now().duration_since(extract_start)
);
Ok(())
}
fn get_shred_rpc_peers(
cluster_info: &Arc<RwLock<ClusterInfo>>,
expected_shred_version: Option<u16>,
@ -458,74 +338,6 @@ fn check_vote_account(
Ok(())
}
fn download_genesis(
rpc_addr: &SocketAddr,
ledger_path: &Path,
validator_config: &mut ValidatorConfig,
) -> Result<(), String> {
let genesis_package = ledger_path.join("genesis.tar.bz2");
let genesis_config = if !genesis_package.exists() {
let tmp_genesis_path = ledger_path.join("tmp-genesis");
let tmp_genesis_package = tmp_genesis_path.join("genesis.tar.bz2");
let _ignored = fs::remove_dir_all(&tmp_genesis_path);
download_file(
&format!("http://{}/{}", rpc_addr, "genesis.tar.bz2"),
&tmp_genesis_package,
)?;
extract_archive(&tmp_genesis_package, &ledger_path)?;
let tmp_genesis_config = GenesisConfig::load(&ledger_path)
.map_err(|err| format!("Failed to load downloaded genesis config: {}", err))?;
if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {
if expected_genesis_hash != tmp_genesis_config.hash() {
return Err(format!(
"Genesis hash mismatch: expected {} but downloaded genesis hash is {}",
expected_genesis_hash,
tmp_genesis_config.hash(),
));
}
}
std::fs::rename(tmp_genesis_package, genesis_package)
.map_err(|err| format!("Unable to rename: {:?}", err))?;
tmp_genesis_config
} else {
GenesisConfig::load(&ledger_path)
.map_err(|err| format!("Failed to load genesis config: {}", err))?
};
if validator_config.expected_genesis_hash.is_none() {
info!("Expected genesis hash set to {}", genesis_config.hash());
// If no particular genesis hash is expected use the one that's here
validator_config.expected_genesis_hash = Some(genesis_config.hash());
}
Ok(())
}
fn download_snapshot(
rpc_addr: &SocketAddr,
ledger_path: &Path,
snapshot_hash: (Slot, Hash),
) -> Result<(), String> {
let snapshot_package =
solana_ledger::snapshot_utils::get_snapshot_archive_path(ledger_path, &snapshot_hash);
if snapshot_package.exists() {
Ok(())
} else {
download_file(
&format!(
"http://{}/{}",
rpc_addr,
snapshot_package.file_name().unwrap().to_str().unwrap()
),
&snapshot_package,
)
}
}
// This function is duplicated in ledger-tool/src/main.rs...
fn hardforks_of(matches: &ArgMatches<'_>, name: &str) -> Option<Vec<Slot>> {
if matches.is_present(name) {
@ -1157,11 +969,19 @@ pub fn main() {
Err(err) => Err(format!("Failed to get RPC node version: {}", err)),
}
.and_then(|_| {
download_genesis(
let genesis_hash = download_genesis(
&rpc_contact_info.rpc,
&ledger_path,
&mut validator_config,
)
validator_config.expected_genesis_hash,
);
if let Ok(genesis_hash) = genesis_hash {
if validator_config.expected_genesis_hash.is_none() {
info!("Expected genesis hash set to {}", genesis_hash);
validator_config.expected_genesis_hash = Some(genesis_hash);
}
}
genesis_hash
})
.and_then(|_| {
if let Some(expected_genesis_hash) = validator_config.expected_genesis_hash {