From d1debcd971faf10b0c57625133d7214a18e99dab Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Thu, 22 Jul 2021 14:40:37 -0500 Subject: [PATCH] Add incremental snapshot utils (#18504) This commit adds high-level functions for creating and loading-from incremental snapshots, plus all low-level functions required to perform those tasks. This commit **does not** add taking incremental snapshots as part of a running validator, nor starting up a node with an incremental snapshot; just laying ground work. Additionally, `snapshot_utils` and `serde_snapshot` have been refactored to use a common code paths for the different snapshots. Also of note, some renaming has happened: 1. Snapshots are now either `full_` or `incremental_` throughout the codebase. If not specified, the code applies to both. 2. Bank snapshots now are called "bank snapshots" (before they were called "slot snapshots", "bank snapshots", or just "snapshots"). The one exception is within `Bank`, where they are still just "snapshots", because they are already "bank snapshots". 3. Snapshot archives now have `_archive` in the code. This should clear up an ambiguity between bank snapshots and snapshot archives. --- core/src/accounts_hash_verifier.rs | 1 + core/src/snapshot_packager_service.rs | 4 +- core/src/test_validator.rs | 6 +- core/src/validator.rs | 2 +- core/tests/snapshots.rs | 46 +- download-utils/src/lib.rs | 8 +- ledger-tool/src/main.rs | 10 +- ledger/src/bank_forks_utils.rs | 35 +- ledger/src/blockstore_processor.rs | 11 +- local-cluster/tests/local_cluster.rs | 66 +- measure/src/measure.rs | 1 + rpc/src/rpc.rs | 2 +- rpc/src/rpc_service.rs | 14 +- runtime/src/accounts_background_service.rs | 2 +- runtime/src/bank.rs | 17 +- runtime/src/serde_snapshot.rs | 119 +- runtime/src/serde_snapshot/tests.rs | 15 +- runtime/src/snapshot_utils.rs | 1905 ++++++++++++++++---- validator/src/main.rs | 16 +- 19 files changed, 1773 insertions(+), 507 deletions(-) diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index fa4dc71312..6ac305a324 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -99,6 +99,7 @@ impl AccountsHashVerifier { let accounts_package = solana_runtime::snapshot_utils::process_accounts_package_pre( accounts_package, thread_pool, + None, ); Self::process_accounts_package( accounts_package, diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index af522d88dd..d85fd44c9e 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -156,7 +156,7 @@ mod tests { } // Create a packageable snapshot - let output_tar_path = snapshot_utils::build_snapshot_archive_path( + let output_tar_path = snapshot_utils::build_full_snapshot_archive_path( snapshot_package_output_path, 42, &Hash::default(), @@ -177,7 +177,7 @@ mod tests { // Make tarball from packageable snapshot snapshot_utils::archive_snapshot_package( &snapshot_package, - snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap(); diff --git a/core/src/test_validator.rs b/core/src/test_validator.rs index 14a97cc0dc..9886d07ead 100644 --- a/core/src/test_validator.rs +++ b/core/src/test_validator.rs @@ -13,7 +13,9 @@ use { genesis_utils::create_genesis_config_with_leader_ex, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_config::SnapshotConfig, - snapshot_utils::{ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }, solana_sdk::{ account::{Account, AccountSharedData}, @@ -492,7 +494,7 @@ impl TestValidator { snapshot_package_output_path: ledger_path.to_path_buf(), archive_format: ArchiveFormat::Tar, snapshot_version: SnapshotVersion::default(), - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }), enforce_ulimit_nofile: false, warp_slot: config.warp_slot, diff --git a/core/src/validator.rs b/core/src/validator.rs index 0a9145d48b..2628c3b1d5 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1187,7 +1187,7 @@ fn new_banks_from_ledger( ); leader_schedule_cache.set_root(&bank_forks.root_bank()); - let archive_file = solana_runtime::snapshot_utils::bank_to_snapshot_archive( + let archive_file = solana_runtime::snapshot_utils::bank_to_full_snapshot_archive( ledger_path, &bank_forks.root_bank(), None, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 9526f6d2a6..2e04ec80a7 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -49,7 +49,9 @@ mod tests { bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, status_cache::MAX_CACHE_ENTRIES, }; use solana_sdk::{ @@ -119,7 +121,7 @@ mod tests { snapshot_path: PathBuf::from(snapshot_dir.path()), archive_format: ArchiveFormat::TarBzip2, snapshot_version, - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }; bank_forks.set_snapshot_config(Some(snapshot_config.clone())); SnapshotTestConfig { @@ -148,7 +150,7 @@ mod tests { let old_last_bank = old_bank_forks.get(old_last_slot).unwrap(); let check_hash_calculation = false; - let (deserialized_bank, _timing) = snapshot_utils::bank_from_snapshot_archive( + let (deserialized_bank, _timing) = snapshot_utils::bank_from_snapshot_archives( account_paths, &[], &old_bank_forks @@ -156,12 +158,13 @@ mod tests { .as_ref() .unwrap() .snapshot_path, - snapshot_utils::build_snapshot_archive_path( + snapshot_utils::build_full_snapshot_archive_path( snapshot_package_output_path.to_path_buf(), old_last_bank.slot(), &old_last_bank.get_accounts_hash(), ArchiveFormat::TarBzip2, ), + None, ArchiveFormat::TarBzip2, old_genesis_config, None, @@ -181,10 +184,10 @@ mod tests { .clone(); assert_eq!(*bank, deserialized_bank); - let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_path); + let bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshot_path); - for p in slot_snapshot_paths { - snapshot_utils::remove_snapshot(p.slot, &snapshot_path).unwrap(); + for p in bank_snapshot_infos { + snapshot_utils::remove_bank_snapshot(p.slot, &snapshot_path).unwrap(); } } @@ -235,12 +238,11 @@ mod tests { let last_bank = bank_forks.get(last_slot).unwrap(); let snapshot_config = &snapshot_test_config.snapshot_config; let snapshot_path = &snapshot_config.snapshot_path; - let last_slot_snapshot_path = snapshot_utils::get_snapshot_paths(snapshot_path) - .pop() + let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_info(snapshot_path) .expect("no snapshots found in path"); - let snapshot_package = snapshot_utils::package_snapshot( + let snapshot_package = snapshot_utils::package_full_snapshot( last_bank, - &last_slot_snapshot_path, + &last_bank_snapshot_info, snapshot_path, last_bank.src.slot_deltas(&last_bank.src.roots()), &snapshot_config.snapshot_package_output_path, @@ -253,10 +255,11 @@ mod tests { let snapshot_package = snapshot_utils::process_accounts_package_pre( snapshot_package, Some(last_bank.get_thread_pool()), + None, ); snapshot_utils::archive_snapshot_package( &snapshot_package, - DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap(); @@ -325,7 +328,8 @@ mod tests { // Take snapshot of zeroth bank let bank0 = bank_forks.get(0).unwrap(); let storages = bank0.get_snapshot_storages(); - snapshot_utils::add_snapshot(snapshot_path, bank0, &storages, snapshot_version).unwrap(); + snapshot_utils::add_bank_snapshot(snapshot_path, bank0, &storages, snapshot_version) + .unwrap(); // Set up snapshotting channels let (sender, receiver) = channel(); @@ -343,7 +347,7 @@ mod tests { let saved_slot = 4; let mut saved_archive_path = None; - for forks in 0..snapshot_utils::MAX_SNAPSHOTS + 2 { + for forks in 0..snapshot_utils::MAX_BANK_SNAPSHOTS + 2 { let bank = Bank::new_from_parent( &bank_forks[forks as u64], &Pubkey::default(), @@ -420,7 +424,7 @@ mod tests { let options = CopyOptions::new(); fs_extra::dir::copy(&last_snapshot_path, &saved_snapshots_dir, &options).unwrap(); - saved_archive_path = Some(snapshot_utils::build_snapshot_archive_path( + saved_archive_path = Some(snapshot_utils::build_full_snapshot_archive_path( snapshot_package_output_path.to_path_buf(), slot, &accounts_hash, @@ -431,11 +435,14 @@ mod tests { // Purge all the outdated snapshots, including the ones needed to generate the package // currently sitting in the channel - snapshot_utils::purge_old_snapshots(snapshot_path); - assert!(snapshot_utils::get_snapshot_paths(&snapshots_dir) + snapshot_utils::purge_old_bank_snapshots(snapshot_path); + + let mut bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshots_dir); + bank_snapshot_infos.sort_unstable(); + assert!(bank_snapshot_infos .into_iter() .map(|path| path.slot) - .eq(3..=snapshot_utils::MAX_SNAPSHOTS as u64 + 2)); + .eq(3..=snapshot_utils::MAX_BANK_SNAPSHOTS as u64 + 2)); // Create a SnapshotPackagerService to create tarballs from all the pending // SnapshotPackage's on the channel. By the time this service starts, we have already @@ -453,7 +460,7 @@ mod tests { None, &exit, &cluster_info, - DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ); let thread_pool = accounts_db::make_min_priority_thread_pool(); @@ -471,6 +478,7 @@ mod tests { solana_runtime::snapshot_utils::process_accounts_package_pre( snapshot_package, Some(&thread_pool), + None, ); *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); } diff --git a/download-utils/src/lib.rs b/download-utils/src/lib.rs index 565f7d7333..ee489c88a0 100644 --- a/download-utils/src/lib.rs +++ b/download-utils/src/lib.rs @@ -246,13 +246,13 @@ pub fn download_genesis_if_missing( pub fn download_snapshot<'a, 'b>( rpc_addr: &SocketAddr, - snapshot_output_dir: &Path, + snapshot_archives_dir: &Path, desired_snapshot_hash: (Slot, Hash), use_progress_bar: bool, maximum_snapshots_to_retain: usize, progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>, ) -> Result<(), String> { - snapshot_utils::purge_old_snapshot_archives(snapshot_output_dir, maximum_snapshots_to_retain); + snapshot_utils::purge_old_snapshot_archives(snapshot_archives_dir, maximum_snapshots_to_retain); for compression in &[ ArchiveFormat::TarZstd, @@ -260,8 +260,8 @@ pub fn download_snapshot<'a, 'b>( ArchiveFormat::TarBzip2, ArchiveFormat::Tar, // `solana-test-validator` creates uncompressed snapshots ] { - let desired_snapshot_package = snapshot_utils::build_snapshot_archive_path( - snapshot_output_dir.to_path_buf(), + let desired_snapshot_package = snapshot_utils::build_full_snapshot_archive_path( + snapshot_archives_dir.to_path_buf(), desired_snapshot_hash.0, &desired_snapshot_hash.1, *compression, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index f762e0c4ce..d628a65c03 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -30,7 +30,9 @@ use solana_runtime::{ bank_forks::BankForks, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }; use solana_sdk::{ account::{AccountSharedData, ReadableAccount, WritableAccount}, @@ -697,7 +699,7 @@ fn load_bank_forks( snapshot_path, archive_format: ArchiveFormat::TarBzip2, snapshot_version: SnapshotVersion::default(), - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }) }; let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { @@ -904,7 +906,7 @@ fn main() { .default_value(SnapshotVersion::default().into()) .help("Output snapshot version"); - let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string(); + let default_max_snapshot_to_retain = &DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN.to_string(); let maximum_snapshots_to_retain_arg = Arg::with_name("maximum_snapshots_to_retain") .long("maximum-snapshots-to-retain") .value_name("NUMBER") @@ -2196,7 +2198,7 @@ fn main() { bank.slot(), ); - let archive_file = snapshot_utils::bank_to_snapshot_archive( + let archive_file = snapshot_utils::bank_to_full_snapshot_archive( ledger_path, &bank, Some(snapshot_version), diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 3620c4ef47..7be4056d8e 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -11,7 +11,7 @@ use solana_entry::entry::VerifyRecyclers; use solana_runtime::{ bank_forks::BankForks, snapshot_config::SnapshotConfig, - snapshot_utils::{self, SnapshotArchiveInfo}, + snapshot_utils::{self, FullSnapshotArchiveInfo}, }; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; use std::{fs, path::PathBuf, process, result}; @@ -53,9 +53,11 @@ pub fn load( fs::create_dir_all(&snapshot_config.snapshot_path) .expect("Couldn't create snapshot directory"); - if let Some(snapshot_archive_info) = snapshot_utils::get_highest_snapshot_archive_info( - &snapshot_config.snapshot_package_output_path, - ) { + if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info( + &snapshot_config.snapshot_package_output_path, + ) + { return load_from_snapshot( genesis_config, blockstore, @@ -65,7 +67,7 @@ pub fn load( process_options, transaction_status_sender, cache_block_meta_sender, - &snapshot_archive_info, + &full_snapshot_archive_info, ); } else { info!("No snapshot package available; will load from genesis"); @@ -113,11 +115,11 @@ fn load_from_snapshot( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - snapshot_archive_info: &SnapshotArchiveInfo, + full_snapshot_archive_info: &FullSnapshotArchiveInfo, ) -> LoadResult { info!( "Loading snapshot package: {:?}", - &snapshot_archive_info.path + full_snapshot_archive_info.path() ); // Fail hard here if snapshot fails to load, don't silently continue @@ -126,12 +128,13 @@ fn load_from_snapshot( process::exit(1); } - let (deserialized_bank, timings) = snapshot_utils::bank_from_snapshot_archive( + let (deserialized_bank, timings) = snapshot_utils::bank_from_snapshot_archives( &account_paths, &process_options.frozen_accounts, &snapshot_config.snapshot_path, - &snapshot_archive_info.path, - snapshot_archive_info.archive_format, + full_snapshot_archive_info.path(), + None, + *full_snapshot_archive_info.archive_format(), genesis_config, process_options.debug_keys.clone(), Some(&crate::builtins::get(process_options.bpf_jit)), @@ -152,10 +155,18 @@ fn load_from_snapshot( deserialized_bank.get_accounts_hash(), ); - if deserialized_bank_slot_and_hash != (snapshot_archive_info.slot, snapshot_archive_info.hash) { + if deserialized_bank_slot_and_hash + != ( + *full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + ) + { error!( "Snapshot has mismatch:\narchive: {:?}\ndeserialized: {:?}", - (snapshot_archive_info.slot, snapshot_archive_info.hash), + ( + full_snapshot_archive_info.slot(), + full_snapshot_archive_info.hash() + ), deserialized_bank_slot_and_hash ); process::exit(1); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 6b18c39070..3838751794 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -567,7 +567,16 @@ fn do_process_blockstore_from_root( ("slot", bank_forks.root(), i64), ("forks", initial_forks.len(), i64), ("calculate_capitalization_us", time_cap.as_us(), i64), - ("untar_us", timings.untar_us, i64), + ( + "full_snapshot_untar_us", + timings.full_snapshot_untar_us, + i64 + ), + ( + "incremental_snapshot_untar_us", + timings.incremental_snapshot_untar_us, + i64 + ), ( "rebuild_bank_from_snapshots_us", timings.rebuild_bank_from_snapshots_us, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 45703741fc..fd934a0380 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1642,25 +1642,15 @@ fn test_snapshot_download() { trace!("Waiting for snapshot"); let (archive_filename, archive_snapshot_hash) = wait_for_next_snapshot(&cluster, snapshot_package_output_path); - trace!("found: {:?}", archive_filename); - let validator_archive_path = snapshot_utils::build_snapshot_archive_path( - validator_snapshot_test_config - .snapshot_output_path - .path() - .to_path_buf(), - archive_snapshot_hash.0, - &archive_snapshot_hash.1, - ArchiveFormat::TarBzip2, - ); // Download the snapshot, then boot a validator from it. download_snapshot( &cluster.entry_point_info.rpc, - &validator_archive_path, + validator_snapshot_test_config.snapshot_archives_dir.path(), archive_snapshot_hash, false, - snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, &mut None, ) .unwrap(); @@ -1720,9 +1710,9 @@ fn test_snapshot_restart_tower() { wait_for_next_snapshot(&cluster, snapshot_package_output_path); // Copy archive to validator's snapshot output directory - let validator_archive_path = snapshot_utils::build_snapshot_archive_path( + let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path( validator_snapshot_test_config - .snapshot_output_path + .snapshot_archives_dir .path() .to_path_buf(), archive_snapshot_hash.0, @@ -1783,7 +1773,7 @@ fn test_snapshots_blockstore_floor() { let archive_info = loop { let archive = - snapshot_utils::get_highest_snapshot_archive_info(&snapshot_package_output_path); + snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_package_output_path); if archive.is_some() { trace!("snapshot exists"); break archive.unwrap(); @@ -1792,17 +1782,17 @@ fn test_snapshots_blockstore_floor() { }; // Copy archive to validator's snapshot output directory - let validator_archive_path = snapshot_utils::build_snapshot_archive_path( + let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path( validator_snapshot_test_config - .snapshot_output_path + .snapshot_archives_dir .path() .to_path_buf(), - archive_info.slot, - &archive_info.hash, + *archive_info.slot(), + archive_info.hash(), ArchiveFormat::TarBzip2, ); - fs::hard_link(archive_info.path, &validator_archive_path).unwrap(); - let slot_floor = archive_info.slot; + fs::hard_link(archive_info.path(), &validator_archive_path).unwrap(); + let slot_floor = *archive_info.slot(); // Start up a new node from a snapshot let validator_stake = 5; @@ -3284,19 +3274,25 @@ fn wait_for_next_snapshot( last_slot ); loop { - if let Some(snapshot_archive_info) = - snapshot_utils::get_highest_snapshot_archive_info(snapshot_package_output_path) + if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_package_output_path) { - trace!("snapshot for slot {} exists", snapshot_archive_info.slot); - if snapshot_archive_info.slot >= last_slot { + trace!( + "full snapshot for slot {} exists", + full_snapshot_archive_info.slot() + ); + if *full_snapshot_archive_info.slot() >= last_slot { return ( - snapshot_archive_info.path, - (snapshot_archive_info.slot, snapshot_archive_info.hash), + full_snapshot_archive_info.path().clone(), + ( + *full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + ), ); } trace!( - "snapshot slot {} < last_slot {}", - snapshot_archive_info.slot, + "full snapshot slot {} < last_slot {}", + full_snapshot_archive_info.slot(), last_slot ); } @@ -3323,7 +3319,7 @@ fn generate_account_paths(num_account_paths: usize) -> (Vec, Vec, validator_config: ValidatorConfig, } @@ -3334,14 +3330,14 @@ fn setup_snapshot_validator_config( ) -> SnapshotValidatorConfig { // Create the snapshot config let snapshot_dir = tempfile::tempdir_in(farf_dir()).unwrap(); - let snapshot_output_path = tempfile::tempdir_in(farf_dir()).unwrap(); + let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap(); let snapshot_config = SnapshotConfig { snapshot_interval_slots, - snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), - snapshot_path: PathBuf::from(snapshot_dir.path()), + snapshot_package_output_path: snapshot_archives_dir.path().to_path_buf(), + snapshot_path: snapshot_dir.path().to_path_buf(), archive_format: ArchiveFormat::TarBzip2, snapshot_version: snapshot_utils::SnapshotVersion::default(), - maximum_snapshots_to_retain: snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }; // Create the account paths @@ -3357,7 +3353,7 @@ fn setup_snapshot_validator_config( SnapshotValidatorConfig { _snapshot_dir: snapshot_dir, - snapshot_output_path, + snapshot_archives_dir, account_storage_dirs, validator_config, } diff --git a/measure/src/measure.rs b/measure/src/measure.rs index 26f32b097c..3ab35229aa 100644 --- a/measure/src/measure.rs +++ b/measure/src/measure.rs @@ -1,6 +1,7 @@ use solana_sdk::timing::duration_as_ns; use std::{fmt, time::Instant}; +#[derive(Debug)] pub struct Measure { name: &'static str, start: Instant, diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 9e20e752c9..6a9b3d6359 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2271,7 +2271,7 @@ pub mod rpc_minimal { meta.snapshot_config .and_then(|snapshot_config| { - snapshot_utils::get_highest_snapshot_archive_slot( + snapshot_utils::get_highest_full_snapshot_archive_slot( &snapshot_config.snapshot_package_output_path, ) }) diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 4a357c4360..8b4adc974c 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -205,14 +205,14 @@ impl RequestMiddleware for RpcRequestMiddleware { if let Some(ref snapshot_config) = self.snapshot_config { if request.uri().path() == "/snapshot.tar.bz2" { // Convenience redirect to the latest snapshot - return if let Some(snapshot_archive_info) = - snapshot_utils::get_highest_snapshot_archive_info( + return if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info( &snapshot_config.snapshot_package_output_path, ) { RpcRequestMiddleware::redirect(&format!( "/{}", - snapshot_archive_info - .path + full_snapshot_archive_info + .path() .file_name() .unwrap_or_else(|| std::ffi::OsStr::new("")) .to_str() @@ -500,7 +500,9 @@ mod tests { }, solana_runtime::{ bank::Bank, - snapshot_utils::{ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }, solana_sdk::{ genesis_config::{ClusterType, DEFAULT_GENESIS_ARCHIVE}, @@ -606,7 +608,7 @@ mod tests { snapshot_path: PathBuf::from("/"), archive_format: ArchiveFormat::TarBzip2, snapshot_version: SnapshotVersion::default(), - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }), bank_forks, RpcHealth::stub(), diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 05b2ace711..cd06ea9d17 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -192,7 +192,7 @@ impl SnapshotRequestHandler { // Cleanup outdated snapshots let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time"); - snapshot_utils::purge_old_snapshots(&self.snapshot_config.snapshot_path); + snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.snapshot_path); purge_old_snapshots_time.stop(); total_time.stop(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 90fb226165..140bcb3281 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -38,7 +38,7 @@ use crate::{ AccountAddressFilter, Accounts, TransactionAccounts, TransactionLoadResult, TransactionLoaders, }, - accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages}, + accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorage, SnapshotStorages}, accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, @@ -4737,6 +4737,21 @@ impl Bank { self.rc.get_snapshot_storages(self.slot()) } + /// Get the snapshot storages _higher than_ the `full_snapshot_slot`. This is used when making an + /// incremental snapshot. + pub fn get_incremental_snapshot_storages(&self, full_snapshot_slot: Slot) -> SnapshotStorages { + self.get_snapshot_storages() + .into_iter() + .map(|storage| { + storage + .into_iter() + .filter(|entry| entry.slot() > full_snapshot_slot) + .collect::() + }) + .filter(|storage| !storage.is_empty()) + .collect() + } + #[must_use] fn verify_hash(&self) -> bool { assert!(self.is_frozen()); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 9f1e65a809..644f136886 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -74,6 +74,62 @@ struct AccountsDbFields( BankHashInfo, ); +/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a +/// full snapshot, or both a full and incremental snapshot +pub struct SnapshotStreams<'a, R> { + pub full_snapshot_stream: &'a mut BufReader, + pub incremental_snapshot_stream: Option<&'a mut BufReader>, +} + +/// Helper type to wrap AccountsDbFields when reconstructing AccountsDb from either just a full +/// snapshot, or both a full and incremental snapshot +#[derive(Debug)] +struct SnapshotAccountsDbFields { + full_snapshot_accounts_db_fields: AccountsDbFields, + incremental_snapshot_accounts_db_fields: Option>, +} + +impl SnapshotAccountsDbFields { + /// Collapse the SnapshotAccountsDbFields into a single AccountsDbFields. If there is no + /// incremental snapshot, this returns the AccountsDbFields from the full snapshot. Otherwise + /// this uses the version, slot, and bank hash info from the incremental snapshot, then the + /// combination of the storages from both the full and incremental snapshots. + fn collapse_into(self) -> Result, Error> { + match self.incremental_snapshot_accounts_db_fields { + None => Ok(self.full_snapshot_accounts_db_fields), + Some(AccountsDbFields( + mut incremental_snapshot_storages, + incremental_snapshot_version, + incremental_snapshot_slot, + incremental_snapshot_bank_hash_info, + )) => { + let full_snapshot_storages = self.full_snapshot_accounts_db_fields.0; + let full_snapshot_slot = self.full_snapshot_accounts_db_fields.2; + + // filter out incremental snapshot storages with slot <= full snapshot slot + incremental_snapshot_storages.retain(|slot, _| *slot > full_snapshot_slot); + + // There must not be any overlap in the slots of storages between the full snapshot and the incremental snapshot + incremental_snapshot_storages + .iter() + .all(|storage_entry| !full_snapshot_storages.contains_key(storage_entry.0)).then(|| ()).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "Snapshots are incompatible: There are storages for the same slot in both the full snapshot and the incremental snapshot!") + })?; + + let mut combined_storages = full_snapshot_storages; + combined_storages.extend(incremental_snapshot_storages.into_iter()); + + Ok(AccountsDbFields( + combined_storages, + incremental_snapshot_version, + incremental_snapshot_slot, + incremental_snapshot_bank_hash_info, + )) + } + } + } +} + trait TypeContext<'a> { type SerializableAccountStorageEntry: Serialize + DeserializeOwned @@ -127,16 +183,16 @@ where } #[allow(clippy::too_many_arguments)] -pub(crate) fn bank_from_stream( +pub(crate) fn bank_from_streams( serde_style: SerdeStyle, - stream: &mut BufReader, + snapshot_streams: &mut SnapshotStreams, account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, genesis_config: &GenesisConfig, frozen_account_pubkeys: &[Pubkey], debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -147,18 +203,33 @@ where { macro_rules! INTO { ($x:ident) => {{ - let (bank_fields, accounts_db_fields) = $x::deserialize_bank_fields(stream)?; + let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) = + $x::deserialize_bank_fields(snapshot_streams.full_snapshot_stream)?; + let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) = + if let Some(ref mut incremental_snapshot_stream) = + snapshot_streams.incremental_snapshot_stream + { + let (bank_fields, accounts_db_fields) = + $x::deserialize_bank_fields(incremental_snapshot_stream)?; + (Some(bank_fields), Some(accounts_db_fields)) + } else { + (None, None) + }; + let snapshot_accounts_db_fields = SnapshotAccountsDbFields { + full_snapshot_accounts_db_fields, + incremental_snapshot_accounts_db_fields, + }; let bank = reconstruct_bank_from_fields( - bank_fields, - accounts_db_fields, + incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields), + snapshot_accounts_db_fields, genesis_config, frozen_account_pubkeys, account_paths, unpacked_append_vec_map, debug_keys, additional_builtins, - account_indexes, + account_secondary_indexes, caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, @@ -243,14 +314,14 @@ impl<'a, C> IgnoreAsHelper for SerializableAccountsDb<'a, C> {} #[allow(clippy::too_many_arguments)] fn reconstruct_bank_from_fields( bank_fields: BankFieldsToDeserialize, - accounts_db_fields: AccountsDbFields, + snapshot_accounts_db_fields: SnapshotAccountsDbFields, genesis_config: &GenesisConfig, frozen_account_pubkeys: &[Pubkey], account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -260,11 +331,11 @@ where E: SerializableStorage + std::marker::Sync, { let mut accounts_db = reconstruct_accountsdb_from_fields( - accounts_db_fields, + snapshot_accounts_db_fields, account_paths, unpacked_append_vec_map, &genesis_config.cluster_type, - account_indexes, + account_secondary_indexes, caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, @@ -310,11 +381,11 @@ where } fn reconstruct_accountsdb_from_fields( - accounts_db_fields: AccountsDbFields, + snapshot_accounts_db_fields: SnapshotAccountsDbFields, account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, cluster_type: &ClusterType, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -326,11 +397,19 @@ where let mut accounts_db = AccountsDb::new_with_config( account_paths.to_vec(), cluster_type, - account_indexes, + account_secondary_indexes, caching_enabled, shrink_ratio, ); - let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields; + + let AccountsDbFields( + snapshot_storages, + snapshot_version, + snapshot_slot, + snapshot_bank_hash_info, + ) = snapshot_accounts_db_fields.collapse_into()?; + + let snapshot_storages = snapshot_storages.into_iter().collect::>(); // Ensure all account paths exist for path in &accounts_db.paths { @@ -338,13 +417,11 @@ where .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err)); } - let storage = storage.into_iter().collect::>(); - // Remap the deserialized AppendVec paths to point to correct local paths - let mut storage = (0..storage.len()) + let mut storage = (0..snapshot_storages.len()) .into_par_iter() .map(|i| { - let (slot, slot_storage) = &storage[i]; + let (slot, slot_storage) = &snapshot_storages[i]; let mut new_slot_storage = HashMap::new(); for storage_entry in slot_storage { let file_name = AppendVec::file_name(*slot, storage_entry.id()); @@ -376,7 +453,7 @@ where .bank_hashes .write() .unwrap() - .insert(slot, bank_hash_info); + .insert(snapshot_slot, snapshot_bank_hash_info); // Process deserialized data, set necessary fields in self let max_id: usize = *storage @@ -400,7 +477,7 @@ where accounts_db.next_id.store(max_id + 1, Ordering::Relaxed); accounts_db .write_version - .fetch_add(version, Ordering::Relaxed); + .fetch_add(snapshot_version, Ordering::Relaxed); accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); Ok(accounts_db) } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 2d40b5448c..2e6f6da9dc 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -66,8 +66,13 @@ where R: Read, { // read and deserialise the accounts database directly from the stream + let accounts_db_fields = C::deserialize_accounts_db_fields(stream)?; + let snapshot_accounts_db_fields = SnapshotAccountsDbFields { + full_snapshot_accounts_db_fields: accounts_db_fields, + incremental_snapshot_accounts_db_fields: None, + }; reconstruct_accountsdb_from_fields( - C::deserialize_accounts_db_fields(stream)?, + snapshot_accounts_db_fields, account_paths, unpacked_append_vec_map, &ClusterType::Development, @@ -219,9 +224,13 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { let copied_accounts = TempDir::new().unwrap(); let unpacked_append_vec_map = copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); - let mut dbank = crate::serde_snapshot::bank_from_stream( + let mut snapshot_streams = SnapshotStreams { + full_snapshot_stream: &mut reader, + incremental_snapshot_stream: None, + }; + let mut dbank = crate::serde_snapshot::bank_from_streams( serde_style, - &mut reader, + &mut snapshot_streams, &dbank_paths, unpacked_append_vec_map, &genesis_config, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index dbc12e328c..84f668fb9f 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -5,7 +5,8 @@ use { bank::{Bank, BankSlotDelta, Builtins}, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, serde_snapshot::{ - bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, + bank_from_streams, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, + SnapshotStreams, }, shared_buffer_reader::{SharedBuffer, SharedBufferReader}, snapshot_package::{ @@ -16,54 +17,170 @@ use { bincode::{config::Options, serialize_into}, bzip2::bufread::BzDecoder, flate2::read::GzDecoder, + lazy_static::lazy_static, log::*, rayon::{prelude::*, ThreadPool}, regex::Regex, solana_measure::measure::Measure, solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}, std::{ - cmp::max, - cmp::Ordering, + cmp::{max, Ordering}, collections::HashSet, fmt, fs::{self, File}, - io::{ - self, BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, SeekFrom, Write, - }, + io::{self, BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, Write}, path::{Path, PathBuf}, process::{self, ExitStatus}, str::FromStr, sync::Arc, }, tar::Archive, + tempfile::TempDir, thiserror::Error, }; -/// Information about a snapshot archive: its path, slot, hash, and archive format -pub struct SnapshotArchiveInfo { +/// Common information about a snapshot archive +#[derive(PartialEq, Eq, Debug)] +struct SnapshotArchiveInfo { /// Path to the snapshot archive file - pub path: PathBuf, + path: PathBuf, /// Slot that the snapshot was made - pub slot: Slot, + slot: Slot, /// Hash of the accounts at this slot - pub hash: Hash, + hash: Hash, /// Archive format for the snapshot file - pub archive_format: ArchiveFormat, + archive_format: ArchiveFormat, } + +/// Information about a full snapshot archive: its path, slot, hash, and archive format +#[derive(PartialEq, Eq, Debug)] +pub struct FullSnapshotArchiveInfo(SnapshotArchiveInfo); + +impl FullSnapshotArchiveInfo { + /// Parse the path to a full snapshot archive and return a new `FullSnapshotArchiveInfo` + fn new_from_path(path: PathBuf) -> Result { + let filename = path_to_file_name_str(path.as_path())?; + let (slot, hash, archive_format) = parse_full_snapshot_archive_filename(filename)?; + + Ok(Self(SnapshotArchiveInfo { + path, + slot, + hash, + archive_format, + })) + } + + pub fn path(&self) -> &PathBuf { + &self.0.path + } + + pub fn slot(&self) -> &Slot { + &self.0.slot + } + + pub fn hash(&self) -> &Hash { + &self.0.hash + } + + pub fn archive_format(&self) -> &ArchiveFormat { + &self.0.archive_format + } +} + +impl PartialOrd for FullSnapshotArchiveInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Order `FullSnapshotArchiveInfo` by slot (ascending), which practially is sorting chronologically +impl Ord for FullSnapshotArchiveInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.slot().cmp(other.slot()) + } +} + +/// Information about an incremental snapshot archive: its path, slot, base slot, hash, and archive format +#[derive(PartialEq, Eq, Debug)] +pub struct IncrementalSnapshotArchiveInfo { + /// The slot that the incremental snapshot was based from. This is the same as the full + /// snapshot slot used when making the incremental snapshot. + base_slot: Slot, + + /// Use the `SnapshotArchiveInfo` struct for the common fields: path, slot, hash, and + /// archive_format, but as they pertain to the incremental snapshot. + inner: SnapshotArchiveInfo, +} + +impl IncrementalSnapshotArchiveInfo { + /// Parse the path to an incremental snapshot archive and return a new `IncrementalSnapshotArchiveInfo` + fn new_from_path(path: PathBuf) -> Result { + let filename = path_to_file_name_str(path.as_path())?; + let (base_slot, slot, hash, archive_format) = + parse_incremental_snapshot_archive_filename(filename)?; + + Ok(Self { + base_slot, + inner: SnapshotArchiveInfo { + path, + slot, + hash, + archive_format, + }, + }) + } + + fn path(&self) -> &PathBuf { + &self.inner.path + } + + fn base_slot(&self) -> &Slot { + &self.base_slot + } + + fn slot(&self) -> &Slot { + &self.inner.slot + } + + fn _hash(&self) -> &Hash { + &self.inner.hash + } + + fn _archive_format(&self) -> &ArchiveFormat { + &self.inner.archive_format + } +} + +impl PartialOrd for IncrementalSnapshotArchiveInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Order `IncrementalSnapshotArchiveInfo` by base slot (ascending), then slot (ascending), which +// practially is sorting chronologically +impl Ord for IncrementalSnapshotArchiveInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.base_slot() + .cmp(other.base_slot()) + .then(self.slot().cmp(other.slot())) + } +} + pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; -pub const MAX_SNAPSHOTS: usize = 8; // Save some snapshots but not too many +pub const MAX_BANK_SNAPSHOTS: usize = 8; // Save some snapshots but not too many const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB const VERSION_STRING_V1_2_0: &str = "1.2.0"; const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0; -const TMP_SNAPSHOT_PREFIX: &str = "tmp-snapshot-"; -pub const DEFAULT_MAX_SNAPSHOTS_TO_RETAIN: usize = 2; - -pub const SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = - r"^snapshot-(\d+)-([[:alnum:]]+)\.(tar|tar\.bz2|tar\.zst|tar\.gz)$"; +const TMP_FULL_SNAPSHOT_PREFIX: &str = "tmp-snapshot-"; +const TMP_INCREMENTAL_SNAPSHOT_PREFIX: &str = "tmp-incremental-snapshot-"; +pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2; +pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P[[:digit:]]+)-(?P[[:alnum:]]+)\.(?Ptar|tar\.bz2|tar\.zst|tar\.gz)$"; +pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P[[:digit:]]+)-(?P[[:digit:]]+)-(?P[[:alnum:]]+)\.(?Ptar|tar\.bz2|tar\.zst|tar\.gz)$"; #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum SnapshotVersion { @@ -129,10 +246,48 @@ pub enum ArchiveFormat { Tar, } +/// A slot and the path to its bank snapshot #[derive(PartialEq, Eq, Debug)] -pub struct SlotSnapshotPaths { +pub struct BankSnapshotInfo { pub slot: Slot, - pub snapshot_file_path: PathBuf, + pub snapshot_path: PathBuf, +} + +impl PartialOrd for BankSnapshotInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Order BankSnapshotInfo by slot (ascending), which practially is sorting chronologically +impl Ord for BankSnapshotInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.slot.cmp(&other.slot) + } +} + +/// Helper type when rebuilding from snapshots. Designed to handle when rebuilding from just a +/// full snapshot, or from both a full snapshot and an incremental snapshot. +#[derive(Debug)] +struct SnapshotRootPaths { + full_snapshot_root_file_path: PathBuf, + incremental_snapshot_root_file_path: Option, +} + +/// Helper type to bundle up the results from `unarchive_snapshot()` +#[derive(Debug)] +struct UnarchivedSnapshot { + unpack_dir: TempDir, + unpacked_append_vec_map: UnpackedAppendVecMap, + unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion, + measure_untar: Measure, +} + +/// Helper type for passing around the unpacked snapshots dir and the snapshot version together +#[derive(Debug)] +struct UnpackedSnapshotsDirAndVersion { + unpacked_snapshots_dir: PathBuf, + snapshot_version: String, } #[derive(Error, Debug)] @@ -157,53 +312,139 @@ pub enum SnapshotError { #[error("source({1}) - I/O error: {0}")] IoWithSource(std::io::Error, &'static str), + + #[error("could not get file name from path: {}", .0.display())] + PathToFileNameError(PathBuf), + + #[error("could not get str from file name: {}", .0.display())] + FileNameToStrError(PathBuf), + + #[error("could not parse snapshot archive's file name: {0}")] + ParseSnapshotArchiveFileNameError(String), + + #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")] + MismatchedBaseSlot(Slot, Slot), } pub type Result = std::result::Result; -impl PartialOrd for SlotSnapshotPaths { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.slot.cmp(&other.slot)) - } -} - -impl Ord for SlotSnapshotPaths { - fn cmp(&self, other: &Self) -> Ordering { - self.slot.cmp(&other.slot) - } -} - -pub fn package_snapshot, Q: AsRef>( +/// Package up bank snapshot files, snapshot storages, and slot deltas for a full snapshot. +pub fn package_full_snapshot( bank: &Bank, - snapshot_files: &SlotSnapshotPaths, - snapshot_path: Q, + bank_snapshot_info: &BankSnapshotInfo, + snapshots_dir: P, + status_cache_slot_deltas: Vec, + snapshot_package_output_path: Q, + snapshot_storages: SnapshotStorages, + archive_format: ArchiveFormat, + snapshot_version: SnapshotVersion, + hash_for_testing: Option, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + info!( + "Package full snapshot for bank: {} has {} account storage entries", + bank.slot(), + snapshot_storages.len() + ); + + let snapshot_tmpdir = tempfile::Builder::new() + .prefix(&format!("{}{}-", TMP_FULL_SNAPSHOT_PREFIX, bank.slot())) + .tempdir_in(snapshots_dir)?; + + do_package_snapshot( + bank, + bank_snapshot_info, + status_cache_slot_deltas, + snapshot_package_output_path, + snapshot_storages, + archive_format, + snapshot_version, + hash_for_testing, + snapshot_tmpdir, + ) +} + +/// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot. +#[allow(clippy::too_many_arguments)] +fn package_incremental_snapshot( + bank: &Bank, + incremental_snapshot_base_slot: Slot, + bank_snapshot_info: &BankSnapshotInfo, + snapshots_dir: P, + status_cache_slot_deltas: Vec, + snapshot_package_output_path: Q, + snapshot_storages: SnapshotStorages, + archive_format: ArchiveFormat, + snapshot_version: SnapshotVersion, + hash_for_testing: Option, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + info!( + "Package incremental snapshot for bank {} (from base slot {}) has {} account storage entries", + bank.slot(), + incremental_snapshot_base_slot, + snapshot_storages.len() + ); + + assert!( + snapshot_storages.iter().all(|storage| storage + .iter() + .all(|entry| entry.slot() > incremental_snapshot_base_slot)), + "Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!" + ); + + let snapshot_tmpdir = tempfile::Builder::new() + .prefix(&format!( + "{}{}-{}-", + TMP_INCREMENTAL_SNAPSHOT_PREFIX, + incremental_snapshot_base_slot, + bank.slot() + )) + .tempdir_in(snapshots_dir)?; + + do_package_snapshot( + bank, + bank_snapshot_info, + status_cache_slot_deltas, + snapshot_package_output_path, + snapshot_storages, + archive_format, + snapshot_version, + hash_for_testing, + snapshot_tmpdir, + ) +} + +fn do_package_snapshot

( + bank: &Bank, + bank_snapshot_info: &BankSnapshotInfo, status_cache_slot_deltas: Vec, snapshot_package_output_path: P, snapshot_storages: SnapshotStorages, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, hash_for_testing: Option, -) -> Result { - // Hard link all the snapshots we need for this package - let snapshot_tmpdir = tempfile::Builder::new() - .prefix(&format!("{}{}-", TMP_SNAPSHOT_PREFIX, bank.slot())) - .tempdir_in(snapshot_path)?; - + snapshot_tmpdir: TempDir, +) -> Result +where + P: AsRef, +{ // Create a snapshot package - info!( - "Snapshot for bank: {} has {} account storage entries", - bank.slot(), - snapshot_storages.len() - ); // Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging. { let snapshot_hardlink_dir = snapshot_tmpdir .as_ref() - .join(snapshot_files.slot.to_string()); + .join(bank_snapshot_info.slot.to_string()); fs::create_dir_all(&snapshot_hardlink_dir)?; fs::hard_link( - &snapshot_files.snapshot_file_path, - &snapshot_hardlink_dir.join(snapshot_files.slot.to_string()), + &bank_snapshot_info.snapshot_path, + &snapshot_hardlink_dir.join(bank_snapshot_info.slot.to_string()), )?; } @@ -234,16 +475,17 @@ fn get_archive_ext(archive_format: ArchiveFormat) -> &'static str { } } -// If the validator is halted in the middle of `archive_snapshot_package` the temporary staging directory -// won't be cleaned up. Call this function to clean them up -pub fn remove_tmp_snapshot_archives(snapshot_path: &Path) { - if let Ok(entries) = fs::read_dir(&snapshot_path) { +/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging +/// directory won't be cleaned up. Call this function to clean them up. +pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: &Path) { + if let Ok(entries) = fs::read_dir(snapshot_archives_dir) { for entry in entries.filter_map(|entry| entry.ok()) { - if entry + let file_name = entry .file_name() .into_string() - .unwrap_or_else(|_| String::new()) - .starts_with(TMP_SNAPSHOT_PREFIX) + .unwrap_or_else(|_| String::new()); + if file_name.starts_with(TMP_FULL_SNAPSHOT_PREFIX) + || file_name.starts_with(TMP_INCREMENTAL_SNAPSHOT_PREFIX) { if entry.path().is_file() { fs::remove_file(entry.path()) @@ -258,7 +500,7 @@ pub fn remove_tmp_snapshot_archives(snapshot_path: &Path) { } } -/// Make a snapshot archive out of the AccountsPackage +/// Make a full snapshot archive out of the AccountsPackage pub fn archive_snapshot_package( snapshot_package: &AccountsPackage, maximum_snapshots_to_retain: usize, @@ -290,7 +532,7 @@ pub fn archive_snapshot_package( let staging_dir = tempfile::Builder::new() .prefix(&format!( "{}{}-", - TMP_SNAPSHOT_PREFIX, snapshot_package.slot + TMP_FULL_SNAPSHOT_PREFIX, snapshot_package.slot )) .tempdir_in(tar_dir) .map_err(|e| SnapshotError::IoWithSource(e, "create archive tempdir"))?; @@ -343,7 +585,7 @@ pub fn archive_snapshot_package( // system `tar` program is used for -S (sparse file support) let archive_path = tar_dir.join(format!( "{}{}.{}", - TMP_SNAPSHOT_PREFIX, snapshot_package.slot, file_ext + TMP_FULL_SNAPSHOT_PREFIX, snapshot_package.slot, file_ext )); let mut tar = process::Command::new("tar") @@ -410,10 +652,7 @@ pub fn archive_snapshot_package( fs::rename(&archive_path, &snapshot_package.tar_output_file) .map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?; - purge_old_snapshot_archives( - snapshot_package.tar_output_file.parent().unwrap(), - maximum_snapshots_to_retain, - ); + purge_old_snapshot_archives(tar_dir, maximum_snapshots_to_retain); timer.stop(); info!( @@ -432,43 +671,56 @@ pub fn archive_snapshot_package( Ok(()) } -pub fn get_snapshot_paths>(snapshot_path: P) -> Vec +/// Get a list of bank snapshots in a directory +pub fn get_bank_snapshots

(snapshots_dir: P) -> Vec where - P: fmt::Debug, + P: AsRef, { - match fs::read_dir(&snapshot_path) { - Ok(paths) => { - let mut names = paths - .filter_map(|entry| { - entry.ok().and_then(|e| { - e.path() - .file_name() - .and_then(|n| n.to_str().map(|s| s.parse::().ok())) - .unwrap_or(None) - }) + match fs::read_dir(&snapshots_dir) { + Ok(paths) => paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(|s| s.parse::().ok())) + .unwrap_or(None) }) - .map(|slot| { - let snapshot_path = snapshot_path.as_ref().join(slot.to_string()); - SlotSnapshotPaths { - slot, - snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), - } - }) - .collect::>(); - - names.sort(); - names - } + }) + .map(|slot| { + let snapshot_file_name = get_snapshot_file_name(slot); + // So nice I join-ed it twice! The redundant `snapshot_file_name` is unintentional + // and should be simplified. Kept for compatibility. + let snapshot_path = snapshots_dir + .as_ref() + .join(&snapshot_file_name) + .join(snapshot_file_name); + BankSnapshotInfo { + slot, + snapshot_path, + } + }) + .collect::>(), Err(err) => { info!( - "Unable to read snapshot directory {:?}: {}", - snapshot_path, err + "Unable to read snapshots directory {}: {}", + snapshots_dir.as_ref().display(), + err ); vec![] } } } +/// Get the bank snapshot with the highest slot in a directory +pub fn get_highest_bank_snapshot_info

(snapshots_dir: P) -> Option +where + P: AsRef, +{ + let mut bank_snapshot_infos = get_bank_snapshots(snapshots_dir); + bank_snapshot_infos.sort_unstable(); + bank_snapshot_infos.into_iter().rev().next() +} + pub fn serialize_snapshot_data_file(data_file_path: &Path, serializer: F) -> Result where F: FnOnce(&mut BufWriter) -> Result<()>, @@ -480,12 +732,32 @@ where ) } -pub fn deserialize_snapshot_data_file(data_file_path: &Path, deserializer: F) -> Result -where - F: FnOnce(&mut BufReader) -> Result, -{ - deserialize_snapshot_data_file_capped::( - data_file_path, +pub fn deserialize_snapshot_data_file( + data_file_path: &Path, + deserializer: impl FnOnce(&mut BufReader) -> Result, +) -> Result { + let wrapped_deserializer = move |streams: &mut SnapshotStreams| -> Result { + deserializer(&mut streams.full_snapshot_stream) + }; + + let wrapped_data_file_path = SnapshotRootPaths { + full_snapshot_root_file_path: data_file_path.to_path_buf(), + incremental_snapshot_root_file_path: None, + }; + + deserialize_snapshot_data_files_capped( + &wrapped_data_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + wrapped_deserializer, + ) +} + +fn deserialize_snapshot_data_files( + snapshot_root_paths: &SnapshotRootPaths, + deserializer: impl FnOnce(&mut SnapshotStreams) -> Result, +) -> Result { + deserialize_snapshot_data_files_capped( + snapshot_root_paths, MAX_SNAPSHOT_DATA_FILE_SIZE, deserializer, ) @@ -504,7 +776,7 @@ where serializer(&mut data_file_stream)?; data_file_stream.flush()?; - let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?; + let consumed_size = data_file_stream.stream_position()?; if consumed_size > maximum_file_size { let error_message = format!( "too large snapshot data file to serialize: {:?} has {} bytes", @@ -515,55 +787,123 @@ where Ok(consumed_size) } -fn deserialize_snapshot_data_file_capped( - data_file_path: &Path, +fn deserialize_snapshot_data_files_capped( + snapshot_root_paths: &SnapshotRootPaths, maximum_file_size: u64, - deserializer: F, -) -> Result -where - F: FnOnce(&mut BufReader) -> Result, -{ - let file_size = fs::metadata(&data_file_path)?.len(); + deserializer: impl FnOnce(&mut SnapshotStreams) -> Result, +) -> Result { + let (full_snapshot_file_size, mut full_snapshot_data_file_stream) = + create_snapshot_data_file_stream( + &snapshot_root_paths.full_snapshot_root_file_path, + maximum_file_size, + )?; - if file_size > maximum_file_size { - let error_message = format!( - "too large snapshot data file to deserialize: {:?} has {} bytes", - data_file_path, file_size - ); - return Err(get_io_error(&error_message)); - } + let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) = + if let Some(ref incremental_snapshot_root_file_path) = + snapshot_root_paths.incremental_snapshot_root_file_path + { + let (incremental_snapshot_file_size, incremental_snapshot_data_file_stream) = + create_snapshot_data_file_stream( + incremental_snapshot_root_file_path, + maximum_file_size, + )?; + ( + Some(incremental_snapshot_file_size), + Some(incremental_snapshot_data_file_stream), + ) + } else { + (None, None) + }; - let data_file = File::open(data_file_path)?; - let mut data_file_stream = BufReader::new(data_file); + let mut snapshot_streams = SnapshotStreams { + full_snapshot_stream: &mut full_snapshot_data_file_stream, + incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(), + }; + let ret = deserializer(&mut snapshot_streams)?; - let ret = deserializer(&mut data_file_stream)?; + check_deserialize_file_consumed( + full_snapshot_file_size, + &snapshot_root_paths.full_snapshot_root_file_path, + &mut full_snapshot_data_file_stream, + )?; - let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?; - - if file_size != consumed_size { - let error_message = format!( - "invalid snapshot data file: {:?} has {} bytes, however consumed {} bytes to deserialize", - data_file_path, file_size, consumed_size - ); - return Err(get_io_error(&error_message)); + if let Some(ref incremental_snapshot_root_file_path) = + snapshot_root_paths.incremental_snapshot_root_file_path + { + check_deserialize_file_consumed( + incremental_snapshot_file_size.unwrap(), + incremental_snapshot_root_file_path, + incremental_snapshot_data_file_stream.as_mut().unwrap(), + )?; } Ok(ret) } -pub fn add_snapshot>( - snapshot_path: P, +/// Before running the deserializer function, perform common operations on the snapshot archive +/// files, such as checking the file size and opening the file into a stream. +fn create_snapshot_data_file_stream

( + snapshot_root_file_path: P, + maximum_file_size: u64, +) -> Result<(u64, BufReader)> +where + P: AsRef, +{ + let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len(); + + if snapshot_file_size > maximum_file_size { + let error_message = + format!( + "too large snapshot data file to deserialize: {} has {} bytes (max size is {} bytes)", + snapshot_root_file_path.as_ref().display(), snapshot_file_size, maximum_file_size + ); + return Err(get_io_error(&error_message)); + } + + let snapshot_data_file = File::open(&snapshot_root_file_path)?; + let snapshot_data_file_stream = BufReader::new(snapshot_data_file); + + Ok((snapshot_file_size, snapshot_data_file_stream)) +} + +/// After running the deserializer function, perform common checks to ensure the snapshot archive +/// files were consumed correctly. +fn check_deserialize_file_consumed

( + file_size: u64, + file_path: P, + file_stream: &mut BufReader, +) -> Result<()> +where + P: AsRef, +{ + let consumed_size = file_stream.stream_position()?; + + if consumed_size != file_size { + let error_message = + format!( + "invalid snapshot data file: {} has {} bytes, however consumed {} bytes to deserialize", + file_path.as_ref().display(), file_size, consumed_size + ); + return Err(get_io_error(&error_message)); + } + + Ok(()) +} + +/// Serialize a bank to a snapshot +pub fn add_bank_snapshot>( + snapshots_dir: P, bank: &Bank, snapshot_storages: &[SnapshotStorage], snapshot_version: SnapshotVersion, -) -> Result { +) -> Result { let slot = bank.slot(); - // snapshot_path/slot - let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); - fs::create_dir_all(slot_snapshot_dir.clone())?; + // snapshots_dir/slot + let bank_snapshots_dir = get_bank_snapshots_dir(snapshots_dir, slot); + fs::create_dir_all(&bank_snapshots_dir)?; - // the bank snapshot is stored as snapshot_path/slot/slot - let snapshot_bank_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); + // the bank snapshot is stored as snapshots_dir/slot/slot + let snapshot_bank_file_path = bank_snapshots_dir.join(get_snapshot_file_name(slot)); info!( "Creating snapshot for slot {}, path: {:?}", slot, snapshot_bank_file_path, @@ -595,9 +935,9 @@ pub fn add_snapshot>( bank_serialize, slot, snapshot_bank_file_path, ); - Ok(SlotSnapshotPaths { + Ok(BankSnapshotInfo { slot, - snapshot_file_path: snapshot_bank_file_path, + snapshot_path: snapshot_bank_file_path, }) } @@ -628,34 +968,40 @@ fn serialize_status_cache( } /// Remove the snapshot directory for this slot -pub fn remove_snapshot>(slot: Slot, snapshot_path: P) -> Result<()> { - let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot); - fs::remove_dir_all(slot_snapshot_dir)?; +pub fn remove_bank_snapshot

(slot: Slot, snapshots_dir: P) -> Result<()> +where + P: AsRef, +{ + let bank_snapshot_dir = get_bank_snapshots_dir(&snapshots_dir, slot); + fs::remove_dir_all(bank_snapshot_dir)?; Ok(()) } #[derive(Debug, Default)] pub struct BankFromArchiveTimings { pub rebuild_bank_from_snapshots_us: u64, - pub untar_us: u64, + pub full_snapshot_untar_us: u64, + pub incremental_snapshot_untar_us: u64, pub verify_snapshot_bank_us: u64, } // From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later. const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4; -/// Rebuild a bank from a snapshot archive +/// Rebuild bank from snapshot archives. Handles either just a full snapshot, or both a full +/// snapshot and an incremental snapshot. #[allow(clippy::too_many_arguments)] -pub fn bank_from_snapshot_archive

( +pub fn bank_from_snapshot_archives

( account_paths: &[PathBuf], frozen_account_pubkeys: &[Pubkey], - snapshot_path: &Path, - snapshot_tar: P, + snapshots_dir: &Path, + full_snapshot_archive_path: P, + incremental_snapshot_archive_path: Option

, archive_format: ArchiveFormat, genesis_config: &GenesisConfig, debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -665,74 +1011,187 @@ pub fn bank_from_snapshot_archive

( where P: AsRef + std::marker::Sync, { - let unpack_dir = tempfile::Builder::new() - .prefix(TMP_SNAPSHOT_PREFIX) - .tempdir_in(snapshot_path)?; - - let mut untar = Measure::start("snapshot untar"); - let divisions = std::cmp::min( + let parallel_divisions = std::cmp::min( PARALLEL_UNTAR_READERS_DEFAULT, std::cmp::max(1, num_cpus::get() / 4), ); - let unpacked_append_vec_map = untar_snapshot_in( - &snapshot_tar, - unpack_dir.as_ref(), + + let unarchived_full_snapshot = unarchive_snapshot( + snapshots_dir, + TMP_FULL_SNAPSHOT_PREFIX, + &full_snapshot_archive_path, + "snapshot untar", account_paths, archive_format, - divisions, + parallel_divisions, )?; - untar.stop(); - info!("{}", untar); - let mut measure = Measure::start("bank rebuild from snapshot"); - let unpacked_snapshots_dir = unpack_dir.as_ref().join("snapshots"); - let unpacked_version_file = unpack_dir.as_ref().join("version"); + let mut unarchived_incremental_snapshot = + if let Some(incremental_snapshot_archive_path) = incremental_snapshot_archive_path { + check_are_snapshots_compatible( + &full_snapshot_archive_path, + &incremental_snapshot_archive_path, + )?; - let mut snapshot_version = String::new(); - File::open(unpacked_version_file).and_then(|mut f| f.read_to_string(&mut snapshot_version))?; + let unarchived_incremental_snapshot = unarchive_snapshot( + snapshots_dir, + TMP_INCREMENTAL_SNAPSHOT_PREFIX, + &incremental_snapshot_archive_path, + "incremental snapshot untar", + account_paths, + archive_format, + parallel_divisions, + )?; + Some(unarchived_incremental_snapshot) + } else { + None + }; + let mut unpacked_append_vec_map = unarchived_full_snapshot.unpacked_append_vec_map; + if let Some(ref mut unarchive_preparation_result) = unarchived_incremental_snapshot { + let incremental_snapshot_unpacked_append_vec_map = + std::mem::take(&mut unarchive_preparation_result.unpacked_append_vec_map); + unpacked_append_vec_map.extend(incremental_snapshot_unpacked_append_vec_map.into_iter()); + } + + let mut measure_rebuild = Measure::start("rebuild bank from snapshots"); let bank = rebuild_bank_from_snapshots( - snapshot_version.trim(), + &unarchived_full_snapshot.unpacked_snapshots_dir_and_version, + unarchived_incremental_snapshot + .as_ref() + .map(|unarchive_preparation_result| { + &unarchive_preparation_result.unpacked_snapshots_dir_and_version + }), frozen_account_pubkeys, - &unpacked_snapshots_dir, account_paths, unpacked_append_vec_map, genesis_config, debug_keys, additional_builtins, - account_indexes, + account_secondary_indexes, accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, verify_index, )?; - measure.stop(); + measure_rebuild.stop(); + info!("{}", measure_rebuild); - let mut verify = Measure::start("verify"); + let mut measure_verify = Measure::start("verify"); if !bank.verify_snapshot_bank(test_hash_calculation) && limit_load_slot_count_from_snapshot.is_none() { panic!("Snapshot bank for slot {} failed to verify", bank.slot()); } - verify.stop(); - let timings = BankFromArchiveTimings { - rebuild_bank_from_snapshots_us: measure.as_us(), - untar_us: untar.as_us(), - verify_snapshot_bank_us: verify.as_us(), - }; + measure_verify.stop(); + let timings = BankFromArchiveTimings { + rebuild_bank_from_snapshots_us: measure_rebuild.as_us(), + full_snapshot_untar_us: unarchived_full_snapshot.measure_untar.as_us(), + incremental_snapshot_untar_us: unarchived_incremental_snapshot + .map_or(0, |unarchive_preparation_result| { + unarchive_preparation_result.measure_untar.as_us() + }), + verify_snapshot_bank_us: measure_verify.as_us(), + }; Ok((bank, timings)) } -/// Build the snapshot archive path from its components: the snapshot archive output directory, the +/// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary +/// directories, untaring, reading the version file, and then returning those fields plus the +/// unpacked append vec map. +fn unarchive_snapshot( + snapshots_dir: P, + unpacked_snapshots_dir_prefix: &'static str, + snapshot_archive_path: Q, + measure_name: &'static str, + account_paths: &[PathBuf], + archive_format: ArchiveFormat, + parallel_divisions: usize, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + let unpack_dir = tempfile::Builder::new() + .prefix(unpacked_snapshots_dir_prefix) + .tempdir_in(snapshots_dir)?; + let unpacked_snapshots_dir = unpack_dir.path().join("snapshots"); + + let mut measure_untar = Measure::start(measure_name); + let unpacked_append_vec_map = untar_snapshot_in( + snapshot_archive_path, + unpack_dir.path(), + account_paths, + archive_format, + parallel_divisions, + )?; + measure_untar.stop(); + info!("{}", measure_untar); + + let unpacked_version_file = unpack_dir.path().join("version"); + let snapshot_version = { + let mut snapshot_version = String::new(); + File::open(unpacked_version_file) + .and_then(|mut f| f.read_to_string(&mut snapshot_version))?; + snapshot_version.trim().to_string() + }; + + Ok(UnarchivedSnapshot { + unpack_dir, + unpacked_append_vec_map, + unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion { + unpacked_snapshots_dir, + snapshot_version, + }, + measure_untar, + }) +} + +/// Check if an incremental snapshot is compatible with a full snapshot. This function parses the +/// paths to see if the incremental snapshot's base slot is the same as the full snapshot's slot. +/// Return an error if they are incompatible (or if the paths cannot be parsed), otherwise return a +/// tuple of the full snapshot slot and the incremental snapshot slot. +fn check_are_snapshots_compatible

( + full_snapshot_archive_path: P, + incremental_snapshot_archive_path: P, +) -> Result<()> +where + P: AsRef, +{ + let full_snapshot_filename = path_to_file_name_str(full_snapshot_archive_path.as_ref())?; + let (full_snapshot_slot, _, _) = parse_full_snapshot_archive_filename(full_snapshot_filename)?; + + let incremental_snapshot_filename = + path_to_file_name_str(incremental_snapshot_archive_path.as_ref())?; + let (incremental_snapshot_base_slot, _, _, _) = + parse_incremental_snapshot_archive_filename(incremental_snapshot_filename)?; + + (full_snapshot_slot == incremental_snapshot_base_slot) + .then(|| ()) + .ok_or(SnapshotError::MismatchedBaseSlot( + full_snapshot_slot, + incremental_snapshot_base_slot, + )) +} + +/// Get the `&str` from a `&Path` +fn path_to_file_name_str(path: &Path) -> Result<&str> { + path.file_name() + .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))? + .to_str() + .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf())) +} + +/// Build the full snapshot archive path from its components: the snapshot archives directory, the /// snapshot slot, the accounts hash, and the archive format. -pub fn build_snapshot_archive_path( - snapshot_output_dir: PathBuf, +pub fn build_full_snapshot_archive_path( + snapshot_archives_dir: PathBuf, slot: Slot, hash: &Hash, archive_format: ArchiveFormat, ) -> PathBuf { - snapshot_output_dir.join(format!( + snapshot_archives_dir.join(format!( "snapshot-{}-{}.{}", slot, hash, @@ -740,6 +1199,25 @@ pub fn build_snapshot_archive_path( )) } +/// Build the incremental snapshot archive path from its components: the snapshot archives +/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive +/// format. +pub fn build_incremental_snapshot_archive_path( + snapshot_archives_dir: PathBuf, + base_slot: Slot, + slot: Slot, + hash: &Hash, + archive_format: ArchiveFormat, +) -> PathBuf { + snapshot_archives_dir.join(format!( + "incremental-snapshot-{}-{}-{}.{}", + base_slot, + slot, + hash, + get_archive_ext(archive_format), + )) +} + fn archive_format_from_str(archive_format: &str) -> Option { match archive_format { "tar.bz2" => Some(ArchiveFormat::TarBzip2), @@ -750,105 +1228,220 @@ fn archive_format_from_str(archive_format: &str) -> Option { } } -/// Parse a snapshot archive filename into its Slot, Hash, and Archive Format -fn parse_snapshot_archive_filename(archive_filename: &str) -> Option<(Slot, Hash, ArchiveFormat)> { - let regex = Regex::new(SNAPSHOT_ARCHIVE_FILENAME_REGEX); +/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format +fn parse_full_snapshot_archive_filename( + archive_filename: &str, +) -> Result<(Slot, Hash, ArchiveFormat)> { + lazy_static! { + static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap(); + } - regex.ok()?.captures(archive_filename).and_then(|captures| { - let slot = captures.get(1).map(|x| x.as_str().parse::())?.ok()?; - let hash = captures.get(2).map(|x| x.as_str().parse::())?.ok()?; - let archive_format = captures - .get(3) - .map(|x| archive_format_from_str(x.as_str()))??; + let do_parse = || { + RE.captures(archive_filename).and_then(|captures| { + let slot = captures + .name("slot") + .map(|x| x.as_str().parse::())? + .ok()?; + let hash = captures + .name("hash") + .map(|x| x.as_str().parse::())? + .ok()?; + let archive_format = captures + .name("ext") + .map(|x| archive_format_from_str(x.as_str()))??; - Some((slot, hash, archive_format)) + Some((slot, hash, archive_format)) + }) + }; + + do_parse().ok_or_else(|| { + SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string()) }) } -/// Get a list of the snapshot archives in a directory -pub fn get_snapshot_archives

(snapshot_output_dir: P) -> Vec +/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format +fn parse_incremental_snapshot_archive_filename( + archive_filename: &str, +) -> Result<(Slot, Slot, Hash, ArchiveFormat)> { + lazy_static! { + static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap(); + } + + let do_parse = || { + RE.captures(archive_filename).and_then(|captures| { + let base_slot = captures + .name("base") + .map(|x| x.as_str().parse::())? + .ok()?; + let slot = captures + .name("slot") + .map(|x| x.as_str().parse::())? + .ok()?; + let hash = captures + .name("hash") + .map(|x| x.as_str().parse::())? + .ok()?; + let archive_format = captures + .name("ext") + .map(|x| archive_format_from_str(x.as_str()))??; + + Some((base_slot, slot, hash, archive_format)) + }) + }; + + do_parse().ok_or_else(|| { + SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string()) + }) +} + +/// Get a list of the full snapshot archives in a directory +pub fn get_full_snapshot_archives

(snapshot_archives_dir: P) -> Vec where P: AsRef, { - match fs::read_dir(&snapshot_output_dir) { + match fs::read_dir(&snapshot_archives_dir) { Err(err) => { - info!("Unable to read snapshot directory: {}", err); + info!( + "Unable to read snapshot archives directory: err: {}, path: {}", + err, + snapshot_archives_dir.as_ref().display() + ); vec![] } Ok(files) => files .filter_map(|entry| { - if let Ok(entry) = entry { - let path = entry.path(); - if path.is_file() { - if let Some((slot, hash, archive_format)) = parse_snapshot_archive_filename( - path.file_name().unwrap().to_str().unwrap(), - ) { - return Some(SnapshotArchiveInfo { - path, - slot, - hash, - archive_format, - }); - } - } - } - None + entry.map_or(None, |entry| { + FullSnapshotArchiveInfo::new_from_path(entry.path()).ok() + }) }) .collect(), } } -/// Get a sorted list of the snapshot archives in a directory -fn get_sorted_snapshot_archives

(snapshot_output_dir: P) -> Vec +/// Get a list of the incremental snapshot archives in a directory +fn get_incremental_snapshot_archives

( + snapshot_archives_dir: P, +) -> Vec where P: AsRef, { - let mut snapshot_archives = get_snapshot_archives(snapshot_output_dir); - sort_snapshot_archives(&mut snapshot_archives); - snapshot_archives + match fs::read_dir(&snapshot_archives_dir) { + Err(err) => { + info!( + "Unable to read snapshot archives directory: err: {}, path: {}", + err, + snapshot_archives_dir.as_ref().display() + ); + vec![] + } + Ok(files) => files + .filter_map(|entry| { + entry.map_or(None, |entry| { + IncrementalSnapshotArchiveInfo::new_from_path(entry.path()).ok() + }) + }) + .collect(), + } } -/// Sort the list of snapshot archives by slot, in descending order -fn sort_snapshot_archives(snapshot_archives: &mut Vec) { - snapshot_archives.sort_unstable_by(|a, b| b.slot.cmp(&a.slot)); -} - -/// Get the highest slot of the snapshots in a directory -pub fn get_highest_snapshot_archive_slot

(snapshot_output_dir: P) -> Option +/// Get the highest slot of the full snapshot archives in a directory +pub fn get_highest_full_snapshot_archive_slot

(snapshot_archives_dir: P) -> Option where P: AsRef, { - get_highest_snapshot_archive_info(snapshot_output_dir) - .map(|snapshot_archive_info| snapshot_archive_info.slot) + get_highest_full_snapshot_archive_info(snapshot_archives_dir) + .map(|full_snapshot_archive_info| *full_snapshot_archive_info.slot()) } -/// Get the path (and metadata) for the snapshot archive with the highest slot in a directory -pub fn get_highest_snapshot_archive_info

(snapshot_output_dir: P) -> Option +/// Get the highest slot of the incremental snapshot archives in a directory, for a given full +/// snapshot slot +pub fn get_highest_incremental_snapshot_archive_slot>( + snapshot_archives_dir: P, + full_snapshot_slot: Slot, +) -> Option { + get_highest_incremental_snapshot_archive_info(snapshot_archives_dir, full_snapshot_slot) + .map(|incremental_snapshot_archive_info| *incremental_snapshot_archive_info.slot()) +} + +/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory +pub fn get_highest_full_snapshot_archive_info

( + snapshot_archives_dir: P, +) -> Option where P: AsRef, { - get_sorted_snapshot_archives(snapshot_output_dir) - .into_iter() - .next() + let mut full_snapshot_archives = get_full_snapshot_archives(snapshot_archives_dir); + full_snapshot_archives.sort_unstable(); + full_snapshot_archives.into_iter().rev().next() } -pub fn purge_old_snapshot_archives>( - snapshot_output_dir: P, - maximum_snapshots_to_retain: usize, -) { +/// Get the path for the incremental snapshot archive with the highest slot, for a given full +/// snapshot slot, in a directory +pub fn get_highest_incremental_snapshot_archive_info

( + snapshot_archives_dir: P, + full_snapshot_slot: Slot, +) -> Option +where + P: AsRef, +{ + // Since we want to filter down to only the incremental snapshot archives that have the same + // full snapshot slot as the value passed in, perform the filtering before sorting to avoid + // doing unnecessary work. + let mut incremental_snapshot_archives = + get_incremental_snapshot_archives(snapshot_archives_dir) + .into_iter() + .filter(|incremental_snapshot_archive_info| { + *incremental_snapshot_archive_info.base_slot() == full_snapshot_slot + }) + .collect::>(); + incremental_snapshot_archives.sort_unstable(); + incremental_snapshot_archives.into_iter().rev().next() +} + +pub fn purge_old_snapshot_archives

(snapshot_archives_dir: P, maximum_snapshots_to_retain: usize) +where + P: AsRef, +{ info!( - "Purging old snapshots in {:?}, retaining {}", - snapshot_output_dir.as_ref(), + "Purging old snapshot archives in {}, retaining {} full snapshots", + snapshot_archives_dir.as_ref().display(), maximum_snapshots_to_retain ); - let mut archives = get_sorted_snapshot_archives(snapshot_output_dir); + let mut snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir); + snapshot_archives.sort_unstable(); // Keep the oldest snapshot so we can always play the ledger from it. - archives.pop(); + snapshot_archives.pop(); let max_snaps = max(1, maximum_snapshots_to_retain); - for old_archive in archives.into_iter().skip(max_snaps) { - fs::remove_file(old_archive.path) - .unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err)); + for old_archive in snapshot_archives.into_iter().skip(max_snaps) { + trace!( + "Purging old full snapshot archive: {}", + old_archive.path().display() + ); + fs::remove_file(old_archive.path()) + .unwrap_or_else(|err| info!("Failed to remove old full snapshot archive: {}", err)); } + + // Only keep incremental snapshots for the latest full snapshot + // bprumo TODO issue #18639: As an option to further reduce the number of incremental + // snapshots, only a subset of the incremental snapshots for the lastest full snapshot could be + // kept. This could reuse maximum_snapshots_to_retain, or use a new field just for incremental + // snapshots. + // In case there are incremental snapshots but no full snapshots, make sure all the incremental + // snapshots are purged. + let last_full_snapshot_slot = + get_highest_full_snapshot_archive_slot(&snapshot_archives_dir).unwrap_or(Slot::MAX); + get_incremental_snapshot_archives(&snapshot_archives_dir) + .iter() + .filter(|archive_info| *archive_info.base_slot() < last_full_snapshot_slot) + .for_each(|old_archive| { + trace!( + "Purging old incremental snapshot archive: {}", + old_archive.path().display() + ); + fs::remove_file(old_archive.path()).unwrap_or_else(|err| { + info!("Failed to remove old incremental snapshot archive: {}", err) + }) + }); } fn unpack_snapshot_local T>( @@ -925,72 +1518,119 @@ fn untar_snapshot_in>( Ok(account_paths_map) } -fn verify_snapshot_version_and_folder( - snapshot_version: &str, - unpacked_snapshots_dir: &Path, -) -> Result<(SnapshotVersion, SlotSnapshotPaths)> { - info!("snapshot version: {}", snapshot_version); +fn verify_unpacked_snapshots_dir_and_version( + unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion, +) -> Result<(SnapshotVersion, BankSnapshotInfo)> { + info!( + "snapshot version: {}", + &unpacked_snapshots_dir_and_version.snapshot_version + ); - let snapshot_version_enum = - SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| { - get_io_error(&format!( - "unsupported snapshot version: {}", - snapshot_version - )) - })?; - let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); - if snapshot_paths.len() > 1 { + let snapshot_version = + SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version) + .ok_or_else(|| { + get_io_error(&format!( + "unsupported snapshot version: {}", + &unpacked_snapshots_dir_and_version.snapshot_version, + )) + })?; + let mut bank_snapshot_infos = + get_bank_snapshots(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir); + if bank_snapshot_infos.len() > 1 { return Err(get_io_error("invalid snapshot format")); } - let root_paths = snapshot_paths + bank_snapshot_infos.sort_unstable(); + let root_paths = bank_snapshot_infos .pop() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; - Ok((snapshot_version_enum, root_paths)) + Ok((snapshot_version, root_paths)) } #[allow(clippy::too_many_arguments)] fn rebuild_bank_from_snapshots( - snapshot_version: &str, + full_snapshot_unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion, + incremental_snapshot_unpacked_snapshots_dir_and_version: Option< + &UnpackedSnapshotsDirAndVersion, + >, frozen_account_pubkeys: &[Pubkey], - unpacked_snapshots_dir: &Path, account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, genesis_config: &GenesisConfig, debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, verify_index: bool, ) -> Result { - let (snapshot_version_enum, root_paths) = - verify_snapshot_version_and_folder(snapshot_version, unpacked_snapshots_dir)?; + let (full_snapshot_version, full_snapshot_root_paths) = + verify_unpacked_snapshots_dir_and_version( + full_snapshot_unpacked_snapshots_dir_and_version, + )?; + let (incremental_snapshot_version, incremental_snapshot_root_paths) = + if let Some(snapshot_unpacked_snapshots_dir_and_version) = + incremental_snapshot_unpacked_snapshots_dir_and_version + { + let (snapshot_version, bank_snapshot_info) = verify_unpacked_snapshots_dir_and_version( + snapshot_unpacked_snapshots_dir_and_version, + )?; + (Some(snapshot_version), Some(bank_snapshot_info)) + } else { + (None, None) + }; info!( - "Loading bank from {}", - &root_paths.snapshot_file_path.display() + "Loading bank from full snapshot {} and incremental snapshot {:?}", + full_snapshot_root_paths.snapshot_path.display(), + incremental_snapshot_root_paths + .as_ref() + .map(|paths| paths.snapshot_path.display()), ); - let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| { - Ok(match snapshot_version_enum { - SnapshotVersion::V1_2_0 => bank_from_stream( - SerdeStyle::Newer, - &mut stream, - account_paths, - unpacked_append_vec_map, - genesis_config, - frozen_account_pubkeys, - debug_keys, - additional_builtins, - account_indexes, - accounts_db_caching_enabled, - limit_load_slot_count_from_snapshot, - shrink_ratio, - verify_index, - ), - }?) + + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path, + incremental_snapshot_root_file_path: incremental_snapshot_root_paths + .map(|root_paths| root_paths.snapshot_path), + }; + + let bank = deserialize_snapshot_data_files(&snapshot_root_paths, |mut snapshot_streams| { + Ok( + match incremental_snapshot_version.unwrap_or(full_snapshot_version) { + SnapshotVersion::V1_2_0 => bank_from_streams( + SerdeStyle::Newer, + &mut snapshot_streams, + account_paths, + unpacked_append_vec_map, + genesis_config, + frozen_account_pubkeys, + debug_keys, + additional_builtins, + account_secondary_indexes, + accounts_db_caching_enabled, + limit_load_slot_count_from_snapshot, + shrink_ratio, + verify_index, + ), + }?, + ) })?; - let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME); + // The status cache is rebuilt from the latest snapshot. So, if there's an incremental + // snapshot, use that. Otherwise use the full snapshot. + let status_cache_path = incremental_snapshot_unpacked_snapshots_dir_and_version + .map_or_else( + || { + full_snapshot_unpacked_snapshots_dir_and_version + .unpacked_snapshots_dir + .as_path() + }, + |unpacked_snapshots_dir_and_version| { + unpacked_snapshots_dir_and_version + .unpacked_snapshots_dir + .as_path() + }, + ) + .join(SNAPSHOT_STATUS_CACHE_FILE_NAME); let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| { info!( "Rebuilding status cache from {}", @@ -1014,7 +1654,7 @@ fn get_snapshot_file_name(slot: Slot) -> String { slot.to_string() } -fn get_bank_snapshot_dir>(path: P, slot: Slot) -> PathBuf { +fn get_bank_snapshots_dir>(path: P, slot: Slot) -> PathBuf { path.as_ref().join(slot.to_string()) } @@ -1053,16 +1693,26 @@ pub fn verify_snapshot_archive( assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); } -pub fn purge_old_snapshots(snapshot_path: &Path) { - // Remove outdated snapshots - let slot_snapshot_paths = get_snapshot_paths(snapshot_path); - let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_SNAPSHOTS); - for slot_files in &slot_snapshot_paths[..num_to_remove] { - let r = remove_snapshot(slot_files.slot, snapshot_path); - if r.is_err() { - warn!("Couldn't remove snapshot at: {:?}", snapshot_path); - } - } +/// Remove outdated bank snapshots +pub fn purge_old_bank_snapshots

(snapshots_dir: P) +where + P: AsRef, +{ + let mut bank_snapshot_infos = get_bank_snapshots(&snapshots_dir); + bank_snapshot_infos.sort_unstable(); + bank_snapshot_infos + .into_iter() + .rev() + .skip(MAX_BANK_SNAPSHOTS) + .for_each(|bank_snapshot_info| { + let r = remove_bank_snapshot(bank_snapshot_info.slot, &snapshots_dir); + if r.is_err() { + warn!( + "Couldn't remove snapshot at: {}", + bank_snapshot_info.snapshot_path.display() + ); + } + }) } /// Gather the necessary elements for a snapshot of the given `root_bank` @@ -1070,7 +1720,7 @@ pub fn snapshot_bank( root_bank: &Bank, status_cache_slot_deltas: Vec, accounts_package_sender: &AccountsPackageSender, - snapshot_path: &Path, + snapshots_dir: &Path, snapshot_package_output_path: &Path, snapshot_version: SnapshotVersion, archive_format: &ArchiveFormat, @@ -1078,20 +1728,18 @@ pub fn snapshot_bank( ) -> Result<()> { let storages: Vec<_> = root_bank.get_snapshot_storages(); let mut add_snapshot_time = Measure::start("add-snapshot-ms"); - add_snapshot(snapshot_path, root_bank, &storages, snapshot_version)?; + add_bank_snapshot(snapshots_dir, root_bank, &storages, snapshot_version)?; add_snapshot_time.stop(); inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); // Package the relevant snapshots - let slot_snapshot_paths = get_snapshot_paths(snapshot_path); - let latest_slot_snapshot_paths = slot_snapshot_paths - .last() - .expect("no snapshots found in config snapshot_path"); + let highest_bank_snapshot_info = get_highest_bank_snapshot_info(snapshots_dir) + .expect("no snapshots found in config snapshots_dir"); - let package = package_snapshot( + let package = package_full_snapshot( root_bank, - latest_slot_snapshot_paths, - snapshot_path, + &highest_bank_snapshot_info, + snapshots_dir, status_cache_slot_deltas, snapshot_package_output_path, storages, @@ -1105,13 +1753,13 @@ pub fn snapshot_bank( Ok(()) } -/// Convenience function to create a snapshot archive out of any Bank, regardless of state. The -/// Bank will be frozen during the process. +/// Convenience function to create a full snapshot archive out of any Bank, regardless of state. +/// The Bank will be frozen during the process. /// /// Requires: /// - `bank` is complete -pub fn bank_to_snapshot_archive, Q: AsRef>( - snapshot_path: P, +pub fn bank_to_full_snapshot_archive, Q: AsRef>( + snapshots_dir: P, bank: &Bank, snapshot_version: Option, snapshot_package_output_path: Q, @@ -1128,13 +1776,13 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( bank.update_accounts_hash(); bank.rehash(); // Bank accounts may have been manually modified by the caller - let temp_dir = tempfile::tempdir_in(snapshot_path)?; + let temp_dir = tempfile::tempdir_in(snapshots_dir)?; let storages = bank.get_snapshot_storages(); - let slot_snapshot_paths = add_snapshot(&temp_dir, bank, &storages, snapshot_version)?; - let package = package_snapshot( + let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; + let package = package_full_snapshot( bank, - &slot_snapshot_paths, + &bank_snapshot_info, &temp_dir, bank.src.slot_deltas(&bank.src.roots()), snapshot_package_output_path, @@ -1144,7 +1792,56 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( None, )?; - let package = process_accounts_package_pre(package, thread_pool); + let package = process_accounts_package_pre(package, thread_pool, None); + + archive_snapshot_package(&package, maximum_snapshots_to_retain)?; + Ok(package.tar_output_file) +} + +/// Convenience function to create an incremental snapshot archive out of any Bank, regardless of +/// state. The Bank will be frozen during the process. +/// +/// Requires: +/// - `bank` is complete +/// - `bank`'s slot is greater than `full_snapshot_slot` +pub fn bank_to_incremental_snapshot_archive, Q: AsRef>( + snapshots_dir: P, + bank: &Bank, + full_snapshot_slot: Slot, + snapshot_version: Option, + snapshot_package_output_path: Q, + archive_format: ArchiveFormat, + thread_pool: Option<&ThreadPool>, + maximum_snapshots_to_retain: usize, +) -> Result { + let snapshot_version = snapshot_version.unwrap_or_default(); + + assert!(bank.is_complete()); + assert!(bank.slot() > full_snapshot_slot); + bank.squash(); // Bank may not be a root + bank.force_flush_accounts_cache(); + bank.clean_accounts(true, false); + bank.update_accounts_hash(); + bank.rehash(); // Bank accounts may have been manually modified by the caller + + let temp_dir = tempfile::tempdir_in(snapshots_dir)?; + + let storages = bank.get_incremental_snapshot_storages(full_snapshot_slot); + let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; + let package = package_incremental_snapshot( + bank, + full_snapshot_slot, + &bank_snapshot_info, + &temp_dir, + bank.src.slot_deltas(&bank.src.roots()), + snapshot_package_output_path, + storages, + archive_format, + snapshot_version, + None, + )?; + + let package = process_accounts_package_pre(package, thread_pool, Some(full_snapshot_slot)); archive_snapshot_package(&package, maximum_snapshots_to_retain)?; Ok(package.tar_output_file) @@ -1153,6 +1850,7 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( pub fn process_accounts_package_pre( accounts_package: AccountsPackagePre, thread_pool: Option<&ThreadPool>, + incremental_snapshot_base_slot: Option, ) -> AccountsPackage { let mut time = Measure::start("hash"); @@ -1179,12 +1877,21 @@ pub fn process_accounts_package_pre( ("calculate_hash", time.as_us(), i64), ); - let tar_output_file = build_snapshot_archive_path( - accounts_package.snapshot_output_dir, - accounts_package.slot, - &hash, - accounts_package.archive_format, - ); + let tar_output_file = match incremental_snapshot_base_slot { + None => build_full_snapshot_archive_path( + accounts_package.snapshot_output_dir, + accounts_package.slot, + &hash, + accounts_package.archive_format, + ), + Some(incremental_snapshot_base_slot) => build_incremental_snapshot_archive_path( + accounts_package.snapshot_output_dir, + incremental_snapshot_base_slot, + accounts_package.slot, + &hash, + accounts_package.archive_format, + ), + }; AccountsPackage::new( accounts_package.slot, @@ -1257,10 +1964,19 @@ mod tests { ) .unwrap(); - let actual_data = deserialize_snapshot_data_file_capped( - &temp_dir.path().join("data-file"), + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: temp_dir.path().join("data-file"), + incremental_snapshot_root_file_path: None, + }; + + let actual_data = deserialize_snapshot_data_files_capped( + &snapshot_root_paths, expected_consumed_size, - |stream| Ok(deserialize_from::<_, u32>(stream)?), + |stream| { + Ok(deserialize_from::<_, u32>( + &mut stream.full_snapshot_stream, + )?) + }, ) .unwrap(); assert_eq!(actual_data, expected_data); @@ -1282,10 +1998,19 @@ mod tests { ) .unwrap(); - let result = deserialize_snapshot_data_file_capped( - &temp_dir.path().join("data-file"), + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: temp_dir.path().join("data-file"), + incremental_snapshot_root_file_path: None, + }; + + let result = deserialize_snapshot_data_files_capped( + &snapshot_root_paths, expected_consumed_size - 1, - |stream| Ok(deserialize_from::<_, u32>(stream)?), + |stream| { + Ok(deserialize_from::<_, u32>( + &mut stream.full_snapshot_stream, + )?) + }, ); assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize")); } @@ -1307,125 +2032,363 @@ mod tests { ) .unwrap(); - let result = deserialize_snapshot_data_file_capped( - &temp_dir.path().join("data-file"), + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: temp_dir.path().join("data-file"), + incremental_snapshot_root_file_path: None, + }; + + let result = deserialize_snapshot_data_files_capped( + &snapshot_root_paths, expected_consumed_size * 2, - |stream| Ok(deserialize_from::<_, u32>(stream)?), + |stream| { + Ok(deserialize_from::<_, u32>( + &mut stream.full_snapshot_stream, + )?) + }, ); assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file")); } #[test] - fn test_parse_snapshot_archive_filename() { + fn test_parse_full_snapshot_archive_filename() { assert_eq!( - parse_snapshot_archive_filename(&format!("snapshot-42-{}.tar.bz2", Hash::default())), - Some((42, Hash::default(), ArchiveFormat::TarBzip2)) + parse_full_snapshot_archive_filename(&format!( + "snapshot-42-{}.tar.bz2", + Hash::default() + )) + .unwrap(), + (42, Hash::default(), ArchiveFormat::TarBzip2) ); assert_eq!( - parse_snapshot_archive_filename(&format!("snapshot-43-{}.tar.zst", Hash::default())), - Some((43, Hash::default(), ArchiveFormat::TarZstd)) + parse_full_snapshot_archive_filename(&format!( + "snapshot-43-{}.tar.zst", + Hash::default() + )) + .unwrap(), + (43, Hash::default(), ArchiveFormat::TarZstd) ); assert_eq!( - parse_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default())), - Some((44, Hash::default(), ArchiveFormat::Tar)) + parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default())) + .unwrap(), + (44, Hash::default(), ArchiveFormat::Tar) ); - assert!(parse_snapshot_archive_filename("invalid").is_none()); - assert!(parse_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_none()); + assert!(parse_full_snapshot_archive_filename("invalid").is_err()); + assert!( + parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err() + ); - assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_none()); - assert!(parse_snapshot_archive_filename(&format!( + assert!( + parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err() + ); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-12345678-{}.bad!ext", Hash::new_unique() )) - .is_none()); - assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_none()); + .is_err()); + assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err()); - assert!(parse_snapshot_archive_filename(&format!( + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-bad!slot-{}.bad!ext", Hash::new_unique() )) - .is_none()); - assert!(parse_snapshot_archive_filename(&format!( + .is_err()); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-12345678-{}.bad!ext", Hash::new_unique() )) - .is_none()); - assert!(parse_snapshot_archive_filename(&format!( + .is_err()); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-bad!slot-{}.tar", Hash::new_unique() )) - .is_none()); + .is_err()); - assert!(parse_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_none()); - assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_none()); - assert!(parse_snapshot_archive_filename(&format!( + assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err()); + assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err()); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-bad!slot-{}.tar", Hash::new_unique() )) - .is_none()); + .is_err()); } - /// A test helper function that creates snapshot archive files. Creates snapshot files in the - /// range (`min_snapshot_slot`, `max_snapshot_slot`]. Additionally, "bad" files are created - /// for snapshots to ensure the tests properly filter them out. + #[test] + fn test_parse_incremental_snapshot_archive_filename() { + solana_logger::setup(); + assert_eq!( + parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-42-123-{}.tar.bz2", + Hash::default() + )) + .unwrap(), + (42, 123, Hash::default(), ArchiveFormat::TarBzip2) + ); + assert_eq!( + parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-43-234-{}.tar.zst", + Hash::default() + )) + .unwrap(), + (43, 234, Hash::default(), ArchiveFormat::TarZstd) + ); + assert_eq!( + parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-44-345-{}.tar", + Hash::default() + )) + .unwrap(), + (44, 345, Hash::default(), ArchiveFormat::Tar) + ); + + assert!(parse_incremental_snapshot_archive_filename("invalid").is_err()); + assert!(parse_incremental_snapshot_archive_filename(&format!( + "snapshot-42-{}.tar", + Hash::new_unique() + )) + .is_err()); + assert!(parse_incremental_snapshot_archive_filename( + "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext" + ) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-bad!slot-56785678-{}.tar", + Hash::new_unique() + )) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-12345678-bad!slot-{}.tar", + Hash::new_unique() + )) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename( + "incremental-snapshot-12341234-56785678-bad!HASH.tar" + ) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-12341234-56785678-{}.bad!ext", + Hash::new_unique() + )) + .is_err()); + } + + #[test] + fn test_check_are_snapshots_compatible() { + solana_logger::setup(); + let slot1: Slot = 1234; + let slot2: Slot = 5678; + let slot3: Slot = 999_999; + + assert!(check_are_snapshots_compatible( + &format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()), + &format!( + "/dir/incremental-snapshot-{}-{}-{}.tar", + slot1, + slot2, + Hash::new_unique() + ), + ) + .is_ok()); + + assert!(check_are_snapshots_compatible( + &format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()), + &format!( + "/dir/incremental-snapshot-{}-{}-{}.tar", + slot2, + slot3, + Hash::new_unique() + ), + ) + .is_err()); + } + + /// A test heler function that creates bank snapshot files + fn common_create_bank_snapshot_files(snapshots_dir: &Path, min_slot: Slot, max_slot: Slot) { + for slot in min_slot..max_slot { + let snapshot_dir = get_bank_snapshots_dir(snapshots_dir, slot); + fs::create_dir_all(&snapshot_dir).unwrap(); + + let snapshot_filename = get_snapshot_file_name(slot); + let snapshot_path = snapshot_dir.join(snapshot_filename); + File::create(snapshot_path).unwrap(); + } + } + + #[test] + fn test_get_bank_snapshot_infos() { + solana_logger::setup(); + let temp_snapshots_dir = tempfile::TempDir::new().unwrap(); + let min_slot = 10; + let max_slot = 20; + common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot); + + let bank_snapshot_infos = get_bank_snapshots(temp_snapshots_dir.path()); + assert_eq!(bank_snapshot_infos.len() as Slot, max_slot - min_slot); + } + + #[test] + fn test_get_highest_bank_snapshot_info() { + solana_logger::setup(); + let temp_snapshots_dir = tempfile::TempDir::new().unwrap(); + let min_slot = 99; + let max_slot = 123; + common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot); + + let highest_bank_snapshot_info = get_highest_bank_snapshot_info(temp_snapshots_dir.path()); + assert!(highest_bank_snapshot_info.is_some()); + assert_eq!(highest_bank_snapshot_info.unwrap().slot, max_slot - 1); + } + + /// A test helper function that creates full and incremental snapshot archive files. Creates + /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and + /// incremental snapshot files in the range (`min_incremental_snapshot_slot`, + /// `max_incremental_snapshot_slot`]. Additionally, "bad" files are created for both full and + /// incremental snapshots to ensure the tests properly filter them out. fn common_create_snapshot_archive_files( - snapshot_dir: &Path, - min_snapshot_slot: Slot, - max_snapshot_slot: Slot, + snapshot_archives_dir: &Path, + min_full_snapshot_slot: Slot, + max_full_snapshot_slot: Slot, + min_incremental_snapshot_slot: Slot, + max_incremental_snapshot_slot: Slot, ) { - for snapshot_slot in min_snapshot_slot..max_snapshot_slot { - let snapshot_filename = format!("snapshot-{}-{}.tar", snapshot_slot, Hash::default()); - let snapshot_filepath = snapshot_dir.join(snapshot_filename); + for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot { + for incremental_snapshot_slot in + min_incremental_snapshot_slot..max_incremental_snapshot_slot + { + let snapshot_filename = format!( + "incremental-snapshot-{}-{}-{}.tar", + full_snapshot_slot, + incremental_snapshot_slot, + Hash::default() + ); + let snapshot_filepath = snapshot_archives_dir.join(snapshot_filename); + File::create(snapshot_filepath).unwrap(); + } + + let snapshot_filename = + format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default()); + let snapshot_filepath = snapshot_archives_dir.join(snapshot_filename); File::create(snapshot_filepath).unwrap(); + + // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly + let bad_filename = format!( + "incremental-snapshot-{}-{}-bad!hash.tar", + full_snapshot_slot, + max_incremental_snapshot_slot + 1, + ); + let bad_filepath = snapshot_archives_dir.join(bad_filename); + File::create(bad_filepath).unwrap(); } // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and // sorted correctly - let bad_filename = format!("snapshot-{}-bad!hash.tar", max_snapshot_slot + 1); - let bad_filepath = snapshot_dir.join(bad_filename); + let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1); + let bad_filepath = snapshot_archives_dir.join(bad_filename); File::create(bad_filepath).unwrap(); } #[test] - fn test_get_snapshot_archives() { + fn test_get_full_snapshot_archives() { solana_logger::setup(); let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let min_slot = 123; let max_slot = 456; - common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot); + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_slot, + max_slot, + 0, + 0, + ); - let snapshot_archives = get_snapshot_archives(temp_snapshot_archives_dir); + let snapshot_archives = get_full_snapshot_archives(temp_snapshot_archives_dir); assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot); } #[test] - fn test_get_sorted_snapshot_archives() { + fn test_get_incremental_snapshot_archives() { solana_logger::setup(); let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); - let min_slot = 12; - let max_slot = 45; - common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot); + let min_full_snapshot_slot = 12; + let max_full_snapshot_slot = 23; + let min_incremental_snapshot_slot = 34; + let max_incremental_snapshot_slot = 45; + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_full_snapshot_slot, + max_full_snapshot_slot, + min_incremental_snapshot_slot, + max_incremental_snapshot_slot, + ); - let sorted_snapshot_archives = get_sorted_snapshot_archives(temp_snapshot_archives_dir); - assert_eq!(sorted_snapshot_archives.len() as Slot, max_slot - min_slot); - assert_eq!(sorted_snapshot_archives[0].slot, max_slot - 1); + let incremental_snapshot_archives = + get_incremental_snapshot_archives(temp_snapshot_archives_dir); + assert_eq!( + incremental_snapshot_archives.len() as Slot, + (max_full_snapshot_slot - min_full_snapshot_slot) + * (max_incremental_snapshot_slot - min_incremental_snapshot_slot) + ); } #[test] - fn test_get_highest_snapshot_archive_slot() { + fn test_get_highest_full_snapshot_archive_slot() { solana_logger::setup(); let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let min_slot = 123; let max_slot = 456; - common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot); + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_slot, + max_slot, + 0, + 0, + ); assert_eq!( - get_highest_snapshot_archive_slot(temp_snapshot_archives_dir.path()), + get_highest_full_snapshot_archive_slot(temp_snapshot_archives_dir.path()), Some(max_slot - 1) ); } + #[test] + fn test_get_highest_incremental_snapshot_slot() { + solana_logger::setup(); + let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + let min_full_snapshot_slot = 12; + let max_full_snapshot_slot = 23; + let min_incremental_snapshot_slot = 34; + let max_incremental_snapshot_slot = 45; + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_full_snapshot_slot, + max_full_snapshot_slot, + min_incremental_snapshot_slot, + max_incremental_snapshot_slot, + ); + + for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot { + assert_eq!( + get_highest_incremental_snapshot_archive_slot( + temp_snapshot_archives_dir.path(), + full_snapshot_slot + ), + Some(max_incremental_snapshot_slot - 1) + ); + } + + assert_eq!( + get_highest_incremental_snapshot_archive_slot( + temp_snapshot_archives_dir.path(), + max_full_snapshot_slot + ), + None + ); + } + fn common_test_purge_old_snapshot_archives( snapshot_names: &[&String], maximum_snapshots_to_retain: usize, @@ -1459,7 +2422,7 @@ mod tests { } #[test] - fn test_purge_old_snapshot_archives() { + fn test_purge_old_full_snapshot_archives() { // Create 3 snapshots, retaining 1, // expecting the oldest 1 and the newest 1 are retained let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default()); @@ -1477,10 +2440,65 @@ mod tests { common_test_purge_old_snapshot_archives(&snapshot_names, 2, &expected_snapshots); } - /// Test roundtrip of bank to snapshot, then back again. This test creates the simplest bank - /// possible, so the contents of the snapshot archive will be quite minimal. #[test] - fn test_roundtrip_bank_to_snapshot_to_bank_simple() { + fn test_purge_old_incremental_snapshot_archives() { + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + + for snapshot_filename in [ + format!("snapshot-100-{}.tar", Hash::default()), + format!("snapshot-200-{}.tar", Hash::default()), + format!("incremental-snapshot-100-120-{}.tar", Hash::default()), + format!("incremental-snapshot-100-140-{}.tar", Hash::default()), + format!("incremental-snapshot-100-160-{}.tar", Hash::default()), + format!("incremental-snapshot-100-180-{}.tar", Hash::default()), + format!("incremental-snapshot-200-220-{}.tar", Hash::default()), + format!("incremental-snapshot-200-240-{}.tar", Hash::default()), + format!("incremental-snapshot-200-260-{}.tar", Hash::default()), + format!("incremental-snapshot-200-280-{}.tar", Hash::default()), + ] { + let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename); + File::create(snapshot_path).unwrap(); + } + + purge_old_snapshot_archives(snapshot_archives_dir.path(), std::usize::MAX); + + let remaining_incremental_snapshot_archives = + get_incremental_snapshot_archives(snapshot_archives_dir.path()); + assert_eq!(remaining_incremental_snapshot_archives.len(), 4); + for archive in &remaining_incremental_snapshot_archives { + assert_eq!(*archive.base_slot(), 200); + } + } + + #[test] + fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() { + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + + for snapshot_filename in [ + format!("incremental-snapshot-100-120-{}.tar", Hash::default()), + format!("incremental-snapshot-100-140-{}.tar", Hash::default()), + format!("incremental-snapshot-100-160-{}.tar", Hash::default()), + format!("incremental-snapshot-100-180-{}.tar", Hash::default()), + format!("incremental-snapshot-200-220-{}.tar", Hash::default()), + format!("incremental-snapshot-200-240-{}.tar", Hash::default()), + format!("incremental-snapshot-200-260-{}.tar", Hash::default()), + format!("incremental-snapshot-200-280-{}.tar", Hash::default()), + ] { + let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename); + File::create(snapshot_path).unwrap(); + } + + purge_old_snapshot_archives(snapshot_archives_dir.path(), std::usize::MAX); + + let remaining_incremental_snapshot_archives = + get_incremental_snapshot_archives(snapshot_archives_dir.path()); + assert!(remaining_incremental_snapshot_archives.is_empty()); + } + + /// Test roundtrip of bank to a full snapshot, then back again. This test creates the simplest + /// bank possible, so the contents of the snapshot archive will be quite minimal. + #[test] + fn test_roundtrip_bank_to_and_from_full_snapshot_simple() { solana_logger::setup(); let genesis_config = GenesisConfig::default(); let original_bank = Bank::new(&genesis_config); @@ -1490,26 +2508,27 @@ mod tests { } let accounts_dir = tempfile::TempDir::new().unwrap(); - let snapshot_dir = tempfile::TempDir::new().unwrap(); - let snapshot_package_output_dir = tempfile::TempDir::new().unwrap(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let snapshot_archive_format = ArchiveFormat::Tar; - let snapshot_archive_path = bank_to_snapshot_archive( - snapshot_dir.path(), + let snapshot_archive_path = bank_to_full_snapshot_archive( + snapshots_dir.path(), &original_bank, None, - snapshot_package_output_dir.path(), + snapshot_archives_dir.path(), snapshot_archive_format, None, 1, ) .unwrap(); - let (roundtrip_bank, _) = bank_from_snapshot_archive( + let (roundtrip_bank, _) = bank_from_snapshot_archives( &[PathBuf::from(accounts_dir.path())], &[], - snapshot_dir.path(), + snapshots_dir.path(), &snapshot_archive_path, + None, snapshot_archive_format, &genesis_config, None, @@ -1526,11 +2545,11 @@ mod tests { assert_eq!(original_bank, roundtrip_bank); } - /// Test roundtrip of bank to snapshot, then back again. This test is more involved than the - /// simple version above; creating multiple banks over multiple slots and doing multiple - /// transfers. So this snapshot should contain more data. + /// Test roundtrip of bank to a full snapshot, then back again. This test is more involved + /// than the simple version above; creating multiple banks over multiple slots and doing + /// multiple transfers. So this full snapshot should contain more data. #[test] - fn test_roundtrip_bank_to_snapshot_to_bank_complex() { + fn test_roundtrip_bank_to_and_from_snapshot_complex() { solana_logger::setup(); let collector = Pubkey::new_unique(); let key1 = Keypair::new(); @@ -1579,26 +2598,136 @@ mod tests { } let accounts_dir = tempfile::TempDir::new().unwrap(); - let snapshot_dir = tempfile::TempDir::new().unwrap(); - let snapshot_package_output_dir = tempfile::TempDir::new().unwrap(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let snapshot_archive_format = ArchiveFormat::Tar; - let full_snapshot_archive_path = bank_to_snapshot_archive( - snapshot_dir.path(), + let full_snapshot_archive_path = bank_to_full_snapshot_archive( + snapshots_dir.path(), &bank4, None, - snapshot_package_output_dir.path(), + snapshot_archives_dir.path(), snapshot_archive_format, None, std::usize::MAX, ) .unwrap(); - let (roundtrip_bank, _) = bank_from_snapshot_archive( + let (roundtrip_bank, _) = bank_from_snapshot_archives( &[PathBuf::from(accounts_dir.path())], &[], - snapshot_dir.path(), + snapshots_dir.path(), &full_snapshot_archive_path, + None, + snapshot_archive_format, + &genesis_config, + None, + None, + AccountSecondaryIndexes::default(), + false, + None, + AccountShrinkThreshold::default(), + false, + false, + ) + .unwrap(); + + assert_eq!(*bank4, roundtrip_bank); + } + + /// Test roundtrip of bank to snapshots, then back again, with incremental snapshots. In this + /// version, build up a few slots and take a full snapshot. Continue on a few more slots and + /// take an incremental snapshot. Rebuild the bank from both the incremental snapshot and full + /// snapshot. + /// + /// For the full snapshot, touch all the accounts, but only one for the incremental snapshot. + /// This is intended to mimic the real behavior of transactions, where only a small number of + /// accounts are modified often, which are captured by the incremental snapshot. The majority + /// of the accounts are not modified often, and are captured by the full snapshot. + #[test] + fn test_roundtrip_bank_to_and_from_incremental_snapshot() { + solana_logger::setup(); + let collector = Pubkey::new_unique(); + let key1 = Keypair::new(); + let key2 = Keypair::new(); + let key3 = Keypair::new(); + let key4 = Keypair::new(); + let key5 = Keypair::new(); + + let (genesis_config, mint_keypair) = create_genesis_config(1_000_000); + let bank0 = Arc::new(Bank::new(&genesis_config)); + bank0.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + bank0.transfer(2, &mint_keypair, &key2.pubkey()).unwrap(); + bank0.transfer(3, &mint_keypair, &key3.pubkey()).unwrap(); + while !bank0.is_complete() { + bank0.register_tick(&Hash::new_unique()); + } + + let slot = 1; + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &collector, slot)); + bank1.transfer(3, &mint_keypair, &key3.pubkey()).unwrap(); + bank1.transfer(4, &mint_keypair, &key4.pubkey()).unwrap(); + bank1.transfer(5, &mint_keypair, &key5.pubkey()).unwrap(); + while !bank1.is_complete() { + bank1.register_tick(&Hash::new_unique()); + } + + let accounts_dir = tempfile::TempDir::new().unwrap(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archive_format = ArchiveFormat::Tar; + + let full_snapshot_slot = slot; + let full_snapshot_archive_path = bank_to_full_snapshot_archive( + snapshots_dir.path(), + &bank1, + None, + snapshot_archives_dir.path(), + snapshot_archive_format, + None, + std::usize::MAX, + ) + .unwrap(); + + let slot = slot + 1; + let bank2 = Arc::new(Bank::new_from_parent(&bank1, &collector, slot)); + bank2.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + while !bank2.is_complete() { + bank2.register_tick(&Hash::new_unique()); + } + + let slot = slot + 1; + let bank3 = Arc::new(Bank::new_from_parent(&bank2, &collector, slot)); + bank3.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + while !bank3.is_complete() { + bank3.register_tick(&Hash::new_unique()); + } + + let slot = slot + 1; + let bank4 = Arc::new(Bank::new_from_parent(&bank3, &collector, slot)); + bank4.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + while !bank4.is_complete() { + bank4.register_tick(&Hash::new_unique()); + } + + let incremental_snapshot_archive_path = bank_to_incremental_snapshot_archive( + snapshots_dir.path(), + &bank4, + full_snapshot_slot, + None, + snapshot_archives_dir.path(), + snapshot_archive_format, + None, + std::usize::MAX, + ) + .unwrap(); + + let (roundtrip_bank, _) = bank_from_snapshot_archives( + &[PathBuf::from(accounts_dir.path())], + &[], + snapshots_dir.path(), + &full_snapshot_archive_path, + Some(&incremental_snapshot_archive_path), snapshot_archive_format, &genesis_config, None, diff --git a/validator/src/main.rs b/validator/src/main.rs index 9e30e7bf91..ca439fe352 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -47,7 +47,9 @@ use { }, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }, solana_sdk::{ clock::{Slot, DEFAULT_S_PER_SLOT}, @@ -475,8 +477,10 @@ fn get_rpc_node( blacklist_timeout = Instant::now(); let mut highest_snapshot_hash: Option<(Slot, Hash)> = - snapshot_utils::get_highest_snapshot_archive_info(snapshot_output_dir).map( - |snapshot_archive_info| (snapshot_archive_info.slot, snapshot_archive_info.hash), + snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_output_dir).map( + |snapshot_archive_info| { + (*snapshot_archive_info.slot(), *snapshot_archive_info.hash()) + }, ); let eligible_rpc_peers = if snapshot_not_required { rpc_peers @@ -858,7 +862,7 @@ fn rpc_bootstrap( let mut use_local_snapshot = false; if let Some(highest_local_snapshot_slot) = - snapshot_utils::get_highest_snapshot_archive_slot(snapshot_output_dir) + snapshot_utils::get_highest_full_snapshot_archive_slot(snapshot_output_dir) { if highest_local_snapshot_slot > snapshot_hash.0.saturating_sub(maximum_local_snapshot_age) @@ -900,7 +904,7 @@ fn rpc_bootstrap( { snapshot_config.maximum_snapshots_to_retain } else { - DEFAULT_MAX_SNAPSHOTS_TO_RETAIN + DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN }; let ret = download_snapshot( &rpc_contact_info.rpc, @@ -1041,7 +1045,7 @@ pub fn main() { .send_transaction_leader_forward_count .to_string(); let default_rpc_threads = num_cpus::get().to_string(); - let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string(); + let default_max_snapshot_to_retain = &DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN.to_string(); let default_min_snapshot_download_speed = &DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED.to_string(); let default_max_snapshot_download_abort = &MAX_SNAPSHOT_DOWNLOAD_ABORT.to_string(); let default_accounts_shrink_optimize_total_space =