diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index bedcff6ab4..26f3456f6f 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -5,19 +5,22 @@ // set and halt the node if a mismatch is detected. use { - crossbeam_channel::RecvTimeoutError, solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}, solana_measure::measure::Measure, solana_runtime::{ accounts_hash::{CalcAccountsHashConfig, HashStats}, snapshot_config::SnapshotConfig, snapshot_package::{ - AccountsPackage, AccountsPackageReceiver, PendingSnapshotPackage, SnapshotPackage, + AccountsPackage, PendingAccountsPackage, PendingSnapshotPackage, SnapshotPackage, SnapshotType, }, sorted_storages::SortedStorages, }, - solana_sdk::{clock::Slot, hash::Hash, pubkey::Pubkey}, + solana_sdk::{ + clock::{Slot, SLOT_MS}, + hash::Hash, + pubkey::Pubkey, + }, std::{ collections::{HashMap, HashSet}, sync::{ @@ -35,7 +38,7 @@ pub struct AccountsHashVerifier { impl AccountsHashVerifier { pub fn new( - accounts_package_receiver: AccountsPackageReceiver, + pending_accounts_package: PendingAccountsPackage, pending_snapshot_package: Option, exit: &Arc, cluster_info: &Arc, @@ -55,23 +58,24 @@ impl AccountsHashVerifier { break; } - match accounts_package_receiver.recv_timeout(Duration::from_secs(1)) { - Ok(accounts_package) => { - Self::process_accounts_package( - accounts_package, - &cluster_info, - known_validators.as_ref(), - halt_on_known_validators_accounts_hash_mismatch, - pending_snapshot_package.as_ref(), - &mut hashes, - &exit, - fault_injection_rate_slots, - snapshot_config.as_ref(), - ); - } - Err(RecvTimeoutError::Disconnected) => break, - Err(RecvTimeoutError::Timeout) => (), + let accounts_package = pending_accounts_package.lock().unwrap().take(); + if accounts_package.is_none() { + std::thread::sleep(Duration::from_millis(SLOT_MS)); + continue; } + let accounts_package = accounts_package.unwrap(); + + Self::process_accounts_package( + accounts_package, + &cluster_info, + known_validators.as_ref(), + halt_on_known_validators_accounts_hash_mismatch, + pending_snapshot_package.as_ref(), + &mut hashes, + &exit, + fault_injection_rate_slots, + snapshot_config.as_ref(), + ); } }) .unwrap(); diff --git a/core/src/tvu.rs b/core/src/tvu.rs index 95463e6ced..e3021abd88 100644 --- a/core/src/tvu.rs +++ b/core/src/tvu.rs @@ -48,9 +48,7 @@ use { commitment::BlockCommitmentCache, cost_model::CostModel, snapshot_config::SnapshotConfig, - snapshot_package::{ - AccountsPackageReceiver, AccountsPackageSender, PendingSnapshotPackage, - }, + snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage}, transaction_cost_metrics_sender::{ TransactionCostMetricsSender, TransactionCostMetricsService, }, @@ -143,7 +141,7 @@ impl Tvu { tvu_config: TvuConfig, max_slots: &Arc, cost_model: &Arc>, - accounts_package_channel: (AccountsPackageSender, AccountsPackageReceiver), + pending_accounts_package: PendingAccountsPackage, last_full_snapshot_slot: Option, block_metadata_notifier: Option, wait_to_vote_slot: Option, @@ -220,9 +218,8 @@ impl Tvu { (Some(snapshot_config), Some(pending_snapshot_package)) }) .unwrap_or((None, None)); - let (accounts_package_sender, accounts_package_receiver) = accounts_package_channel; let accounts_hash_verifier = AccountsHashVerifier::new( - accounts_package_receiver, + Arc::clone(&pending_accounts_package), pending_snapshot_package, exit, cluster_info, @@ -241,7 +238,7 @@ impl Tvu { Some(SnapshotRequestHandler { snapshot_config, snapshot_request_receiver, - accounts_package_sender, + pending_accounts_package, }), ) } @@ -444,7 +441,6 @@ pub mod tests { let (_, gossip_confirmed_slots_receiver) = unbounded(); let bank_forks = Arc::new(RwLock::new(bank_forks)); let tower = Tower::default(); - let accounts_package_channel = unbounded(); let max_complete_transaction_status_slot = Arc::new(AtomicU64::default()); let (_pruned_banks_sender, pruned_banks_receiver) = unbounded(); let tvu = Tvu::new( @@ -492,7 +488,7 @@ pub mod tests { TvuConfig::default(), &Arc::new(MaxSlots::default()), &Arc::new(RwLock::new(CostModel::default())), - accounts_package_channel, + PendingAccountsPackage::default(), None, None, None, diff --git a/core/src/validator.rs b/core/src/validator.rs index 49d3a11c68..e5c101e218 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -80,7 +80,7 @@ use { snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, - snapshot_package::{AccountsPackageSender, PendingSnapshotPackage}, + snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage}, snapshot_utils, }, solana_sdk::{ @@ -460,8 +460,6 @@ impl Validator { .register_exit(Box::new(move || exit.store(true, Ordering::Relaxed))); } - let accounts_package_channel = unbounded(); - let accounts_update_notifier = geyser_plugin_service .as_ref() .and_then(|geyser_plugin_service| geyser_plugin_service.get_accounts_update_notifier()); @@ -520,6 +518,7 @@ impl Validator { Some(poh_timing_point_sender.clone()), ); + let pending_accounts_package = PendingAccountsPackage::default(); let last_full_snapshot_slot = process_blockstore( &blockstore, &mut bank_forks, @@ -528,7 +527,7 @@ impl Validator { transaction_status_sender.as_ref(), cache_block_meta_sender.as_ref(), config.snapshot_config.as_ref(), - accounts_package_channel.0.clone(), + Arc::clone(&pending_accounts_package), blockstore_root_scan, pruned_banks_receiver.clone(), ); @@ -934,7 +933,7 @@ impl Validator { }, &max_slots, &cost_model, - accounts_package_channel, + pending_accounts_package, last_full_snapshot_slot, block_metadata_notifier, config.wait_to_vote_slot, @@ -1414,7 +1413,7 @@ fn process_blockstore( transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, - accounts_package_sender: AccountsPackageSender, + pending_accounts_package: PendingAccountsPackage, blockstore_root_scan: BlockstoreRootScan, pruned_banks_receiver: DroppedSlotsReceiver, ) -> Option { @@ -1426,7 +1425,7 @@ fn process_blockstore( transaction_status_sender, cache_block_meta_sender, snapshot_config, - accounts_package_sender, + pending_accounts_package, pruned_banks_receiver, ) .unwrap_or_else(|err| { diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 02350295c8..32bddd146f 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -69,7 +69,8 @@ mod tests { snapshot_archive_info::FullSnapshotArchiveInfo, snapshot_config::SnapshotConfig, snapshot_package::{ - AccountsPackage, PendingSnapshotPackage, SnapshotPackage, SnapshotType, + AccountsPackage, PendingAccountsPackage, PendingSnapshotPackage, SnapshotPackage, + SnapshotType, }, snapshot_utils::{self, ArchiveFormat, SnapshotVersion}, status_cache::MAX_CACHE_ENTRIES, @@ -247,12 +248,11 @@ mod tests { let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; let (s, snapshot_request_receiver) = unbounded(); - let (accounts_package_sender, _r) = unbounded(); let request_sender = AbsRequestSender::new(Some(s)); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - accounts_package_sender, + pending_accounts_package: PendingAccountsPackage::default(), }; for slot in 1..=last_slot { let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot); @@ -366,8 +366,8 @@ mod tests { .unwrap(); // Set up snapshotting channels - let (sender, receiver) = unbounded(); - let (fake_sender, _fake_receiver) = unbounded(); + let real_pending_accounts_package = PendingAccountsPackage::default(); + let fake_pending_accounts_package = PendingAccountsPackage::default(); // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted // and the snapshot purging logic will run on every snapshot taken. This means the three @@ -394,21 +394,21 @@ mod tests { bank.squash(); let accounts_hash = bank.update_accounts_hash(); - let package_sender = { + let pending_accounts_package = { if slot == saved_slot as u64 { - // Only send one package on the real sender so that the packaging service - // doesn't take forever to run the packaging logic on all MAX_CACHE_ENTRIES - // later - &sender + // Only send one package on the real pending_accounts_package so that the + // packaging service doesn't take forever to run the packaging logic on all + // MAX_CACHE_ENTRIES later + &real_pending_accounts_package } else { - &fake_sender + &fake_pending_accounts_package } }; snapshot_utils::snapshot_bank( &bank, vec![], - package_sender, + pending_accounts_package, bank_snapshots_dir, snapshot_archives_dir, snapshot_config.snapshot_version, @@ -506,15 +506,16 @@ mod tests { let _package_receiver = std::thread::Builder::new() .name("package-receiver".to_string()) .spawn(move || { - while let Ok(mut accounts_package) = receiver.recv() { - // Only package the latest - while let Ok(new_accounts_package) = receiver.try_recv() { - accounts_package = new_accounts_package; - } - - let snapshot_package = SnapshotPackage::from(accounts_package); - *pending_snapshot_package.lock().unwrap() = Some(snapshot_package); - } + let accounts_package = real_pending_accounts_package + .lock() + .unwrap() + .take() + .unwrap(); + let snapshot_package = SnapshotPackage::from(accounts_package); + pending_snapshot_package + .lock() + .unwrap() + .replace(snapshot_package); // Wait until the package is consumed by SnapshotPackagerService while pending_snapshot_package.lock().unwrap().is_some() { @@ -526,10 +527,6 @@ mod tests { }) .unwrap(); - // Close the channel so that the package receiver will exit after reading all the - // packages off the channel - drop(sender); - // Wait for service to finish snapshot_packager_service .join() @@ -669,12 +666,11 @@ mod tests { let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); - let (accounts_package_sender, _accounts_package_receiver) = unbounded(); let request_sender = AbsRequestSender::new(Some(snapshot_request_sender)); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - accounts_package_sender, + pending_accounts_package: PendingAccountsPackage::default(), }; let mut last_full_snapshot_slot = None; @@ -892,7 +888,7 @@ mod tests { let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); - let (accounts_package_sender, accounts_package_receiver) = unbounded(); + let pending_accounts_package = PendingAccountsPackage::default(); let pending_snapshot_package = PendingSnapshotPackage::default(); let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks)); @@ -912,7 +908,7 @@ mod tests { let snapshot_request_handler = Some(SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - accounts_package_sender, + pending_accounts_package: Arc::clone(&pending_accounts_package), }); let abs_request_handler = AbsRequestHandler { snapshot_request_handler, @@ -930,7 +926,7 @@ mod tests { ); let accounts_hash_verifier = AccountsHashVerifier::new( - accounts_package_receiver, + pending_accounts_package, Some(pending_snapshot_package), &exit, &cluster_info, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index 067bf9ed53..7e612513fe 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -5,7 +5,6 @@ use { crate_description, crate_name, value_t, value_t_or_exit, values_t_or_exit, App, AppSettings, Arg, ArgMatches, SubCommand, }, - crossbeam_channel::unbounded, dashmap::DashMap, itertools::Itertools, log::*, @@ -43,6 +42,7 @@ use { snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, + snapshot_package::PendingAccountsPackage, snapshot_utils::{ self, ArchiveFormat, SnapshotVersion, DEFAULT_MAX_FULL_SNAPSHOT_ARCHIVES_TO_RETAIN, DEFAULT_MAX_INCREMENTAL_SNAPSHOT_ARCHIVES_TO_RETAIN, @@ -768,7 +768,6 @@ fn load_bank_forks( vec![non_primary_accounts_path] }; - let (accounts_package_sender, _) = unbounded(); bank_forks_utils::load( genesis_config, blockstore, @@ -778,7 +777,7 @@ fn load_bank_forks( process_options, None, None, - accounts_package_sender, + PendingAccountsPackage::default(), None, ) .map(|(bank_forks, .., starting_snapshot_hashes)| (bank_forks, starting_snapshot_hashes)) diff --git a/ledger/src/bank_forks_utils.rs b/ledger/src/bank_forks_utils.rs index efaf5aca99..093689292d 100644 --- a/ledger/src/bank_forks_utils.rs +++ b/ledger/src/bank_forks_utils.rs @@ -16,7 +16,7 @@ use { snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, snapshot_hash::{FullSnapshotHash, IncrementalSnapshotHash, StartingSnapshotHashes}, - snapshot_package::AccountsPackageSender, + snapshot_package::PendingAccountsPackage, snapshot_utils, }, solana_sdk::genesis_config::GenesisConfig, @@ -46,7 +46,7 @@ pub fn load( process_options: ProcessOptions, transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, - accounts_package_sender: AccountsPackageSender, + pending_accounts_package: PendingAccountsPackage, accounts_update_notifier: Option, ) -> LoadResult { let (mut bank_forks, leader_schedule_cache, starting_snapshot_hashes, pruned_banks_receiver) = @@ -69,7 +69,7 @@ pub fn load( transaction_status_sender, cache_block_meta_sender, snapshot_config, - accounts_package_sender, + pending_accounts_package, pruned_banks_receiver, ) .map(|_| (bank_forks, leader_schedule_cache, starting_snapshot_hashes)) diff --git a/ledger/src/blockstore_processor.rs b/ledger/src/blockstore_processor.rs index a6190eb8af..3dd7a223c7 100644 --- a/ledger/src/blockstore_processor.rs +++ b/ledger/src/blockstore_processor.rs @@ -4,7 +4,7 @@ use { blockstore_meta::SlotMeta, leader_schedule_cache::LeaderScheduleCache, }, chrono_humanize::{Accuracy, HumanTime, Tense}, - crossbeam_channel::{unbounded, Sender}, + crossbeam_channel::Sender, itertools::Itertools, log::*, rand::{seq::SliceRandom, thread_rng}, @@ -31,7 +31,7 @@ use { commitment::VOTE_THRESHOLD_SIZE, cost_model::CostModel, snapshot_config::SnapshotConfig, - snapshot_package::{AccountsPackageSender, SnapshotType}, + snapshot_package::{PendingAccountsPackage, SnapshotType}, snapshot_utils, transaction_batch::TransactionBatch, transaction_cost_metrics_sender::TransactionCostMetricsSender, @@ -579,7 +579,6 @@ pub fn test_process_blockstore( None, None, ); - let (accounts_package_sender, _) = unbounded(); process_blockstore_from_root( blockstore, &mut bank_forks, @@ -588,7 +587,7 @@ pub fn test_process_blockstore( None, None, None, - accounts_package_sender, + PendingAccountsPackage::default(), pruned_banks_receiver, ) .unwrap(); @@ -639,7 +638,7 @@ pub fn process_blockstore_from_root( transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, - accounts_package_sender: AccountsPackageSender, + pending_accounts_package: PendingAccountsPackage, pruned_banks_receiver: DroppedSlotsReceiver, ) -> result::Result, BlockstoreProcessorError> { if let Some(num_threads) = opts.override_num_threads { @@ -697,7 +696,7 @@ pub fn process_blockstore_from_root( transaction_status_sender, cache_block_meta_sender, snapshot_config, - accounts_package_sender, + pending_accounts_package, &mut timing, &mut last_full_snapshot_slot, pruned_banks_receiver, @@ -1119,7 +1118,7 @@ fn load_frozen_forks( transaction_status_sender: Option<&TransactionStatusSender>, cache_block_meta_sender: Option<&CacheBlockMetaSender>, snapshot_config: Option<&SnapshotConfig>, - accounts_package_sender: AccountsPackageSender, + pending_accounts_package: PendingAccountsPackage, timing: &mut ExecuteTimings, last_full_snapshot_slot: &mut Option, pruned_banks_receiver: DroppedSlotsReceiver, @@ -1273,7 +1272,7 @@ fn load_frozen_forks( snapshot_utils::snapshot_bank( new_root_bank, new_root_bank.src.slot_deltas(&new_root_bank.src.roots()), - &accounts_package_sender, + &pending_accounts_package, &snapshot_config.bank_snapshots_dir, &snapshot_config.snapshot_archives_dir, snapshot_config.snapshot_version, @@ -3163,8 +3162,7 @@ pub mod tests { let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank1); // Test process_blockstore_from_root() from slot 1 onwards - let (accounts_package_sender, _) = unbounded(); - let (_pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let (_pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded(); process_blockstore_from_root( &blockstore, &mut bank_forks, @@ -3173,7 +3171,7 @@ pub mod tests { None, None, None, - accounts_package_sender, + PendingAccountsPackage::default(), pruned_banks_receiver, ) .unwrap(); @@ -3272,10 +3270,10 @@ pub mod tests { ..SnapshotConfig::default() }; - let (accounts_package_sender, accounts_package_receiver) = unbounded(); + let pending_accounts_package = PendingAccountsPackage::default(); let leader_schedule_cache = LeaderScheduleCache::new_from_bank(&bank); - let (_pruned_banks_sender, pruned_banks_receiver) = unbounded(); + let (_pruned_banks_sender, pruned_banks_receiver) = crossbeam_channel::unbounded(); process_blockstore_from_root( &blockstore, &mut bank_forks, @@ -3284,23 +3282,26 @@ pub mod tests { None, None, Some(&snapshot_config), - accounts_package_sender.clone(), + Arc::clone(&pending_accounts_package), pruned_banks_receiver, ) .unwrap(); - // The `drop()` is necessary here in order to call `.iter()` on the channel below - drop(accounts_package_sender); - - // Ensure all the AccountsPackages were created and sent to the AccountsPackageReceiver - let received_accounts_package_slots = accounts_package_receiver - .iter() - .map(|accounts_package| accounts_package.slot) - .collect::>(); - let expected_slots = (slot_start_processing..=LAST_SLOT) + // Ensure the last AccountsPackage was created and is pending + let received_accounts_package_slot = pending_accounts_package + .lock() + .unwrap() + .take() + .unwrap() + .slot; + let expected_accounts_package_slot = (slot_start_processing..=LAST_SLOT) .filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0) - .collect::>(); - assert_eq!(received_accounts_package_slots, expected_slots); + .max() + .unwrap(); + assert_eq!( + received_accounts_package_slot, + expected_accounts_package_slot + ); // Ensure all the bank snapshots were created let bank_snapshots = snapshot_utils::get_bank_snapshots(&bank_snapshots_tempdir); @@ -3309,6 +3310,9 @@ pub mod tests { .map(|bank_snapshot| bank_snapshot.slot) .collect::>(); bank_snapshot_slots.sort_unstable(); + let expected_slots = (slot_start_processing..=LAST_SLOT) + .filter(|slot| slot % FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS == 0) + .collect::>(); assert_eq!(bank_snapshot_slots, expected_slots); } @@ -3615,7 +3619,7 @@ pub mod tests { }) .collect(); let entry = next_entry(&bank_1_blockhash, 1, vote_txs); - let (replay_vote_sender, replay_vote_receiver) = unbounded(); + let (replay_vote_sender, replay_vote_receiver) = crossbeam_channel::unbounded(); let _ = process_entries_for_tests(&bank1, vec![entry], true, None, Some(&replay_vote_sender)); let successes: BTreeSet = replay_vote_receiver diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index 369f9d7a11..49c3dce3d4 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -7,7 +7,7 @@ use { bank::{Bank, BankSlotDelta, DropCallback}, bank_forks::BankForks, snapshot_config::SnapshotConfig, - snapshot_package::{AccountsPackageSender, SnapshotType}, + snapshot_package::{PendingAccountsPackage, SnapshotType}, snapshot_utils::{self, SnapshotError}, }, crossbeam_channel::{Receiver, SendError, Sender}, @@ -85,7 +85,7 @@ pub struct SnapshotRequest { pub struct SnapshotRequestHandler { pub snapshot_config: SnapshotConfig, pub snapshot_request_receiver: SnapshotRequestReceiver, - pub accounts_package_sender: AccountsPackageSender, + pub pending_accounts_package: PendingAccountsPackage, } impl SnapshotRequestHandler { @@ -198,7 +198,7 @@ impl SnapshotRequestHandler { let result = snapshot_utils::snapshot_bank( &snapshot_root_bank, status_cache_slot_deltas, - &self.accounts_package_sender, + &self.pending_accounts_package, &self.snapshot_config.bank_snapshots_dir, &self.snapshot_config.snapshot_archives_dir, self.snapshot_config.snapshot_version, @@ -269,7 +269,6 @@ impl SnapshotRequestHandler { SnapshotError::ArchiveGenerationFailure(..) => true, SnapshotError::StoragePathSymlinkInvalid => true, SnapshotError::UnpackError(..) => true, - SnapshotError::AccountsPackageSendError(..) => true, SnapshotError::IoWithSource(..) => true, SnapshotError::PathToFileNameError(..) => true, SnapshotError::FileNameToStrError(..) => true, diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index fe58afa4f1..991f6cabf6 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -10,7 +10,6 @@ use { TMP_BANK_SNAPSHOT_PREFIX, }, }, - crossbeam_channel::{Receiver, SendError, Sender}, log::*, solana_sdk::{ clock::Slot, genesis_config::ClusterType, hash::Hash, sysvar::epoch_schedule::EpochSchedule, @@ -23,14 +22,9 @@ use { tempfile::TempDir, }; -/// The sender side of the AccountsPackage channel, used by AccountsBackgroundService -pub type AccountsPackageSender = Sender; - -/// The receiver side of the AccountsPackage channel, used by AccountsHashVerifier -pub type AccountsPackageReceiver = Receiver; - -/// The error type when sending an AccountsPackage over the channel fails -pub type AccountsPackageSendError = SendError; +/// The PendingAccountsPackage passes an AccountsPackage from AccountsBackgroundService to +/// AccountsHashVerifier for hashing +pub type PendingAccountsPackage = Arc>>; /// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to /// SnapshotPackagerService for archiving diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 635d182cca..439cb62f2a 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -14,8 +14,7 @@ use { FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter, }, snapshot_package::{ - AccountsPackage, AccountsPackageSendError, AccountsPackageSender, SnapshotPackage, - SnapshotType, + AccountsPackage, PendingAccountsPackage, SnapshotPackage, SnapshotType, }, }, bincode::{config::Options, serialize_into}, @@ -179,9 +178,6 @@ pub enum SnapshotError { #[error("Unpack error: {0}")] UnpackError(#[from] UnpackError), - #[error("accounts package send error")] - AccountsPackageSendError(#[from] AccountsPackageSendError), - #[error("source({1}) - I/O error: {0}")] IoWithSource(std::io::Error, &'static str), @@ -1638,7 +1634,7 @@ where pub fn snapshot_bank( root_bank: &Bank, status_cache_slot_deltas: Vec, - accounts_package_sender: &AccountsPackageSender, + pending_accounts_package: &PendingAccountsPackage, bank_snapshots_dir: impl AsRef, snapshot_archives_dir: impl AsRef, snapshot_version: SnapshotVersion, @@ -1672,7 +1668,23 @@ pub fn snapshot_bank( ) .expect("failed to hard link bank snapshot into a tmpdir"); - accounts_package_sender.send(accounts_package)?; + if can_submit_accounts_package(&accounts_package, pending_accounts_package) { + let old_accounts_package = pending_accounts_package + .lock() + .unwrap() + .replace(accounts_package); + if let Some(old_accounts_package) = old_accounts_package { + debug!( + "The pending AccountsPackage has been overwritten: \ + \nNew AccountsPackage slot: {}, snapshot type: {:?} \ + \nOld AccountsPackage slot: {}, snapshot type: {:?}", + root_bank.slot(), + snapshot_type, + old_accounts_package.slot, + old_accounts_package.snapshot_type, + ); + } + } Ok(()) } @@ -1877,6 +1889,37 @@ pub fn should_take_incremental_snapshot( && last_full_snapshot_slot.is_some() } +/// Decide if an accounts package can be submitted to the PendingAccountsPackage +/// +/// This is based on the values for `snapshot_type` in both the `accounts_package` and the +/// `pending_accounts_package`: +/// - if the new AccountsPackage is for a full snapshot, always submit +/// - if the new AccountsPackage is for an incremental snapshot, submit as long as there isn't a +/// pending full snapshot +/// - otherwise, only submit the new AccountsPackage as long as there's not a pending package +/// destined for a snapshot archive +fn can_submit_accounts_package( + accounts_package: &AccountsPackage, + pending_accounts_package: &PendingAccountsPackage, +) -> bool { + match accounts_package.snapshot_type { + Some(SnapshotType::FullSnapshot) => true, + Some(SnapshotType::IncrementalSnapshot(_)) => pending_accounts_package + .lock() + .unwrap() + .as_ref() + .and_then(|old_accounts_package| old_accounts_package.snapshot_type) + .map(|old_snapshot_type| !old_snapshot_type.is_full_snapshot()) + .unwrap_or(true), + None => pending_accounts_package + .lock() + .unwrap() + .as_ref() + .map(|old_accounts_package| old_accounts_package.snapshot_type.is_none()) + .unwrap_or(true), + } +} + #[cfg(test)] mod tests { use { @@ -3263,4 +3306,84 @@ mod tests { "Ensure Account1 has not been brought back from the dead" ); } + + /// All the permutations of `snapshot_type` for the new-and-old accounts packages: + /// + /// new | old | + /// snapshot | snapshot | + /// type | type | result + /// ----------+----------+-------- + /// FSS | FSS | true + /// FSS | ISS | true + /// FSS | None | true + /// ISS | FSS | false + /// ISS | ISS | true + /// ISS | None | true + /// None | FSS | false + /// None | ISS | false + /// None | None | true + #[test] + fn test_can_submit_accounts_package() { + /// helper function to create an AccountsPackage that's good enough for this test + fn new_accounts_package_with(snapshot_type: Option) -> AccountsPackage { + AccountsPackage { + slot: Slot::default(), + block_height: Slot::default(), + slot_deltas: Vec::default(), + snapshot_links: TempDir::new().unwrap(), + snapshot_storages: SnapshotStorages::default(), + accounts_hash: Hash::default(), + archive_format: ArchiveFormat::Tar, + snapshot_version: SnapshotVersion::default(), + snapshot_archives_dir: PathBuf::default(), + expected_capitalization: u64::default(), + accounts_hash_for_testing: None, + cluster_type: solana_sdk::genesis_config::ClusterType::Development, + snapshot_type, + accounts: Arc::new(crate::accounts::Accounts::default_for_tests()), + epoch_schedule: solana_sdk::epoch_schedule::EpochSchedule::default(), + rent_collector: crate::rent_collector::RentCollector::default(), + } + } + + let pending_accounts_package = PendingAccountsPackage::default(); + for (new_snapshot_type, old_snapshot_type, expected_result) in [ + ( + Some(SnapshotType::FullSnapshot), + Some(SnapshotType::FullSnapshot), + true, + ), + ( + Some(SnapshotType::FullSnapshot), + Some(SnapshotType::IncrementalSnapshot(0)), + true, + ), + (Some(SnapshotType::FullSnapshot), None, true), + ( + Some(SnapshotType::IncrementalSnapshot(0)), + Some(SnapshotType::FullSnapshot), + false, + ), + ( + Some(SnapshotType::IncrementalSnapshot(0)), + Some(SnapshotType::IncrementalSnapshot(0)), + true, + ), + (Some(SnapshotType::IncrementalSnapshot(0)), None, true), + (None, Some(SnapshotType::FullSnapshot), false), + (None, Some(SnapshotType::IncrementalSnapshot(0)), false), + (None, None, true), + ] { + let new_accounts_package = new_accounts_package_with(new_snapshot_type); + let old_accounts_package = new_accounts_package_with(old_snapshot_type); + pending_accounts_package + .lock() + .unwrap() + .replace(old_accounts_package); + + let actual_result = + can_submit_accounts_package(&new_accounts_package, &pending_accounts_package); + assert_eq!(expected_result, actual_result); + } + } }