diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index 9adb19fc73..6087c2e55e 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -6,14 +6,16 @@ use rayon::ThreadPool; use solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}; +use solana_measure::measure::Measure; use solana_runtime::{ - accounts_db, - snapshot_archive_info::SnapshotArchiveInfoGetter, + accounts_db::{self, AccountsDb}, + accounts_hash::HashStats, snapshot_config::SnapshotConfig, snapshot_package::{ AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage, + SnapshotType, }, - snapshot_utils, + sorted_storages::SortedStorages, }; use solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}; use std::collections::{HashMap, HashSet}; @@ -48,7 +50,7 @@ impl AccountsHashVerifier { .name("solana-hash-accounts".to_string()) .spawn(move || { let mut hashes = vec![]; - let mut thread_pool_storage = None; + let mut thread_pool = None; loop { if exit.load(Ordering::Relaxed) { break; @@ -56,11 +58,9 @@ impl AccountsHashVerifier { match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) { Ok(accounts_package) => { - if accounts_package.hash_for_testing.is_some() - && thread_pool_storage.is_none() + if accounts_package.hash_for_testing.is_some() && thread_pool.is_none() { - thread_pool_storage = - Some(accounts_db::make_min_priority_thread_pool()); + thread_pool = Some(accounts_db::make_min_priority_thread_pool()); } Self::process_accounts_package( @@ -73,7 +73,7 @@ impl AccountsHashVerifier { &exit, fault_injection_rate_slots, snapshot_config.as_ref(), - thread_pool_storage.as_ref(), + thread_pool.as_ref(), ); } Err(RecvTimeoutError::Disconnected) => break, @@ -100,45 +100,69 @@ impl AccountsHashVerifier { snapshot_config: Option<&SnapshotConfig>, thread_pool: Option<&ThreadPool>, ) { - let snapshot_package = - snapshot_utils::process_accounts_package(accounts_package, thread_pool, None); - Self::process_snapshot_package( - snapshot_package, + Self::verify_accounts_package_hash(&accounts_package, thread_pool); + + Self::push_accounts_hashes_to_cluster( + &accounts_package, cluster_info, trusted_validators, halt_on_trusted_validator_accounts_hash_mismatch, - pending_snapshot_package, hashes, exit, fault_injection_rate_slots, - snapshot_config, + ); + + Self::submit_for_packaging(accounts_package, pending_snapshot_package, snapshot_config); + } + + fn verify_accounts_package_hash( + accounts_package: &AccountsPackage, + thread_pool: Option<&ThreadPool>, + ) { + let mut measure_hash = Measure::start("hash"); + if let Some(expected_hash) = accounts_package.hash_for_testing { + let sorted_storages = SortedStorages::new(&accounts_package.snapshot_storages); + let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index( + &sorted_storages, + thread_pool, + HashStats::default(), + false, + None, + ) + .unwrap(); + + assert_eq!(accounts_package.expected_capitalization, lamports); + assert_eq!(expected_hash, hash); + }; + measure_hash.stop(); + datapoint_info!( + "accounts_hash_verifier", + ("calculate_hash", measure_hash.as_us(), i64), ); } - fn process_snapshot_package( - snapshot_package: SnapshotPackage, + fn push_accounts_hashes_to_cluster( + accounts_package: &AccountsPackage, cluster_info: &ClusterInfo, trusted_validators: Option<&HashSet>, halt_on_trusted_validator_accounts_hash_mismatch: bool, - pending_snapshot_package: Option<&PendingSnapshotPackage>, hashes: &mut Vec<(Slot, Hash)>, exit: &Arc, fault_injection_rate_slots: u64, - snapshot_config: Option<&SnapshotConfig>, ) { - let hash = *snapshot_package.hash(); + let hash = accounts_package.hash; if fault_injection_rate_slots != 0 - && snapshot_package.slot() % fault_injection_rate_slots == 0 + && accounts_package.slot % fault_injection_rate_slots == 0 { // For testing, publish an invalid hash to gossip. use rand::{thread_rng, Rng}; use solana_sdk::hash::extend_and_hash; - warn!("inserting fault at slot: {}", snapshot_package.slot()); + warn!("inserting fault at slot: {}", accounts_package.slot); let rand = thread_rng().gen_range(0, 10); let hash = extend_and_hash(&hash, &[rand]); - hashes.push((snapshot_package.slot(), hash)); + hashes.push((accounts_package.slot, hash)); } else { - hashes.push((snapshot_package.slot(), hash)); + hashes.push((accounts_package.slot, hash)); } while hashes.len() > MAX_SNAPSHOT_HASHES { @@ -155,19 +179,43 @@ impl AccountsHashVerifier { } } - if let Some(snapshot_config) = snapshot_config { - if snapshot_package.block_height % snapshot_config.full_snapshot_archive_interval_slots - == 0 - { - if let Some(pending_snapshot_package) = pending_snapshot_package { - *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); - } - } - } - cluster_info.push_accounts_hashes(hashes.clone()); } + fn submit_for_packaging( + accounts_package: AccountsPackage, + pending_snapshot_package: Option<&PendingSnapshotPackage>, + snapshot_config: Option<&SnapshotConfig>, + ) { + if accounts_package.snapshot_type.is_none() + || pending_snapshot_package.is_none() + || snapshot_config.is_none() + { + return; + }; + + let snapshot_package = SnapshotPackage::from(accounts_package); + let pending_snapshot_package = pending_snapshot_package.unwrap(); + let _snapshot_config = snapshot_config.unwrap(); + + // If the snapshot package is an Incremental Snapshot, do not submit it if there's already + // a pending Full Snapshot. + let can_submit = match snapshot_package.snapshot_type { + SnapshotType::FullSnapshot => true, + SnapshotType::IncrementalSnapshot(_) => pending_snapshot_package + .lock() + .unwrap() + .as_ref() + .map_or(true, |snapshot_package| { + snapshot_package.snapshot_type.is_incremental_snapshot() + }), + }; + + if can_submit { + *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); + } + } + fn should_halt( cluster_info: &ClusterInfo, trusted_validators: Option<&HashSet>, @@ -225,10 +273,10 @@ mod tests { use solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo}; use solana_runtime::{ snapshot_config::LastFullSnapshotSlot, - snapshot_package::SnapshotType, snapshot_utils::{ArchiveFormat, SnapshotVersion}, }; use solana_sdk::{ + genesis_config::ClusterType, hash::hash, signature::{Keypair, Signer}, }; @@ -301,30 +349,24 @@ mod tests { last_full_snapshot_slot: LastFullSnapshotSlot::default(), }; for i in 0..MAX_SNAPSHOT_HASHES + 1 { - let slot = full_snapshot_archive_interval_slots + i as u64; - let block_height = full_snapshot_archive_interval_slots + i as u64; - let slot_deltas = vec![]; - let snapshot_links = TempDir::new().unwrap(); - let storages = vec![]; - let snapshot_archive_path = PathBuf::from("."); - let hash = hash(&[i as u8]); - let archive_format = ArchiveFormat::TarBzip2; - let snapshot_version = SnapshotVersion::default(); - let snapshot_package = SnapshotPackage::new( - slot, - block_height, - slot_deltas, - snapshot_links, - storages, - snapshot_archive_path, - hash, - archive_format, - snapshot_version, - SnapshotType::FullSnapshot, - ); + let accounts_package = AccountsPackage { + slot: full_snapshot_archive_interval_slots + i as u64, + block_height: full_snapshot_archive_interval_slots + i as u64, + slot_deltas: vec![], + snapshot_links: TempDir::new().unwrap(), + snapshot_storages: vec![], + hash: hash(&[i as u8]), + archive_format: ArchiveFormat::TarBzip2, + snapshot_version: SnapshotVersion::default(), + snapshot_archives_dir: PathBuf::default(), + expected_capitalization: 0, + hash_for_testing: None, + cluster_type: ClusterType::MainnetBeta, + snapshot_type: None, + }; - AccountsHashVerifier::process_snapshot_package( - snapshot_package, + AccountsHashVerifier::process_accounts_package( + accounts_package, &cluster_info, Some(&trusted_validators), false, @@ -333,7 +375,9 @@ mod tests { &exit, 0, Some(&snapshot_config), + None, ); + // sleep for 1ms to create a newer timestmap for gossip entry // otherwise the timestamp won't be newer. std::thread::sleep(Duration::from_millis(1)); diff --git a/core/src/snapshot_packager_service.rs b/core/src/snapshot_packager_service.rs index be65fd9a7c..8670ea962b 100644 --- a/core/src/snapshot_packager_service.rs +++ b/core/src/snapshot_packager_service.rs @@ -42,25 +42,26 @@ impl SnapshotPackagerService { } let snapshot_package = pending_snapshot_package.lock().unwrap().take(); - if let Some(snapshot_package) = snapshot_package { - match snapshot_utils::archive_snapshot_package( - &snapshot_package, - maximum_snapshots_to_retain, - ) { - Ok(_) => { - hashes.push((snapshot_package.slot(), *snapshot_package.hash())); - while hashes.len() > MAX_SNAPSHOT_HASHES { - hashes.remove(0); - } - cluster_info.push_snapshot_hashes(hashes.clone()); - } - Err(err) => { - warn!("Failed to create snapshot archive: {}", err); - } - }; - } else { + if snapshot_package.is_none() { std::thread::sleep(Duration::from_millis(100)); + continue; } + let snapshot_package = snapshot_package.unwrap(); + + // Archiving the snapshot package is not allowed to fail. + // AccountsBackgroundService calls `clean_accounts()` with a value for + // last_full_snapshot_slot that requires this archive call to succeed. + snapshot_utils::archive_snapshot_package( + &snapshot_package, + maximum_snapshots_to_retain, + ) + .expect("failed to archive snapshot package"); + + hashes.push((snapshot_package.slot(), *snapshot_package.hash())); + while hashes.len() > MAX_SNAPSHOT_HASHES { + hashes.remove(0); + } + cluster_info.push_snapshot_hashes(hashes.clone()); } }) .unwrap(); @@ -82,6 +83,7 @@ mod tests { use solana_runtime::{ accounts_db::AccountStorageEntry, bank::BankSlotDelta, + snapshot_archive_info::SnapshotArchiveInfo, snapshot_package::{SnapshotPackage, SnapshotType}, snapshot_utils::{self, ArchiveFormat, SnapshotVersion, SNAPSHOT_STATUS_CACHE_FILE_NAME}, }; @@ -160,24 +162,29 @@ mod tests { } // Create a packageable snapshot + let slot = 42; + let hash = Hash::default(); + let archive_format = ArchiveFormat::TarBzip2; let output_tar_path = snapshot_utils::build_full_snapshot_archive_path( snapshot_archives_dir, - 42, - &Hash::default(), - ArchiveFormat::TarBzip2, - ); - let snapshot_package = SnapshotPackage::new( - 5, - 5, - vec![], - link_snapshots_dir, - vec![storage_entries], - output_tar_path.clone(), - Hash::default(), - ArchiveFormat::TarBzip2, - SnapshotVersion::default(), - SnapshotType::FullSnapshot, + slot, + &hash, + archive_format, ); + let snapshot_package = SnapshotPackage { + snapshot_archive_info: SnapshotArchiveInfo { + path: output_tar_path.clone(), + slot, + hash, + archive_format, + }, + block_height: slot, + slot_deltas: vec![], + snapshot_links: link_snapshots_dir, + snapshot_storages: vec![storage_entries], + snapshot_version: SnapshotVersion::default(), + snapshot_type: SnapshotType::FullSnapshot, + }; // Make tarball from packageable snapshot snapshot_utils::archive_snapshot_package( @@ -204,7 +211,7 @@ mod tests { output_tar_path, snapshots_dir, accounts_dir, - ArchiveFormat::TarBzip2, + archive_format, ); } } diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 379063ac26..64fbe95cf6 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -340,6 +340,7 @@ impl Tvu { tvu_config.accounts_db_caching_enabled, tvu_config.test_hash_calculation, tvu_config.use_index_hash_calculation, + None, ); Tvu { diff --git a/core/src/validator.rs b/core/src/validator.rs index 4e2ed84656..6b282782b2 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -1212,7 +1212,6 @@ fn new_banks_from_ledger( None, &snapshot_config.snapshot_archives_dir, snapshot_config.archive_format, - Some(bank_forks.root_bank().get_thread_pool()), snapshot_config.maximum_snapshots_to_retain, ) .unwrap_or_else(|err| { diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 358b06d961..0108a91e62 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -66,7 +66,9 @@ mod tests { genesis_utils::{create_genesis_config, GenesisConfigInfo}, snapshot_archive_info::FullSnapshotArchiveInfo, snapshot_config::{LastFullSnapshotSlot, SnapshotConfig}, - snapshot_package::{AccountsPackage, PendingSnapshotPackage}, + snapshot_package::{ + AccountsPackage, PendingSnapshotPackage, SnapshotPackage, SnapshotType, + }, snapshot_utils::{ self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, }, @@ -255,18 +257,19 @@ mod tests { snapshot_request_receiver, accounts_package_sender, }; - for slot in 0..last_slot { - let mut bank = Bank::new_from_parent(&bank_forks[slot], &Pubkey::default(), slot + 1); + for slot in 1..=last_slot { + let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot); f(&mut bank, mint_keypair); let bank = bank_forks.insert(bank); // Set root to make sure we don't end up with too many account storage entries // and to allow snapshotting of bank and the purging logic on status_cache to // kick in - if slot % set_root_interval == 0 || slot == last_slot - 1 { + if slot % set_root_interval == 0 || slot == last_slot { // set_root should send a snapshot request bank_forks.set_root(bank.slot(), &request_sender, None); bank.update_accounts_hash(); - snapshot_request_handler.handle_snapshot_requests(false, false, false, 0); + snapshot_request_handler + .handle_snapshot_requests(false, false, false, 0, &mut None); } } @@ -277,7 +280,7 @@ mod tests { let last_bank_snapshot_info = snapshot_utils::get_highest_bank_snapshot_info(bank_snapshots_dir) .expect("no bank snapshots found in path"); - let accounts_package = AccountsPackage::new_for_full_snapshot( + let accounts_package = AccountsPackage::new( last_bank, &last_bank_snapshot_info, bank_snapshots_dir, @@ -287,13 +290,10 @@ mod tests { ArchiveFormat::TarBzip2, snapshot_version, None, + Some(SnapshotType::FullSnapshot), ) .unwrap(); - let snapshot_package = snapshot_utils::process_accounts_package( - accounts_package, - Some(last_bank.get_thread_pool()), - None, - ); + let snapshot_package = SnapshotPackage::from(accounts_package); snapshot_utils::archive_snapshot_package( &snapshot_package, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, @@ -415,8 +415,9 @@ mod tests { bank_snapshots_dir, snapshot_archives_dir, snapshot_config.snapshot_version, - &snapshot_config.archive_format, + snapshot_config.archive_format, None, + Some(SnapshotType::FullSnapshot), ) .unwrap(); @@ -504,8 +505,6 @@ mod tests { DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ); - let thread_pool = accounts_db::make_min_priority_thread_pool(); - let _package_receiver = std::thread::Builder::new() .name("package-receiver".to_string()) .spawn(move || { @@ -515,11 +514,7 @@ mod tests { accounts_package = new_accounts_package; } - let snapshot_package = solana_runtime::snapshot_utils::process_accounts_package( - accounts_package, - Some(&thread_pool), - None, - ); + let snapshot_package = SnapshotPackage::from(accounts_package); *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); } @@ -625,7 +620,7 @@ mod tests { run_bank_forks_snapshot_n( snapshot_version, cluster_type, - (MAX_CACHE_ENTRIES * 2 + 1) as u64, + (MAX_CACHE_ENTRIES * 2) as u64, |bank, mint_keypair| { let tx = system_transaction::transfer( mint_keypair, @@ -712,14 +707,19 @@ mod tests { // set_root sends a snapshot request bank_forks.set_root(bank.slot(), &request_sender, None); bank.update_accounts_hash(); - snapshot_request_handler.handle_snapshot_requests(false, false, false, 0); + snapshot_request_handler.handle_snapshot_requests( + false, + false, + false, + 0, + &mut last_full_snapshot_slot, + ); } // Since AccountsBackgroundService isn't running, manually make a full snapshot archive // at the right interval if slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0 { make_full_snapshot_archive(&bank, &snapshot_test_config.snapshot_config).unwrap(); - last_full_snapshot_slot = Some(slot); } // Similarly, make an incremental snapshot archive at the right interval, but only if // there's been at least one full snapshot first, and a full snapshot wasn't already @@ -764,7 +764,7 @@ mod tests { "did not find bank snapshot with this path", ) })?; - snapshot_utils::package_process_and_archive_full_snapshot( + snapshot_utils::package_and_archive_full_snapshot( bank, &bank_snapshot_info, &snapshot_config.bank_snapshots_dir, @@ -772,7 +772,6 @@ mod tests { bank.get_snapshot_storages(None), snapshot_config.archive_format, snapshot_config.snapshot_version, - None, snapshot_config.maximum_snapshots_to_retain, )?; @@ -800,7 +799,7 @@ mod tests { ) })?; let storages = bank.get_snapshot_storages(Some(incremental_snapshot_base_slot)); - snapshot_utils::package_process_and_archive_incremental_snapshot( + snapshot_utils::package_and_archive_incremental_snapshot( bank, incremental_snapshot_base_slot, &bank_snapshot_info, @@ -809,7 +808,6 @@ mod tests { storages, snapshot_config.archive_format, snapshot_config.snapshot_version, - None, snapshot_config.maximum_snapshots_to_retain, )?; @@ -857,9 +855,8 @@ mod tests { const INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = BANK_SNAPSHOT_INTERVAL_SLOTS * 3; const FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 5; - const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 3 - 1; - const EXPECTED_SLOT_FOR_LAST_SNAPSHOT_ARCHIVE: Slot = - LAST_SLOT + 1 - FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS; + const LAST_SLOT: Slot = FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 3 + + INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS * 2; info!("Running snapshots with background services test..."); trace!( @@ -949,6 +946,7 @@ mod tests { false, false, true, + None, ); let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; @@ -1019,9 +1017,14 @@ mod tests { ) .unwrap(); + assert_eq!(deserialized_bank.slot(), LAST_SLOT,); assert_eq!( - deserialized_bank.slot(), - EXPECTED_SLOT_FOR_LAST_SNAPSHOT_ARCHIVE + deserialized_bank, + **bank_forks + .read() + .unwrap() + .get(deserialized_bank.slot()) + .unwrap() ); // Stop the background services diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 4f491709e6..995df3c368 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -2230,7 +2230,6 @@ fn main() { Some(snapshot_version), output_directory, ArchiveFormat::TarZstd, - None, maximum_snapshots_to_retain, ) .unwrap_or_else(|err| { diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 7b592a3e77..f89fba3f73 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -6,8 +6,8 @@ use crate::{ bank::{Bank, BankSlotDelta, DropCallback}, bank_forks::BankForks, snapshot_config::SnapshotConfig, - snapshot_package::AccountsPackageSender, - snapshot_utils, + snapshot_package::{AccountsPackageSender, SnapshotType}, + snapshot_utils::{self, SnapshotError}, }; use crossbeam_channel::{Receiver, SendError, Sender}; use log::*; @@ -94,7 +94,8 @@ impl SnapshotRequestHandler { test_hash_calculation: bool, use_index_hash_calculation: bool, non_snapshot_time_us: u128, - ) -> Option { + last_full_snapshot_slot: &mut Option, + ) -> Option> { self.snapshot_request_receiver .try_iter() .last() @@ -161,7 +162,7 @@ impl SnapshotRequestHandler { // accounts that were included in the bank delta hash when the bank was frozen, // and if we clean them here, the newly created snapshot's hash may not match // the frozen hash. - snapshot_root_bank.clean_accounts(true, false, None); + snapshot_root_bank.clean_accounts(true, false, *last_full_snapshot_slot); clean_time.stop(); if accounts_db_caching_enabled { @@ -170,24 +171,51 @@ impl SnapshotRequestHandler { shrink_time.stop(); } - // Generate an accounts package + let block_height = snapshot_root_bank.block_height(); + let snapshot_type = if block_height + % self.snapshot_config.full_snapshot_archive_interval_slots + == 0 + { + *last_full_snapshot_slot = Some(snapshot_root_bank.slot()); + Some(SnapshotType::FullSnapshot) + } else if block_height + % self + .snapshot_config + .incremental_snapshot_archive_interval_slots + == 0 + && last_full_snapshot_slot.is_some() + { + Some(SnapshotType::IncrementalSnapshot( + last_full_snapshot_slot.unwrap(), + )) + } else { + None + }; + + // Snapshot the bank and send over an accounts package let mut snapshot_time = Measure::start("snapshot_time"); - let r = snapshot_utils::snapshot_bank( + let result = snapshot_utils::snapshot_bank( &snapshot_root_bank, status_cache_slot_deltas, &self.accounts_package_sender, &self.snapshot_config.bank_snapshots_dir, &self.snapshot_config.snapshot_archives_dir, self.snapshot_config.snapshot_version, - &self.snapshot_config.archive_format, + self.snapshot_config.archive_format, hash_for_testing, + snapshot_type, ); - if r.is_err() { + if let Err(e) = result { warn!( - "Error generating snapshot for bank: {}, err: {:?}", + "Error taking bank snapshot. slot: {}, snapshot type: {:?}, err: {:?}", snapshot_root_bank.slot(), - r + snapshot_type, + e, ); + + if Self::is_snapshot_error_fatal(&e) { + return Err(e); + } } snapshot_time.stop(); @@ -216,9 +244,34 @@ impl SnapshotRequestHandler { ("total_us", total_time.as_us(), i64), ("non_snapshot_time_us", non_snapshot_time_us, i64), ); - snapshot_root_bank.block_height() + Ok(snapshot_root_bank.block_height()) }) } + + /// Check if a SnapshotError should be treated as 'fatal' by SnapshotRequestHandler, and + /// `handle_snapshot_requests()` in particular. Fatal errors will cause the node to shutdown. + /// Non-fatal errors are logged and then swallowed. + /// + /// All `SnapshotError`s are enumerated, and there is **NO** default case. This way, if + /// a new error is added to SnapshotError, a conscious decision must be made on how it should + /// be handled. + fn is_snapshot_error_fatal(err: &SnapshotError) -> bool { + match err { + SnapshotError::Io(..) => true, + SnapshotError::Serialize(..) => true, + SnapshotError::ArchiveGenerationFailure(..) => true, + SnapshotError::StoragePathSymlinkInvalid => true, + SnapshotError::UnpackError(..) => true, + SnapshotError::AccountsPackageSendError(..) => true, + SnapshotError::IoWithSource(..) => true, + SnapshotError::PathToFileNameError(..) => true, + SnapshotError::FileNameToStrError(..) => true, + SnapshotError::ParseSnapshotArchiveFileNameError(..) => true, + SnapshotError::MismatchedBaseSlot(..) => true, + SnapshotError::NoSnapshotArchives => true, + SnapshotError::MismatchedSlotHash(..) => true, + } + } } #[derive(Default)] @@ -262,7 +315,8 @@ impl AbsRequestHandler { test_hash_calculation: bool, use_index_hash_calculation: bool, non_snapshot_time_us: u128, - ) -> Option { + last_full_snapshot_slot: &mut Option, + ) -> Option> { self.snapshot_request_handler .as_ref() .and_then(|snapshot_request_handler| { @@ -271,6 +325,7 @@ impl AbsRequestHandler { test_hash_calculation, use_index_hash_calculation, non_snapshot_time_us, + last_full_snapshot_slot, ) }) } @@ -301,6 +356,7 @@ impl AccountsBackgroundService { accounts_db_caching_enabled: bool, test_hash_calculation: bool, use_index_hash_calculation: bool, + mut last_full_snapshot_slot: Option, ) -> Self { info!("AccountsBackgroundService active"); let exit = exit.clone(); @@ -354,13 +410,15 @@ impl AccountsBackgroundService { // request for `N` to the snapshot request channel before setting a root `R > N`, and // snapshot_request_handler.handle_requests() will always look for the latest // available snapshot in the channel. - let snapshot_block_height = request_handler.handle_snapshot_requests( - accounts_db_caching_enabled, - test_hash_calculation, - use_index_hash_calculation, - non_snapshot_time, - ); - if snapshot_block_height.is_some() { + let snapshot_block_height_option_result = request_handler + .handle_snapshot_requests( + accounts_db_caching_enabled, + test_hash_calculation, + use_index_hash_calculation, + non_snapshot_time, + &mut last_full_snapshot_slot, + ); + if snapshot_block_height_option_result.is_some() { last_snapshot_end_time = Some(Instant::now()); } @@ -372,10 +430,16 @@ impl AccountsBackgroundService { bank.flush_accounts_cache_if_needed(); } - if let Some(snapshot_block_height) = snapshot_block_height { + if let Some(snapshot_block_height_result) = snapshot_block_height_option_result + { // Safe, see proof above - assert!(last_cleaned_block_height <= snapshot_block_height); - last_cleaned_block_height = snapshot_block_height; + if let Ok(snapshot_block_height) = snapshot_block_height_result { + assert!(last_cleaned_block_height <= snapshot_block_height); + last_cleaned_block_height = snapshot_block_height; + } else { + exit.store(true, Ordering::Relaxed); + return; + } } else { if accounts_db_caching_enabled { bank.shrink_candidate_slots(); @@ -400,7 +464,7 @@ impl AccountsBackgroundService { // slots >= bank.slot() bank.force_flush_accounts_cache(); } - bank.clean_accounts(true, false, None); + bank.clean_accounts(true, false, last_full_snapshot_slot); last_cleaned_block_height = bank.block_height(); } } diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index 1ee2d64a3a..6933e08acf 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -5,8 +5,7 @@ use crate::{ use crate::{ snapshot_archive_info::{SnapshotArchiveInfo, SnapshotArchiveInfoGetter}, snapshot_utils::{ - ArchiveFormat, BankSnapshotInfo, Result, SnapshotVersion, TMP_FULL_SNAPSHOT_PREFIX, - TMP_INCREMENTAL_SNAPSHOT_PREFIX, + self, ArchiveFormat, BankSnapshotInfo, Result, SnapshotVersion, TMP_BANK_SNAPSHOT_PREFIX, }, }; use log::*; @@ -42,7 +41,7 @@ pub struct AccountsPackage { pub block_height: Slot, pub slot_deltas: Vec, pub snapshot_links: TempDir, - pub storages: SnapshotStorages, + pub snapshot_storages: SnapshotStorages, pub hash: Hash, // temporarily here while we still have to calculate hash before serializing bank pub archive_format: ArchiveFormat, pub snapshot_version: SnapshotVersion, @@ -50,26 +49,53 @@ pub struct AccountsPackage { pub expected_capitalization: u64, pub hash_for_testing: Option, pub cluster_type: ClusterType, + pub snapshot_type: Option, } impl AccountsPackage { - /// Create an accounts package + /// Package up bank files, storages, and slot deltas for a snapshot #[allow(clippy::too_many_arguments)] - fn new( + pub fn new( bank: &Bank, bank_snapshot_info: &BankSnapshotInfo, - status_cache_slot_deltas: Vec, + bank_snapshots_dir: impl AsRef, + slot_deltas: Vec, snapshot_archives_dir: impl AsRef, snapshot_storages: SnapshotStorages, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, hash_for_testing: Option, - snapshot_tmpdir: TempDir, + snapshot_type: Option, ) -> Result { - // Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging. + info!( + "Package snapshot for bank {} has {} account storage entries (snapshot type: {:?})", + bank.slot(), + snapshot_storages.len(), + snapshot_type, + ); + + if let Some(SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot)) = + snapshot_type { - let snapshot_hardlink_dir = snapshot_tmpdir - .as_ref() + assert!( + bank.slot() > incremental_snapshot_base_slot, + "Incremental snapshot base slot must be less than the bank being snapshotted!" + ); + assert!( + snapshot_storages.iter().all(|storage| storage + .iter() + .all(|entry| entry.slot() > incremental_snapshot_base_slot)), + "Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!" + ); + } + + // Hard link the snapshot into a tmpdir, to ensure its not removed prior to packaging. + let snapshot_links = tempfile::Builder::new() + .prefix(&format!("{}{}-", TMP_BANK_SNAPSHOT_PREFIX, bank.slot())) + .tempdir_in(bank_snapshots_dir)?; + { + let snapshot_hardlink_dir = snapshot_links + .path() .join(bank_snapshot_info.slot.to_string()); fs::create_dir_all(&snapshot_hardlink_dir)?; fs::hard_link( @@ -81,9 +107,9 @@ impl AccountsPackage { Ok(Self { slot: bank.slot(), block_height: bank.block_height(), - slot_deltas: status_cache_slot_deltas, - snapshot_links: snapshot_tmpdir, - storages: snapshot_storages, + slot_deltas, + snapshot_links, + snapshot_storages, hash: bank.get_accounts_hash(), archive_format, snapshot_version, @@ -91,94 +117,9 @@ impl AccountsPackage { expected_capitalization: bank.capitalization(), hash_for_testing, cluster_type: bank.cluster_type(), + snapshot_type, }) } - - /// Package up bank snapshot files, snapshot storages, and slot deltas for a full snapshot. - #[allow(clippy::too_many_arguments)] - pub fn new_for_full_snapshot( - bank: &Bank, - bank_snapshot_info: &BankSnapshotInfo, - bank_snapshots_dir: impl AsRef, - status_cache_slot_deltas: Vec, - snapshot_archives_dir: impl AsRef, - snapshot_storages: SnapshotStorages, - archive_format: ArchiveFormat, - snapshot_version: SnapshotVersion, - hash_for_testing: Option, - ) -> Result { - info!( - "Package full snapshot for bank: {} has {} account storage entries", - bank.slot(), - snapshot_storages.len() - ); - - let snapshot_tmpdir = tempfile::Builder::new() - .prefix(&format!("{}{}-", TMP_FULL_SNAPSHOT_PREFIX, bank.slot())) - .tempdir_in(bank_snapshots_dir)?; - - Self::new( - bank, - bank_snapshot_info, - status_cache_slot_deltas, - snapshot_archives_dir, - snapshot_storages, - archive_format, - snapshot_version, - hash_for_testing, - snapshot_tmpdir, - ) - } - - /// Package up bank snapshot files, snapshot storages, and slot deltas for an incremental snapshot. - #[allow(clippy::too_many_arguments)] - pub fn new_for_incremental_snapshot( - bank: &Bank, - incremental_snapshot_base_slot: Slot, - bank_snapshot_info: &BankSnapshotInfo, - bank_snapshots_dir: impl AsRef, - status_cache_slot_deltas: Vec, - snapshot_archives_dir: impl AsRef, - snapshot_storages: SnapshotStorages, - archive_format: ArchiveFormat, - snapshot_version: SnapshotVersion, - hash_for_testing: Option, - ) -> Result { - info!( - "Package incremental snapshot for bank {} (from base slot {}) has {} account storage entries", - bank.slot(), - incremental_snapshot_base_slot, - snapshot_storages.len() - ); - - assert!( - snapshot_storages.iter().all(|storage| storage - .iter() - .all(|entry| entry.slot() > incremental_snapshot_base_slot)), - "Incremental snapshot package must only contain storage entries where slot > incremental snapshot base slot (i.e. full snapshot slot)!" - ); - - let snapshot_tmpdir = tempfile::Builder::new() - .prefix(&format!( - "{}{}-{}-", - TMP_INCREMENTAL_SNAPSHOT_PREFIX, - incremental_snapshot_base_slot, - bank.slot() - )) - .tempdir_in(bank_snapshots_dir)?; - - Self::new( - bank, - bank_snapshot_info, - status_cache_slot_deltas, - snapshot_archives_dir, - snapshot_storages, - archive_format, - snapshot_version, - hash_for_testing, - snapshot_tmpdir, - ) - } } pub struct SnapshotPackage { @@ -186,38 +127,49 @@ pub struct SnapshotPackage { pub block_height: Slot, pub slot_deltas: Vec, pub snapshot_links: TempDir, - pub storages: SnapshotStorages, + pub snapshot_storages: SnapshotStorages, pub snapshot_version: SnapshotVersion, pub snapshot_type: SnapshotType, } -impl SnapshotPackage { - #[allow(clippy::too_many_arguments)] - pub fn new( - slot: Slot, - block_height: u64, - slot_deltas: Vec, - snapshot_links: TempDir, - storages: SnapshotStorages, - snapshot_archive_path: PathBuf, - hash: Hash, - archive_format: ArchiveFormat, - snapshot_version: SnapshotVersion, - snapshot_type: SnapshotType, - ) -> Self { +impl From for SnapshotPackage { + fn from(accounts_package: AccountsPackage) -> Self { + assert!( + accounts_package.snapshot_type.is_some(), + "Cannot make a SnapshotPackage from an AccountsPackage when SnapshotType is None!" + ); + + let snapshot_archive_path = match accounts_package.snapshot_type.unwrap() { + SnapshotType::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path( + accounts_package.snapshot_archives_dir, + accounts_package.slot, + &accounts_package.hash, + accounts_package.archive_format, + ), + SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot) => { + snapshot_utils::build_incremental_snapshot_archive_path( + accounts_package.snapshot_archives_dir, + incremental_snapshot_base_slot, + accounts_package.slot, + &accounts_package.hash, + accounts_package.archive_format, + ) + } + }; + Self { snapshot_archive_info: SnapshotArchiveInfo { path: snapshot_archive_path, - slot, - hash, - archive_format, + slot: accounts_package.slot, + hash: accounts_package.hash, + archive_format: accounts_package.archive_format, }, - block_height, - slot_deltas, - snapshot_links, - storages, - snapshot_version, - snapshot_type, + block_height: accounts_package.block_height, + slot_deltas: accounts_package.slot_deltas, + snapshot_links: accounts_package.snapshot_links, + snapshot_storages: accounts_package.snapshot_storages, + snapshot_version: accounts_package.snapshot_version, + snapshot_type: accounts_package.snapshot_type.unwrap(), } } } @@ -228,18 +180,19 @@ impl SnapshotArchiveInfoGetter for SnapshotPackage { } } +/// Snapshots come in two flavors, Full and Incremental. The IncrementalSnapshot has a Slot field, +/// which is the incremental snapshot base slot. #[derive(Clone, Copy, Debug, Eq, PartialEq)] pub enum SnapshotType { FullSnapshot, - IncrementalSnapshot, + IncrementalSnapshot(Slot), } impl SnapshotType { - /// Get the string prefix of the snapshot type - pub fn to_prefix(&self) -> &'static str { - match self { - SnapshotType::FullSnapshot => TMP_FULL_SNAPSHOT_PREFIX, - SnapshotType::IncrementalSnapshot => TMP_INCREMENTAL_SNAPSHOT_PREFIX, - } + pub fn is_full_snapshot(&self) -> bool { + matches!(self, SnapshotType::FullSnapshot) + } + pub fn is_incremental_snapshot(&self) -> bool { + matches!(self, SnapshotType::IncrementalSnapshot(_)) } } diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 0777c2fb35..b6960b2bad 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -1,6 +1,6 @@ use { crate::{ - accounts_db::{AccountShrinkThreshold, AccountsDb}, + accounts_db::AccountShrinkThreshold, accounts_index::{AccountSecondaryIndexes, AccountsIndexConfig}, bank::{Bank, BankSlotDelta, Builtins}, hardened_unpack::{unpack_snapshot, ParallelSelector, UnpackError, UnpackedAppendVecMap}, @@ -16,14 +16,13 @@ use { AccountsPackage, AccountsPackageSendError, AccountsPackageSender, SnapshotPackage, SnapshotType, }, - sorted_storages::SortedStorages, }, bincode::{config::Options, serialize_into}, bzip2::bufread::BzDecoder, flate2::read::GzDecoder, lazy_static::lazy_static, log::*, - rayon::{prelude::*, ThreadPool}, + rayon::prelude::*, regex::Regex, solana_measure::measure::Measure, solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey}, @@ -49,8 +48,8 @@ pub const MAX_BANK_SNAPSHOTS: usize = 8; // Save some snapshots but not too many const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB const VERSION_STRING_V1_2_0: &str = "1.2.0"; const DEFAULT_SNAPSHOT_VERSION: SnapshotVersion = SnapshotVersion::V1_2_0; -pub(crate) const TMP_FULL_SNAPSHOT_PREFIX: &str = "tmp-snapshot-"; -pub(crate) const TMP_INCREMENTAL_SNAPSHOT_PREFIX: &str = "tmp-incremental-snapshot-"; +pub(crate) const TMP_BANK_SNAPSHOT_PREFIX: &str = "tmp-bank-snapshot-"; +pub const TMP_SNAPSHOT_ARCHIVE_PREFIX: &str = "tmp-snapshot-archive-"; pub const DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN: usize = 2; pub const FULL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^snapshot-(?P[[:digit:]]+)-(?P[[:alnum:]]+)\.(?Ptar|tar\.bz2|tar\.zst|tar\.gz)$"; pub const INCREMENTAL_SNAPSHOT_ARCHIVE_FILENAME_REGEX: &str = r"^incremental-snapshot-(?P[[:digit:]]+)-(?P[[:digit:]]+)-(?P[[:alnum:]]+)\.(?Ptar|tar\.bz2|tar\.zst|tar\.gz)$"; @@ -224,9 +223,7 @@ pub fn remove_tmp_snapshot_archives(snapshot_archives_dir: impl AsRef) { .file_name() .into_string() .unwrap_or_else(|_| String::new()); - if file_name.starts_with(TMP_FULL_SNAPSHOT_PREFIX) - || file_name.starts_with(TMP_INCREMENTAL_SNAPSHOT_PREFIX) - { + if file_name.starts_with(TMP_SNAPSHOT_ARCHIVE_PREFIX) { if entry.path().is_file() { fs::remove_file(entry.path()) } else { @@ -269,7 +266,7 @@ pub fn archive_snapshot_package( .map_err(|e| SnapshotError::IoWithSource(e, "create archive path"))?; // Create the staging directories - let staging_dir_prefix = snapshot_package.snapshot_type.to_prefix(); + let staging_dir_prefix = TMP_SNAPSHOT_ARCHIVE_PREFIX; let staging_dir = tempfile::Builder::new() .prefix(&format!( "{}{}-", @@ -293,7 +290,7 @@ pub fn archive_snapshot_package( .map_err(|e| SnapshotError::IoWithSource(e, "create staging symlinks"))?; // Add the AppendVecs into the compressible list - for storage in snapshot_package.storages.iter().flatten() { + for storage in snapshot_package.snapshot_storages.iter().flatten() { storage.flush()?; let storage_path = storage.get_path(); let output_path = staging_accounts_dir.join(crate::append_vec::AppendVec::file_name( @@ -742,7 +739,7 @@ pub fn bank_from_snapshot_archives( let unarchived_full_snapshot = unarchive_snapshot( &bank_snapshots_dir, - TMP_FULL_SNAPSHOT_PREFIX, + TMP_SNAPSHOT_ARCHIVE_PREFIX, full_snapshot_archive_info.path(), "snapshot untar", account_paths, @@ -754,7 +751,7 @@ pub fn bank_from_snapshot_archives( if let Some(incremental_snapshot_archive_info) = incremental_snapshot_archive_info { let unarchived_incremental_snapshot = unarchive_snapshot( &bank_snapshots_dir, - TMP_INCREMENTAL_SNAPSHOT_PREFIX, + TMP_SNAPSHOT_ARCHIVE_PREFIX, incremental_snapshot_archive_info.path(), "incremental snapshot untar", account_paths, @@ -1530,7 +1527,12 @@ where }) } -/// Gather the necessary elements for a snapshot of the given `root_bank` +/// Gather the necessary elements for a snapshot of the given `root_bank`. +/// +/// **DEVELOPER NOTE** Any error that is returned from this function may bring down the node! This +/// function is called from AccountsBackgroundService to handle snapshot requests. Since taking a +/// snapshot is not permitted to fail, any errors returned here will trigger the node to shutdown. +/// So, be careful whenever adding new code that may return errors. pub fn snapshot_bank( root_bank: &Bank, status_cache_slot_deltas: Vec, @@ -1538,30 +1540,42 @@ pub fn snapshot_bank( bank_snapshots_dir: impl AsRef, snapshot_archives_dir: impl AsRef, snapshot_version: SnapshotVersion, - archive_format: &ArchiveFormat, + archive_format: ArchiveFormat, hash_for_testing: Option, + snapshot_type: Option, ) -> Result<()> { - let storages = root_bank.get_snapshot_storages(None); + let snapshot_storages = snapshot_type.map_or_else(SnapshotStorages::default, |snapshot_type| { + let incremental_snapshot_base_slot = match snapshot_type { + SnapshotType::IncrementalSnapshot(incremental_snapshot_base_slot) => { + Some(incremental_snapshot_base_slot) + } + _ => None, + }; + root_bank.get_snapshot_storages(incremental_snapshot_base_slot) + }); let mut add_snapshot_time = Measure::start("add-snapshot-ms"); - add_bank_snapshot(&bank_snapshots_dir, root_bank, &storages, snapshot_version)?; + let bank_snapshot_info = add_bank_snapshot( + &bank_snapshots_dir, + root_bank, + &snapshot_storages, + snapshot_version, + )?; add_snapshot_time.stop(); inc_new_counter_info!("add-snapshot-ms", add_snapshot_time.as_ms() as usize); - // Package the relevant snapshots - let highest_bank_snapshot_info = get_highest_bank_snapshot_info(&bank_snapshots_dir) - .expect("no snapshots found in config bank_snapshots_dir"); - - let accounts_package = AccountsPackage::new_for_full_snapshot( + let accounts_package = AccountsPackage::new( root_bank, - &highest_bank_snapshot_info, + &bank_snapshot_info, bank_snapshots_dir, status_cache_slot_deltas, snapshot_archives_dir, - storages, - *archive_format, + snapshot_storages, + archive_format, snapshot_version, hash_for_testing, - )?; + snapshot_type, + ) + .expect("failed to hard link bank snapshot into a tmpdir"); accounts_package_sender.send(accounts_package)?; @@ -1579,7 +1593,6 @@ pub fn bank_to_full_snapshot_archive( snapshot_version: Option, snapshot_archives_dir: impl AsRef, archive_format: ArchiveFormat, - thread_pool: Option<&ThreadPool>, maximum_snapshots_to_retain: usize, ) -> Result { let snapshot_version = snapshot_version.unwrap_or_default(); @@ -1592,18 +1605,18 @@ pub fn bank_to_full_snapshot_archive( bank.rehash(); // Bank accounts may have been manually modified by the caller let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?; - let storages = bank.get_snapshot_storages(None); - let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; + let snapshot_storages = bank.get_snapshot_storages(None); + let bank_snapshot_info = + add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?; - package_process_and_archive_full_snapshot( + package_and_archive_full_snapshot( bank, &bank_snapshot_info, &temp_dir, snapshot_archives_dir, - storages, + snapshot_storages, archive_format, snapshot_version, - thread_pool, maximum_snapshots_to_retain, ) } @@ -1621,7 +1634,6 @@ pub fn bank_to_incremental_snapshot_archive( snapshot_version: Option, snapshot_archives_dir: impl AsRef, archive_format: ArchiveFormat, - thread_pool: Option<&ThreadPool>, maximum_snapshots_to_retain: usize, ) -> Result { let snapshot_version = snapshot_version.unwrap_or_default(); @@ -1635,25 +1647,25 @@ pub fn bank_to_incremental_snapshot_archive( bank.rehash(); // Bank accounts may have been manually modified by the caller let temp_dir = tempfile::tempdir_in(bank_snapshots_dir)?; - let storages = bank.get_snapshot_storages(Some(full_snapshot_slot)); - let bank_snapshot_info = add_bank_snapshot(&temp_dir, bank, &storages, snapshot_version)?; + let snapshot_storages = bank.get_snapshot_storages(Some(full_snapshot_slot)); + let bank_snapshot_info = + add_bank_snapshot(&temp_dir, bank, &snapshot_storages, snapshot_version)?; - package_process_and_archive_incremental_snapshot( + package_and_archive_incremental_snapshot( bank, full_snapshot_slot, &bank_snapshot_info, &temp_dir, snapshot_archives_dir, - storages, + snapshot_storages, archive_format, snapshot_version, - thread_pool, maximum_snapshots_to_retain, ) } /// Helper function to hold shared code to package, process, and archive full snapshots -pub fn package_process_and_archive_full_snapshot( +pub fn package_and_archive_full_snapshot( bank: &Bank, bank_snapshot_info: &BankSnapshotInfo, bank_snapshots_dir: impl AsRef, @@ -1661,10 +1673,9 @@ pub fn package_process_and_archive_full_snapshot( snapshot_storages: SnapshotStorages, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, - thread_pool: Option<&ThreadPool>, maximum_snapshots_to_retain: usize, ) -> Result { - let accounts_package = AccountsPackage::new_for_full_snapshot( + let accounts_package = AccountsPackage::new( bank, bank_snapshot_info, bank_snapshots_dir, @@ -1674,14 +1685,11 @@ pub fn package_process_and_archive_full_snapshot( archive_format, snapshot_version, None, + Some(SnapshotType::FullSnapshot), )?; - let snapshot_package = process_and_archive_accounts_package( - accounts_package, - thread_pool, - None, - maximum_snapshots_to_retain, - )?; + let snapshot_package = SnapshotPackage::from(accounts_package); + archive_snapshot_package(&snapshot_package, maximum_snapshots_to_retain)?; Ok(FullSnapshotArchiveInfo::new( snapshot_package.snapshot_archive_info, @@ -1690,7 +1698,7 @@ pub fn package_process_and_archive_full_snapshot( /// Helper function to hold shared code to package, process, and archive incremental snapshots #[allow(clippy::too_many_arguments)] -pub fn package_process_and_archive_incremental_snapshot( +pub fn package_and_archive_incremental_snapshot( bank: &Bank, incremental_snapshot_base_slot: Slot, bank_snapshot_info: &BankSnapshotInfo, @@ -1699,12 +1707,10 @@ pub fn package_process_and_archive_incremental_snapshot( snapshot_storages: SnapshotStorages, archive_format: ArchiveFormat, snapshot_version: SnapshotVersion, - thread_pool: Option<&ThreadPool>, maximum_snapshots_to_retain: usize, ) -> Result { - let accounts_package = AccountsPackage::new_for_incremental_snapshot( + let accounts_package = AccountsPackage::new( bank, - incremental_snapshot_base_slot, bank_snapshot_info, bank_snapshots_dir, bank.src.slot_deltas(&bank.src.roots()), @@ -1713,14 +1719,13 @@ pub fn package_process_and_archive_incremental_snapshot( archive_format, snapshot_version, None, + Some(SnapshotType::IncrementalSnapshot( + incremental_snapshot_base_slot, + )), )?; - let snapshot_package = process_and_archive_accounts_package( - accounts_package, - thread_pool, - Some(incremental_snapshot_base_slot), - maximum_snapshots_to_retain, - )?; + let snapshot_package = SnapshotPackage::from(accounts_package); + archive_snapshot_package(&snapshot_package, maximum_snapshots_to_retain)?; Ok(IncrementalSnapshotArchiveInfo::new( incremental_snapshot_base_slot, @@ -1728,89 +1733,6 @@ pub fn package_process_and_archive_incremental_snapshot( )) } -/// Helper function to hold shared code to process and archive accounts packages -fn process_and_archive_accounts_package( - accounts_package: AccountsPackage, - thread_pool: Option<&ThreadPool>, - incremental_snapshot_base_slot: Option, - maximum_snapshots_to_retain: usize, -) -> Result { - let snapshot_package = process_accounts_package( - accounts_package, - thread_pool, - incremental_snapshot_base_slot, - ); - - archive_snapshot_package(&snapshot_package, maximum_snapshots_to_retain)?; - - Ok(snapshot_package) -} - -pub fn process_accounts_package( - accounts_package: AccountsPackage, - thread_pool: Option<&ThreadPool>, - incremental_snapshot_base_slot: Option, -) -> SnapshotPackage { - let mut time = Measure::start("hash"); - - let hash = accounts_package.hash; // temporarily remaining here - if let Some(expected_hash) = accounts_package.hash_for_testing { - let sorted_storages = SortedStorages::new(&accounts_package.storages); - let (hash, lamports) = AccountsDb::calculate_accounts_hash_without_index( - &sorted_storages, - thread_pool, - crate::accounts_hash::HashStats::default(), - false, - None, - ) - .unwrap(); - - assert_eq!(accounts_package.expected_capitalization, lamports); - - assert_eq!(expected_hash, hash); - }; - time.stop(); - - datapoint_info!( - "accounts_hash_verifier", - ("calculate_hash", time.as_us(), i64), - ); - - let snapshot_archive_path = match incremental_snapshot_base_slot { - None => build_full_snapshot_archive_path( - accounts_package.snapshot_archives_dir, - accounts_package.slot, - &hash, - accounts_package.archive_format, - ), - Some(incremental_snapshot_base_slot) => build_incremental_snapshot_archive_path( - accounts_package.snapshot_archives_dir, - incremental_snapshot_base_slot, - accounts_package.slot, - &hash, - accounts_package.archive_format, - ), - }; - - let snapshot_type = match incremental_snapshot_base_slot { - None => SnapshotType::FullSnapshot, - Some(_) => SnapshotType::IncrementalSnapshot, - }; - - SnapshotPackage::new( - accounts_package.slot, - accounts_package.block_height, - accounts_package.slot_deltas, - accounts_package.snapshot_links, - accounts_package.storages, - snapshot_archive_path, - hash, - accounts_package.archive_format, - accounts_package.snapshot_version, - snapshot_type, - ) -} - #[cfg(test)] mod tests { use super::*; @@ -2491,7 +2413,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, 1, ) .unwrap(); @@ -2582,7 +2503,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, std::usize::MAX, ) .unwrap(); @@ -2659,7 +2579,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, std::usize::MAX, ) .unwrap(); @@ -2692,7 +2611,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, std::usize::MAX, ) .unwrap(); @@ -2759,7 +2677,6 @@ mod tests { None, &snapshot_archives_dir, snapshot_archive_format, - None, std::usize::MAX, ) .unwrap(); @@ -2792,7 +2709,6 @@ mod tests { None, &snapshot_archives_dir, snapshot_archive_format, - None, std::usize::MAX, ) .unwrap(); @@ -2891,7 +2807,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap(); @@ -2933,7 +2848,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap(); @@ -2994,7 +2908,6 @@ mod tests { None, snapshot_archives_dir.path(), snapshot_archive_format, - None, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, ) .unwrap();