Add test_incremental_snapshot_download() to local-cluster (#19746)

This commit is contained in:
Brooks Prumo 2021-09-13 21:44:48 -05:00 committed by GitHub
parent b57e86abf2
commit 79ade5ec68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 385 additions and 119 deletions

View File

@ -2,7 +2,10 @@
use console::Emoji;
use indicatif::{ProgressBar, ProgressStyle};
use log::*;
use solana_runtime::{snapshot_utils, snapshot_utils::ArchiveFormat};
use solana_runtime::{
snapshot_package::SnapshotType,
snapshot_utils::{self, ArchiveFormat},
};
use solana_sdk::{clock::Slot, genesis_config::DEFAULT_GENESIS_ARCHIVE, hash::Hash};
use std::fs::{self, File};
use std::io;
@ -244,13 +247,16 @@ pub fn download_genesis_if_missing(
}
}
pub fn download_snapshot<'a, 'b>(
/// Download a snapshot archive from `rpc_addr`. Use `snapshot_type` to specify downloading either
/// a full snapshot or an incremental snapshot.
pub fn download_snapshot_archive<'a, 'b>(
rpc_addr: &SocketAddr,
snapshot_archives_dir: &Path,
desired_snapshot_hash: (Slot, Hash),
use_progress_bar: bool,
snapshot_type: SnapshotType,
maximum_full_snapshot_archives_to_retain: usize,
maximum_incremental_snapshot_archives_to_retain: usize,
use_progress_bar: bool,
progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>,
) -> Result<(), String> {
snapshot_utils::purge_old_snapshot_archives(
@ -259,20 +265,31 @@ pub fn download_snapshot<'a, 'b>(
maximum_incremental_snapshot_archives_to_retain,
);
for compression in &[
for archive_format in [
ArchiveFormat::TarZstd,
ArchiveFormat::TarGzip,
ArchiveFormat::TarBzip2,
ArchiveFormat::Tar, // `solana-test-validator` creates uncompressed snapshots
] {
let desired_snapshot_package = snapshot_utils::build_full_snapshot_archive_path(
snapshot_archives_dir.to_path_buf(),
desired_snapshot_hash.0,
&desired_snapshot_hash.1,
*compression,
);
let destination_path = match snapshot_type {
SnapshotType::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
snapshot_archives_dir.to_path_buf(),
desired_snapshot_hash.0,
&desired_snapshot_hash.1,
archive_format,
),
SnapshotType::IncrementalSnapshot(base_slot) => {
snapshot_utils::build_incremental_snapshot_archive_path(
snapshot_archives_dir.to_path_buf(),
base_slot,
desired_snapshot_hash.0,
&desired_snapshot_hash.1,
archive_format,
)
}
};
if desired_snapshot_package.is_file() {
if destination_path.is_file() {
return Ok(());
}
@ -280,13 +297,9 @@ pub fn download_snapshot<'a, 'b>(
&format!(
"http://{}/{}",
rpc_addr,
desired_snapshot_package
.file_name()
.unwrap()
.to_str()
.unwrap()
destination_path.file_name().unwrap().to_str().unwrap()
),
&desired_snapshot_package,
&destination_path,
use_progress_bar,
progress_notify_callback,
) {
@ -295,7 +308,7 @@ pub fn download_snapshot<'a, 'b>(
}
}
Err(format!(
"Failed to download a snapshot for slot {} from {}",
"Failed to download a snapshot archive for slot {} from {}",
desired_snapshot_hash.0, rpc_addr
))
}

View File

@ -2,4 +2,5 @@
pub mod cluster;
pub mod cluster_tests;
pub mod local_cluster;
mod local_cluster_snapshot_utils;
pub mod validator_configs;

View File

@ -0,0 +1,110 @@
use crate::{cluster::Cluster, local_cluster::LocalCluster};
use log::*;
use solana_runtime::{
snapshot_archive_info::{
FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter,
},
snapshot_utils,
};
use solana_sdk::{client::SyncClient, commitment_config::CommitmentConfig};
use std::{path::Path, thread::sleep, time::Duration};
impl LocalCluster {
/// Return the next full snapshot archive info after the cluster's last processed slot
pub fn wait_for_next_full_snapshot(
&self,
snapshot_archives_dir: impl AsRef<Path>,
) -> FullSnapshotArchiveInfo {
match self.wait_for_next_snapshot(snapshot_archives_dir, NextSnapshotType::FullSnapshot) {
NextSnapshotResult::FullSnapshot(full_snapshot_archive_info) => {
full_snapshot_archive_info
}
_ => unreachable!(),
}
}
/// Return the next incremental snapshot archive info (and associated full snapshot archive info)
/// after the cluster's last processed slot
pub fn wait_for_next_incremental_snapshot(
&self,
snapshot_archives_dir: impl AsRef<Path>,
) -> (IncrementalSnapshotArchiveInfo, FullSnapshotArchiveInfo) {
match self.wait_for_next_snapshot(
snapshot_archives_dir,
NextSnapshotType::IncrementalAndFullSnapshot,
) {
NextSnapshotResult::IncrementalAndFullSnapshot(
incremental_snapshot_archive_info,
full_snapshot_archive_info,
) => (
incremental_snapshot_archive_info,
full_snapshot_archive_info,
),
_ => unreachable!(),
}
}
/// Return the next snapshot archive infos after the cluster's last processed slot
pub fn wait_for_next_snapshot(
&self,
snapshot_archives_dir: impl AsRef<Path>,
next_snapshot_type: NextSnapshotType,
) -> NextSnapshotResult {
// Get slot after which this was generated
let client = self
.get_validator_client(&self.entry_point_info.id)
.unwrap();
let last_slot = client
.get_slot_with_commitment(CommitmentConfig::processed())
.expect("Couldn't get slot");
// Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot
// must include the transactions just pushed
trace!(
"Waiting for {:?} snapshot archive to be generated with slot >= {}",
next_snapshot_type,
last_slot
);
loop {
if let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_archives_dir)
{
match next_snapshot_type {
NextSnapshotType::FullSnapshot => {
if full_snapshot_archive_info.slot() >= last_slot {
return NextSnapshotResult::FullSnapshot(full_snapshot_archive_info);
}
}
NextSnapshotType::IncrementalAndFullSnapshot => {
if let Some(incremental_snapshot_archive_info) =
snapshot_utils::get_highest_incremental_snapshot_archive_info(
&snapshot_archives_dir,
full_snapshot_archive_info.slot(),
)
{
if incremental_snapshot_archive_info.slot() >= last_slot {
return NextSnapshotResult::IncrementalAndFullSnapshot(
incremental_snapshot_archive_info,
full_snapshot_archive_info,
);
}
}
}
}
}
sleep(Duration::from_secs(5));
}
}
}
#[derive(Debug)]
pub enum NextSnapshotType {
FullSnapshot,
IncrementalAndFullSnapshot,
}
#[derive(Debug)]
pub enum NextSnapshotResult {
FullSnapshot(FullSnapshotArchiveInfo),
IncrementalAndFullSnapshot(IncrementalSnapshotArchiveInfo, FullSnapshotArchiveInfo),
}

View File

@ -22,7 +22,7 @@ use {
tower_storage::FileTowerStorage,
validator::ValidatorConfig,
},
solana_download_utils::download_snapshot,
solana_download_utils::download_snapshot_archive,
solana_gossip::{
cluster_info::VALIDATOR_PORT_RANGE,
crds::Cursor,
@ -44,6 +44,7 @@ use {
solana_runtime::{
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_package::SnapshotType,
snapshot_utils::{self, ArchiveFormat},
},
solana_sdk::{
@ -1543,10 +1544,12 @@ fn test_frozen_account_from_snapshot() {
.snapshot_archives_dir;
trace!("Waiting for snapshot at {:?}", snapshot_archives_dir);
let (archive_filename, _archive_snapshot_hash) =
wait_for_next_snapshot(&cluster, snapshot_archives_dir);
let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot(snapshot_archives_dir);
trace!("Found snapshot: {:?}", archive_filename);
trace!(
"Found snapshot: {}",
full_snapshot_archive_info.path().display()
);
// Restart the validator from a snapshot
let validator_info = cluster.exit_node(&validator_identity.pubkey());
@ -1688,7 +1691,6 @@ fn test_snapshot_download() {
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
// Get slot after which this was generated
let snapshot_archives_dir = &leader_snapshot_test_config
.validator_config
.snapshot_config
@ -1697,18 +1699,170 @@ fn test_snapshot_download() {
.snapshot_archives_dir;
trace!("Waiting for snapshot");
let (archive_filename, archive_snapshot_hash) =
wait_for_next_snapshot(&cluster, snapshot_archives_dir);
trace!("found: {:?}", archive_filename);
let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot(snapshot_archives_dir);
trace!("found: {}", full_snapshot_archive_info.path().display());
// Download the snapshot, then boot a validator from it.
download_snapshot(
download_snapshot_archive(
&cluster.entry_point_info.rpc,
validator_snapshot_test_config.snapshot_archives_dir.path(),
archive_snapshot_hash,
snapshot_archives_dir,
(
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
),
SnapshotType::FullSnapshot,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_full_snapshot_archives_to_retain,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_incremental_snapshot_archives_to_retain,
false,
&mut None,
)
.unwrap();
cluster.add_validator(
&validator_snapshot_test_config.validator_config,
stake,
Arc::new(Keypair::new()),
None,
SocketAddrSpace::Unspecified,
);
}
#[test]
#[serial]
fn test_incremental_snapshot_download() {
solana_logger::setup_with_default(RUST_LOG_FILTER);
// First set up the cluster with 1 node
let accounts_hash_interval = 3;
let incremental_snapshot_interval = accounts_hash_interval * 3;
let full_snapshot_interval = incremental_snapshot_interval * 3;
let num_account_paths = 3;
let leader_snapshot_test_config = SnapshotValidatorConfig::new(
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval,
num_account_paths,
);
let validator_snapshot_test_config = SnapshotValidatorConfig::new(
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval,
num_account_paths,
);
let stake = 10_000;
let mut config = ClusterConfig {
node_stakes: vec![stake],
cluster_lamports: 1_000_000,
validator_configs: make_identical_validator_configs(
&leader_snapshot_test_config.validator_config,
1,
),
..ClusterConfig::default()
};
let mut cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);
let snapshot_archives_dir = &leader_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.snapshot_archives_dir;
debug!("snapshot config:\n\tfull snapshot interval: {}\n\tincremental snapshot interval: {}\n\taccounts hash interval: {}",
full_snapshot_interval,
incremental_snapshot_interval,
accounts_hash_interval);
debug!(
"leader config:\n\tbank snapshots dir: {}\n\tsnapshot archives dir: {}",
leader_snapshot_test_config
.bank_snapshots_dir
.path()
.display(),
leader_snapshot_test_config
.snapshot_archives_dir
.path()
.display(),
);
debug!(
"validator config:\n\tbank snapshots dir: {}\n\tsnapshot archives dir: {}",
validator_snapshot_test_config
.bank_snapshots_dir
.path()
.display(),
validator_snapshot_test_config
.snapshot_archives_dir
.path()
.display(),
);
trace!("Waiting for snapshots");
let (incremental_snapshot_archive_info, full_snapshot_archive_info) =
cluster.wait_for_next_incremental_snapshot(snapshot_archives_dir);
trace!(
"found: {} and {}",
full_snapshot_archive_info.path().display(),
incremental_snapshot_archive_info.path().display()
);
// Download the snapshots, then boot a validator from them.
download_snapshot_archive(
&cluster.entry_point_info.rpc,
snapshot_archives_dir,
(
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
),
SnapshotType::FullSnapshot,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_full_snapshot_archives_to_retain,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_incremental_snapshot_archives_to_retain,
false,
&mut None,
)
.unwrap();
download_snapshot_archive(
&cluster.entry_point_info.rpc,
snapshot_archives_dir,
(
incremental_snapshot_archive_info.slot(),
*incremental_snapshot_archive_info.hash(),
),
SnapshotType::IncrementalSnapshot(incremental_snapshot_archive_info.base_slot()),
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_full_snapshot_archives_to_retain,
validator_snapshot_test_config
.validator_config
.snapshot_config
.as_ref()
.unwrap()
.maximum_incremental_snapshot_archives_to_retain,
false,
snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN,
snapshot_utils::DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN,
&mut None,
)
.unwrap();
@ -1765,8 +1919,7 @@ fn test_snapshot_restart_tower() {
.unwrap()
.snapshot_archives_dir;
let (archive_filename, archive_snapshot_hash) =
wait_for_next_snapshot(&cluster, snapshot_archives_dir);
let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot(snapshot_archives_dir);
// Copy archive to validator's snapshot output directory
let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path(
@ -1774,11 +1927,11 @@ fn test_snapshot_restart_tower() {
.snapshot_archives_dir
.path()
.to_path_buf(),
archive_snapshot_hash.0,
&archive_snapshot_hash.1,
ArchiveFormat::TarBzip2,
full_snapshot_archive_info.slot(),
full_snapshot_archive_info.hash(),
full_snapshot_archive_info.archive_format(),
);
fs::hard_link(archive_filename, &validator_archive_path).unwrap();
fs::hard_link(full_snapshot_archive_info.path(), &validator_archive_path).unwrap();
// Restart validator from snapshot, the validator's tower state in this snapshot
// will contain slots < the root bank of the snapshot. Validator should not panic.
@ -1958,7 +2111,7 @@ fn test_snapshots_restart_validity() {
expected_balances.extend(new_balances);
wait_for_next_snapshot(&cluster, snapshot_archives_dir);
cluster.wait_for_next_full_snapshot(snapshot_archives_dir);
// Create new account paths since validator exit is not guaranteed to cleanup RPC threads,
// which may delete the old accounts on exit at any point
@ -3452,51 +3605,6 @@ fn run_test_load_program_accounts(scan_commitment: CommitmentConfig) {
t_scan.join().unwrap();
}
fn wait_for_next_snapshot(
cluster: &LocalCluster,
snapshot_archives_dir: &Path,
) -> (PathBuf, (Slot, Hash)) {
// Get slot after which this was generated
let client = cluster
.get_validator_client(&cluster.entry_point_info.id)
.unwrap();
let last_slot = client
.get_slot_with_commitment(CommitmentConfig::processed())
.expect("Couldn't get slot");
// Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot
// must include the transactions just pushed
trace!(
"Waiting for snapshot archive to be generated with slot > {}",
last_slot
);
loop {
if let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_archives_dir)
{
trace!(
"full snapshot for slot {} exists",
full_snapshot_archive_info.slot()
);
if full_snapshot_archive_info.slot() >= last_slot {
return (
full_snapshot_archive_info.path().clone(),
(
full_snapshot_archive_info.slot(),
*full_snapshot_archive_info.hash(),
),
);
}
trace!(
"full snapshot slot {} < last_slot {}",
full_snapshot_archive_info.slot(),
last_slot
);
}
sleep(Duration::from_millis(5000));
}
}
fn farf_dir() -> PathBuf {
std::env::var("FARF_DIR")
.unwrap_or_else(|_| "farf".to_string())
@ -3515,42 +3623,73 @@ fn generate_account_paths(num_account_paths: usize) -> (Vec<TempDir>, Vec<PathBu
}
struct SnapshotValidatorConfig {
_snapshot_dir: TempDir,
bank_snapshots_dir: TempDir,
snapshot_archives_dir: TempDir,
account_storage_dirs: Vec<TempDir>,
validator_config: ValidatorConfig,
}
fn setup_snapshot_validator_config(
snapshot_interval_slots: u64,
num_account_paths: usize,
) -> SnapshotValidatorConfig {
// Create the snapshot config
let bank_snapshots_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots: snapshot_interval_slots,
incremental_snapshot_archive_interval_slots: Slot::MAX,
snapshot_archives_dir: snapshot_archives_dir.path().to_path_buf(),
bank_snapshots_dir: bank_snapshots_dir.path().to_path_buf(),
..SnapshotConfig::default()
};
impl SnapshotValidatorConfig {
pub fn new(
full_snapshot_archive_interval_slots: Slot,
incremental_snapshot_archive_interval_slots: Slot,
accounts_hash_interval_slots: Slot,
num_account_paths: usize,
) -> SnapshotValidatorConfig {
assert!(accounts_hash_interval_slots > 0);
assert!(full_snapshot_archive_interval_slots > 0);
assert!(full_snapshot_archive_interval_slots % accounts_hash_interval_slots == 0);
if incremental_snapshot_archive_interval_slots != Slot::MAX {
assert!(incremental_snapshot_archive_interval_slots > 0);
assert!(
incremental_snapshot_archive_interval_slots % accounts_hash_interval_slots == 0
);
assert!(
full_snapshot_archive_interval_slots % incremental_snapshot_archive_interval_slots
== 0
);
}
// Create the account paths
let (account_storage_dirs, account_storage_paths) = generate_account_paths(num_account_paths);
// Create the snapshot config
let bank_snapshots_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap();
let snapshot_config = SnapshotConfig {
full_snapshot_archive_interval_slots,
incremental_snapshot_archive_interval_slots,
snapshot_archives_dir: snapshot_archives_dir.path().to_path_buf(),
bank_snapshots_dir: bank_snapshots_dir.path().to_path_buf(),
..SnapshotConfig::default()
};
// Create the validator config
let validator_config = ValidatorConfig {
snapshot_config: Some(snapshot_config),
account_paths: account_storage_paths,
accounts_hash_interval_slots: snapshot_interval_slots,
..ValidatorConfig::default()
};
// Create the account paths
let (account_storage_dirs, account_storage_paths) =
generate_account_paths(num_account_paths);
SnapshotValidatorConfig {
_snapshot_dir: bank_snapshots_dir,
snapshot_archives_dir,
account_storage_dirs,
validator_config,
// Create the validator config
let validator_config = ValidatorConfig {
snapshot_config: Some(snapshot_config),
account_paths: account_storage_paths,
accounts_hash_interval_slots,
..ValidatorConfig::default()
};
SnapshotValidatorConfig {
bank_snapshots_dir,
snapshot_archives_dir,
account_storage_dirs,
validator_config,
}
}
}
fn setup_snapshot_validator_config(
snapshot_interval_slots: Slot,
num_account_paths: usize,
) -> SnapshotValidatorConfig {
SnapshotValidatorConfig::new(
snapshot_interval_slots,
Slot::MAX,
snapshot_interval_slots,
num_account_paths,
)
}

View File

@ -2,7 +2,7 @@ use {
crate::accountsdb_repl_service::AccountsDbReplService,
crossbeam_channel::unbounded,
log::*,
solana_download_utils::download_snapshot,
solana_download_utils::download_snapshot_archive,
solana_genesis_utils::download_then_check_genesis_hash,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{
@ -23,7 +23,7 @@ use {
solana_runtime::{
accounts_index::AccountSecondaryIndexes, bank_forks::BankForks,
commitment::BlockCommitmentCache, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
snapshot_config::SnapshotConfig, snapshot_utils,
snapshot_config::SnapshotConfig, snapshot_package::SnapshotType, snapshot_utils,
},
solana_sdk::{clock::Slot, exit::Exit, genesis_config::GenesisConfig, hash::Hash},
solana_streamer::socket::SocketAddrSpace,
@ -86,13 +86,14 @@ fn initialize_from_snapshot(
replica_config.snapshot_archives_dir
);
download_snapshot(
download_snapshot_archive(
&replica_config.rpc_peer_addr,
&replica_config.snapshot_archives_dir,
replica_config.snapshot_info,
false,
SnapshotType::FullSnapshot,
snapshot_config.maximum_full_snapshot_archives_to_retain,
snapshot_config.maximum_incremental_snapshot_archives_to_retain,
false,
&mut None,
)
.unwrap();

View File

@ -25,7 +25,7 @@ use {
tpu::DEFAULT_TPU_COALESCE_MS,
validator::{is_snapshot_config_valid, Validator, ValidatorConfig, ValidatorStartProgress},
},
solana_download_utils::{download_snapshot, DownloadProgressRecord},
solana_download_utils::{download_snapshot_archive, DownloadProgressRecord},
solana_genesis_utils::download_then_check_genesis_hash,
solana_gossip::{
cluster_info::{ClusterInfo, Node, VALIDATOR_PORT_RANGE},
@ -49,6 +49,7 @@ use {
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_package::SnapshotType,
snapshot_utils::{
self, ArchiveFormat, SnapshotVersion, DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS,
@ -932,13 +933,14 @@ fn rpc_bootstrap(
} else {
(DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN)
};
let ret = download_snapshot(
let ret = download_snapshot_archive(
&rpc_contact_info.rpc,
snapshot_archives_dir,
snapshot_hash,
use_progress_bar,
SnapshotType::FullSnapshot,
maximum_full_snapshot_archives_to_retain,
maximum_incremental_snapshot_archives_to_retain,
use_progress_bar,
&mut Some(Box::new(|download_progress: &DownloadProgressRecord| {
debug!("Download progress: {:?}", download_progress);