From 9cbd00fdbced1258245ac2f1da8afde58281cc05 Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Thu, 13 Oct 2022 12:47:36 -0400 Subject: [PATCH] Converts PendingAccountsPackage to a channel (#28352) --- core/src/accounts_hash_verifier.rs | 280 ++++++++++++-- core/src/validator.rs | 9 +- core/tests/epoch_accounts_hash.rs | 9 +- core/tests/snapshots.rs | 47 +-- ledger-tool/src/main.rs | 4 +- runtime/src/accounts_background_service.rs | 6 +- runtime/src/snapshot_package.rs | 40 +- runtime/src/snapshot_package/compare.rs | 404 +++++++++++++++++++++ runtime/src/snapshot_utils.rs | 248 +------------ 9 files changed, 739 insertions(+), 308 deletions(-) create mode 100644 runtime/src/snapshot_package/compare.rs diff --git a/core/src/accounts_hash_verifier.rs b/core/src/accounts_hash_verifier.rs index c594fa9038..ead8f197e3 100644 --- a/core/src/accounts_hash_verifier.rs +++ b/core/src/accounts_hash_verifier.rs @@ -5,14 +5,15 @@ // set and halt the node if a mismatch is detected. use { + crossbeam_channel::{Receiver, Sender}, solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES}, - solana_measure::measure::Measure, + solana_measure::{measure, measure::Measure}, solana_runtime::{ accounts_hash::{CalcAccountsHashConfig, HashStats}, epoch_accounts_hash::EpochAccountsHash, snapshot_config::SnapshotConfig, snapshot_package::{ - retain_max_n_elements, AccountsPackage, AccountsPackageType, PendingAccountsPackage, + self, retain_max_n_elements, AccountsPackage, AccountsPackageType, PendingSnapshotPackage, SnapshotPackage, SnapshotType, }, sorted_storages::SortedStorages, @@ -39,7 +40,8 @@ pub struct AccountsHashVerifier { impl AccountsHashVerifier { pub fn new( - pending_accounts_package: PendingAccountsPackage, + accounts_package_sender: Sender, + accounts_package_receiver: Receiver, pending_snapshot_package: Option, exit: &Arc, cluster_info: &Arc, @@ -48,6 +50,8 @@ impl AccountsHashVerifier { fault_injection_rate_slots: u64, snapshot_config: Option, ) -> Self { + // If there are no accounts packages to process, limit how often we re-check + const LOOP_LIMITER: Duration = Duration::from_millis(SLOT_MS); let exit = exit.clone(); let cluster_info = cluster_info.clone(); let t_accounts_hash_verifier = Builder::new() @@ -59,30 +63,45 @@ impl AccountsHashVerifier { break; } - let accounts_package = pending_accounts_package.lock().unwrap().take(); - if accounts_package.is_none() { - std::thread::sleep(Duration::from_millis(SLOT_MS)); - continue; - } - let accounts_package = accounts_package.unwrap(); - debug!( - "handling accounts package, type: {:?}, slot: {}, block height: {}", - accounts_package.package_type, - accounts_package.slot, - accounts_package.block_height, - ); - - Self::process_accounts_package( + if let Some(( accounts_package, - &cluster_info, - known_validators.as_ref(), - halt_on_known_validators_accounts_hash_mismatch, - pending_snapshot_package.as_ref(), - &mut hashes, - &exit, - fault_injection_rate_slots, - snapshot_config.as_ref(), - ); + num_outstanding_accounts_packages, + num_re_enqueued_accounts_packages, + )) = Self::get_next_accounts_package( + &accounts_package_sender, + &accounts_package_receiver, + ) { + info!("handling accounts package: {accounts_package:?}"); + + let (_, measure) = measure!(Self::process_accounts_package( + accounts_package, + &cluster_info, + known_validators.as_ref(), + halt_on_known_validators_accounts_hash_mismatch, + pending_snapshot_package.as_ref(), + &mut hashes, + &exit, + fault_injection_rate_slots, + snapshot_config.as_ref(), + )); + + datapoint_info!( + "accounts_hash_verifier", + ( + "num-outstanding-accounts-packages", + num_outstanding_accounts_packages as i64, + i64 + ), + ( + "num-re-enqueued-accounts-packages", + num_re_enqueued_accounts_packages as i64, + i64 + ), + ("total-processing-time-us", measure.as_us() as i64, i64), + ); + } else { + std::thread::sleep(LOOP_LIMITER); + } } }) .unwrap(); @@ -91,6 +110,68 @@ impl AccountsHashVerifier { } } + /// Get the next accounts package to handle + /// + /// Look through the accounts package channel to find the highest priority one to handle next. + /// If there are no accounts packages in the channel, return None. Otherwise return the + /// highest priority one. Unhandled accounts packages with slots GREATER-THAN the handled one + /// will be re-enqueued. The remaining will be dropped. + /// + /// Also return the number of accounts packages initially in the channel, and the number of + /// ones re-enqueued. + fn get_next_accounts_package( + accounts_package_sender: &Sender, + accounts_package_receiver: &Receiver, + ) -> Option<( + AccountsPackage, + /*num outstanding accounts packages*/ usize, + /*num re-enqueued accounts packages*/ usize, + )> { + let mut accounts_packages: Vec<_> = accounts_package_receiver.try_iter().collect(); + // `select_nth()` panics if the slice is empty, so continue if that's the case + if accounts_packages.is_empty() { + return None; + } + let accounts_packages_len = accounts_packages.len(); + debug!("outstanding accounts packages ({accounts_packages_len}): {accounts_packages:?}"); + let num_eah_packages = accounts_packages + .iter() + .filter(|account_package| { + account_package.package_type == AccountsPackageType::EpochAccountsHash + }) + .count(); + assert!( + num_eah_packages <= 1, + "Only a single EAH accounts package is allowed at a time! count: {num_eah_packages}" + ); + + accounts_packages.select_nth_unstable_by( + accounts_packages_len - 1, + snapshot_package::cmp_accounts_packages_by_priority, + ); + // SAFETY: We know `accounts_packages` is not empty, so its len is >= 1, + // therefore there is always an element to pop. + let accounts_package = accounts_packages.pop().unwrap(); + let handled_accounts_package_slot = accounts_package.slot; + // re-enqueue any remaining accounts packages for slots GREATER-THAN the accounts package + // that will be handled + let num_re_enqueued_accounts_packages = accounts_packages + .into_iter() + .filter(|accounts_package| accounts_package.slot > handled_accounts_package_slot) + .map(|accounts_package| { + accounts_package_sender + .try_send(accounts_package) + .expect("re-enqueue accounts package") + }) + .count(); + + Some(( + accounts_package, + accounts_packages_len, + num_re_enqueued_accounts_packages, + )) + } + #[allow(clippy::too_many_arguments)] fn process_accounts_package( accounts_package: AccountsPackage, @@ -387,6 +468,7 @@ impl AccountsHashVerifier { mod tests { use { super::*, + rand::seq::SliceRandom, solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo}, solana_runtime::{ rent_collector::RentCollector, @@ -520,4 +602,150 @@ mod tests { ) ); } + + /// Ensure that unhandled accounts packages are properly re-enqueued or dropped + /// + /// The accounts package handler should re-enqueue unhandled accounts packages, if those + /// unhandled accounts packages are for slots GREATER-THAN the last handled accounts package. + /// Otherwise, they should be dropped. + #[test] + fn test_get_next_accounts_package() { + fn new(package_type: AccountsPackageType, slot: Slot) -> AccountsPackage { + AccountsPackage { + package_type, + slot, + block_height: slot, + ..AccountsPackage::default_for_tests() + } + } + fn new_eah(slot: Slot) -> AccountsPackage { + new(AccountsPackageType::EpochAccountsHash, slot) + } + fn new_fss(slot: Slot) -> AccountsPackage { + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + slot, + ) + } + fn new_iss(slot: Slot, base: Slot) -> AccountsPackage { + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(base)), + slot, + ) + } + fn new_ahv(slot: Slot) -> AccountsPackage { + new(AccountsPackageType::AccountsHashVerifier, slot) + } + + let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); + + // Populate the channel so that re-enqueueing and dropping will be tested + let mut accounts_packages = [ + new_ahv(99), + new_fss(100), + new_ahv(101), + new_iss(110, 100), + new_ahv(111), + new_eah(200), // <-- handle 1st + new_ahv(201), + new_iss(210, 100), + new_ahv(211), + new_fss(300), + new_ahv(301), + new_iss(310, 300), + new_ahv(311), + new_fss(400), // <-- handle 2nd + new_ahv(401), + new_iss(410, 400), + new_ahv(411), + new_iss(420, 400), // <-- handle 3rd + new_ahv(421), + new_ahv(422), + new_ahv(423), // <-- handle 4th + ]; + // Shuffle the accounts packages to simulate receiving new accounts packages from ABS + // simultaneously as AHV is processing them. + accounts_packages.shuffle(&mut rand::thread_rng()); + accounts_packages + .into_iter() + .for_each(|accounts_package| accounts_package_sender.send(accounts_package).unwrap()); + + // The EAH is handled 1st + let ( + account_package, + _num_outstanding_accounts_packages, + num_re_enqueued_accounts_packages, + ) = AccountsHashVerifier::get_next_accounts_package( + &accounts_package_sender, + &accounts_package_receiver, + ) + .unwrap(); + assert_eq!( + account_package.package_type, + AccountsPackageType::EpochAccountsHash + ); + assert_eq!(account_package.slot, 200); + assert_eq!(num_re_enqueued_accounts_packages, 15); + + // The Full Snapshot from slot 400 is handled 2nd + // (the older full snapshot from slot 300 is skipped and dropped) + let ( + account_package, + _num_outstanding_accounts_packages, + num_re_enqueued_accounts_packages, + ) = AccountsHashVerifier::get_next_accounts_package( + &accounts_package_sender, + &accounts_package_receiver, + ) + .unwrap(); + assert_eq!( + account_package.package_type, + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot) + ); + assert_eq!(account_package.slot, 400); + assert_eq!(num_re_enqueued_accounts_packages, 7); + + // The Incremental Snapshot from slot 420 is handled 3rd + // (the older incremental snapshot from slot 410 is skipped and dropped) + let ( + account_package, + _num_outstanding_accounts_packages, + num_re_enqueued_accounts_packages, + ) = AccountsHashVerifier::get_next_accounts_package( + &accounts_package_sender, + &accounts_package_receiver, + ) + .unwrap(); + assert_eq!( + account_package.package_type, + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(400)) + ); + assert_eq!(account_package.slot, 420); + assert_eq!(num_re_enqueued_accounts_packages, 3); + + // The Accounts Have Verifier from slot 423 is handled 4th + // (the older accounts have verifiers from slot 421 and 422 are skipped and dropped) + let ( + account_package, + _num_outstanding_accounts_packages, + num_re_enqueued_accounts_packages, + ) = AccountsHashVerifier::get_next_accounts_package( + &accounts_package_sender, + &accounts_package_receiver, + ) + .unwrap(); + assert_eq!( + account_package.package_type, + AccountsPackageType::AccountsHashVerifier + ); + assert_eq!(account_package.slot, 423); + assert_eq!(num_re_enqueued_accounts_packages, 0); + + // And now the accounts package channel is empty! + assert!(AccountsHashVerifier::get_next_accounts_package( + &accounts_package_sender, + &accounts_package_receiver + ) + .is_none()); + } } diff --git a/core/src/validator.rs b/core/src/validator.rs index f99b244485..a0bee91d4f 100644 --- a/core/src/validator.rs +++ b/core/src/validator.rs @@ -83,7 +83,7 @@ use { snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, - snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage}, + snapshot_package::PendingSnapshotPackage, snapshot_utils, }, solana_sdk::{ @@ -613,9 +613,10 @@ impl Validator { (None, None) }; - let pending_accounts_package = PendingAccountsPackage::default(); + let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); let accounts_hash_verifier = AccountsHashVerifier::new( - Arc::clone(&pending_accounts_package), + accounts_package_sender.clone(), + accounts_package_receiver, pending_snapshot_package, &exit, &cluster_info, @@ -630,7 +631,7 @@ impl Validator { let snapshot_request_handler = SnapshotRequestHandler { snapshot_config, snapshot_request_receiver, - pending_accounts_package, + accounts_package_sender, }; let pruned_banks_request_handler = PrunedBanksRequestHandler { pruned_banks_receiver, diff --git a/core/tests/epoch_accounts_hash.rs b/core/tests/epoch_accounts_hash.rs index 569ed3c173..53c29b2f4b 100755 --- a/core/tests/epoch_accounts_hash.rs +++ b/core/tests/epoch_accounts_hash.rs @@ -21,7 +21,7 @@ use { runtime_config::RuntimeConfig, snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_config::SnapshotConfig, - snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage}, + snapshot_package::PendingSnapshotPackage, snapshot_utils, }, solana_sdk::{ @@ -186,9 +186,10 @@ impl BackgroundServices { false, ); - let pending_accounts_package = PendingAccountsPackage::default(); + let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); let accounts_hash_verifier = AccountsHashVerifier::new( - Arc::clone(&pending_accounts_package), + accounts_package_sender.clone(), + accounts_package_receiver, Some(pending_snapshot_package), &exit, &cluster_info, @@ -203,7 +204,7 @@ impl BackgroundServices { let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package, + accounts_package_sender, }; let pruned_banks_request_handler = PrunedBanksRequestHandler { pruned_banks_receiver, diff --git a/core/tests/snapshots.rs b/core/tests/snapshots.rs index 21b169a84b..76f3d303fe 100644 --- a/core/tests/snapshots.rs +++ b/core/tests/snapshots.rs @@ -25,8 +25,8 @@ use { snapshot_archive_info::FullSnapshotArchiveInfo, snapshot_config::SnapshotConfig, snapshot_package::{ - AccountsPackage, AccountsPackageType, PendingAccountsPackage, PendingSnapshotPackage, - SnapshotPackage, SnapshotType, + AccountsPackage, AccountsPackageType, PendingSnapshotPackage, SnapshotPackage, + SnapshotType, }, snapshot_utils::{ self, ArchiveFormat, @@ -211,7 +211,7 @@ fn run_bank_forks_snapshot_n( let bank_forks = &mut snapshot_test_config.bank_forks; let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; - let pending_accounts_package = PendingAccountsPackage::default(); + let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); let exit = Arc::new(AtomicBool::new(false)); let node_id = Arc::new(Keypair::new()); let cluster_info = Arc::new(ClusterInfo::new( @@ -220,7 +220,8 @@ fn run_bank_forks_snapshot_n( SocketAddrSpace::Unspecified, )); let accounts_hash_verifier = AccountsHashVerifier::new( - Arc::clone(&pending_accounts_package), + accounts_package_sender.clone(), + accounts_package_receiver, None, &exit, &cluster_info, @@ -235,7 +236,7 @@ fn run_bank_forks_snapshot_n( let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package, + accounts_package_sender, }; for slot in 1..=last_slot { let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot); @@ -366,8 +367,10 @@ fn test_concurrent_snapshot_packaging( .unwrap(); // Set up snapshotting channels - let real_pending_accounts_package = PendingAccountsPackage::default(); - let fake_pending_accounts_package = PendingAccountsPackage::default(); + let (real_accounts_package_sender, real_accounts_package_receiver) = + crossbeam_channel::unbounded(); + let (fake_accounts_package_sender, _fake_accounts_package_receiver) = + crossbeam_channel::unbounded(); // Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted // and the snapshot purging logic will run on every snapshot taken. This means the three @@ -393,21 +396,21 @@ fn test_concurrent_snapshot_packaging( assert_eq!(bank.process_transaction(&tx), Ok(())); bank.squash(); - let pending_accounts_package = { + let accounts_package_sender = { if slot == saved_slot as u64 { - // Only send one package on the real pending_accounts_package so that the + // Only send one package on the real accounts package channel so that the // packaging service doesn't take forever to run the packaging logic on all // MAX_CACHE_ENTRIES later - &real_pending_accounts_package + &real_accounts_package_sender } else { - &fake_pending_accounts_package + &fake_accounts_package_sender } }; snapshot_utils::snapshot_bank( &bank, vec![], - pending_accounts_package, + accounts_package_sender, bank_snapshots_dir, full_snapshot_archives_dir, incremental_snapshot_archives_dir, @@ -508,11 +511,7 @@ fn test_concurrent_snapshot_packaging( let _package_receiver = std::thread::Builder::new() .name("package-receiver".to_string()) .spawn(move || { - let accounts_package = real_pending_accounts_package - .lock() - .unwrap() - .take() - .unwrap(); + let accounts_package = real_accounts_package_receiver.try_recv().unwrap(); solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash( accounts_package.snapshot_links.path(), accounts_package.slot, @@ -709,7 +708,7 @@ fn test_bank_forks_incremental_snapshot( let bank_forks = &mut snapshot_test_config.bank_forks; let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair; - let pending_accounts_package = PendingAccountsPackage::default(); + let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded(); let exit = Arc::new(AtomicBool::new(false)); let node_id = Arc::new(Keypair::new()); let cluster_info = Arc::new(ClusterInfo::new( @@ -718,7 +717,8 @@ fn test_bank_forks_incremental_snapshot( SocketAddrSpace::Unspecified, )); let accounts_hash_verifier = AccountsHashVerifier::new( - Arc::clone(&pending_accounts_package), + accounts_package_sender.clone(), + accounts_package_receiver, None, &exit, &cluster_info, @@ -733,7 +733,7 @@ fn test_bank_forks_incremental_snapshot( let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package, + accounts_package_sender, }; let mut last_full_snapshot_slot = None; @@ -961,7 +961,7 @@ fn test_snapshots_with_background_services( let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); - let pending_accounts_package = PendingAccountsPackage::default(); + let (accounts_package_sender, accounts_package_receiver) = unbounded(); let pending_snapshot_package = PendingSnapshotPackage::default(); let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks)); @@ -981,7 +981,7 @@ fn test_snapshots_with_background_services( let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: snapshot_test_config.snapshot_config.clone(), snapshot_request_receiver, - pending_accounts_package: Arc::clone(&pending_accounts_package), + accounts_package_sender: accounts_package_sender.clone(), }; let pruned_banks_request_handler = PrunedBanksRequestHandler { pruned_banks_receiver, @@ -1002,7 +1002,8 @@ fn test_snapshots_with_background_services( ); let accounts_hash_verifier = AccountsHashVerifier::new( - pending_accounts_package, + accounts_package_sender, + accounts_package_receiver, Some(pending_snapshot_package), &exit, &cluster_info, diff --git a/ledger-tool/src/main.rs b/ledger-tool/src/main.rs index ce982512f4..ab544413dd 100644 --- a/ledger-tool/src/main.rs +++ b/ledger-tool/src/main.rs @@ -56,7 +56,6 @@ use { snapshot_config::SnapshotConfig, snapshot_hash::StartingSnapshotHashes, snapshot_minimizer::SnapshotMinimizer, - snapshot_package::PendingAccountsPackage, snapshot_utils::{ self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION, SUPPORTED_ARCHIVE_COMPRESSION, @@ -1135,11 +1134,12 @@ fn load_bank_forks( ); let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded(); + let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded(); let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender); let snapshot_request_handler = SnapshotRequestHandler { snapshot_config: SnapshotConfig::new_load_only(), snapshot_request_receiver, - pending_accounts_package: PendingAccountsPackage::default(), + accounts_package_sender, }; let pruned_banks_receiver = AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone()); diff --git a/runtime/src/accounts_background_service.rs b/runtime/src/accounts_background_service.rs index c0a5a60aa2..89df623ea1 100644 --- a/runtime/src/accounts_background_service.rs +++ b/runtime/src/accounts_background_service.rs @@ -9,7 +9,7 @@ use { bank::{Bank, BankSlotDelta, DropCallback}, bank_forks::BankForks, snapshot_config::SnapshotConfig, - snapshot_package::{AccountsPackageType, PendingAccountsPackage, SnapshotType}, + snapshot_package::{AccountsPackage, AccountsPackageType, SnapshotType}, snapshot_utils::{self, SnapshotError}, }, crossbeam_channel::{Receiver, SendError, Sender}, @@ -138,7 +138,7 @@ pub enum SnapshotRequestType { pub struct SnapshotRequestHandler { pub snapshot_config: SnapshotConfig, pub snapshot_request_receiver: SnapshotRequestReceiver, - pub pending_accounts_package: PendingAccountsPackage, + pub accounts_package_sender: Sender, } impl SnapshotRequestHandler { @@ -292,7 +292,7 @@ impl SnapshotRequestHandler { let result = snapshot_utils::snapshot_bank( &snapshot_root_bank, status_cache_slot_deltas, - &self.pending_accounts_package, + &self.accounts_package_sender, &self.snapshot_config.bank_snapshots_dir, &self.snapshot_config.full_snapshot_archives_dir, &self.snapshot_config.incremental_snapshot_archives_dir, diff --git a/runtime/src/snapshot_package.rs b/runtime/src/snapshot_package.rs index ff70f175e2..690f3974b0 100644 --- a/runtime/src/snapshot_package.rs +++ b/runtime/src/snapshot_package.rs @@ -22,15 +22,13 @@ use { tempfile::TempDir, }; -/// The PendingAccountsPackage passes an AccountsPackage from AccountsBackgroundService to -/// AccountsHashVerifier for hashing -pub type PendingAccountsPackage = Arc>>; +mod compare; +pub use compare::*; /// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to /// SnapshotPackagerService for archiving pub type PendingSnapshotPackage = Arc>>; -#[derive(Debug)] pub struct AccountsPackage { pub package_type: AccountsPackageType, pub slot: Slot, @@ -122,6 +120,40 @@ impl AccountsPackage { enable_rehashing: bank.bank_enable_rehashing_on_accounts_hash(), }) } + + /// Create a new Accounts Package where basically every field is defaulted. + /// Only use for tests; many of the fields are invalid! + pub fn default_for_tests() -> Self { + Self { + package_type: AccountsPackageType::AccountsHashVerifier, + slot: Slot::default(), + block_height: Slot::default(), + slot_deltas: Vec::default(), + snapshot_links: TempDir::new().unwrap(), + snapshot_storages: SnapshotStorages::default(), + archive_format: ArchiveFormat::Tar, + snapshot_version: SnapshotVersion::default(), + full_snapshot_archives_dir: PathBuf::default(), + incremental_snapshot_archives_dir: PathBuf::default(), + expected_capitalization: u64::default(), + accounts_hash_for_testing: Option::default(), + cluster_type: ClusterType::Development, + accounts: Arc::new(Accounts::default_for_tests()), + epoch_schedule: EpochSchedule::default(), + rent_collector: RentCollector::default(), + enable_rehashing: bool::default(), + } + } +} + +impl std::fmt::Debug for AccountsPackage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AccountsPackage") + .field("type", &self.package_type) + .field("slot", &self.slot) + .field("block_height", &self.block_height) + .finish_non_exhaustive() + } } /// Accounts packages are sent to the Accounts Hash Verifier for processing. There are multiple diff --git a/runtime/src/snapshot_package/compare.rs b/runtime/src/snapshot_package/compare.rs new file mode 100644 index 0000000000..3e8324316b --- /dev/null +++ b/runtime/src/snapshot_package/compare.rs @@ -0,0 +1,404 @@ +use { + super::{AccountsPackage, AccountsPackageType, SnapshotType}, + std::cmp::Ordering::{self, Equal, Greater, Less}, +}; + +/// Compare accounts packages by priority; first by type, then by slot +#[must_use] +pub fn cmp_accounts_packages_by_priority(a: &AccountsPackage, b: &AccountsPackage) -> Ordering { + cmp_accounts_package_types_by_priority(&a.package_type, &b.package_type) + .then(a.slot.cmp(&b.slot)) +} + +/// Compare accounts package types by priority +/// +/// Priority, from highest to lowest: +/// - Epoch Accounts Hash +/// - Full Snapshot +/// - Incremental Snapshot +/// - Accounts Hash Verifier +/// +/// If two `Snapshot`s are compared, their snapshot types are the tiebreaker. +#[must_use] +pub fn cmp_accounts_package_types_by_priority( + a: &AccountsPackageType, + b: &AccountsPackageType, +) -> Ordering { + use AccountsPackageType::*; + match (a, b) { + // Epoch Accounts Hash packages + (EpochAccountsHash, EpochAccountsHash) => Equal, + (EpochAccountsHash, _) => Greater, + (_, EpochAccountsHash) => Less, + + // Snapshot packages + (Snapshot(snapshot_type_a), Snapshot(snapshot_type_b)) => { + cmp_snapshot_types_by_priority(snapshot_type_a, snapshot_type_b) + } + (Snapshot(_), _) => Greater, + (_, Snapshot(_)) => Less, + + // Accounts Hash Verifier packages + (AccountsHashVerifier, AccountsHashVerifier) => Equal, + } +} + +/// Compare snapshot types by priority +/// +/// Full snapshots are higher in priority than incremental snapshots. +/// If two `IncrementalSnapshot`s are compared, their base slots are the tiebreaker. +#[must_use] +pub fn cmp_snapshot_types_by_priority(a: &SnapshotType, b: &SnapshotType) -> Ordering { + use SnapshotType::*; + match (a, b) { + (FullSnapshot, FullSnapshot) => Equal, + (FullSnapshot, IncrementalSnapshot(_)) => Greater, + (IncrementalSnapshot(_), FullSnapshot) => Less, + (IncrementalSnapshot(base_slot_a), IncrementalSnapshot(base_slot_b)) => { + base_slot_a.cmp(base_slot_b) + } + } +} + +#[cfg(test)] +mod tests { + use {super::*, solana_sdk::clock::Slot}; + + #[test] + fn test_cmp_accounts_packages_by_priority() { + fn new(package_type: AccountsPackageType, slot: Slot) -> AccountsPackage { + AccountsPackage { + package_type, + slot, + block_height: slot, + ..AccountsPackage::default_for_tests() + } + } + + for (accounts_package_a, accounts_package_b, expected_result) in [ + ( + new(AccountsPackageType::EpochAccountsHash, 11), + new(AccountsPackageType::EpochAccountsHash, 22), + Less, + ), + ( + new(AccountsPackageType::EpochAccountsHash, 22), + new(AccountsPackageType::EpochAccountsHash, 22), + Equal, + ), + ( + new(AccountsPackageType::EpochAccountsHash, 33), + new(AccountsPackageType::EpochAccountsHash, 22), + Greater, + ), + ( + new(AccountsPackageType::EpochAccountsHash, 123), + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 123, + ), + Greater, + ), + ( + new(AccountsPackageType::EpochAccountsHash, 123), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + Greater, + ), + ( + new(AccountsPackageType::EpochAccountsHash, 123), + new(AccountsPackageType::AccountsHashVerifier, 123), + Greater, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 123, + ), + new(AccountsPackageType::EpochAccountsHash, 123), + Less, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 11, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 22, + ), + Less, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 22, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 22, + ), + Equal, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 33, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 22, + ), + Greater, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 123, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + Greater, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 123, + ), + new(AccountsPackageType::AccountsHashVerifier, 123), + Greater, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + new(AccountsPackageType::EpochAccountsHash, 123), + Less, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + 123, + ), + Less, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(6)), + 123, + ), + Less, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 11, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 22, + ), + Less, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 22, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 22, + ), + Equal, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 33, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 22, + ), + Greater, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(4)), + 123, + ), + Greater, + ), + ( + new( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + 123, + ), + new(AccountsPackageType::AccountsHashVerifier, 123), + Greater, + ), + ( + new(AccountsPackageType::AccountsHashVerifier, 11), + new(AccountsPackageType::AccountsHashVerifier, 22), + Less, + ), + ( + new(AccountsPackageType::AccountsHashVerifier, 22), + new(AccountsPackageType::AccountsHashVerifier, 22), + Equal, + ), + ( + new(AccountsPackageType::AccountsHashVerifier, 33), + new(AccountsPackageType::AccountsHashVerifier, 22), + Greater, + ), + ] { + let actual_result = + cmp_accounts_packages_by_priority(&accounts_package_a, &accounts_package_b); + assert_eq!(expected_result, actual_result); + } + } + + #[test] + fn test_cmp_accounts_package_types_by_priority() { + for (accounts_package_type_a, accounts_package_type_b, expected_result) in [ + ( + AccountsPackageType::EpochAccountsHash, + AccountsPackageType::EpochAccountsHash, + Equal, + ), + ( + AccountsPackageType::EpochAccountsHash, + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + Greater, + ), + ( + AccountsPackageType::EpochAccountsHash, + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + Greater, + ), + ( + AccountsPackageType::EpochAccountsHash, + AccountsPackageType::AccountsHashVerifier, + Greater, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + AccountsPackageType::EpochAccountsHash, + Less, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + Equal, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + Greater, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + AccountsPackageType::AccountsHashVerifier, + Greater, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + AccountsPackageType::EpochAccountsHash, + Less, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), + Less, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(6)), + Less, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + Equal, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(4)), + Greater, + ), + ( + AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)), + AccountsPackageType::AccountsHashVerifier, + Greater, + ), + ( + AccountsPackageType::AccountsHashVerifier, + AccountsPackageType::AccountsHashVerifier, + Equal, + ), + ] { + let actual_result = cmp_accounts_package_types_by_priority( + &accounts_package_type_a, + &accounts_package_type_b, + ); + assert_eq!(expected_result, actual_result); + } + } + + #[test] + fn test_cmp_snapshot_types_by_priority() { + for (snapshot_type_a, snapshot_type_b, expected_result) in [ + ( + SnapshotType::FullSnapshot, + SnapshotType::FullSnapshot, + Equal, + ), + ( + SnapshotType::FullSnapshot, + SnapshotType::IncrementalSnapshot(5), + Greater, + ), + ( + SnapshotType::IncrementalSnapshot(5), + SnapshotType::FullSnapshot, + Less, + ), + ( + SnapshotType::IncrementalSnapshot(5), + SnapshotType::IncrementalSnapshot(6), + Less, + ), + ( + SnapshotType::IncrementalSnapshot(5), + SnapshotType::IncrementalSnapshot(5), + Equal, + ), + ( + SnapshotType::IncrementalSnapshot(5), + SnapshotType::IncrementalSnapshot(4), + Greater, + ), + ] { + let actual_result = cmp_snapshot_types_by_priority(&snapshot_type_a, &snapshot_type_b); + assert_eq!(expected_result, actual_result); + } + } +} diff --git a/runtime/src/snapshot_utils.rs b/runtime/src/snapshot_utils.rs index 0cbb70964f..ef4b931d7f 100644 --- a/runtime/src/snapshot_utils.rs +++ b/runtime/src/snapshot_utils.rs @@ -16,10 +16,7 @@ use { snapshot_archive_info::{ FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter, }, - snapshot_package::{ - AccountsPackage, AccountsPackageType, PendingAccountsPackage, SnapshotPackage, - SnapshotType, - }, + snapshot_package::{AccountsPackage, AccountsPackageType, SnapshotPackage, SnapshotType}, snapshot_utils::snapshot_storage_rebuilder::SnapshotStorageRebuilder, status_cache, }, @@ -2089,7 +2086,7 @@ pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef) { pub fn snapshot_bank( root_bank: &Bank, status_cache_slot_deltas: Vec, - pending_accounts_package: &PendingAccountsPackage, + accounts_package_sender: &Sender, bank_snapshots_dir: impl AsRef, full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: impl AsRef, @@ -2126,26 +2123,9 @@ pub fn snapshot_bank( .expect("failed to hard link bank snapshot into a tmpdir"); // Submit the accounts package - // This extra scope is to be explicit about the lifetime of the pending accounts package lock - { - let mut pending_accounts_package = pending_accounts_package.lock().unwrap(); - if can_submit_accounts_package(&accounts_package, pending_accounts_package.as_ref()) { - let package_type = accounts_package.package_type; - let old_accounts_package = pending_accounts_package.replace(accounts_package); - drop(pending_accounts_package); - if let Some(old_accounts_package) = old_accounts_package { - info!( - "The pending AccountsPackage has been overwritten: \ - \nNew AccountsPackage slot: {}, package type: {:?} \ - \nOld AccountsPackage slot: {}, package type: {:?}", - root_bank.slot(), - package_type, - old_accounts_package.slot, - old_accounts_package.package_type, - ); - } - } - } + accounts_package_sender + .send(accounts_package) + .expect("send accounts package"); Ok(()) } @@ -2383,68 +2363,11 @@ pub fn should_take_incremental_snapshot( && last_full_snapshot_slot.is_some() } -/// Decide if an accounts package can be submitted to the PendingAccountsPackage -/// -/// If there's no pending accounts package, then submit -/// Otherwise, check if the pending accounts package can be overwritten -#[must_use] -fn can_submit_accounts_package( - accounts_package: &AccountsPackage, - pending_accounts_package: Option<&AccountsPackage>, -) -> bool { - if let Some(pending_accounts_package) = pending_accounts_package { - can_overwrite_pending_accounts_package(accounts_package, pending_accounts_package) - } else { - true - } -} - -/// Decide when it is appropriate to overwrite a pending accounts package -/// -/// The priority of the package types is (from highest to lowest): -/// 1. Epoch Account Hash -/// 2. Full Snapshot -/// 3. Incremental Snapshot -/// 4. Accounts Hash Verifier -/// -/// New packages of higher priority types can overwrite pending packages of *equivalent or lower* -/// priority types. -#[must_use] -fn can_overwrite_pending_accounts_package( - accounts_package: &AccountsPackage, - pending_accounts_package: &AccountsPackage, -) -> bool { - match ( - &pending_accounts_package.package_type, - &accounts_package.package_type, - ) { - (AccountsPackageType::EpochAccountsHash, AccountsPackageType::EpochAccountsHash) => { - panic!( - "Both pending and new accounts packages are of type EpochAccountsHash! \ - EAH calculations must complete before new ones are enqueued. \ - \npending accounts package slot: {} \ - \n new accounts package slot: {}", - pending_accounts_package.slot, accounts_package.slot, - ); - } - (_, AccountsPackageType::EpochAccountsHash) => true, - (AccountsPackageType::EpochAccountsHash, _) => false, - (_, AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)) => true, - (AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), _) => false, - (_, AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(_))) => true, - (AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(_)), _) => false, - _ => true, - } -} - #[cfg(test)] mod tests { use { super::*, - crate::{ - accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, snapshot_package::AccountsPackageType, - status_cache::Status, - }, + crate::{accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, status_cache::Status}, assert_matches::assert_matches, bincode::{deserialize_from, serialize_into}, solana_sdk::{ @@ -4036,165 +3959,6 @@ mod tests { assert_eq!(bank_fields.parent_slot, bank2.parent_slot()); } - /// Test `can_submit_accounts_packages()` with all permutations of `package_type` - /// for both the pending accounts package and the new accounts package. - /// - /// pending | new | - /// package | package | result - /// ---------+---------+-------- - /// 1. None | X | true - /// 2. AHV | AHV | true - /// 3. AHV | ISS | true - /// 4. AHV | FSS | true - /// 5. AHV | EAH | true - /// 6. ISS | AHV | false - /// 7. ISS | ISS | true - /// 8. ISS | FSS | true - /// 9. ISS | EAH | true - /// 10. FSS | AHV | false - /// 11. FSS | ISS | false - /// 12. FSS | FSS | true - /// 13. FSS | EAH | true - /// 14. EAH | AHV | false - /// 15. EAH | ISS | false - /// 16. EAH | FSS | false - /// 17. EAH | EAH | assert unreachable, so test separately - #[test] - fn test_can_submit_accounts_package() { - // Test 1 - { - let pending_accounts_package = None; - let accounts_package = new_accounts_package(AccountsPackageType::AccountsHashVerifier); - assert!(can_submit_accounts_package( - &accounts_package, - pending_accounts_package - )); - } - - for (pending_package_type, new_package_type, expected_result) in [ - // Tests 2-5, pending package is AHV - ( - AccountsPackageType::AccountsHashVerifier, - AccountsPackageType::AccountsHashVerifier, - true, - ), - ( - AccountsPackageType::AccountsHashVerifier, - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - true, - ), - ( - AccountsPackageType::AccountsHashVerifier, - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - true, - ), - ( - AccountsPackageType::AccountsHashVerifier, - AccountsPackageType::EpochAccountsHash, - true, - ), - // Tests 6-9, pending package is ISS - ( - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - AccountsPackageType::AccountsHashVerifier, - false, - ), - ( - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - true, - ), - ( - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - true, - ), - ( - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - AccountsPackageType::EpochAccountsHash, - true, - ), - // Tests 10-13, pending package is FSS - ( - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - AccountsPackageType::AccountsHashVerifier, - false, - ), - ( - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - false, - ), - ( - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - true, - ), - ( - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - AccountsPackageType::EpochAccountsHash, - true, - ), - // Tests 14-16, pending package is EAH - ( - AccountsPackageType::EpochAccountsHash, - AccountsPackageType::AccountsHashVerifier, - false, - ), - ( - AccountsPackageType::EpochAccountsHash, - AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)), - false, - ), - ( - AccountsPackageType::EpochAccountsHash, - AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), - false, - ), - // tested separately: (AccountsPackageType::EpochAccountsHash, None, AccountsPackageType::EpochAccountsHash, None, X), - ] { - let pending_accounts_package = new_accounts_package(pending_package_type); - let accounts_package = new_accounts_package(new_package_type); - - let actual_result = - can_submit_accounts_package(&accounts_package, Some(&pending_accounts_package)); - assert_eq!(expected_result, actual_result); - } - } - - /// It should not be allowed to have a new accounts package intended for EAH when there is - /// already a pending EAH accounts package. - #[test] - #[should_panic] - fn test_can_submit_accounts_package_both_are_eah() { - let pending_accounts_package = new_accounts_package(AccountsPackageType::EpochAccountsHash); - let accounts_package = new_accounts_package(AccountsPackageType::EpochAccountsHash); - _ = can_submit_accounts_package(&accounts_package, Some(&pending_accounts_package)); - } - - /// helper function to create an AccountsPackage that's good enough for tests - fn new_accounts_package(package_type: AccountsPackageType) -> AccountsPackage { - AccountsPackage { - package_type, - slot: Slot::default(), - block_height: Slot::default(), - slot_deltas: Vec::default(), - snapshot_links: TempDir::new().unwrap(), - snapshot_storages: SnapshotStorages::default(), - archive_format: ArchiveFormat::Tar, - snapshot_version: SnapshotVersion::default(), - full_snapshot_archives_dir: PathBuf::default(), - incremental_snapshot_archives_dir: PathBuf::default(), - expected_capitalization: u64::default(), - accounts_hash_for_testing: None, - cluster_type: solana_sdk::genesis_config::ClusterType::Development, - accounts: Arc::new(crate::accounts::Accounts::default_for_tests()), - epoch_schedule: solana_sdk::epoch_schedule::EpochSchedule::default(), - rent_collector: crate::rent_collector::RentCollector::default(), - enable_rehashing: true, - } - } - #[test] fn test_verify_slot_deltas_structural_good() { // NOTE: slot deltas do not need to be sorted