diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index fa4dc7131..6ac305a32 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -99,6 +99,7 @@ impl AccountsHashVerifier { let accounts_package = solana_runtime::snapshot_utils::process_accounts_package_pre( accounts_package, thread_pool, + None, ); Self::process_accounts_package( accounts_package, diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index af522d88d..d85fd44c9 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -156,7 +156,7 @@ mod tests { } // Create a packageable snapshot - let output_tar_path = snapshot_utils::build_snapshot_archive_path( + let output_tar_path = snapshot_utils::build_full_snapshot_archive_path( snapshot_package_output_path, 42, &Hash::default(), @@ -177,7 +177,7 @@ mod tests { // Make tarball from packageable snapshot snapshot_utils::archive_snapshot_package( &snapshot_package, - snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap(); diff --git a/core/src/test_validator.rs b/core/src/test_validator.rs index 14a97cc0d..9886d07ea 100644 --- a/core/src/test_validator.rs +++ b/core/src/test_validator.rs @@ -13,7 +13,9 @@ use { genesis_utils::create_genesis_config_with_leader_ex, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_config::SnapshotConfig, - snapshot_utils::{ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }, solana_sdk::{ account::{Account, AccountSharedData}, @@ -492,7 +494,7 @@ impl TestValidator { snapshot_package_output_path: ledger_path.to_path_buf(), archive_format: ArchiveFormat::Tar, snapshot_version: SnapshotVersion::default(), - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }), enforce_ulimit_nofile: false, warp_slot: config.warp_slot, diff --git a/core/src/validator.rs b/core/src/validator.rs index 0a9145d48..2628c3b1d 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1187,7 +1187,7 @@ fn new_banks_from_ledger( ); leader_schedule_cache.set_root(&bank_forks.root_bank()); - let archive_file = solana_runtime::snapshot_utils::bank_to_snapshot_archive( + let archive_file = solana_runtime::snapshot_utils::bank_to_full_snapshot_archive( ledger_path, &bank_forks.root_bank(), None, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 9526f6d2a..2e04ec80a 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -49,7 +49,9 @@ mod tests { bank_forks::BankForks, genesis_utils::{create_genesis_config, GenesisConfigInfo}, snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, status_cache::MAX_CACHE_ENTRIES, }; use solana_sdk::{ @@ -119,7 +121,7 @@ mod tests { snapshot_path: PathBuf::from(snapshot_dir.path()), archive_format: ArchiveFormat::TarBzip2, snapshot_version, - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }; bank_forks.set_snapshot_config(Some(snapshot_config.clone())); SnapshotTestConfig { @@ -148,7 +150,7 @@ mod tests { let old_last_bank = old_bank_forks.get(old_last_slot).unwrap(); let check_hash_calculation = false; - let (deserialized_bank, _timing) = snapshot_utils::bank_from_snapshot_archive( + let (deserialized_bank, _timing) = snapshot_utils::bank_from_snapshot_archives( account_paths, &[], &old_bank_forks @@ -156,12 +158,13 @@ mod tests { .as_ref() .unwrap() .snapshot_path, - snapshot_utils::build_snapshot_archive_path( + snapshot_utils::build_full_snapshot_archive_path( snapshot_package_output_path.to_path_buf(), old_last_bank.slot(), &old_last_bank.get_accounts_hash(), ArchiveFormat::TarBzip2, ), + None, ArchiveFormat::TarBzip2, old_genesis_config, None, @@ -181,10 +184,10 @@ mod tests { .clone(); assert_eq!(*bank, deserialized_bank); - let slot_snapshot_paths = snapshot_utils::get_snapshot_paths(&snapshot_path); + let bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshot_path); - for p in slot_snapshot_paths { - snapshot_utils::remove_snapshot(p.slot, &snapshot_path).unwrap(); + for p in bank_snapshot_infos { + snapshot_utils::remove_bank_snapshot(p.slot, &snapshot_path).unwrap(); } } @@ -235,12 +238,11 @@ mod tests { let last_bank = bank_forks.get(last_slot).unwrap(); let snapshot_config = &snapshot_test_config.snapshot_config; let snapshot_path = &snapshot_config.snapshot_path; - let last_slot_snapshot_path = snapshot_utils::get_snapshot_paths(snapshot_path) - .pop() + let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_info(snapshot_path) .expect("no snapshots found in path"); - let snapshot_package = snapshot_utils::package_snapshot( + let snapshot_package = snapshot_utils::package_full_snapshot( last_bank, - &last_slot_snapshot_path, + &last_bank_snapshot_info, snapshot_path, last_bank.src.slot_deltas(&last_bank.src.roots()), &snapshot_config.snapshot_package_output_path, @@ -253,10 +255,11 @@ mod tests { let snapshot_package = snapshot_utils::process_accounts_package_pre( snapshot_package, Some(last_bank.get_thread_pool()), + None, ); snapshot_utils::archive_snapshot_package( &snapshot_package, - DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap(); @@ -325,7 +328,8 @@ mod tests { // Take snapshot of zeroth bank let bank0 = bank_forks.get(0).unwrap(); let storages = bank0.get_snapshot_storages(); - snapshot_utils::add_snapshot(snapshot_path, bank0, &storages, snapshot_version).unwrap(); + snapshot_utils::add_bank_snapshot(snapshot_path, bank0, &storages, snapshot_version) + .unwrap(); // Set up snapshotting channels let (sender, receiver) = channel(); @@ -343,7 +347,7 @@ mod tests { let saved_slot = 4; let mut saved_archive_path = None; - for forks in 0..snapshot_utils::MAX_SNAPSHOTS + 2 { + for forks in 0..snapshot_utils::MAX_BANK_SNAPSHOTS + 2 { let bank = Bank::new_from_parent( &bank_forks[forks as u64], &Pubkey::default(), @@ -420,7 +424,7 @@ mod tests { let options = CopyOptions::new(); fs_extra::dir::copy(&last_snapshot_path, &saved_snapshots_dir, &options).unwrap(); - saved_archive_path = Some(snapshot_utils::build_snapshot_archive_path( + saved_archive_path = Some(snapshot_utils::build_full_snapshot_archive_path( snapshot_package_output_path.to_path_buf(), slot, &accounts_hash, @@ -431,11 +435,14 @@ mod tests { // Purge all the outdated snapshots, including the ones needed to generate the package // currently sitting in the channel - snapshot_utils::purge_old_snapshots(snapshot_path); - assert!(snapshot_utils::get_snapshot_paths(&snapshots_dir) + snapshot_utils::purge_old_bank_snapshots(snapshot_path); + + let mut bank_snapshot_infos = snapshot_utils::get_bank_snapshots(&snapshots_dir); + bank_snapshot_infos.sort_unstable(); + assert!(bank_snapshot_infos .into_iter() .map(|path| path.slot) - .eq(3..=snapshot_utils::MAX_SNAPSHOTS as u64 + 2)); + .eq(3..=snapshot_utils::MAX_BANK_SNAPSHOTS as u64 + 2)); // Create a SnapshotPackagerService to create tarballs from all the pending // SnapshotPackage's on the channel. By the time this service starts, we have already @@ -453,7 +460,7 @@ mod tests { None, &exit, &cluster_info, - DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ); let thread_pool = accounts_db::make_min_priority_thread_pool(); @@ -471,6 +478,7 @@ mod tests { solana_runtime::snapshot_utils::process_accounts_package_pre( snapshot_package, Some(&thread_pool), + None, ); *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); } diff --git a/download-utils/src/lib.rs b/download-utils/src/lib.rs index 565f7d733..ee489c88a 100644 --- a/download-utils/src/lib.rs +++ b/download-utils/src/lib.rs @@ -246,13 +246,13 @@ pub fn download_genesis_if_missing( pub fn download_snapshot<'a, 'b>( rpc_addr: &SocketAddr, - snapshot_output_dir: &Path, + snapshot_archives_dir: &Path, desired_snapshot_hash: (Slot, Hash), use_progress_bar: bool, maximum_snapshots_to_retain: usize, progress_notify_callback: &'a mut DownloadProgressCallbackOption<'b>, ) -> Result<(), String> { - snapshot_utils::purge_old_snapshot_archives(snapshot_output_dir, maximum_snapshots_to_retain); + snapshot_utils::purge_old_snapshot_archives(snapshot_archives_dir, maximum_snapshots_to_retain); for compression in &[ ArchiveFormat::TarZstd, @@ -260,8 +260,8 @@ pub fn download_snapshot<'a, 'b>( ArchiveFormat::TarBzip2, ArchiveFormat::Tar, // `solana-test-validator` creates uncompressed snapshots ] { - let desired_snapshot_package = snapshot_utils::build_snapshot_archive_path( - snapshot_output_dir.to_path_buf(), + let desired_snapshot_package = snapshot_utils::build_full_snapshot_archive_path( + snapshot_archives_dir.to_path_buf(), desired_snapshot_hash.0, &desired_snapshot_hash.1, *compression, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index f762e0c4c..d628a65c0 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -30,7 +30,9 @@ use solana_runtime::{ bank_forks::BankForks, hardened_unpack::{open_genesis_config, MAX_GENESIS_ARCHIVE_UNPACKED_SIZE}, snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }; use solana_sdk::{ account::{AccountSharedData, ReadableAccount, WritableAccount}, @@ -697,7 +699,7 @@ fn load_bank_forks( snapshot_path, archive_format: ArchiveFormat::TarBzip2, snapshot_version: SnapshotVersion::default(), - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }) }; let account_paths = if let Some(account_paths) = arg_matches.value_of("account_paths") { @@ -904,7 +906,7 @@ fn main() { .default_value(SnapshotVersion::default().into()) .help("Output snapshot version"); - let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string(); + let default_max_snapshot_to_retain = &DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN.to_string(); let maximum_snapshots_to_retain_arg = Arg::with_name("maximum_snapshots_to_retain") .long("maximum-snapshots-to-retain") .value_name("NUMBER") @@ -2196,7 +2198,7 @@ fn main() { bank.slot(), ); - let archive_file = snapshot_utils::bank_to_snapshot_archive( + let archive_file = snapshot_utils::bank_to_full_snapshot_archive( ledger_path, &bank, Some(snapshot_version), diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index 3620c4ef4..7be4056d8 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -11,7 +11,7 @@ use solana_entry::entry::VerifyRecyclers; use solana_runtime::{ bank_forks::BankForks, snapshot_config::SnapshotConfig, - snapshot_utils::{self, SnapshotArchiveInfo}, + snapshot_utils::{self, FullSnapshotArchiveInfo}, }; use solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash}; use std::{fs, path::PathBuf, process, result}; @@ -53,9 +53,11 @@ pub fn load( fs::create_dir_all(&snapshot_config.snapshot_path) .expect("Couldn't create snapshot directory"); - if let Some(snapshot_archive_info) = snapshot_utils::get_highest_snapshot_archive_info( - &snapshot_config.snapshot_package_output_path, - ) { + if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info( + &snapshot_config.snapshot_package_output_path, + ) + { return load_from_snapshot( genesis_config, blockstore, @@ -65,7 +67,7 @@ pub fn load( process_options, transaction_status_sender, cache_block_meta_sender, - &snapshot_archive_info, + &full_snapshot_archive_info, ); } else { info!("No snapshot package available; will load from genesis"); @@ -113,11 +115,11 @@ fn load_from_snapshot( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - snapshot_archive_info: &SnapshotArchiveInfo, + full_snapshot_archive_info: &FullSnapshotArchiveInfo, ) -> LoadResult { info!( "Loading snapshot package: {:?}", - &snapshot_archive_info.path + full_snapshot_archive_info.path() ); // Fail hard here if snapshot fails to load, don't silently continue @@ -126,12 +128,13 @@ fn load_from_snapshot( process::exit(1); } - let (deserialized_bank, timings) = snapshot_utils::bank_from_snapshot_archive( + let (deserialized_bank, timings) = snapshot_utils::bank_from_snapshot_archives( &account_paths, &process_options.frozen_accounts, &snapshot_config.snapshot_path, - &snapshot_archive_info.path, - snapshot_archive_info.archive_format, + full_snapshot_archive_info.path(), + None, + *full_snapshot_archive_info.archive_format(), genesis_config, process_options.debug_keys.clone(), Some(&crate::builtins::get(process_options.bpf_jit)), @@ -152,10 +155,18 @@ fn load_from_snapshot( deserialized_bank.get_accounts_hash(), ); - if deserialized_bank_slot_and_hash != (snapshot_archive_info.slot, snapshot_archive_info.hash) { + if deserialized_bank_slot_and_hash + != ( + *full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + ) + { error!( "Snapshot has mismatch:\narchive: {:?}\ndeserialized: {:?}", - (snapshot_archive_info.slot, snapshot_archive_info.hash), + ( + full_snapshot_archive_info.slot(), + full_snapshot_archive_info.hash() + ), deserialized_bank_slot_and_hash ); process::exit(1); diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index 6b18c3907..383875179 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -567,7 +567,16 @@ fn do_process_blockstore_from_root( ("slot", bank_forks.root(), i64), ("forks", initial_forks.len(), i64), ("calculate_capitalization_us", time_cap.as_us(), i64), - ("untar_us", timings.untar_us, i64), + ( + "full_snapshot_untar_us", + timings.full_snapshot_untar_us, + i64 + ), + ( + "incremental_snapshot_untar_us", + timings.incremental_snapshot_untar_us, + i64 + ), ( "rebuild_bank_from_snapshots_us", timings.rebuild_bank_from_snapshots_us, diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index 45703741f..fd934a038 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -1642,25 +1642,15 @@ fn test_snapshot_download() { trace!("Waiting for snapshot"); let (archive_filename, archive_snapshot_hash) = wait_for_next_snapshot(&cluster, snapshot_package_output_path); - trace!("found: {:?}", archive_filename); - let validator_archive_path = snapshot_utils::build_snapshot_archive_path( - validator_snapshot_test_config - .snapshot_output_path - .path() - .to_path_buf(), - archive_snapshot_hash.0, - &archive_snapshot_hash.1, - ArchiveFormat::TarBzip2, - ); // Download the snapshot, then boot a validator from it. download_snapshot( &cluster.entry_point_info.rpc, - &validator_archive_path, + validator_snapshot_test_config.snapshot_archives_dir.path(), archive_snapshot_hash, false, - snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, &mut None, ) .unwrap(); @@ -1720,9 +1710,9 @@ fn test_snapshot_restart_tower() { wait_for_next_snapshot(&cluster, snapshot_package_output_path); // Copy archive to validator's snapshot output directory - let validator_archive_path = snapshot_utils::build_snapshot_archive_path( + let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path( validator_snapshot_test_config - .snapshot_output_path + .snapshot_archives_dir .path() .to_path_buf(), archive_snapshot_hash.0, @@ -1783,7 +1773,7 @@ fn test_snapshots_blockstore_floor() { let archive_info = loop { let archive = - snapshot_utils::get_highest_snapshot_archive_info(&snapshot_package_output_path); + snapshot_utils::get_highest_full_snapshot_archive_info(&snapshot_package_output_path); if archive.is_some() { trace!("snapshot exists"); break archive.unwrap(); @@ -1792,17 +1782,17 @@ fn test_snapshots_blockstore_floor() { }; // Copy archive to validator's snapshot output directory - let validator_archive_path = snapshot_utils::build_snapshot_archive_path( + let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path( validator_snapshot_test_config - .snapshot_output_path + .snapshot_archives_dir .path() .to_path_buf(), - archive_info.slot, - &archive_info.hash, + *archive_info.slot(), + archive_info.hash(), ArchiveFormat::TarBzip2, ); - fs::hard_link(archive_info.path, &validator_archive_path).unwrap(); - let slot_floor = archive_info.slot; + fs::hard_link(archive_info.path(), &validator_archive_path).unwrap(); + let slot_floor = *archive_info.slot(); // Start up a new node from a snapshot let validator_stake = 5; @@ -3284,19 +3274,25 @@ fn wait_for_next_snapshot( last_slot ); loop { - if let Some(snapshot_archive_info) = - snapshot_utils::get_highest_snapshot_archive_info(snapshot_package_output_path) + if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_package_output_path) { - trace!("snapshot for slot {} exists", snapshot_archive_info.slot); - if snapshot_archive_info.slot >= last_slot { + trace!( + "full snapshot for slot {} exists", + full_snapshot_archive_info.slot() + ); + if *full_snapshot_archive_info.slot() >= last_slot { return ( - snapshot_archive_info.path, - (snapshot_archive_info.slot, snapshot_archive_info.hash), + full_snapshot_archive_info.path().clone(), + ( + *full_snapshot_archive_info.slot(), + *full_snapshot_archive_info.hash(), + ), ); } trace!( - "snapshot slot {} < last_slot {}", - snapshot_archive_info.slot, + "full snapshot slot {} < last_slot {}", + full_snapshot_archive_info.slot(), last_slot ); } @@ -3323,7 +3319,7 @@ fn generate_account_paths(num_account_paths: usize) -> (Vec, Vec, validator_config: ValidatorConfig, } @@ -3334,14 +3330,14 @@ fn setup_snapshot_validator_config( ) -> SnapshotValidatorConfig { // Create the snapshot config let snapshot_dir = tempfile::tempdir_in(farf_dir()).unwrap(); - let snapshot_output_path = tempfile::tempdir_in(farf_dir()).unwrap(); + let snapshot_archives_dir = tempfile::tempdir_in(farf_dir()).unwrap(); let snapshot_config = SnapshotConfig { snapshot_interval_slots, - snapshot_package_output_path: PathBuf::from(snapshot_output_path.path()), - snapshot_path: PathBuf::from(snapshot_dir.path()), + snapshot_package_output_path: snapshot_archives_dir.path().to_path_buf(), + snapshot_path: snapshot_dir.path().to_path_buf(), archive_format: ArchiveFormat::TarBzip2, snapshot_version: snapshot_utils::SnapshotVersion::default(), - maximum_snapshots_to_retain: snapshot_utils::DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: snapshot_utils::DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }; // Create the account paths @@ -3357,7 +3353,7 @@ fn setup_snapshot_validator_config( SnapshotValidatorConfig { _snapshot_dir: snapshot_dir, - snapshot_output_path, + snapshot_archives_dir, account_storage_dirs, validator_config, } diff --git a/measure/src/measure.rs b/measure/src/measure.rs index 26f32b097..3ab35229a 100644 --- a/measure/src/measure.rs +++ b/measure/src/measure.rs @@ -1,6 +1,7 @@ use solana_sdk::timing::duration_as_ns; use std::{fmt, time::Instant}; +#[derive(Debug)] pub struct Measure { name: &'static str, start: Instant, diff --git a/rpc/src/rpc.rs b/rpc/src/rpc.rs index 9e20e752c..6a9b3d635 100644 --- a/rpc/src/rpc.rs +++ b/rpc/src/rpc.rs @@ -2271,7 +2271,7 @@ pub mod rpc_minimal { meta.snapshot_config .and_then(|snapshot_config| { - snapshot_utils::get_highest_snapshot_archive_slot( + snapshot_utils::get_highest_full_snapshot_archive_slot( &snapshot_config.snapshot_package_output_path, ) }) diff --git a/rpc/src/rpc_service.rs b/rpc/src/rpc_service.rs index 4a357c436..8b4adc974 100644 --- a/rpc/src/rpc_service.rs +++ b/rpc/src/rpc_service.rs @@ -205,14 +205,14 @@ impl RequestMiddleware for RpcRequestMiddleware { if let Some(ref snapshot_config) = self.snapshot_config { if request.uri().path() == "/snapshot.tar.bz2" { // Convenience redirect to the latest snapshot - return if let Some(snapshot_archive_info) = - snapshot_utils::get_highest_snapshot_archive_info( + return if let Some(full_snapshot_archive_info) = + snapshot_utils::get_highest_full_snapshot_archive_info( &snapshot_config.snapshot_package_output_path, ) { RpcRequestMiddleware::redirect(&format!( "/{}", - snapshot_archive_info - .path + full_snapshot_archive_info + .path() .file_name() .unwrap_or_else(|| std::ffi::OsStr::new("")) .to_str() @@ -500,7 +500,9 @@ mod tests { }, solana_runtime::{ bank::Bank, - snapshot_utils::{ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }, solana_sdk::{ genesis_config::{ClusterType, DEFAULT_GENESIS_ARCHIVE}, @@ -606,7 +608,7 @@ mod tests { snapshot_path: PathBuf::from("/"), archive_format: ArchiveFormat::TarBzip2, snapshot_version: SnapshotVersion::default(), - maximum_snapshots_to_retain: DEFAULT_MAX_SNAPSHOTS_TO_RETAIN, + maximum_snapshots_to_retain: DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }), bank_forks, RpcHealth::stub(), diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 05b2ace71..cd06ea9d1 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -192,7 +192,7 @@ impl SnapshotRequestHandler { // Cleanup outdated snapshots let mut purge_old_snapshots_time = Measure::start("purge_old_snapshots_time"); - snapshot_utils::purge_old_snapshots(&self.snapshot_config.snapshot_path); + snapshot_utils::purge_old_bank_snapshots(&self.snapshot_config.snapshot_path); purge_old_snapshots_time.stop(); total_time.stop(); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 90fb22616..140bcb328 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -38,7 +38,7 @@ use crate::{ AccountAddressFilter, Accounts, TransactionAccounts, TransactionLoadResult, TransactionLoaders, }, - accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorages}, + accounts_db::{AccountShrinkThreshold, ErrorCounters, SnapshotStorage, SnapshotStorages}, accounts_index::{AccountSecondaryIndexes, IndexKey, ScanResult}, ancestors::{Ancestors, AncestorsForSerialization}, blockhash_queue::BlockhashQueue, @@ -4737,6 +4737,21 @@ impl Bank { self.rc.get_snapshot_storages(self.slot()) } + /// Get the snapshot storages _higher than_ the `full_snapshot_slot`. This is used when making an + /// incremental snapshot. + pub fn get_incremental_snapshot_storages(&self, full_snapshot_slot: Slot) -> SnapshotStorages { + self.get_snapshot_storages() + .into_iter() + .map(|storage| { + storage + .into_iter() + .filter(|entry| entry.slot() > full_snapshot_slot) + .collect::() + }) + .filter(|storage| !storage.is_empty()) + .collect() + } + #[must_use] fn verify_hash(&self) -> bool { assert!(self.is_frozen()); diff --git a/runtime/src/serde_snapshot.rs b/runtime/src/serde_snapshot.rs index 9f1e65a80..644f13688 100644 --- a/runtime/src/serde_snapshot.rs +++ b/runtime/src/serde_snapshot.rs @@ -74,6 +74,62 @@ struct AccountsDbFields( BankHashInfo, ); +/// Helper type to wrap BufReader streams when deserializing and reconstructing from either just a +/// full snapshot, or both a full and incremental snapshot +pub struct SnapshotStreams<'a, R> { + pub full_snapshot_stream: &'a mut BufReader, + pub incremental_snapshot_stream: Option<&'a mut BufReader>, +} + +/// Helper type to wrap AccountsDbFields when reconstructing AccountsDb from either just a full +/// snapshot, or both a full and incremental snapshot +#[derive(Debug)] +struct SnapshotAccountsDbFields { + full_snapshot_accounts_db_fields: AccountsDbFields, + incremental_snapshot_accounts_db_fields: Option>, +} + +impl SnapshotAccountsDbFields { + /// Collapse the SnapshotAccountsDbFields into a single AccountsDbFields. If there is no + /// incremental snapshot, this returns the AccountsDbFields from the full snapshot. Otherwise + /// this uses the version, slot, and bank hash info from the incremental snapshot, then the + /// combination of the storages from both the full and incremental snapshots. + fn collapse_into(self) -> Result, Error> { + match self.incremental_snapshot_accounts_db_fields { + None => Ok(self.full_snapshot_accounts_db_fields), + Some(AccountsDbFields( + mut incremental_snapshot_storages, + incremental_snapshot_version, + incremental_snapshot_slot, + incremental_snapshot_bank_hash_info, + )) => { + let full_snapshot_storages = self.full_snapshot_accounts_db_fields.0; + let full_snapshot_slot = self.full_snapshot_accounts_db_fields.2; + + // filter out incremental snapshot storages with slot <= full snapshot slot + incremental_snapshot_storages.retain(|slot, _| *slot > full_snapshot_slot); + + // There must not be any overlap in the slots of storages between the full snapshot and the incremental snapshot + incremental_snapshot_storages + .iter() + .all(|storage_entry| !full_snapshot_storages.contains_key(storage_entry.0)).then(|| ()).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidData, "Snapshots are incompatible: There are storages for the same slot in both the full snapshot and the incremental snapshot!") + })?; + + let mut combined_storages = full_snapshot_storages; + combined_storages.extend(incremental_snapshot_storages.into_iter()); + + Ok(AccountsDbFields( + combined_storages, + incremental_snapshot_version, + incremental_snapshot_slot, + incremental_snapshot_bank_hash_info, + )) + } + } + } +} + trait TypeContext<'a> { type SerializableAccountStorageEntry: Serialize + DeserializeOwned @@ -127,16 +183,16 @@ where } #[allow(clippy::too_many_arguments)] -pub(crate) fn bank_from_stream( +pub(crate) fn bank_from_streams( serde_style: SerdeStyle, - stream: &mut BufReader, + snapshot_streams: &mut SnapshotStreams, account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, genesis_config: &GenesisConfig, frozen_account_pubkeys: &[Pubkey], debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -147,18 +203,33 @@ where { macro_rules! INTO { ($x:ident) => {{ - let (bank_fields, accounts_db_fields) = $x::deserialize_bank_fields(stream)?; + let (full_snapshot_bank_fields, full_snapshot_accounts_db_fields) = + $x::deserialize_bank_fields(snapshot_streams.full_snapshot_stream)?; + let (incremental_snapshot_bank_fields, incremental_snapshot_accounts_db_fields) = + if let Some(ref mut incremental_snapshot_stream) = + snapshot_streams.incremental_snapshot_stream + { + let (bank_fields, accounts_db_fields) = + $x::deserialize_bank_fields(incremental_snapshot_stream)?; + (Some(bank_fields), Some(accounts_db_fields)) + } else { + (None, None) + }; + let snapshot_accounts_db_fields = SnapshotAccountsDbFields { + full_snapshot_accounts_db_fields, + incremental_snapshot_accounts_db_fields, + }; let bank = reconstruct_bank_from_fields( - bank_fields, - accounts_db_fields, + incremental_snapshot_bank_fields.unwrap_or(full_snapshot_bank_fields), + snapshot_accounts_db_fields, genesis_config, frozen_account_pubkeys, account_paths, unpacked_append_vec_map, debug_keys, additional_builtins, - account_indexes, + account_secondary_indexes, caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, @@ -243,14 +314,14 @@ impl<'a, C> IgnoreAsHelper for SerializableAccountsDb<'a, C> {} #[allow(clippy::too_many_arguments)] fn reconstruct_bank_from_fields( bank_fields: BankFieldsToDeserialize, - accounts_db_fields: AccountsDbFields, + snapshot_accounts_db_fields: SnapshotAccountsDbFields, genesis_config: &GenesisConfig, frozen_account_pubkeys: &[Pubkey], account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -260,11 +331,11 @@ where E: SerializableStorage + std::marker::Sync, { let mut accounts_db = reconstruct_accountsdb_from_fields( - accounts_db_fields, + snapshot_accounts_db_fields, account_paths, unpacked_append_vec_map, &genesis_config.cluster_type, - account_indexes, + account_secondary_indexes, caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, @@ -310,11 +381,11 @@ where } fn reconstruct_accountsdb_from_fields( - accounts_db_fields: AccountsDbFields, + snapshot_accounts_db_fields: SnapshotAccountsDbFields, account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, cluster_type: &ClusterType, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -326,11 +397,19 @@ where let mut accounts_db = AccountsDb::new_with_config( account_paths.to_vec(), cluster_type, - account_indexes, + account_secondary_indexes, caching_enabled, shrink_ratio, ); - let AccountsDbFields(storage, version, slot, bank_hash_info) = accounts_db_fields; + + let AccountsDbFields( + snapshot_storages, + snapshot_version, + snapshot_slot, + snapshot_bank_hash_info, + ) = snapshot_accounts_db_fields.collapse_into()?; + + let snapshot_storages = snapshot_storages.into_iter().collect::>(); // Ensure all account paths exist for path in &accounts_db.paths { @@ -338,13 +417,11 @@ where .unwrap_or_else(|err| panic!("Failed to create directory {}: {}", path.display(), err)); } - let storage = storage.into_iter().collect::>(); - // Remap the deserialized AppendVec paths to point to correct local paths - let mut storage = (0..storage.len()) + let mut storage = (0..snapshot_storages.len()) .into_par_iter() .map(|i| { - let (slot, slot_storage) = &storage[i]; + let (slot, slot_storage) = &snapshot_storages[i]; let mut new_slot_storage = HashMap::new(); for storage_entry in slot_storage { let file_name = AppendVec::file_name(*slot, storage_entry.id()); @@ -376,7 +453,7 @@ where .bank_hashes .write() .unwrap() - .insert(slot, bank_hash_info); + .insert(snapshot_slot, snapshot_bank_hash_info); // Process deserialized data, set necessary fields in self let max_id: usize = *storage @@ -400,7 +477,7 @@ where accounts_db.next_id.store(max_id + 1, Ordering::Relaxed); accounts_db .write_version - .fetch_add(version, Ordering::Relaxed); + .fetch_add(snapshot_version, Ordering::Relaxed); accounts_db.generate_index(limit_load_slot_count_from_snapshot, verify_index); Ok(accounts_db) } diff --git a/runtime/src/serde_snapshot/tests.rs b/runtime/src/serde_snapshot/tests.rs index 2d40b5448..2e6f6da9d 100644 --- a/runtime/src/serde_snapshot/tests.rs +++ b/runtime/src/serde_snapshot/tests.rs @@ -66,8 +66,13 @@ where R: Read, { // read and deserialise the accounts database directly from the stream + let accounts_db_fields = C::deserialize_accounts_db_fields(stream)?; + let snapshot_accounts_db_fields = SnapshotAccountsDbFields { + full_snapshot_accounts_db_fields: accounts_db_fields, + incremental_snapshot_accounts_db_fields: None, + }; reconstruct_accountsdb_from_fields( - C::deserialize_accounts_db_fields(stream)?, + snapshot_accounts_db_fields, account_paths, unpacked_append_vec_map, &ClusterType::Development, @@ -219,9 +224,13 @@ fn test_bank_serialize_style(serde_style: SerdeStyle) { let copied_accounts = TempDir::new().unwrap(); let unpacked_append_vec_map = copy_append_vecs(&bank2.rc.accounts.accounts_db, copied_accounts.path()).unwrap(); - let mut dbank = crate::serde_snapshot::bank_from_stream( + let mut snapshot_streams = SnapshotStreams { + full_snapshot_stream: &mut reader, + incremental_snapshot_stream: None, + }; + let mut dbank = crate::serde_snapshot::bank_from_streams( serde_style, - &mut reader, + &mut snapshot_streams, &dbank_paths, unpacked_append_vec_map, &genesis_config, diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index dbc12e328..84f668fb9 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -5,7 +5,8 @@ use { bank::{Bank, BankSlotDelta, Builtins}, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, serde_snapshot::{ - bank_from_stream, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, + bank_from_streams, bank_to_stream, SerdeStyle, SnapshotStorage, SnapshotStorages, + SnapshotStreams, }, shared_buffer_reader::{SharedBuffer, SharedBufferReader}, snapshot_package::{ @@ -16,54 +17,170 @@ use { bincode::{config::Options, serialize_into}, bzip2::bufread::BzDecoder, flate2::read::GzDecoder, + lazy_static::lazy_static, log::*, rayon::{prelude::*, ThreadPool}, regex::Regex, solana_measure::measure::Measure, solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}, std::{ - cmp::max, - cmp::Ordering, + cmp::{max, Ordering}, collections::HashSet, fmt, fs::{self, File}, - io::{ - self, BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, SeekFrom, Write, - }, + io::{self, BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, Write}, path::{Path, PathBuf}, process::{self, ExitStatus}, str::FromStr, sync::Arc, }, tar::Archive, + tempfile::TempDir, thiserror::Error, }; -/// Information about a snapshot archive: its path, slot, hash, and archive format -pub struct SnapshotArchiveInfo { +/// Common information about a snapshot archive +#[derive(PartialEq, Eq, Debug)] +struct SnapshotArchiveInfo { /// Path to the snapshot archive file - pub path: PathBuf, + path: PathBuf, /// Slot that the snapshot was made - pub slot: Slot, + slot: Slot, /// Hash of the accounts at this slot - pub hash: Hash, + hash: Hash, /// Archive format for the snapshot file - pub archive_format: ArchiveFormat, + archive_format: ArchiveFormat, } + +/// Information about a full snapshot archive: its path, slot, hash, and archive format +#[derive(PartialEq, Eq, Debug)] +pub struct FullSnapshotArchiveInfo(SnapshotArchiveInfo); + +impl FullSnapshotArchiveInfo { + /// Parse the path to a full snapshot archive and return a new `FullSnapshotArchiveInfo` + fn new_from_path(path: PathBuf) -> Result { + let filename = path_to_file_name_str(path.as_path())?; + let (slot, hash, archive_format) = parse_full_snapshot_archive_filename(filename)?; + + Ok(Self(SnapshotArchiveInfo { + path, + slot, + hash, + archive_format, + })) + } + + pub fn path(&self) -> &PathBuf { + &self.0.path + } + + pub fn slot(&self) -> &Slot { + &self.0.slot + } + + pub fn hash(&self) -> &Hash { + &self.0.hash + } + + pub fn archive_format(&self) -> &ArchiveFormat { + &self.0.archive_format + } +} + +impl PartialOrd for FullSnapshotArchiveInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Order `FullSnapshotArchiveInfo` by slot (ascending), which practially is sorting chronologically +impl Ord for FullSnapshotArchiveInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.slot().cmp(other.slot()) + } +} + +/// Information about an incremental snapshot archive: its path, slot, base slot, hash, and archive format +#[derive(PartialEq, Eq, Debug)] +pub struct IncrementalSnapshotArchiveInfo { + /// The slot that the incremental snapshot was based from. This is the same as the full + /// snapshot slot used when making the incremental snapshot. + base_slot: Slot, + + /// Use the `SnapshotArchiveInfo` struct for the common fields: path, slot, hash, and + /// archive_format, but as they pertain to the incremental snapshot. + inner: SnapshotArchiveInfo, +} + +impl IncrementalSnapshotArchiveInfo { + /// Parse the path to an incremental snapshot archive and return a new `IncrementalSnapshotArchiveInfo` + fn new_from_path(path: PathBuf) -> Result { + let filename = path_to_file_name_str(path.as_path())?; + let (base_slot, slot, hash, archive_format) = + parse_incremental_snapshot_archive_filename(filename)?; + + Ok(Self { + base_slot, + inner: SnapshotArchiveInfo { + path, + slot, + hash, + archive_format, + }, + }) + } + + fn path(&self) -> &PathBuf { + &self.inner.path + } + + fn base_slot(&self) -> &Slot { + &self.base_slot + } + + fn slot(&self) -> &Slot { + &self.inner.slot + } + + fn _hash(&self) -> &Hash { + &self.inner.hash + } + + fn _archive_format(&self) -> &ArchiveFormat { + &self.inner.archive_format + } +} + +impl PartialOrd for IncrementalSnapshotArchiveInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Order `IncrementalSnapshotArchiveInfo` by base slot (ascending), then slot (ascending), which +// practially is sorting chronologically +impl Ord for IncrementalSnapshotArchiveInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.base_slot() + .cmp(other.base_slot()) + .then(self.slot().cmp(other.slot())) + } +} + pub const SNAPSHOT_STATUS_CACHE_FILE_NAME: &str = "status_cache"; -pub const MAX_SNAPSHOTS: usize = 8; // Save some snapshots but not too many +pub const MAX_BANK_SNAPSHOTS: usize = 8; // Save some snapshots but not too many const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB const VERSION_STRING_V1_2_0: &str = "1.2.0"; const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0; -const TMP_SNAPSHOT_PREFIX: &str = "tmp-snapshot-"; -pub const DEFAULT_MAX_SNAPSHOTS_TO_RETAIN: usize = 2; - -pub const SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = - r"^snapshot-(\d+)-([[:alnum:]]+)\.(tar|tar\.bz2|tar\.zst|tar\.gz)$"; +const TMP_FULL_SNAPSHOT_PREFIX: &str = "tmp-snapshot-"; +const TMP_INCREMENTAL_SNAPSHOT_PREFIX: &str = "tmp-incremental-snapshot-"; +pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2; +pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P[[:digit:]]+)-(?P[[:alnum:]]+)\.(?Ptar|tar\.bz2|tar\.zst|tar\.gz)$"; +pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P[[:digit:]]+)-(?P[[:digit:]]+)-(?P[[:alnum:]]+)\.(?Ptar|tar\.bz2|tar\.zst|tar\.gz)$"; #[derive(Copy, Clone, Eq, PartialEq, Debug)] pub enum SnapshotVersion { @@ -129,10 +246,48 @@ pub enum ArchiveFormat { Tar, } +/// A slot and the path to its bank snapshot #[derive(PartialEq, Eq, Debug)] -pub struct SlotSnapshotPaths { +pub struct BankSnapshotInfo { pub slot: Slot, - pub snapshot_file_path: PathBuf, + pub snapshot_path: PathBuf, +} + +impl PartialOrd for BankSnapshotInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +// Order BankSnapshotInfo by slot (ascending), which practially is sorting chronologically +impl Ord for BankSnapshotInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.slot.cmp(&other.slot) + } +} + +/// Helper type when rebuilding from snapshots. Designed to handle when rebuilding from just a +/// full snapshot, or from both a full snapshot and an incremental snapshot. +#[derive(Debug)] +struct SnapshotRootPaths { + full_snapshot_root_file_path: PathBuf, + incremental_snapshot_root_file_path: Option, +} + +/// Helper type to bundle up the results from `unarchive_snapshot()` +#[derive(Debug)] +struct UnarchivedSnapshot { + unpack_dir: TempDir, + unpacked_append_vec_map: UnpackedAppendVecMap, + unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion, + measure_untar: Measure, +} + +/// Helper type for passing around the unpacked snapshots dir and the snapshot version together +#[derive(Debug)] +struct UnpackedSnapshotsDirAndVersion { + unpacked_snapshots_dir: PathBuf, + snapshot_version: String, } #[derive(Error, Debug)] @@ -157,53 +312,139 @@ pub enum SnapshotError { #[error("source({1}) - I/O error: {0}")] IoWithSource(std::io::Error, &'static str), + + #[error("could not get file name from path: {}", .0.display())] + PathToFileNameError(PathBuf), + + #[error("could not get str from file name: {}", .0.display())] + FileNameToStrError(PathBuf), + + #[error("could not parse snapshot archive's file name: {0}")] + ParseSnapshotArchiveFileNameError(String), + + #[error("snapshots are incompatible: full snapshot slot ({0}) and incremental snapshot base slot ({1}) do not match")] + MismatchedBaseSlot(Slot, Slot), } pub type Result = std::result::Result; -impl PartialOrd for SlotSnapshotPaths { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.slot.cmp(&other.slot)) - } -} - -impl Ord for SlotSnapshotPaths { - fn cmp(&self, other: &Self) -> Ordering { - self.slot.cmp(&other.slot) - } -} - -pub fn package_snapshot, Q: AsRef>( +/// Package up bank snapshot files, snapshot storages, and slot deltas for a full snapshot. +pub fn package_full_snapshot( bank: &Bank, - snapshot_files: &SlotSnapshotPaths, - snapshot_path: Q, + bank_snapshot_info: &BankSnapshotInfo, + snapshots_dir: P, + status_cache_slot_deltas: Vec, + snapshot_package_output_path: Q, + snapshot_storages: SnapshotStorages, + archive_format: ArchiveFormat, + snapshot_version: SnapshotVersion, + hash_for_testing: Option, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + info!( + "Package full snapshot for bank: {} has {} account storage entries", + bank.slot(), + snapshot_storages.len() + ); + + let snapshot_tmpdir = tempfile::Builder::new() + .prefix(&format!("{}{}-", TMP_FULL_SNAPSHOT_PREFIX, bank.slot())) + .tempdir_in(snapshots_dir)?; + + do_package_snapshot( + bank, + bank_snapshot_info, + status_cache_slot_deltas, + snapshot_package_output_path, + snapshot_storages, + archive_format, + snapshot_version, + hash_for_testing, + snapshot_tmpdir, + ) +} + +/// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot. +#[allow(clippy::too_many_arguments)] +fn package_incremental_snapshot( + bank: &Bank, + incremental_snapshot_base_slot: Slot, + bank_snapshot_info: &BankSnapshotInfo, + snapshots_dir: P, + status_cache_slot_deltas: Vec, + snapshot_package_output_path: Q, + snapshot_storages: SnapshotStorages, + archive_format: ArchiveFormat, + snapshot_version: SnapshotVersion, + hash_for_testing: Option, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + info!( + "Package incremental snapshot for bank {} (from base slot {}) has {} account storage entries", + bank.slot(), + incremental_snapshot_base_slot, + snapshot_storages.len() + ); + + assert!( + snapshot_storages.iter().all(|storage| storage + .iter() + .all(|entry| entry.slot() > incremental_snapshot_base_slot)), + "Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!" + ); + + let snapshot_tmpdir = tempfile::Builder::new() + .prefix(&format!( + "{}{}-{}-", + TMP_INCREMENTAL_SNAPSHOT_PREFIX, + incremental_snapshot_base_slot, + bank.slot() + )) + .tempdir_in(snapshots_dir)?; + + do_package_snapshot( + bank, + bank_snapshot_info, + status_cache_slot_deltas, + snapshot_package_output_path, + snapshot_storages, + archive_format, + snapshot_version, + hash_for_testing, + snapshot_tmpdir, + ) +} + +fn do_package_snapshot

( + bank: &Bank, + bank_snapshot_info: &BankSnapshotInfo, status_cache_slot_deltas: Vec, snapshot_package_output_path: P, snapshot_storages: SnapshotStorages, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, hash_for_testing: Option, -) -> Result { - // Hard link all the snapshots we need for this package - let snapshot_tmpdir = tempfile::Builder::new() - .prefix(&format!("{}{}-", TMP_SNAPSHOT_PREFIX, bank.slot())) - .tempdir_in(snapshot_path)?; - + snapshot_tmpdir: TempDir, +) -> Result +where + P: AsRef, +{ // Create a snapshot package - info!( - "Snapshot for bank: {} has {} account storage entries", - bank.slot(), - snapshot_storages.len() - ); // Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging. { let snapshot_hardlink_dir = snapshot_tmpdir .as_ref() - .join(snapshot_files.slot.to_string()); + .join(bank_snapshot_info.slot.to_string()); fs::create_dir_all(&snapshot_hardlink_dir)?; fs::hard_link( - &snapshot_files.snapshot_file_path, - &snapshot_hardlink_dir.join(snapshot_files.slot.to_string()), + &bank_snapshot_info.snapshot_path, + &snapshot_hardlink_dir.join(bank_snapshot_info.slot.to_string()), )?; } @@ -234,16 +475,17 @@ fn get_archive_ext(archive_format: ArchiveFormat) -> &'static str { } } -// If the validator is halted in the middle of `archive_snapshot_package` the temporary staging directory -// won't be cleaned up. Call this function to clean them up -pub fn remove_tmp_snapshot_archives(snapshot_path: &Path) { - if let Ok(entries) = fs::read_dir(&snapshot_path) { +/// If the validator halts in the middle of `archive_snapshot_package()`, the temporary staging +/// directory won't be cleaned up. Call this function to clean them up. +pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: &Path) { + if let Ok(entries) = fs::read_dir(snapshot_archives_dir) { for entry in entries.filter_map(|entry| entry.ok()) { - if entry + let file_name = entry .file_name() .into_string() - .unwrap_or_else(|_| String::new()) - .starts_with(TMP_SNAPSHOT_PREFIX) + .unwrap_or_else(|_| String::new()); + if file_name.starts_with(TMP_FULL_SNAPSHOT_PREFIX) + || file_name.starts_with(TMP_INCREMENTAL_SNAPSHOT_PREFIX) { if entry.path().is_file() { fs::remove_file(entry.path()) @@ -258,7 +500,7 @@ pub fn remove_tmp_snapshot_archives(snapshot_path: &Path) { } } -/// Make a snapshot archive out of the AccountsPackage +/// Make a full snapshot archive out of the AccountsPackage pub fn archive_snapshot_package( snapshot_package: &AccountsPackage, maximum_snapshots_to_retain: usize, @@ -290,7 +532,7 @@ pub fn archive_snapshot_package( let staging_dir = tempfile::Builder::new() .prefix(&format!( "{}{}-", - TMP_SNAPSHOT_PREFIX, snapshot_package.slot + TMP_FULL_SNAPSHOT_PREFIX, snapshot_package.slot )) .tempdir_in(tar_dir) .map_err(|e| SnapshotError::IoWithSource(e, "create archive tempdir"))?; @@ -343,7 +585,7 @@ pub fn archive_snapshot_package( // system `tar` program is used for -S (sparse file support) let archive_path = tar_dir.join(format!( "{}{}.{}", - TMP_SNAPSHOT_PREFIX, snapshot_package.slot, file_ext + TMP_FULL_SNAPSHOT_PREFIX, snapshot_package.slot, file_ext )); let mut tar = process::Command::new("tar") @@ -410,10 +652,7 @@ pub fn archive_snapshot_package( fs::rename(&archive_path, &snapshot_package.tar_output_file) .map_err(|e| SnapshotError::IoWithSource(e, "archive path rename"))?; - purge_old_snapshot_archives( - snapshot_package.tar_output_file.parent().unwrap(), - maximum_snapshots_to_retain, - ); + purge_old_snapshot_archives(tar_dir, maximum_snapshots_to_retain); timer.stop(); info!( @@ -432,43 +671,56 @@ pub fn archive_snapshot_package( Ok(()) } -pub fn get_snapshot_paths>(snapshot_path: P) -> Vec +/// Get a list of bank snapshots in a directory +pub fn get_bank_snapshots

(snapshots_dir: P) -> Vec where - P: fmt::Debug, + P: AsRef, { - match fs::read_dir(&snapshot_path) { - Ok(paths) => { - let mut names = paths - .filter_map(|entry| { - entry.ok().and_then(|e| { - e.path() - .file_name() - .and_then(|n| n.to_str().map(|s| s.parse::().ok())) - .unwrap_or(None) - }) + match fs::read_dir(&snapshots_dir) { + Ok(paths) => paths + .filter_map(|entry| { + entry.ok().and_then(|e| { + e.path() + .file_name() + .and_then(|n| n.to_str().map(|s| s.parse::().ok())) + .unwrap_or(None) }) - .map(|slot| { - let snapshot_path = snapshot_path.as_ref().join(slot.to_string()); - SlotSnapshotPaths { - slot, - snapshot_file_path: snapshot_path.join(get_snapshot_file_name(slot)), - } - }) - .collect::>(); - - names.sort(); - names - } + }) + .map(|slot| { + let snapshot_file_name = get_snapshot_file_name(slot); + // So nice I join-ed it twice! The redundant `snapshot_file_name` is unintentional + // and should be simplified. Kept for compatibility. + let snapshot_path = snapshots_dir + .as_ref() + .join(&snapshot_file_name) + .join(snapshot_file_name); + BankSnapshotInfo { + slot, + snapshot_path, + } + }) + .collect::>(), Err(err) => { info!( - "Unable to read snapshot directory {:?}: {}", - snapshot_path, err + "Unable to read snapshots directory {}: {}", + snapshots_dir.as_ref().display(), + err ); vec![] } } } +/// Get the bank snapshot with the highest slot in a directory +pub fn get_highest_bank_snapshot_info

(snapshots_dir: P) -> Option +where + P: AsRef, +{ + let mut bank_snapshot_infos = get_bank_snapshots(snapshots_dir); + bank_snapshot_infos.sort_unstable(); + bank_snapshot_infos.into_iter().rev().next() +} + pub fn serialize_snapshot_data_file(data_file_path: &Path, serializer: F) -> Result where F: FnOnce(&mut BufWriter) -> Result<()>, @@ -480,12 +732,32 @@ where ) } -pub fn deserialize_snapshot_data_file(data_file_path: &Path, deserializer: F) -> Result -where - F: FnOnce(&mut BufReader) -> Result, -{ - deserialize_snapshot_data_file_capped::( - data_file_path, +pub fn deserialize_snapshot_data_file( + data_file_path: &Path, + deserializer: impl FnOnce(&mut BufReader) -> Result, +) -> Result { + let wrapped_deserializer = move |streams: &mut SnapshotStreams| -> Result { + deserializer(&mut streams.full_snapshot_stream) + }; + + let wrapped_data_file_path = SnapshotRootPaths { + full_snapshot_root_file_path: data_file_path.to_path_buf(), + incremental_snapshot_root_file_path: None, + }; + + deserialize_snapshot_data_files_capped( + &wrapped_data_file_path, + MAX_SNAPSHOT_DATA_FILE_SIZE, + wrapped_deserializer, + ) +} + +fn deserialize_snapshot_data_files( + snapshot_root_paths: &SnapshotRootPaths, + deserializer: impl FnOnce(&mut SnapshotStreams) -> Result, +) -> Result { + deserialize_snapshot_data_files_capped( + snapshot_root_paths, MAX_SNAPSHOT_DATA_FILE_SIZE, deserializer, ) @@ -504,7 +776,7 @@ where serializer(&mut data_file_stream)?; data_file_stream.flush()?; - let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?; + let consumed_size = data_file_stream.stream_position()?; if consumed_size > maximum_file_size { let error_message = format!( "too large snapshot data file to serialize: {:?} has {} bytes", @@ -515,55 +787,123 @@ where Ok(consumed_size) } -fn deserialize_snapshot_data_file_capped( - data_file_path: &Path, +fn deserialize_snapshot_data_files_capped( + snapshot_root_paths: &SnapshotRootPaths, maximum_file_size: u64, - deserializer: F, -) -> Result -where - F: FnOnce(&mut BufReader) -> Result, -{ - let file_size = fs::metadata(&data_file_path)?.len(); + deserializer: impl FnOnce(&mut SnapshotStreams) -> Result, +) -> Result { + let (full_snapshot_file_size, mut full_snapshot_data_file_stream) = + create_snapshot_data_file_stream( + &snapshot_root_paths.full_snapshot_root_file_path, + maximum_file_size, + )?; - if file_size > maximum_file_size { - let error_message = format!( - "too large snapshot data file to deserialize: {:?} has {} bytes", - data_file_path, file_size - ); - return Err(get_io_error(&error_message)); - } + let (incremental_snapshot_file_size, mut incremental_snapshot_data_file_stream) = + if let Some(ref incremental_snapshot_root_file_path) = + snapshot_root_paths.incremental_snapshot_root_file_path + { + let (incremental_snapshot_file_size, incremental_snapshot_data_file_stream) = + create_snapshot_data_file_stream( + incremental_snapshot_root_file_path, + maximum_file_size, + )?; + ( + Some(incremental_snapshot_file_size), + Some(incremental_snapshot_data_file_stream), + ) + } else { + (None, None) + }; - let data_file = File::open(data_file_path)?; - let mut data_file_stream = BufReader::new(data_file); + let mut snapshot_streams = SnapshotStreams { + full_snapshot_stream: &mut full_snapshot_data_file_stream, + incremental_snapshot_stream: incremental_snapshot_data_file_stream.as_mut(), + }; + let ret = deserializer(&mut snapshot_streams)?; - let ret = deserializer(&mut data_file_stream)?; + check_deserialize_file_consumed( + full_snapshot_file_size, + &snapshot_root_paths.full_snapshot_root_file_path, + &mut full_snapshot_data_file_stream, + )?; - let consumed_size = data_file_stream.seek(SeekFrom::Current(0))?; - - if file_size != consumed_size { - let error_message = format!( - "invalid snapshot data file: {:?} has {} bytes, however consumed {} bytes to deserialize", - data_file_path, file_size, consumed_size - ); - return Err(get_io_error(&error_message)); + if let Some(ref incremental_snapshot_root_file_path) = + snapshot_root_paths.incremental_snapshot_root_file_path + { + check_deserialize_file_consumed( + incremental_snapshot_file_size.unwrap(), + incremental_snapshot_root_file_path, + incremental_snapshot_data_file_stream.as_mut().unwrap(), + )?; } Ok(ret) } -pub fn add_snapshot>( - snapshot_path: P, +/// Before running the deserializer function, perform common operations on the snapshot archive +/// files, such as checking the file size and opening the file into a stream. +fn create_snapshot_data_file_stream

( + snapshot_root_file_path: P, + maximum_file_size: u64, +) -> Result<(u64, BufReader)> +where + P: AsRef, +{ + let snapshot_file_size = fs::metadata(&snapshot_root_file_path)?.len(); + + if snapshot_file_size > maximum_file_size { + let error_message = + format!( + "too large snapshot data file to deserialize: {} has {} bytes (max size is {} bytes)", + snapshot_root_file_path.as_ref().display(), snapshot_file_size, maximum_file_size + ); + return Err(get_io_error(&error_message)); + } + + let snapshot_data_file = File::open(&snapshot_root_file_path)?; + let snapshot_data_file_stream = BufReader::new(snapshot_data_file); + + Ok((snapshot_file_size, snapshot_data_file_stream)) +} + +/// After running the deserializer function, perform common checks to ensure the snapshot archive +/// files were consumed correctly. +fn check_deserialize_file_consumed

( + file_size: u64, + file_path: P, + file_stream: &mut BufReader, +) -> Result<()> +where + P: AsRef, +{ + let consumed_size = file_stream.stream_position()?; + + if consumed_size != file_size { + let error_message = + format!( + "invalid snapshot data file: {} has {} bytes, however consumed {} bytes to deserialize", + file_path.as_ref().display(), file_size, consumed_size + ); + return Err(get_io_error(&error_message)); + } + + Ok(()) +} + +/// Serialize a bank to a snapshot +pub fn add_bank_snapshot>( + snapshots_dir: P, bank: &Bank, snapshot_storages: &[SnapshotStorage], snapshot_version: SnapshotVersion, -) -> Result { +) -> Result { let slot = bank.slot(); - // snapshot_path/slot - let slot_snapshot_dir = get_bank_snapshot_dir(snapshot_path, slot); - fs::create_dir_all(slot_snapshot_dir.clone())?; + // snapshots_dir/slot + let bank_snapshots_dir = get_bank_snapshots_dir(snapshots_dir, slot); + fs::create_dir_all(&bank_snapshots_dir)?; - // the bank snapshot is stored as snapshot_path/slot/slot - let snapshot_bank_file_path = slot_snapshot_dir.join(get_snapshot_file_name(slot)); + // the bank snapshot is stored as snapshots_dir/slot/slot + let snapshot_bank_file_path = bank_snapshots_dir.join(get_snapshot_file_name(slot)); info!( "Creating snapshot for slot {}, path: {:?}", slot, snapshot_bank_file_path, @@ -595,9 +935,9 @@ pub fn add_snapshot>( bank_serialize, slot, snapshot_bank_file_path, ); - Ok(SlotSnapshotPaths { + Ok(BankSnapshotInfo { slot, - snapshot_file_path: snapshot_bank_file_path, + snapshot_path: snapshot_bank_file_path, }) } @@ -628,34 +968,40 @@ fn serialize_status_cache( } /// Remove the snapshot directory for this slot -pub fn remove_snapshot>(slot: Slot, snapshot_path: P) -> Result<()> { - let slot_snapshot_dir = get_bank_snapshot_dir(&snapshot_path, slot); - fs::remove_dir_all(slot_snapshot_dir)?; +pub fn remove_bank_snapshot

(slot: Slot, snapshots_dir: P) -> Result<()> +where + P: AsRef, +{ + let bank_snapshot_dir = get_bank_snapshots_dir(&snapshots_dir, slot); + fs::remove_dir_all(bank_snapshot_dir)?; Ok(()) } #[derive(Debug, Default)] pub struct BankFromArchiveTimings { pub rebuild_bank_from_snapshots_us: u64, - pub untar_us: u64, + pub full_snapshot_untar_us: u64, + pub incremental_snapshot_untar_us: u64, pub verify_snapshot_bank_us: u64, } // From testing, 4 seems to be a sweet spot for ranges of 60M-360M accounts and 16-64 cores. This may need to be tuned later. const PARALLEL_UNTAR_READERS_DEFAULT: usize = 4; -/// Rebuild a bank from a snapshot archive +/// Rebuild bank from snapshot archives. Handles either just a full snapshot, or both a full +/// snapshot and an incremental snapshot. #[allow(clippy::too_many_arguments)] -pub fn bank_from_snapshot_archive

( +pub fn bank_from_snapshot_archives

( account_paths: &[PathBuf], frozen_account_pubkeys: &[Pubkey], - snapshot_path: &Path, - snapshot_tar: P, + snapshots_dir: &Path, + full_snapshot_archive_path: P, + incremental_snapshot_archive_path: Option

, archive_format: ArchiveFormat, genesis_config: &GenesisConfig, debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, @@ -665,74 +1011,187 @@ pub fn bank_from_snapshot_archive

( where P: AsRef + std::marker::Sync, { - let unpack_dir = tempfile::Builder::new() - .prefix(TMP_SNAPSHOT_PREFIX) - .tempdir_in(snapshot_path)?; - - let mut untar = Measure::start("snapshot untar"); - let divisions = std::cmp::min( + let parallel_divisions = std::cmp::min( PARALLEL_UNTAR_READERS_DEFAULT, std::cmp::max(1, num_cpus::get() / 4), ); - let unpacked_append_vec_map = untar_snapshot_in( - &snapshot_tar, - unpack_dir.as_ref(), + + let unarchived_full_snapshot = unarchive_snapshot( + snapshots_dir, + TMP_FULL_SNAPSHOT_PREFIX, + &full_snapshot_archive_path, + "snapshot untar", account_paths, archive_format, - divisions, + parallel_divisions, )?; - untar.stop(); - info!("{}", untar); - let mut measure = Measure::start("bank rebuild from snapshot"); - let unpacked_snapshots_dir = unpack_dir.as_ref().join("snapshots"); - let unpacked_version_file = unpack_dir.as_ref().join("version"); + let mut unarchived_incremental_snapshot = + if let Some(incremental_snapshot_archive_path) = incremental_snapshot_archive_path { + check_are_snapshots_compatible( + &full_snapshot_archive_path, + &incremental_snapshot_archive_path, + )?; - let mut snapshot_version = String::new(); - File::open(unpacked_version_file).and_then(|mut f| f.read_to_string(&mut snapshot_version))?; + let unarchived_incremental_snapshot = unarchive_snapshot( + snapshots_dir, + TMP_INCREMENTAL_SNAPSHOT_PREFIX, + &incremental_snapshot_archive_path, + "incremental snapshot untar", + account_paths, + archive_format, + parallel_divisions, + )?; + Some(unarchived_incremental_snapshot) + } else { + None + }; + let mut unpacked_append_vec_map = unarchived_full_snapshot.unpacked_append_vec_map; + if let Some(ref mut unarchive_preparation_result) = unarchived_incremental_snapshot { + let incremental_snapshot_unpacked_append_vec_map = + std::mem::take(&mut unarchive_preparation_result.unpacked_append_vec_map); + unpacked_append_vec_map.extend(incremental_snapshot_unpacked_append_vec_map.into_iter()); + } + + let mut measure_rebuild = Measure::start("rebuild bank from snapshots"); let bank = rebuild_bank_from_snapshots( - snapshot_version.trim(), + &unarchived_full_snapshot.unpacked_snapshots_dir_and_version, + unarchived_incremental_snapshot + .as_ref() + .map(|unarchive_preparation_result| { + &unarchive_preparation_result.unpacked_snapshots_dir_and_version + }), frozen_account_pubkeys, - &unpacked_snapshots_dir, account_paths, unpacked_append_vec_map, genesis_config, debug_keys, additional_builtins, - account_indexes, + account_secondary_indexes, accounts_db_caching_enabled, limit_load_slot_count_from_snapshot, shrink_ratio, verify_index, )?; - measure.stop(); + measure_rebuild.stop(); + info!("{}", measure_rebuild); - let mut verify = Measure::start("verify"); + let mut measure_verify = Measure::start("verify"); if !bank.verify_snapshot_bank(test_hash_calculation) && limit_load_slot_count_from_snapshot.is_none() { panic!("Snapshot bank for slot {} failed to verify", bank.slot()); } - verify.stop(); - let timings = BankFromArchiveTimings { - rebuild_bank_from_snapshots_us: measure.as_us(), - untar_us: untar.as_us(), - verify_snapshot_bank_us: verify.as_us(), - }; + measure_verify.stop(); + let timings = BankFromArchiveTimings { + rebuild_bank_from_snapshots_us: measure_rebuild.as_us(), + full_snapshot_untar_us: unarchived_full_snapshot.measure_untar.as_us(), + incremental_snapshot_untar_us: unarchived_incremental_snapshot + .map_or(0, |unarchive_preparation_result| { + unarchive_preparation_result.measure_untar.as_us() + }), + verify_snapshot_bank_us: measure_verify.as_us(), + }; Ok((bank, timings)) } -/// Build the snapshot archive path from its components: the snapshot archive output directory, the +/// Perform the common tasks when unarchiving a snapshot. Handles creating the temporary +/// directories, untaring, reading the version file, and then returning those fields plus the +/// unpacked append vec map. +fn unarchive_snapshot( + snapshots_dir: P, + unpacked_snapshots_dir_prefix: &'static str, + snapshot_archive_path: Q, + measure_name: &'static str, + account_paths: &[PathBuf], + archive_format: ArchiveFormat, + parallel_divisions: usize, +) -> Result +where + P: AsRef, + Q: AsRef, +{ + let unpack_dir = tempfile::Builder::new() + .prefix(unpacked_snapshots_dir_prefix) + .tempdir_in(snapshots_dir)?; + let unpacked_snapshots_dir = unpack_dir.path().join("snapshots"); + + let mut measure_untar = Measure::start(measure_name); + let unpacked_append_vec_map = untar_snapshot_in( + snapshot_archive_path, + unpack_dir.path(), + account_paths, + archive_format, + parallel_divisions, + )?; + measure_untar.stop(); + info!("{}", measure_untar); + + let unpacked_version_file = unpack_dir.path().join("version"); + let snapshot_version = { + let mut snapshot_version = String::new(); + File::open(unpacked_version_file) + .and_then(|mut f| f.read_to_string(&mut snapshot_version))?; + snapshot_version.trim().to_string() + }; + + Ok(UnarchivedSnapshot { + unpack_dir, + unpacked_append_vec_map, + unpacked_snapshots_dir_and_version: UnpackedSnapshotsDirAndVersion { + unpacked_snapshots_dir, + snapshot_version, + }, + measure_untar, + }) +} + +/// Check if an incremental snapshot is compatible with a full snapshot. This function parses the +/// paths to see if the incremental snapshot's base slot is the same as the full snapshot's slot. +/// Return an error if they are incompatible (or if the paths cannot be parsed), otherwise return a +/// tuple of the full snapshot slot and the incremental snapshot slot. +fn check_are_snapshots_compatible

( + full_snapshot_archive_path: P, + incremental_snapshot_archive_path: P, +) -> Result<()> +where + P: AsRef, +{ + let full_snapshot_filename = path_to_file_name_str(full_snapshot_archive_path.as_ref())?; + let (full_snapshot_slot, _, _) = parse_full_snapshot_archive_filename(full_snapshot_filename)?; + + let incremental_snapshot_filename = + path_to_file_name_str(incremental_snapshot_archive_path.as_ref())?; + let (incremental_snapshot_base_slot, _, _, _) = + parse_incremental_snapshot_archive_filename(incremental_snapshot_filename)?; + + (full_snapshot_slot == incremental_snapshot_base_slot) + .then(|| ()) + .ok_or(SnapshotError::MismatchedBaseSlot( + full_snapshot_slot, + incremental_snapshot_base_slot, + )) +} + +/// Get the `&str` from a `&Path` +fn path_to_file_name_str(path: &Path) -> Result<&str> { + path.file_name() + .ok_or_else(|| SnapshotError::PathToFileNameError(path.to_path_buf()))? + .to_str() + .ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf())) +} + +/// Build the full snapshot archive path from its components: the snapshot archives directory, the /// snapshot slot, the accounts hash, and the archive format. -pub fn build_snapshot_archive_path( - snapshot_output_dir: PathBuf, +pub fn build_full_snapshot_archive_path( + snapshot_archives_dir: PathBuf, slot: Slot, hash: &Hash, archive_format: ArchiveFormat, ) -> PathBuf { - snapshot_output_dir.join(format!( + snapshot_archives_dir.join(format!( "snapshot-{}-{}.{}", slot, hash, @@ -740,6 +1199,25 @@ pub fn build_snapshot_archive_path( )) } +/// Build the incremental snapshot archive path from its components: the snapshot archives +/// directory, the snapshot base slot, the snapshot slot, the accounts hash, and the archive +/// format. +pub fn build_incremental_snapshot_archive_path( + snapshot_archives_dir: PathBuf, + base_slot: Slot, + slot: Slot, + hash: &Hash, + archive_format: ArchiveFormat, +) -> PathBuf { + snapshot_archives_dir.join(format!( + "incremental-snapshot-{}-{}-{}.{}", + base_slot, + slot, + hash, + get_archive_ext(archive_format), + )) +} + fn archive_format_from_str(archive_format: &str) -> Option { match archive_format { "tar.bz2" => Some(ArchiveFormat::TarBzip2), @@ -750,105 +1228,220 @@ fn archive_format_from_str(archive_format: &str) -> Option { } } -/// Parse a snapshot archive filename into its Slot, Hash, and Archive Format -fn parse_snapshot_archive_filename(archive_filename: &str) -> Option<(Slot, Hash, ArchiveFormat)> { - let regex = Regex::new(SNAPSHOT_ARCHIVE_FILENAME_REGEX); +/// Parse a full snapshot archive filename into its Slot, Hash, and Archive Format +fn parse_full_snapshot_archive_filename( + archive_filename: &str, +) -> Result<(Slot, Hash, ArchiveFormat)> { + lazy_static! { + static ref RE: Regex = Regex::new(FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap(); + } - regex.ok()?.captures(archive_filename).and_then(|captures| { - let slot = captures.get(1).map(|x| x.as_str().parse::())?.ok()?; - let hash = captures.get(2).map(|x| x.as_str().parse::())?.ok()?; - let archive_format = captures - .get(3) - .map(|x| archive_format_from_str(x.as_str()))??; + let do_parse = || { + RE.captures(archive_filename).and_then(|captures| { + let slot = captures + .name("slot") + .map(|x| x.as_str().parse::())? + .ok()?; + let hash = captures + .name("hash") + .map(|x| x.as_str().parse::())? + .ok()?; + let archive_format = captures + .name("ext") + .map(|x| archive_format_from_str(x.as_str()))??; - Some((slot, hash, archive_format)) + Some((slot, hash, archive_format)) + }) + }; + + do_parse().ok_or_else(|| { + SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string()) }) } -/// Get a list of the snapshot archives in a directory -pub fn get_snapshot_archives

(snapshot_output_dir: P) -> Vec +/// Parse an incremental snapshot archive filename into its base Slot, actual Slot, Hash, and Archive Format +fn parse_incremental_snapshot_archive_filename( + archive_filename: &str, +) -> Result<(Slot, Slot, Hash, ArchiveFormat)> { + lazy_static! { + static ref RE: Regex = Regex::new(INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX).unwrap(); + } + + let do_parse = || { + RE.captures(archive_filename).and_then(|captures| { + let base_slot = captures + .name("base") + .map(|x| x.as_str().parse::())? + .ok()?; + let slot = captures + .name("slot") + .map(|x| x.as_str().parse::())? + .ok()?; + let hash = captures + .name("hash") + .map(|x| x.as_str().parse::())? + .ok()?; + let archive_format = captures + .name("ext") + .map(|x| archive_format_from_str(x.as_str()))??; + + Some((base_slot, slot, hash, archive_format)) + }) + }; + + do_parse().ok_or_else(|| { + SnapshotError::ParseSnapshotArchiveFileNameError(archive_filename.to_string()) + }) +} + +/// Get a list of the full snapshot archives in a directory +pub fn get_full_snapshot_archives

(snapshot_archives_dir: P) -> Vec where P: AsRef, { - match fs::read_dir(&snapshot_output_dir) { + match fs::read_dir(&snapshot_archives_dir) { Err(err) => { - info!("Unable to read snapshot directory: {}", err); + info!( + "Unable to read snapshot archives directory: err: {}, path: {}", + err, + snapshot_archives_dir.as_ref().display() + ); vec![] } Ok(files) => files .filter_map(|entry| { - if let Ok(entry) = entry { - let path = entry.path(); - if path.is_file() { - if let Some((slot, hash, archive_format)) = parse_snapshot_archive_filename( - path.file_name().unwrap().to_str().unwrap(), - ) { - return Some(SnapshotArchiveInfo { - path, - slot, - hash, - archive_format, - }); - } - } - } - None + entry.map_or(None, |entry| { + FullSnapshotArchiveInfo::new_from_path(entry.path()).ok() + }) }) .collect(), } } -/// Get a sorted list of the snapshot archives in a directory -fn get_sorted_snapshot_archives

(snapshot_output_dir: P) -> Vec +/// Get a list of the incremental snapshot archives in a directory +fn get_incremental_snapshot_archives

( + snapshot_archives_dir: P, +) -> Vec where P: AsRef, { - let mut snapshot_archives = get_snapshot_archives(snapshot_output_dir); - sort_snapshot_archives(&mut snapshot_archives); - snapshot_archives + match fs::read_dir(&snapshot_archives_dir) { + Err(err) => { + info!( + "Unable to read snapshot archives directory: err: {}, path: {}", + err, + snapshot_archives_dir.as_ref().display() + ); + vec![] + } + Ok(files) => files + .filter_map(|entry| { + entry.map_or(None, |entry| { + IncrementalSnapshotArchiveInfo::new_from_path(entry.path()).ok() + }) + }) + .collect(), + } } -/// Sort the list of snapshot archives by slot, in descending order -fn sort_snapshot_archives(snapshot_archives: &mut Vec) { - snapshot_archives.sort_unstable_by(|a, b| b.slot.cmp(&a.slot)); -} - -/// Get the highest slot of the snapshots in a directory -pub fn get_highest_snapshot_archive_slot

(snapshot_output_dir: P) -> Option +/// Get the highest slot of the full snapshot archives in a directory +pub fn get_highest_full_snapshot_archive_slot

(snapshot_archives_dir: P) -> Option where P: AsRef, { - get_highest_snapshot_archive_info(snapshot_output_dir) - .map(|snapshot_archive_info| snapshot_archive_info.slot) + get_highest_full_snapshot_archive_info(snapshot_archives_dir) + .map(|full_snapshot_archive_info| *full_snapshot_archive_info.slot()) } -/// Get the path (and metadata) for the snapshot archive with the highest slot in a directory -pub fn get_highest_snapshot_archive_info

(snapshot_output_dir: P) -> Option +/// Get the highest slot of the incremental snapshot archives in a directory, for a given full +/// snapshot slot +pub fn get_highest_incremental_snapshot_archive_slot>( + snapshot_archives_dir: P, + full_snapshot_slot: Slot, +) -> Option { + get_highest_incremental_snapshot_archive_info(snapshot_archives_dir, full_snapshot_slot) + .map(|incremental_snapshot_archive_info| *incremental_snapshot_archive_info.slot()) +} + +/// Get the path (and metadata) for the full snapshot archive with the highest slot in a directory +pub fn get_highest_full_snapshot_archive_info

( + snapshot_archives_dir: P, +) -> Option where P: AsRef, { - get_sorted_snapshot_archives(snapshot_output_dir) - .into_iter() - .next() + let mut full_snapshot_archives = get_full_snapshot_archives(snapshot_archives_dir); + full_snapshot_archives.sort_unstable(); + full_snapshot_archives.into_iter().rev().next() } -pub fn purge_old_snapshot_archives>( - snapshot_output_dir: P, - maximum_snapshots_to_retain: usize, -) { +/// Get the path for the incremental snapshot archive with the highest slot, for a given full +/// snapshot slot, in a directory +pub fn get_highest_incremental_snapshot_archive_info

( + snapshot_archives_dir: P, + full_snapshot_slot: Slot, +) -> Option +where + P: AsRef, +{ + // Since we want to filter down to only the incremental snapshot archives that have the same + // full snapshot slot as the value passed in, perform the filtering before sorting to avoid + // doing unnecessary work. + let mut incremental_snapshot_archives = + get_incremental_snapshot_archives(snapshot_archives_dir) + .into_iter() + .filter(|incremental_snapshot_archive_info| { + *incremental_snapshot_archive_info.base_slot() == full_snapshot_slot + }) + .collect::>(); + incremental_snapshot_archives.sort_unstable(); + incremental_snapshot_archives.into_iter().rev().next() +} + +pub fn purge_old_snapshot_archives

(snapshot_archives_dir: P, maximum_snapshots_to_retain: usize) +where + P: AsRef, +{ info!( - "Purging old snapshots in {:?}, retaining {}", - snapshot_output_dir.as_ref(), + "Purging old snapshot archives in {}, retaining {} full snapshots", + snapshot_archives_dir.as_ref().display(), maximum_snapshots_to_retain ); - let mut archives = get_sorted_snapshot_archives(snapshot_output_dir); + let mut snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir); + snapshot_archives.sort_unstable(); // Keep the oldest snapshot so we can always play the ledger from it. - archives.pop(); + snapshot_archives.pop(); let max_snaps = max(1, maximum_snapshots_to_retain); - for old_archive in archives.into_iter().skip(max_snaps) { - fs::remove_file(old_archive.path) - .unwrap_or_else(|err| info!("Failed to remove old snapshot: {:}", err)); + for old_archive in snapshot_archives.into_iter().skip(max_snaps) { + trace!( + "Purging old full snapshot archive: {}", + old_archive.path().display() + ); + fs::remove_file(old_archive.path()) + .unwrap_or_else(|err| info!("Failed to remove old full snapshot archive: {}", err)); } + + // Only keep incremental snapshots for the latest full snapshot + // bprumo TODO issue #18639: As an option to further reduce the number of incremental + // snapshots, only a subset of the incremental snapshots for the lastest full snapshot could be + // kept. This could reuse maximum_snapshots_to_retain, or use a new field just for incremental + // snapshots. + // In case there are incremental snapshots but no full snapshots, make sure all the incremental + // snapshots are purged. + let last_full_snapshot_slot = + get_highest_full_snapshot_archive_slot(&snapshot_archives_dir).unwrap_or(Slot::MAX); + get_incremental_snapshot_archives(&snapshot_archives_dir) + .iter() + .filter(|archive_info| *archive_info.base_slot() < last_full_snapshot_slot) + .for_each(|old_archive| { + trace!( + "Purging old incremental snapshot archive: {}", + old_archive.path().display() + ); + fs::remove_file(old_archive.path()).unwrap_or_else(|err| { + info!("Failed to remove old incremental snapshot archive: {}", err) + }) + }); } fn unpack_snapshot_local T>( @@ -925,72 +1518,119 @@ fn untar_snapshot_in>( Ok(account_paths_map) } -fn verify_snapshot_version_and_folder( - snapshot_version: &str, - unpacked_snapshots_dir: &Path, -) -> Result<(SnapshotVersion, SlotSnapshotPaths)> { - info!("snapshot version: {}", snapshot_version); +fn verify_unpacked_snapshots_dir_and_version( + unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion, +) -> Result<(SnapshotVersion, BankSnapshotInfo)> { + info!( + "snapshot version: {}", + &unpacked_snapshots_dir_and_version.snapshot_version + ); - let snapshot_version_enum = - SnapshotVersion::maybe_from_string(snapshot_version).ok_or_else(|| { - get_io_error(&format!( - "unsupported snapshot version: {}", - snapshot_version - )) - })?; - let mut snapshot_paths = get_snapshot_paths(&unpacked_snapshots_dir); - if snapshot_paths.len() > 1 { + let snapshot_version = + SnapshotVersion::maybe_from_string(&unpacked_snapshots_dir_and_version.snapshot_version) + .ok_or_else(|| { + get_io_error(&format!( + "unsupported snapshot version: {}", + &unpacked_snapshots_dir_and_version.snapshot_version, + )) + })?; + let mut bank_snapshot_infos = + get_bank_snapshots(&unpacked_snapshots_dir_and_version.unpacked_snapshots_dir); + if bank_snapshot_infos.len() > 1 { return Err(get_io_error("invalid snapshot format")); } - let root_paths = snapshot_paths + bank_snapshot_infos.sort_unstable(); + let root_paths = bank_snapshot_infos .pop() .ok_or_else(|| get_io_error("No snapshots found in snapshots directory"))?; - Ok((snapshot_version_enum, root_paths)) + Ok((snapshot_version, root_paths)) } #[allow(clippy::too_many_arguments)] fn rebuild_bank_from_snapshots( - snapshot_version: &str, + full_snapshot_unpacked_snapshots_dir_and_version: &UnpackedSnapshotsDirAndVersion, + incremental_snapshot_unpacked_snapshots_dir_and_version: Option< + &UnpackedSnapshotsDirAndVersion, + >, frozen_account_pubkeys: &[Pubkey], - unpacked_snapshots_dir: &Path, account_paths: &[PathBuf], unpacked_append_vec_map: UnpackedAppendVecMap, genesis_config: &GenesisConfig, debug_keys: Option>>, additional_builtins: Option<&Builtins>, - account_indexes: AccountSecondaryIndexes, + account_secondary_indexes: AccountSecondaryIndexes, accounts_db_caching_enabled: bool, limit_load_slot_count_from_snapshot: Option, shrink_ratio: AccountShrinkThreshold, verify_index: bool, ) -> Result { - let (snapshot_version_enum, root_paths) = - verify_snapshot_version_and_folder(snapshot_version, unpacked_snapshots_dir)?; + let (full_snapshot_version, full_snapshot_root_paths) = + verify_unpacked_snapshots_dir_and_version( + full_snapshot_unpacked_snapshots_dir_and_version, + )?; + let (incremental_snapshot_version, incremental_snapshot_root_paths) = + if let Some(snapshot_unpacked_snapshots_dir_and_version) = + incremental_snapshot_unpacked_snapshots_dir_and_version + { + let (snapshot_version, bank_snapshot_info) = verify_unpacked_snapshots_dir_and_version( + snapshot_unpacked_snapshots_dir_and_version, + )?; + (Some(snapshot_version), Some(bank_snapshot_info)) + } else { + (None, None) + }; info!( - "Loading bank from {}", - &root_paths.snapshot_file_path.display() + "Loading bank from full snapshot {} and incremental snapshot {:?}", + full_snapshot_root_paths.snapshot_path.display(), + incremental_snapshot_root_paths + .as_ref() + .map(|paths| paths.snapshot_path.display()), ); - let bank = deserialize_snapshot_data_file(&root_paths.snapshot_file_path, |mut stream| { - Ok(match snapshot_version_enum { - SnapshotVersion::V1_2_0 => bank_from_stream( - SerdeStyle::Newer, - &mut stream, - account_paths, - unpacked_append_vec_map, - genesis_config, - frozen_account_pubkeys, - debug_keys, - additional_builtins, - account_indexes, - accounts_db_caching_enabled, - limit_load_slot_count_from_snapshot, - shrink_ratio, - verify_index, - ), - }?) + + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: full_snapshot_root_paths.snapshot_path, + incremental_snapshot_root_file_path: incremental_snapshot_root_paths + .map(|root_paths| root_paths.snapshot_path), + }; + + let bank = deserialize_snapshot_data_files(&snapshot_root_paths, |mut snapshot_streams| { + Ok( + match incremental_snapshot_version.unwrap_or(full_snapshot_version) { + SnapshotVersion::V1_2_0 => bank_from_streams( + SerdeStyle::Newer, + &mut snapshot_streams, + account_paths, + unpacked_append_vec_map, + genesis_config, + frozen_account_pubkeys, + debug_keys, + additional_builtins, + account_secondary_indexes, + accounts_db_caching_enabled, + limit_load_slot_count_from_snapshot, + shrink_ratio, + verify_index, + ), + }?, + ) })?; - let status_cache_path = unpacked_snapshots_dir.join(SNAPSHOT_STATUS_CACHE_FILE_NAME); + // The status cache is rebuilt from the latest snapshot. So, if there's an incremental + // snapshot, use that. Otherwise use the full snapshot. + let status_cache_path = incremental_snapshot_unpacked_snapshots_dir_and_version + .map_or_else( + || { + full_snapshot_unpacked_snapshots_dir_and_version + .unpacked_snapshots_dir + .as_path() + }, + |unpacked_snapshots_dir_and_version| { + unpacked_snapshots_dir_and_version + .unpacked_snapshots_dir + .as_path() + }, + ) + .join(SNAPSHOT_STATUS_CACHE_FILE_NAME); let slot_deltas = deserialize_snapshot_data_file(&status_cache_path, |stream| { info!( "Rebuilding status cache from {}", @@ -1014,7 +1654,7 @@ fn get_snapshot_file_name(slot: Slot) -> String { slot.to_string() } -fn get_bank_snapshot_dir>(path: P, slot: Slot) -> PathBuf { +fn get_bank_snapshots_dir>(path: P, slot: Slot) -> PathBuf { path.as_ref().join(slot.to_string()) } @@ -1053,16 +1693,26 @@ pub fn verify_snapshot_archive( assert!(!dir_diff::is_different(&storages_to_verify, unpacked_accounts).unwrap()); } -pub fn purge_old_snapshots(snapshot_path: &Path) { - // Remove outdated snapshots - let slot_snapshot_paths = get_snapshot_paths(snapshot_path); - let num_to_remove = slot_snapshot_paths.len().saturating_sub(MAX_SNAPSHOTS); - for slot_files in &slot_snapshot_paths[..num_to_remove] { - let r = remove_snapshot(slot_files.slot, snapshot_path); - if r.is_err() { - warn!("Couldn't remove snapshot at: {:?}", snapshot_path); - } - } +/// Remove outdated bank snapshots +pub fn purge_old_bank_snapshots

(snapshots_dir: P) +where + P: AsRef, +{ + let mut bank_snapshot_infos = get_bank_snapshots(&snapshots_dir); + bank_snapshot_infos.sort_unstable(); + bank_snapshot_infos + .into_iter() + .rev() + .skip(MAX_BANK_SNAPSHOTS) + .for_each(|bank_snapshot_info| { + let r = remove_bank_snapshot(bank_snapshot_info.slot, &snapshots_dir); + if r.is_err() { + warn!( + "Couldn't remove snapshot at: {}", + bank_snapshot_info.snapshot_path.display() + ); + } + }) } /// Gather the necessary elements for a snapshot of the given `root_bank` @@ -1070,7 +1720,7 @@ pub fn snapshot_bank( root_bank: &Bank, status_cache_slot_deltas: Vec, accounts_package_sender: &AccountsPackageSender, - snapshot_path: &Path, + snapshots_dir: &Path, snapshot_package_output_path: &Path, snapshot_version: SnapshotVersion, archive_format: &ArchiveFormat, @@ -1078,20 +1728,18 @@ pub fn snapshot_bank( ) -> Result<()> { let storages: Vec<_> = root_bank.get_snapshot_storages(); let mut add_snapshot_time = Measure::start("add-snapshot-ms"); - add_snapshot(snapshot_path, root_bank, &storages, snapshot_version)?; + add_bank_snapshot(snapshots_dir, root_bank, &storages, snapshot_version)?; add_snapshot_time.stop(); inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); // Package the relevant snapshots - let slot_snapshot_paths = get_snapshot_paths(snapshot_path); - let latest_slot_snapshot_paths = slot_snapshot_paths - .last() - .expect("no snapshots found in config snapshot_path"); + let highest_bank_snapshot_info = get_highest_bank_snapshot_info(snapshots_dir) + .expect("no snapshots found in config snapshots_dir"); - let package = package_snapshot( + let package = package_full_snapshot( root_bank, - latest_slot_snapshot_paths, - snapshot_path, + &highest_bank_snapshot_info, + snapshots_dir, status_cache_slot_deltas, snapshot_package_output_path, storages, @@ -1105,13 +1753,13 @@ pub fn snapshot_bank( Ok(()) } -/// Convenience function to create a snapshot archive out of any Bank, regardless of state. The -/// Bank will be frozen during the process. +/// Convenience function to create a full snapshot archive out of any Bank, regardless of state. +/// The Bank will be frozen during the process. /// /// Requires: /// - `bank` is complete -pub fn bank_to_snapshot_archive, Q: AsRef>( - snapshot_path: P, +pub fn bank_to_full_snapshot_archive, Q: AsRef>( + snapshots_dir: P, bank: &Bank, snapshot_version: Option, snapshot_package_output_path: Q, @@ -1128,13 +1776,13 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( bank.update_accounts_hash(); bank.rehash(); // Bank accounts may have been manually modified by the caller - let temp_dir = tempfile::tempdir_in(snapshot_path)?; + let temp_dir = tempfile::tempdir_in(snapshots_dir)?; let storages = bank.get_snapshot_storages(); - let slot_snapshot_paths = add_snapshot(&temp_dir, bank, &storages, snapshot_version)?; - let package = package_snapshot( + let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; + let package = package_full_snapshot( bank, - &slot_snapshot_paths, + &bank_snapshot_info, &temp_dir, bank.src.slot_deltas(&bank.src.roots()), snapshot_package_output_path, @@ -1144,7 +1792,56 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( None, )?; - let package = process_accounts_package_pre(package, thread_pool); + let package = process_accounts_package_pre(package, thread_pool, None); + + archive_snapshot_package(&package, maximum_snapshots_to_retain)?; + Ok(package.tar_output_file) +} + +/// Convenience function to create an incremental snapshot archive out of any Bank, regardless of +/// state. The Bank will be frozen during the process. +/// +/// Requires: +/// - `bank` is complete +/// - `bank`'s slot is greater than `full_snapshot_slot` +pub fn bank_to_incremental_snapshot_archive, Q: AsRef>( + snapshots_dir: P, + bank: &Bank, + full_snapshot_slot: Slot, + snapshot_version: Option, + snapshot_package_output_path: Q, + archive_format: ArchiveFormat, + thread_pool: Option<&ThreadPool>, + maximum_snapshots_to_retain: usize, +) -> Result { + let snapshot_version = snapshot_version.unwrap_or_default(); + + assert!(bank.is_complete()); + assert!(bank.slot() > full_snapshot_slot); + bank.squash(); // Bank may not be a root + bank.force_flush_accounts_cache(); + bank.clean_accounts(true, false); + bank.update_accounts_hash(); + bank.rehash(); // Bank accounts may have been manually modified by the caller + + let temp_dir = tempfile::tempdir_in(snapshots_dir)?; + + let storages = bank.get_incremental_snapshot_storages(full_snapshot_slot); + let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; + let package = package_incremental_snapshot( + bank, + full_snapshot_slot, + &bank_snapshot_info, + &temp_dir, + bank.src.slot_deltas(&bank.src.roots()), + snapshot_package_output_path, + storages, + archive_format, + snapshot_version, + None, + )?; + + let package = process_accounts_package_pre(package, thread_pool, Some(full_snapshot_slot)); archive_snapshot_package(&package, maximum_snapshots_to_retain)?; Ok(package.tar_output_file) @@ -1153,6 +1850,7 @@ pub fn bank_to_snapshot_archive, Q: AsRef>( pub fn process_accounts_package_pre( accounts_package: AccountsPackagePre, thread_pool: Option<&ThreadPool>, + incremental_snapshot_base_slot: Option, ) -> AccountsPackage { let mut time = Measure::start("hash"); @@ -1179,12 +1877,21 @@ pub fn process_accounts_package_pre( ("calculate_hash", time.as_us(), i64), ); - let tar_output_file = build_snapshot_archive_path( - accounts_package.snapshot_output_dir, - accounts_package.slot, - &hash, - accounts_package.archive_format, - ); + let tar_output_file = match incremental_snapshot_base_slot { + None => build_full_snapshot_archive_path( + accounts_package.snapshot_output_dir, + accounts_package.slot, + &hash, + accounts_package.archive_format, + ), + Some(incremental_snapshot_base_slot) => build_incremental_snapshot_archive_path( + accounts_package.snapshot_output_dir, + incremental_snapshot_base_slot, + accounts_package.slot, + &hash, + accounts_package.archive_format, + ), + }; AccountsPackage::new( accounts_package.slot, @@ -1257,10 +1964,19 @@ mod tests { ) .unwrap(); - let actual_data = deserialize_snapshot_data_file_capped( - &temp_dir.path().join("data-file"), + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: temp_dir.path().join("data-file"), + incremental_snapshot_root_file_path: None, + }; + + let actual_data = deserialize_snapshot_data_files_capped( + &snapshot_root_paths, expected_consumed_size, - |stream| Ok(deserialize_from::<_, u32>(stream)?), + |stream| { + Ok(deserialize_from::<_, u32>( + &mut stream.full_snapshot_stream, + )?) + }, ) .unwrap(); assert_eq!(actual_data, expected_data); @@ -1282,10 +1998,19 @@ mod tests { ) .unwrap(); - let result = deserialize_snapshot_data_file_capped( - &temp_dir.path().join("data-file"), + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: temp_dir.path().join("data-file"), + incremental_snapshot_root_file_path: None, + }; + + let result = deserialize_snapshot_data_files_capped( + &snapshot_root_paths, expected_consumed_size - 1, - |stream| Ok(deserialize_from::<_, u32>(stream)?), + |stream| { + Ok(deserialize_from::<_, u32>( + &mut stream.full_snapshot_stream, + )?) + }, ); assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("too large snapshot data file to deserialize")); } @@ -1307,125 +2032,363 @@ mod tests { ) .unwrap(); - let result = deserialize_snapshot_data_file_capped( - &temp_dir.path().join("data-file"), + let snapshot_root_paths = SnapshotRootPaths { + full_snapshot_root_file_path: temp_dir.path().join("data-file"), + incremental_snapshot_root_file_path: None, + }; + + let result = deserialize_snapshot_data_files_capped( + &snapshot_root_paths, expected_consumed_size * 2, - |stream| Ok(deserialize_from::<_, u32>(stream)?), + |stream| { + Ok(deserialize_from::<_, u32>( + &mut stream.full_snapshot_stream, + )?) + }, ); assert_matches!(result, Err(SnapshotError::Io(ref message)) if message.to_string().starts_with("invalid snapshot data file")); } #[test] - fn test_parse_snapshot_archive_filename() { + fn test_parse_full_snapshot_archive_filename() { assert_eq!( - parse_snapshot_archive_filename(&format!("snapshot-42-{}.tar.bz2", Hash::default())), - Some((42, Hash::default(), ArchiveFormat::TarBzip2)) + parse_full_snapshot_archive_filename(&format!( + "snapshot-42-{}.tar.bz2", + Hash::default() + )) + .unwrap(), + (42, Hash::default(), ArchiveFormat::TarBzip2) ); assert_eq!( - parse_snapshot_archive_filename(&format!("snapshot-43-{}.tar.zst", Hash::default())), - Some((43, Hash::default(), ArchiveFormat::TarZstd)) + parse_full_snapshot_archive_filename(&format!( + "snapshot-43-{}.tar.zst", + Hash::default() + )) + .unwrap(), + (43, Hash::default(), ArchiveFormat::TarZstd) ); assert_eq!( - parse_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default())), - Some((44, Hash::default(), ArchiveFormat::Tar)) + parse_full_snapshot_archive_filename(&format!("snapshot-44-{}.tar", Hash::default())) + .unwrap(), + (44, Hash::default(), ArchiveFormat::Tar) ); - assert!(parse_snapshot_archive_filename("invalid").is_none()); - assert!(parse_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_none()); + assert!(parse_full_snapshot_archive_filename("invalid").is_err()); + assert!( + parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.bad!ext").is_err() + ); - assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_none()); - assert!(parse_snapshot_archive_filename(&format!( + assert!( + parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.bad!ext").is_err() + ); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-12345678-{}.bad!ext", Hash::new_unique() )) - .is_none()); - assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_none()); + .is_err()); + assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err()); - assert!(parse_snapshot_archive_filename(&format!( + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-bad!slot-{}.bad!ext", Hash::new_unique() )) - .is_none()); - assert!(parse_snapshot_archive_filename(&format!( + .is_err()); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-12345678-{}.bad!ext", Hash::new_unique() )) - .is_none()); - assert!(parse_snapshot_archive_filename(&format!( + .is_err()); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-bad!slot-{}.tar", Hash::new_unique() )) - .is_none()); + .is_err()); - assert!(parse_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_none()); - assert!(parse_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_none()); - assert!(parse_snapshot_archive_filename(&format!( + assert!(parse_full_snapshot_archive_filename("snapshot-bad!slot-bad!hash.tar").is_err()); + assert!(parse_full_snapshot_archive_filename("snapshot-12345678-bad!hash.tar").is_err()); + assert!(parse_full_snapshot_archive_filename(&format!( "snapshot-bad!slot-{}.tar", Hash::new_unique() )) - .is_none()); + .is_err()); } - /// A test helper function that creates snapshot archive files. Creates snapshot files in the - /// range (`min_snapshot_slot`, `max_snapshot_slot`]. Additionally, "bad" files are created - /// for snapshots to ensure the tests properly filter them out. + #[test] + fn test_parse_incremental_snapshot_archive_filename() { + solana_logger::setup(); + assert_eq!( + parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-42-123-{}.tar.bz2", + Hash::default() + )) + .unwrap(), + (42, 123, Hash::default(), ArchiveFormat::TarBzip2) + ); + assert_eq!( + parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-43-234-{}.tar.zst", + Hash::default() + )) + .unwrap(), + (43, 234, Hash::default(), ArchiveFormat::TarZstd) + ); + assert_eq!( + parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-44-345-{}.tar", + Hash::default() + )) + .unwrap(), + (44, 345, Hash::default(), ArchiveFormat::Tar) + ); + + assert!(parse_incremental_snapshot_archive_filename("invalid").is_err()); + assert!(parse_incremental_snapshot_archive_filename(&format!( + "snapshot-42-{}.tar", + Hash::new_unique() + )) + .is_err()); + assert!(parse_incremental_snapshot_archive_filename( + "incremental-snapshot-bad!slot-bad!slot-bad!hash.bad!ext" + ) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-bad!slot-56785678-{}.tar", + Hash::new_unique() + )) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-12345678-bad!slot-{}.tar", + Hash::new_unique() + )) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename( + "incremental-snapshot-12341234-56785678-bad!HASH.tar" + ) + .is_err()); + + assert!(parse_incremental_snapshot_archive_filename(&format!( + "incremental-snapshot-12341234-56785678-{}.bad!ext", + Hash::new_unique() + )) + .is_err()); + } + + #[test] + fn test_check_are_snapshots_compatible() { + solana_logger::setup(); + let slot1: Slot = 1234; + let slot2: Slot = 5678; + let slot3: Slot = 999_999; + + assert!(check_are_snapshots_compatible( + &format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()), + &format!( + "/dir/incremental-snapshot-{}-{}-{}.tar", + slot1, + slot2, + Hash::new_unique() + ), + ) + .is_ok()); + + assert!(check_are_snapshots_compatible( + &format!("/dir/snapshot-{}-{}.tar", slot1, Hash::new_unique()), + &format!( + "/dir/incremental-snapshot-{}-{}-{}.tar", + slot2, + slot3, + Hash::new_unique() + ), + ) + .is_err()); + } + + /// A test heler function that creates bank snapshot files + fn common_create_bank_snapshot_files(snapshots_dir: &Path, min_slot: Slot, max_slot: Slot) { + for slot in min_slot..max_slot { + let snapshot_dir = get_bank_snapshots_dir(snapshots_dir, slot); + fs::create_dir_all(&snapshot_dir).unwrap(); + + let snapshot_filename = get_snapshot_file_name(slot); + let snapshot_path = snapshot_dir.join(snapshot_filename); + File::create(snapshot_path).unwrap(); + } + } + + #[test] + fn test_get_bank_snapshot_infos() { + solana_logger::setup(); + let temp_snapshots_dir = tempfile::TempDir::new().unwrap(); + let min_slot = 10; + let max_slot = 20; + common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot); + + let bank_snapshot_infos = get_bank_snapshots(temp_snapshots_dir.path()); + assert_eq!(bank_snapshot_infos.len() as Slot, max_slot - min_slot); + } + + #[test] + fn test_get_highest_bank_snapshot_info() { + solana_logger::setup(); + let temp_snapshots_dir = tempfile::TempDir::new().unwrap(); + let min_slot = 99; + let max_slot = 123; + common_create_bank_snapshot_files(temp_snapshots_dir.path(), min_slot, max_slot); + + let highest_bank_snapshot_info = get_highest_bank_snapshot_info(temp_snapshots_dir.path()); + assert!(highest_bank_snapshot_info.is_some()); + assert_eq!(highest_bank_snapshot_info.unwrap().slot, max_slot - 1); + } + + /// A test helper function that creates full and incremental snapshot archive files. Creates + /// full snapshot files in the range (`min_full_snapshot_slot`, `max_full_snapshot_slot`], and + /// incremental snapshot files in the range (`min_incremental_snapshot_slot`, + /// `max_incremental_snapshot_slot`]. Additionally, "bad" files are created for both full and + /// incremental snapshots to ensure the tests properly filter them out. fn common_create_snapshot_archive_files( - snapshot_dir: &Path, - min_snapshot_slot: Slot, - max_snapshot_slot: Slot, + snapshot_archives_dir: &Path, + min_full_snapshot_slot: Slot, + max_full_snapshot_slot: Slot, + min_incremental_snapshot_slot: Slot, + max_incremental_snapshot_slot: Slot, ) { - for snapshot_slot in min_snapshot_slot..max_snapshot_slot { - let snapshot_filename = format!("snapshot-{}-{}.tar", snapshot_slot, Hash::default()); - let snapshot_filepath = snapshot_dir.join(snapshot_filename); + for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot { + for incremental_snapshot_slot in + min_incremental_snapshot_slot..max_incremental_snapshot_slot + { + let snapshot_filename = format!( + "incremental-snapshot-{}-{}-{}.tar", + full_snapshot_slot, + incremental_snapshot_slot, + Hash::default() + ); + let snapshot_filepath = snapshot_archives_dir.join(snapshot_filename); + File::create(snapshot_filepath).unwrap(); + } + + let snapshot_filename = + format!("snapshot-{}-{}.tar", full_snapshot_slot, Hash::default()); + let snapshot_filepath = snapshot_archives_dir.join(snapshot_filename); File::create(snapshot_filepath).unwrap(); + + // Add in an incremental snapshot with a bad filename and high slot to ensure filename are filtered and sorted correctly + let bad_filename = format!( + "incremental-snapshot-{}-{}-bad!hash.tar", + full_snapshot_slot, + max_incremental_snapshot_slot + 1, + ); + let bad_filepath = snapshot_archives_dir.join(bad_filename); + File::create(bad_filepath).unwrap(); } // Add in a snapshot with a bad filename and high slot to ensure filename are filtered and // sorted correctly - let bad_filename = format!("snapshot-{}-bad!hash.tar", max_snapshot_slot + 1); - let bad_filepath = snapshot_dir.join(bad_filename); + let bad_filename = format!("snapshot-{}-bad!hash.tar", max_full_snapshot_slot + 1); + let bad_filepath = snapshot_archives_dir.join(bad_filename); File::create(bad_filepath).unwrap(); } #[test] - fn test_get_snapshot_archives() { + fn test_get_full_snapshot_archives() { solana_logger::setup(); let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let min_slot = 123; let max_slot = 456; - common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot); + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_slot, + max_slot, + 0, + 0, + ); - let snapshot_archives = get_snapshot_archives(temp_snapshot_archives_dir); + let snapshot_archives = get_full_snapshot_archives(temp_snapshot_archives_dir); assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot); } #[test] - fn test_get_sorted_snapshot_archives() { + fn test_get_incremental_snapshot_archives() { solana_logger::setup(); let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); - let min_slot = 12; - let max_slot = 45; - common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot); + let min_full_snapshot_slot = 12; + let max_full_snapshot_slot = 23; + let min_incremental_snapshot_slot = 34; + let max_incremental_snapshot_slot = 45; + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_full_snapshot_slot, + max_full_snapshot_slot, + min_incremental_snapshot_slot, + max_incremental_snapshot_slot, + ); - let sorted_snapshot_archives = get_sorted_snapshot_archives(temp_snapshot_archives_dir); - assert_eq!(sorted_snapshot_archives.len() as Slot, max_slot - min_slot); - assert_eq!(sorted_snapshot_archives[0].slot, max_slot - 1); + let incremental_snapshot_archives = + get_incremental_snapshot_archives(temp_snapshot_archives_dir); + assert_eq!( + incremental_snapshot_archives.len() as Slot, + (max_full_snapshot_slot - min_full_snapshot_slot) + * (max_incremental_snapshot_slot - min_incremental_snapshot_slot) + ); } #[test] - fn test_get_highest_snapshot_archive_slot() { + fn test_get_highest_full_snapshot_archive_slot() { solana_logger::setup(); let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let min_slot = 123; let max_slot = 456; - common_create_snapshot_archive_files(temp_snapshot_archives_dir.path(), min_slot, max_slot); + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_slot, + max_slot, + 0, + 0, + ); assert_eq!( - get_highest_snapshot_archive_slot(temp_snapshot_archives_dir.path()), + get_highest_full_snapshot_archive_slot(temp_snapshot_archives_dir.path()), Some(max_slot - 1) ); } + #[test] + fn test_get_highest_incremental_snapshot_slot() { + solana_logger::setup(); + let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + let min_full_snapshot_slot = 12; + let max_full_snapshot_slot = 23; + let min_incremental_snapshot_slot = 34; + let max_incremental_snapshot_slot = 45; + common_create_snapshot_archive_files( + temp_snapshot_archives_dir.path(), + min_full_snapshot_slot, + max_full_snapshot_slot, + min_incremental_snapshot_slot, + max_incremental_snapshot_slot, + ); + + for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot { + assert_eq!( + get_highest_incremental_snapshot_archive_slot( + temp_snapshot_archives_dir.path(), + full_snapshot_slot + ), + Some(max_incremental_snapshot_slot - 1) + ); + } + + assert_eq!( + get_highest_incremental_snapshot_archive_slot( + temp_snapshot_archives_dir.path(), + max_full_snapshot_slot + ), + None + ); + } + fn common_test_purge_old_snapshot_archives( snapshot_names: &[&String], maximum_snapshots_to_retain: usize, @@ -1459,7 +2422,7 @@ mod tests { } #[test] - fn test_purge_old_snapshot_archives() { + fn test_purge_old_full_snapshot_archives() { // Create 3 snapshots, retaining 1, // expecting the oldest 1 and the newest 1 are retained let snap1_name = format!("snapshot-1-{}.tar.zst", Hash::default()); @@ -1477,10 +2440,65 @@ mod tests { common_test_purge_old_snapshot_archives(&snapshot_names, 2, &expected_snapshots); } - /// Test roundtrip of bank to snapshot, then back again. This test creates the simplest bank - /// possible, so the contents of the snapshot archive will be quite minimal. #[test] - fn test_roundtrip_bank_to_snapshot_to_bank_simple() { + fn test_purge_old_incremental_snapshot_archives() { + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + + for snapshot_filename in [ + format!("snapshot-100-{}.tar", Hash::default()), + format!("snapshot-200-{}.tar", Hash::default()), + format!("incremental-snapshot-100-120-{}.tar", Hash::default()), + format!("incremental-snapshot-100-140-{}.tar", Hash::default()), + format!("incremental-snapshot-100-160-{}.tar", Hash::default()), + format!("incremental-snapshot-100-180-{}.tar", Hash::default()), + format!("incremental-snapshot-200-220-{}.tar", Hash::default()), + format!("incremental-snapshot-200-240-{}.tar", Hash::default()), + format!("incremental-snapshot-200-260-{}.tar", Hash::default()), + format!("incremental-snapshot-200-280-{}.tar", Hash::default()), + ] { + let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename); + File::create(snapshot_path).unwrap(); + } + + purge_old_snapshot_archives(snapshot_archives_dir.path(), std::usize::MAX); + + let remaining_incremental_snapshot_archives = + get_incremental_snapshot_archives(snapshot_archives_dir.path()); + assert_eq!(remaining_incremental_snapshot_archives.len(), 4); + for archive in &remaining_incremental_snapshot_archives { + assert_eq!(*archive.base_slot(), 200); + } + } + + #[test] + fn test_purge_all_incremental_snapshot_archives_when_no_full_snapshot_archives() { + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + + for snapshot_filename in [ + format!("incremental-snapshot-100-120-{}.tar", Hash::default()), + format!("incremental-snapshot-100-140-{}.tar", Hash::default()), + format!("incremental-snapshot-100-160-{}.tar", Hash::default()), + format!("incremental-snapshot-100-180-{}.tar", Hash::default()), + format!("incremental-snapshot-200-220-{}.tar", Hash::default()), + format!("incremental-snapshot-200-240-{}.tar", Hash::default()), + format!("incremental-snapshot-200-260-{}.tar", Hash::default()), + format!("incremental-snapshot-200-280-{}.tar", Hash::default()), + ] { + let snapshot_path = snapshot_archives_dir.path().join(&snapshot_filename); + File::create(snapshot_path).unwrap(); + } + + purge_old_snapshot_archives(snapshot_archives_dir.path(), std::usize::MAX); + + let remaining_incremental_snapshot_archives = + get_incremental_snapshot_archives(snapshot_archives_dir.path()); + assert!(remaining_incremental_snapshot_archives.is_empty()); + } + + /// Test roundtrip of bank to a full snapshot, then back again. This test creates the simplest + /// bank possible, so the contents of the snapshot archive will be quite minimal. + #[test] + fn test_roundtrip_bank_to_and_from_full_snapshot_simple() { solana_logger::setup(); let genesis_config = GenesisConfig::default(); let original_bank = Bank::new(&genesis_config); @@ -1490,26 +2508,27 @@ mod tests { } let accounts_dir = tempfile::TempDir::new().unwrap(); - let snapshot_dir = tempfile::TempDir::new().unwrap(); - let snapshot_package_output_dir = tempfile::TempDir::new().unwrap(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let snapshot_archive_format = ArchiveFormat::Tar; - let snapshot_archive_path = bank_to_snapshot_archive( - snapshot_dir.path(), + let snapshot_archive_path = bank_to_full_snapshot_archive( + snapshots_dir.path(), &original_bank, None, - snapshot_package_output_dir.path(), + snapshot_archives_dir.path(), snapshot_archive_format, None, 1, ) .unwrap(); - let (roundtrip_bank, _) = bank_from_snapshot_archive( + let (roundtrip_bank, _) = bank_from_snapshot_archives( &[PathBuf::from(accounts_dir.path())], &[], - snapshot_dir.path(), + snapshots_dir.path(), &snapshot_archive_path, + None, snapshot_archive_format, &genesis_config, None, @@ -1526,11 +2545,11 @@ mod tests { assert_eq!(original_bank, roundtrip_bank); } - /// Test roundtrip of bank to snapshot, then back again. This test is more involved than the - /// simple version above; creating multiple banks over multiple slots and doing multiple - /// transfers. So this snapshot should contain more data. + /// Test roundtrip of bank to a full snapshot, then back again. This test is more involved + /// than the simple version above; creating multiple banks over multiple slots and doing + /// multiple transfers. So this full snapshot should contain more data. #[test] - fn test_roundtrip_bank_to_snapshot_to_bank_complex() { + fn test_roundtrip_bank_to_and_from_snapshot_complex() { solana_logger::setup(); let collector = Pubkey::new_unique(); let key1 = Keypair::new(); @@ -1579,26 +2598,136 @@ mod tests { } let accounts_dir = tempfile::TempDir::new().unwrap(); - let snapshot_dir = tempfile::TempDir::new().unwrap(); - let snapshot_package_output_dir = tempfile::TempDir::new().unwrap(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); let snapshot_archive_format = ArchiveFormat::Tar; - let full_snapshot_archive_path = bank_to_snapshot_archive( - snapshot_dir.path(), + let full_snapshot_archive_path = bank_to_full_snapshot_archive( + snapshots_dir.path(), &bank4, None, - snapshot_package_output_dir.path(), + snapshot_archives_dir.path(), snapshot_archive_format, None, std::usize::MAX, ) .unwrap(); - let (roundtrip_bank, _) = bank_from_snapshot_archive( + let (roundtrip_bank, _) = bank_from_snapshot_archives( &[PathBuf::from(accounts_dir.path())], &[], - snapshot_dir.path(), + snapshots_dir.path(), &full_snapshot_archive_path, + None, + snapshot_archive_format, + &genesis_config, + None, + None, + AccountSecondaryIndexes::default(), + false, + None, + AccountShrinkThreshold::default(), + false, + false, + ) + .unwrap(); + + assert_eq!(*bank4, roundtrip_bank); + } + + /// Test roundtrip of bank to snapshots, then back again, with incremental snapshots. In this + /// version, build up a few slots and take a full snapshot. Continue on a few more slots and + /// take an incremental snapshot. Rebuild the bank from both the incremental snapshot and full + /// snapshot. + /// + /// For the full snapshot, touch all the accounts, but only one for the incremental snapshot. + /// This is intended to mimic the real behavior of transactions, where only a small number of + /// accounts are modified often, which are captured by the incremental snapshot. The majority + /// of the accounts are not modified often, and are captured by the full snapshot. + #[test] + fn test_roundtrip_bank_to_and_from_incremental_snapshot() { + solana_logger::setup(); + let collector = Pubkey::new_unique(); + let key1 = Keypair::new(); + let key2 = Keypair::new(); + let key3 = Keypair::new(); + let key4 = Keypair::new(); + let key5 = Keypair::new(); + + let (genesis_config, mint_keypair) = create_genesis_config(1_000_000); + let bank0 = Arc::new(Bank::new(&genesis_config)); + bank0.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + bank0.transfer(2, &mint_keypair, &key2.pubkey()).unwrap(); + bank0.transfer(3, &mint_keypair, &key3.pubkey()).unwrap(); + while !bank0.is_complete() { + bank0.register_tick(&Hash::new_unique()); + } + + let slot = 1; + let bank1 = Arc::new(Bank::new_from_parent(&bank0, &collector, slot)); + bank1.transfer(3, &mint_keypair, &key3.pubkey()).unwrap(); + bank1.transfer(4, &mint_keypair, &key4.pubkey()).unwrap(); + bank1.transfer(5, &mint_keypair, &key5.pubkey()).unwrap(); + while !bank1.is_complete() { + bank1.register_tick(&Hash::new_unique()); + } + + let accounts_dir = tempfile::TempDir::new().unwrap(); + let snapshots_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archives_dir = tempfile::TempDir::new().unwrap(); + let snapshot_archive_format = ArchiveFormat::Tar; + + let full_snapshot_slot = slot; + let full_snapshot_archive_path = bank_to_full_snapshot_archive( + snapshots_dir.path(), + &bank1, + None, + snapshot_archives_dir.path(), + snapshot_archive_format, + None, + std::usize::MAX, + ) + .unwrap(); + + let slot = slot + 1; + let bank2 = Arc::new(Bank::new_from_parent(&bank1, &collector, slot)); + bank2.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + while !bank2.is_complete() { + bank2.register_tick(&Hash::new_unique()); + } + + let slot = slot + 1; + let bank3 = Arc::new(Bank::new_from_parent(&bank2, &collector, slot)); + bank3.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + while !bank3.is_complete() { + bank3.register_tick(&Hash::new_unique()); + } + + let slot = slot + 1; + let bank4 = Arc::new(Bank::new_from_parent(&bank3, &collector, slot)); + bank4.transfer(1, &mint_keypair, &key1.pubkey()).unwrap(); + while !bank4.is_complete() { + bank4.register_tick(&Hash::new_unique()); + } + + let incremental_snapshot_archive_path = bank_to_incremental_snapshot_archive( + snapshots_dir.path(), + &bank4, + full_snapshot_slot, + None, + snapshot_archives_dir.path(), + snapshot_archive_format, + None, + std::usize::MAX, + ) + .unwrap(); + + let (roundtrip_bank, _) = bank_from_snapshot_archives( + &[PathBuf::from(accounts_dir.path())], + &[], + snapshots_dir.path(), + &full_snapshot_archive_path, + Some(&incremental_snapshot_archive_path), snapshot_archive_format, &genesis_config, None, diff --git a/validator/src/main.rs b/validator/src/main.rs index 9e30e7bf9..ca439fe35 100644 --- a/validator/src/main.rs +++ b/validator/src/main.rs @@ -47,7 +47,9 @@ use { }, hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE, snapshot_config::SnapshotConfig, - snapshot_utils::{self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_SNAPSHOTS_TO_RETAIN}, + snapshot_utils::{ + self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, + }, }, solana_sdk::{ clock::{Slot, DEFAULT_S_PER_SLOT}, @@ -475,8 +477,10 @@ fn get_rpc_node( blacklist_timeout = Instant::now(); let mut highest_snapshot_hash: Option<(Slot, Hash)> = - snapshot_utils::get_highest_snapshot_archive_info(snapshot_output_dir).map( - |snapshot_archive_info| (snapshot_archive_info.slot, snapshot_archive_info.hash), + snapshot_utils::get_highest_full_snapshot_archive_info(snapshot_output_dir).map( + |snapshot_archive_info| { + (*snapshot_archive_info.slot(), *snapshot_archive_info.hash()) + }, ); let eligible_rpc_peers = if snapshot_not_required { rpc_peers @@ -858,7 +862,7 @@ fn rpc_bootstrap( let mut use_local_snapshot = false; if let Some(highest_local_snapshot_slot) = - snapshot_utils::get_highest_snapshot_archive_slot(snapshot_output_dir) + snapshot_utils::get_highest_full_snapshot_archive_slot(snapshot_output_dir) { if highest_local_snapshot_slot > snapshot_hash.0.saturating_sub(maximum_local_snapshot_age) @@ -900,7 +904,7 @@ fn rpc_bootstrap( { snapshot_config.maximum_snapshots_to_retain } else { - DEFAULT_MAX_SNAPSHOTS_TO_RETAIN + DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN }; let ret = download_snapshot( &rpc_contact_info.rpc, @@ -1041,7 +1045,7 @@ pub fn main() { .send_transaction_leader_forward_count .to_string(); let default_rpc_threads = num_cpus::get().to_string(); - let default_max_snapshot_to_retain = &DEFAULT_MAX_SNAPSHOTS_TO_RETAIN.to_string(); + let default_max_snapshot_to_retain = &DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN.to_string(); let default_min_snapshot_download_speed = &DEFAULT_MIN_SNAPSHOT_DOWNLOAD_SPEED.to_string(); let default_max_snapshot_download_abort = &MAX_SNAPSHOT_DOWNLOAD_ABORT.to_string(); let default_accounts_shrink_optimize_total_space =