Converts PendingAccountsPackage to a channel (#28352)

This commit is contained in:
Brooks Prumo 2022-10-13 12:47:36 -04:00 committed by GitHub
parent 72d6927fa1
commit 9cbd00fdbc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 739 additions and 308 deletions

View File

@ -5,14 +5,15 @@
// set and halt the node if a mismatch is detected.
use {
crossbeam_channel::{Receiver, Sender},
solana_gossip::cluster_info::{ClusterInfo, MAX_SNAPSHOT_HASHES},
solana_measure::measure::Measure,
solana_measure::{measure, measure::Measure},
solana_runtime::{
accounts_hash::{CalcAccountsHashConfig, HashStats},
epoch_accounts_hash::EpochAccountsHash,
snapshot_config::SnapshotConfig,
snapshot_package::{
retain_max_n_elements, AccountsPackage, AccountsPackageType, PendingAccountsPackage,
self, retain_max_n_elements, AccountsPackage, AccountsPackageType,
PendingSnapshotPackage, SnapshotPackage, SnapshotType,
},
sorted_storages::SortedStorages,
@ -39,7 +40,8 @@ pub struct AccountsHashVerifier {
impl AccountsHashVerifier {
pub fn new(
pending_accounts_package: PendingAccountsPackage,
accounts_package_sender: Sender<AccountsPackage>,
accounts_package_receiver: Receiver<AccountsPackage>,
pending_snapshot_package: Option<PendingSnapshotPackage>,
exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>,
@ -48,6 +50,8 @@ impl AccountsHashVerifier {
fault_injection_rate_slots: u64,
snapshot_config: Option<SnapshotConfig>,
) -> Self {
// If there are no accounts packages to process, limit how often we re-check
const LOOP_LIMITER: Duration = Duration::from_millis(SLOT_MS);
let exit = exit.clone();
let cluster_info = cluster_info.clone();
let t_accounts_hash_verifier = Builder::new()
@ -59,30 +63,45 @@ impl AccountsHashVerifier {
break;
}
let accounts_package = pending_accounts_package.lock().unwrap().take();
if accounts_package.is_none() {
std::thread::sleep(Duration::from_millis(SLOT_MS));
continue;
}
let accounts_package = accounts_package.unwrap();
debug!(
"handling accounts package, type: {:?}, slot: {}, block height: {}",
accounts_package.package_type,
accounts_package.slot,
accounts_package.block_height,
);
Self::process_accounts_package(
if let Some((
accounts_package,
&cluster_info,
known_validators.as_ref(),
halt_on_known_validators_accounts_hash_mismatch,
pending_snapshot_package.as_ref(),
&mut hashes,
&exit,
fault_injection_rate_slots,
snapshot_config.as_ref(),
);
num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
)) = Self::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
) {
info!("handling accounts package: {accounts_package:?}");
let (_, measure) = measure!(Self::process_accounts_package(
accounts_package,
&cluster_info,
known_validators.as_ref(),
halt_on_known_validators_accounts_hash_mismatch,
pending_snapshot_package.as_ref(),
&mut hashes,
&exit,
fault_injection_rate_slots,
snapshot_config.as_ref(),
));
datapoint_info!(
"accounts_hash_verifier",
(
"num-outstanding-accounts-packages",
num_outstanding_accounts_packages as i64,
i64
),
(
"num-re-enqueued-accounts-packages",
num_re_enqueued_accounts_packages as i64,
i64
),
("total-processing-time-us", measure.as_us() as i64, i64),
);
} else {
std::thread::sleep(LOOP_LIMITER);
}
}
})
.unwrap();
@ -91,6 +110,68 @@ impl AccountsHashVerifier {
}
}
/// Get the next accounts package to handle
///
/// Look through the accounts package channel to find the highest priority one to handle next.
/// If there are no accounts packages in the channel, return None. Otherwise return the
/// highest priority one. Unhandled accounts packages with slots GREATER-THAN the handled one
/// will be re-enqueued. The remaining will be dropped.
///
/// Also return the number of accounts packages initially in the channel, and the number of
/// ones re-enqueued.
fn get_next_accounts_package(
accounts_package_sender: &Sender<AccountsPackage>,
accounts_package_receiver: &Receiver<AccountsPackage>,
) -> Option<(
AccountsPackage,
/*num outstanding accounts packages*/ usize,
/*num re-enqueued accounts packages*/ usize,
)> {
let mut accounts_packages: Vec<_> = accounts_package_receiver.try_iter().collect();
// `select_nth()` panics if the slice is empty, so continue if that's the case
if accounts_packages.is_empty() {
return None;
}
let accounts_packages_len = accounts_packages.len();
debug!("outstanding accounts packages ({accounts_packages_len}): {accounts_packages:?}");
let num_eah_packages = accounts_packages
.iter()
.filter(|account_package| {
account_package.package_type == AccountsPackageType::EpochAccountsHash
})
.count();
assert!(
num_eah_packages <= 1,
"Only a single EAH accounts package is allowed at a time! count: {num_eah_packages}"
);
accounts_packages.select_nth_unstable_by(
accounts_packages_len - 1,
snapshot_package::cmp_accounts_packages_by_priority,
);
// SAFETY: We know `accounts_packages` is not empty, so its len is >= 1,
// therefore there is always an element to pop.
let accounts_package = accounts_packages.pop().unwrap();
let handled_accounts_package_slot = accounts_package.slot;
// re-enqueue any remaining accounts packages for slots GREATER-THAN the accounts package
// that will be handled
let num_re_enqueued_accounts_packages = accounts_packages
.into_iter()
.filter(|accounts_package| accounts_package.slot > handled_accounts_package_slot)
.map(|accounts_package| {
accounts_package_sender
.try_send(accounts_package)
.expect("re-enqueue accounts package")
})
.count();
Some((
accounts_package,
accounts_packages_len,
num_re_enqueued_accounts_packages,
))
}
#[allow(clippy::too_many_arguments)]
fn process_accounts_package(
accounts_package: AccountsPackage,
@ -387,6 +468,7 @@ impl AccountsHashVerifier {
mod tests {
use {
super::*,
rand::seq::SliceRandom,
solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo},
solana_runtime::{
rent_collector::RentCollector,
@ -520,4 +602,150 @@ mod tests {
)
);
}
/// Ensure that unhandled accounts packages are properly re-enqueued or dropped
///
/// The accounts package handler should re-enqueue unhandled accounts packages, if those
/// unhandled accounts packages are for slots GREATER-THAN the last handled accounts package.
/// Otherwise, they should be dropped.
#[test]
fn test_get_next_accounts_package() {
fn new(package_type: AccountsPackageType, slot: Slot) -> AccountsPackage {
AccountsPackage {
package_type,
slot,
block_height: slot,
..AccountsPackage::default_for_tests()
}
}
fn new_eah(slot: Slot) -> AccountsPackage {
new(AccountsPackageType::EpochAccountsHash, slot)
}
fn new_fss(slot: Slot) -> AccountsPackage {
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
slot,
)
}
fn new_iss(slot: Slot, base: Slot) -> AccountsPackage {
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(base)),
slot,
)
}
fn new_ahv(slot: Slot) -> AccountsPackage {
new(AccountsPackageType::AccountsHashVerifier, slot)
}
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
// Populate the channel so that re-enqueueing and dropping will be tested
let mut accounts_packages = [
new_ahv(99),
new_fss(100),
new_ahv(101),
new_iss(110, 100),
new_ahv(111),
new_eah(200), // <-- handle 1st
new_ahv(201),
new_iss(210, 100),
new_ahv(211),
new_fss(300),
new_ahv(301),
new_iss(310, 300),
new_ahv(311),
new_fss(400), // <-- handle 2nd
new_ahv(401),
new_iss(410, 400),
new_ahv(411),
new_iss(420, 400), // <-- handle 3rd
new_ahv(421),
new_ahv(422),
new_ahv(423), // <-- handle 4th
];
// Shuffle the accounts packages to simulate receiving new accounts packages from ABS
// simultaneously as AHV is processing them.
accounts_packages.shuffle(&mut rand::thread_rng());
accounts_packages
.into_iter()
.for_each(|accounts_package| accounts_package_sender.send(accounts_package).unwrap());
// The EAH is handled 1st
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_type,
AccountsPackageType::EpochAccountsHash
);
assert_eq!(account_package.slot, 200);
assert_eq!(num_re_enqueued_accounts_packages, 15);
// The Full Snapshot from slot 400 is handled 2nd
// (the older full snapshot from slot 300 is skipped and dropped)
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_type,
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)
);
assert_eq!(account_package.slot, 400);
assert_eq!(num_re_enqueued_accounts_packages, 7);
// The Incremental Snapshot from slot 420 is handled 3rd
// (the older incremental snapshot from slot 410 is skipped and dropped)
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_type,
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(400))
);
assert_eq!(account_package.slot, 420);
assert_eq!(num_re_enqueued_accounts_packages, 3);
// The Accounts Have Verifier from slot 423 is handled 4th
// (the older accounts have verifiers from slot 421 and 422 are skipped and dropped)
let (
account_package,
_num_outstanding_accounts_packages,
num_re_enqueued_accounts_packages,
) = AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver,
)
.unwrap();
assert_eq!(
account_package.package_type,
AccountsPackageType::AccountsHashVerifier
);
assert_eq!(account_package.slot, 423);
assert_eq!(num_re_enqueued_accounts_packages, 0);
// And now the accounts package channel is empty!
assert!(AccountsHashVerifier::get_next_accounts_package(
&accounts_package_sender,
&accounts_package_receiver
)
.is_none());
}
}

View File

@ -83,7 +83,7 @@ use {
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage},
snapshot_package::PendingSnapshotPackage,
snapshot_utils,
},
solana_sdk::{
@ -613,9 +613,10 @@ impl Validator {
(None, None)
};
let pending_accounts_package = PendingAccountsPackage::default();
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
let accounts_hash_verifier = AccountsHashVerifier::new(
Arc::clone(&pending_accounts_package),
accounts_package_sender.clone(),
accounts_package_receiver,
pending_snapshot_package,
&exit,
&cluster_info,
@ -630,7 +631,7 @@ impl Validator {
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config,
snapshot_request_receiver,
pending_accounts_package,
accounts_package_sender,
};
let pruned_banks_request_handler = PrunedBanksRequestHandler {
pruned_banks_receiver,

View File

@ -21,7 +21,7 @@ use {
runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig,
snapshot_package::{PendingAccountsPackage, PendingSnapshotPackage},
snapshot_package::PendingSnapshotPackage,
snapshot_utils,
},
solana_sdk::{
@ -186,9 +186,10 @@ impl BackgroundServices {
false,
);
let pending_accounts_package = PendingAccountsPackage::default();
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
let accounts_hash_verifier = AccountsHashVerifier::new(
Arc::clone(&pending_accounts_package),
accounts_package_sender.clone(),
accounts_package_receiver,
Some(pending_snapshot_package),
&exit,
&cluster_info,
@ -203,7 +204,7 @@ impl BackgroundServices {
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_config.clone(),
snapshot_request_receiver,
pending_accounts_package,
accounts_package_sender,
};
let pruned_banks_request_handler = PrunedBanksRequestHandler {
pruned_banks_receiver,

View File

@ -25,8 +25,8 @@ use {
snapshot_archive_info::FullSnapshotArchiveInfo,
snapshot_config::SnapshotConfig,
snapshot_package::{
AccountsPackage, AccountsPackageType, PendingAccountsPackage, PendingSnapshotPackage,
SnapshotPackage, SnapshotType,
AccountsPackage, AccountsPackageType, PendingSnapshotPackage, SnapshotPackage,
SnapshotType,
},
snapshot_utils::{
self, ArchiveFormat,
@ -211,7 +211,7 @@ fn run_bank_forks_snapshot_n<F>(
let bank_forks = &mut snapshot_test_config.bank_forks;
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let pending_accounts_package = PendingAccountsPackage::default();
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
let exit = Arc::new(AtomicBool::new(false));
let node_id = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
@ -220,7 +220,8 @@ fn run_bank_forks_snapshot_n<F>(
SocketAddrSpace::Unspecified,
));
let accounts_hash_verifier = AccountsHashVerifier::new(
Arc::clone(&pending_accounts_package),
accounts_package_sender.clone(),
accounts_package_receiver,
None,
&exit,
&cluster_info,
@ -235,7 +236,7 @@ fn run_bank_forks_snapshot_n<F>(
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
pending_accounts_package,
accounts_package_sender,
};
for slot in 1..=last_slot {
let mut bank = Bank::new_from_parent(&bank_forks[slot - 1], &Pubkey::default(), slot);
@ -366,8 +367,10 @@ fn test_concurrent_snapshot_packaging(
.unwrap();
// Set up snapshotting channels
let real_pending_accounts_package = PendingAccountsPackage::default();
let fake_pending_accounts_package = PendingAccountsPackage::default();
let (real_accounts_package_sender, real_accounts_package_receiver) =
crossbeam_channel::unbounded();
let (fake_accounts_package_sender, _fake_accounts_package_receiver) =
crossbeam_channel::unbounded();
// Create next MAX_CACHE_ENTRIES + 2 banks and snapshots. Every bank will get snapshotted
// and the snapshot purging logic will run on every snapshot taken. This means the three
@ -393,21 +396,21 @@ fn test_concurrent_snapshot_packaging(
assert_eq!(bank.process_transaction(&tx), Ok(()));
bank.squash();
let pending_accounts_package = {
let accounts_package_sender = {
if slot == saved_slot as u64 {
// Only send one package on the real pending_accounts_package so that the
// Only send one package on the real accounts package channel so that the
// packaging service doesn't take forever to run the packaging logic on all
// MAX_CACHE_ENTRIES later
&real_pending_accounts_package
&real_accounts_package_sender
} else {
&fake_pending_accounts_package
&fake_accounts_package_sender
}
};
snapshot_utils::snapshot_bank(
&bank,
vec![],
pending_accounts_package,
accounts_package_sender,
bank_snapshots_dir,
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
@ -508,11 +511,7 @@ fn test_concurrent_snapshot_packaging(
let _package_receiver = std::thread::Builder::new()
.name("package-receiver".to_string())
.spawn(move || {
let accounts_package = real_pending_accounts_package
.lock()
.unwrap()
.take()
.unwrap();
let accounts_package = real_accounts_package_receiver.try_recv().unwrap();
solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash(
accounts_package.snapshot_links.path(),
accounts_package.slot,
@ -709,7 +708,7 @@ fn test_bank_forks_incremental_snapshot(
let bank_forks = &mut snapshot_test_config.bank_forks;
let mint_keypair = &snapshot_test_config.genesis_config_info.mint_keypair;
let pending_accounts_package = PendingAccountsPackage::default();
let (accounts_package_sender, accounts_package_receiver) = crossbeam_channel::unbounded();
let exit = Arc::new(AtomicBool::new(false));
let node_id = Arc::new(Keypair::new());
let cluster_info = Arc::new(ClusterInfo::new(
@ -718,7 +717,8 @@ fn test_bank_forks_incremental_snapshot(
SocketAddrSpace::Unspecified,
));
let accounts_hash_verifier = AccountsHashVerifier::new(
Arc::clone(&pending_accounts_package),
accounts_package_sender.clone(),
accounts_package_receiver,
None,
&exit,
&cluster_info,
@ -733,7 +733,7 @@ fn test_bank_forks_incremental_snapshot(
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
pending_accounts_package,
accounts_package_sender,
};
let mut last_full_snapshot_slot = None;
@ -961,7 +961,7 @@ fn test_snapshots_with_background_services(
let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let pending_accounts_package = PendingAccountsPackage::default();
let (accounts_package_sender, accounts_package_receiver) = unbounded();
let pending_snapshot_package = PendingSnapshotPackage::default();
let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks));
@ -981,7 +981,7 @@ fn test_snapshots_with_background_services(
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: snapshot_test_config.snapshot_config.clone(),
snapshot_request_receiver,
pending_accounts_package: Arc::clone(&pending_accounts_package),
accounts_package_sender: accounts_package_sender.clone(),
};
let pruned_banks_request_handler = PrunedBanksRequestHandler {
pruned_banks_receiver,
@ -1002,7 +1002,8 @@ fn test_snapshots_with_background_services(
);
let accounts_hash_verifier = AccountsHashVerifier::new(
pending_accounts_package,
accounts_package_sender,
accounts_package_receiver,
Some(pending_snapshot_package),
&exit,
&cluster_info,

View File

@ -56,7 +56,6 @@ use {
snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes,
snapshot_minimizer::SnapshotMinimizer,
snapshot_package::PendingAccountsPackage,
snapshot_utils::{
self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION,
SUPPORTED_ARCHIVE_COMPRESSION,
@ -1135,11 +1134,12 @@ fn load_bank_forks(
);
let (snapshot_request_sender, snapshot_request_receiver) = crossbeam_channel::unbounded();
let (accounts_package_sender, _accounts_package_receiver) = crossbeam_channel::unbounded();
let accounts_background_request_sender = AbsRequestSender::new(snapshot_request_sender);
let snapshot_request_handler = SnapshotRequestHandler {
snapshot_config: SnapshotConfig::new_load_only(),
snapshot_request_receiver,
pending_accounts_package: PendingAccountsPackage::default(),
accounts_package_sender,
};
let pruned_banks_receiver =
AccountsBackgroundService::setup_bank_drop_callback(bank_forks.clone());

View File

@ -9,7 +9,7 @@ use {
bank::{Bank, BankSlotDelta, DropCallback},
bank_forks::BankForks,
snapshot_config::SnapshotConfig,
snapshot_package::{AccountsPackageType, PendingAccountsPackage, SnapshotType},
snapshot_package::{AccountsPackage, AccountsPackageType, SnapshotType},
snapshot_utils::{self, SnapshotError},
},
crossbeam_channel::{Receiver, SendError, Sender},
@ -138,7 +138,7 @@ pub enum SnapshotRequestType {
pub struct SnapshotRequestHandler {
pub snapshot_config: SnapshotConfig,
pub snapshot_request_receiver: SnapshotRequestReceiver,
pub pending_accounts_package: PendingAccountsPackage,
pub accounts_package_sender: Sender<AccountsPackage>,
}
impl SnapshotRequestHandler {
@ -292,7 +292,7 @@ impl SnapshotRequestHandler {
let result = snapshot_utils::snapshot_bank(
&snapshot_root_bank,
status_cache_slot_deltas,
&self.pending_accounts_package,
&self.accounts_package_sender,
&self.snapshot_config.bank_snapshots_dir,
&self.snapshot_config.full_snapshot_archives_dir,
&self.snapshot_config.incremental_snapshot_archives_dir,

View File

@ -22,15 +22,13 @@ use {
tempfile::TempDir,
};
/// The PendingAccountsPackage passes an AccountsPackage from AccountsBackgroundService to
/// AccountsHashVerifier for hashing
pub type PendingAccountsPackage = Arc<Mutex<Option<AccountsPackage>>>;
mod compare;
pub use compare::*;
/// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to
/// SnapshotPackagerService for archiving
pub type PendingSnapshotPackage = Arc<Mutex<Option<SnapshotPackage>>>;
#[derive(Debug)]
pub struct AccountsPackage {
pub package_type: AccountsPackageType,
pub slot: Slot,
@ -122,6 +120,40 @@ impl AccountsPackage {
enable_rehashing: bank.bank_enable_rehashing_on_accounts_hash(),
})
}
/// Create a new Accounts Package where basically every field is defaulted.
/// Only use for tests; many of the fields are invalid!
pub fn default_for_tests() -> Self {
Self {
package_type: AccountsPackageType::AccountsHashVerifier,
slot: Slot::default(),
block_height: Slot::default(),
slot_deltas: Vec::default(),
snapshot_links: TempDir::new().unwrap(),
snapshot_storages: SnapshotStorages::default(),
archive_format: ArchiveFormat::Tar,
snapshot_version: SnapshotVersion::default(),
full_snapshot_archives_dir: PathBuf::default(),
incremental_snapshot_archives_dir: PathBuf::default(),
expected_capitalization: u64::default(),
accounts_hash_for_testing: Option::default(),
cluster_type: ClusterType::Development,
accounts: Arc::new(Accounts::default_for_tests()),
epoch_schedule: EpochSchedule::default(),
rent_collector: RentCollector::default(),
enable_rehashing: bool::default(),
}
}
}
impl std::fmt::Debug for AccountsPackage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AccountsPackage")
.field("type", &self.package_type)
.field("slot", &self.slot)
.field("block_height", &self.block_height)
.finish_non_exhaustive()
}
}
/// Accounts packages are sent to the Accounts Hash Verifier for processing. There are multiple

View File

@ -0,0 +1,404 @@
use {
super::{AccountsPackage, AccountsPackageType, SnapshotType},
std::cmp::Ordering::{self, Equal, Greater, Less},
};
/// Compare accounts packages by priority; first by type, then by slot
#[must_use]
pub fn cmp_accounts_packages_by_priority(a: &AccountsPackage, b: &AccountsPackage) -> Ordering {
cmp_accounts_package_types_by_priority(&a.package_type, &b.package_type)
.then(a.slot.cmp(&b.slot))
}
/// Compare accounts package types by priority
///
/// Priority, from highest to lowest:
/// - Epoch Accounts Hash
/// - Full Snapshot
/// - Incremental Snapshot
/// - Accounts Hash Verifier
///
/// If two `Snapshot`s are compared, their snapshot types are the tiebreaker.
#[must_use]
pub fn cmp_accounts_package_types_by_priority(
a: &AccountsPackageType,
b: &AccountsPackageType,
) -> Ordering {
use AccountsPackageType::*;
match (a, b) {
// Epoch Accounts Hash packages
(EpochAccountsHash, EpochAccountsHash) => Equal,
(EpochAccountsHash, _) => Greater,
(_, EpochAccountsHash) => Less,
// Snapshot packages
(Snapshot(snapshot_type_a), Snapshot(snapshot_type_b)) => {
cmp_snapshot_types_by_priority(snapshot_type_a, snapshot_type_b)
}
(Snapshot(_), _) => Greater,
(_, Snapshot(_)) => Less,
// Accounts Hash Verifier packages
(AccountsHashVerifier, AccountsHashVerifier) => Equal,
}
}
/// Compare snapshot types by priority
///
/// Full snapshots are higher in priority than incremental snapshots.
/// If two `IncrementalSnapshot`s are compared, their base slots are the tiebreaker.
#[must_use]
pub fn cmp_snapshot_types_by_priority(a: &SnapshotType, b: &SnapshotType) -> Ordering {
use SnapshotType::*;
match (a, b) {
(FullSnapshot, FullSnapshot) => Equal,
(FullSnapshot, IncrementalSnapshot(_)) => Greater,
(IncrementalSnapshot(_), FullSnapshot) => Less,
(IncrementalSnapshot(base_slot_a), IncrementalSnapshot(base_slot_b)) => {
base_slot_a.cmp(base_slot_b)
}
}
}
#[cfg(test)]
mod tests {
use {super::*, solana_sdk::clock::Slot};
#[test]
fn test_cmp_accounts_packages_by_priority() {
fn new(package_type: AccountsPackageType, slot: Slot) -> AccountsPackage {
AccountsPackage {
package_type,
slot,
block_height: slot,
..AccountsPackage::default_for_tests()
}
}
for (accounts_package_a, accounts_package_b, expected_result) in [
(
new(AccountsPackageType::EpochAccountsHash, 11),
new(AccountsPackageType::EpochAccountsHash, 22),
Less,
),
(
new(AccountsPackageType::EpochAccountsHash, 22),
new(AccountsPackageType::EpochAccountsHash, 22),
Equal,
),
(
new(AccountsPackageType::EpochAccountsHash, 33),
new(AccountsPackageType::EpochAccountsHash, 22),
Greater,
),
(
new(AccountsPackageType::EpochAccountsHash, 123),
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
123,
),
Greater,
),
(
new(AccountsPackageType::EpochAccountsHash, 123),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
Greater,
),
(
new(AccountsPackageType::EpochAccountsHash, 123),
new(AccountsPackageType::AccountsHashVerifier, 123),
Greater,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
123,
),
new(AccountsPackageType::EpochAccountsHash, 123),
Less,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
11,
),
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
22,
),
Less,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
22,
),
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
22,
),
Equal,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
33,
),
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
22,
),
Greater,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
123,
),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
Greater,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
123,
),
new(AccountsPackageType::AccountsHashVerifier, 123),
Greater,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
new(AccountsPackageType::EpochAccountsHash, 123),
Less,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
new(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
123,
),
Less,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(6)),
123,
),
Less,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
11,
),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
22,
),
Less,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
22,
),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
22,
),
Equal,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
33,
),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
22,
),
Greater,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(4)),
123,
),
Greater,
),
(
new(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
123,
),
new(AccountsPackageType::AccountsHashVerifier, 123),
Greater,
),
(
new(AccountsPackageType::AccountsHashVerifier, 11),
new(AccountsPackageType::AccountsHashVerifier, 22),
Less,
),
(
new(AccountsPackageType::AccountsHashVerifier, 22),
new(AccountsPackageType::AccountsHashVerifier, 22),
Equal,
),
(
new(AccountsPackageType::AccountsHashVerifier, 33),
new(AccountsPackageType::AccountsHashVerifier, 22),
Greater,
),
] {
let actual_result =
cmp_accounts_packages_by_priority(&accounts_package_a, &accounts_package_b);
assert_eq!(expected_result, actual_result);
}
}
#[test]
fn test_cmp_accounts_package_types_by_priority() {
for (accounts_package_type_a, accounts_package_type_b, expected_result) in [
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::EpochAccountsHash,
Equal,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
Greater,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
Greater,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::AccountsHashVerifier,
Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::EpochAccountsHash,
Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
Equal,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::AccountsHashVerifier,
Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::EpochAccountsHash,
Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(6)),
Less,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
Equal,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(4)),
Greater,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(5)),
AccountsPackageType::AccountsHashVerifier,
Greater,
),
(
AccountsPackageType::AccountsHashVerifier,
AccountsPackageType::AccountsHashVerifier,
Equal,
),
] {
let actual_result = cmp_accounts_package_types_by_priority(
&accounts_package_type_a,
&accounts_package_type_b,
);
assert_eq!(expected_result, actual_result);
}
}
#[test]
fn test_cmp_snapshot_types_by_priority() {
for (snapshot_type_a, snapshot_type_b, expected_result) in [
(
SnapshotType::FullSnapshot,
SnapshotType::FullSnapshot,
Equal,
),
(
SnapshotType::FullSnapshot,
SnapshotType::IncrementalSnapshot(5),
Greater,
),
(
SnapshotType::IncrementalSnapshot(5),
SnapshotType::FullSnapshot,
Less,
),
(
SnapshotType::IncrementalSnapshot(5),
SnapshotType::IncrementalSnapshot(6),
Less,
),
(
SnapshotType::IncrementalSnapshot(5),
SnapshotType::IncrementalSnapshot(5),
Equal,
),
(
SnapshotType::IncrementalSnapshot(5),
SnapshotType::IncrementalSnapshot(4),
Greater,
),
] {
let actual_result = cmp_snapshot_types_by_priority(&snapshot_type_a, &snapshot_type_b);
assert_eq!(expected_result, actual_result);
}
}
}

View File

@ -16,10 +16,7 @@ use {
snapshot_archive_info::{
FullSnapshotArchiveInfo, IncrementalSnapshotArchiveInfo, SnapshotArchiveInfoGetter,
},
snapshot_package::{
AccountsPackage, AccountsPackageType, PendingAccountsPackage, SnapshotPackage,
SnapshotType,
},
snapshot_package::{AccountsPackage, AccountsPackageType, SnapshotPackage, SnapshotType},
snapshot_utils::snapshot_storage_rebuilder::SnapshotStorageRebuilder,
status_cache,
},
@ -2089,7 +2086,7 @@ pub fn purge_old_bank_snapshots(bank_snapshots_dir: impl AsRef<Path>) {
pub fn snapshot_bank(
root_bank: &Bank,
status_cache_slot_deltas: Vec<BankSlotDelta>,
pending_accounts_package: &PendingAccountsPackage,
accounts_package_sender: &Sender<AccountsPackage>,
bank_snapshots_dir: impl AsRef<Path>,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
@ -2126,26 +2123,9 @@ pub fn snapshot_bank(
.expect("failed to hard link bank snapshot into a tmpdir");
// Submit the accounts package
// This extra scope is to be explicit about the lifetime of the pending accounts package lock
{
let mut pending_accounts_package = pending_accounts_package.lock().unwrap();
if can_submit_accounts_package(&accounts_package, pending_accounts_package.as_ref()) {
let package_type = accounts_package.package_type;
let old_accounts_package = pending_accounts_package.replace(accounts_package);
drop(pending_accounts_package);
if let Some(old_accounts_package) = old_accounts_package {
info!(
"The pending AccountsPackage has been overwritten: \
\nNew AccountsPackage slot: {}, package type: {:?} \
\nOld AccountsPackage slot: {}, package type: {:?}",
root_bank.slot(),
package_type,
old_accounts_package.slot,
old_accounts_package.package_type,
);
}
}
}
accounts_package_sender
.send(accounts_package)
.expect("send accounts package");
Ok(())
}
@ -2383,68 +2363,11 @@ pub fn should_take_incremental_snapshot(
&& last_full_snapshot_slot.is_some()
}
/// Decide if an accounts package can be submitted to the PendingAccountsPackage
///
/// If there's no pending accounts package, then submit
/// Otherwise, check if the pending accounts package can be overwritten
#[must_use]
fn can_submit_accounts_package(
accounts_package: &AccountsPackage,
pending_accounts_package: Option<&AccountsPackage>,
) -> bool {
if let Some(pending_accounts_package) = pending_accounts_package {
can_overwrite_pending_accounts_package(accounts_package, pending_accounts_package)
} else {
true
}
}
/// Decide when it is appropriate to overwrite a pending accounts package
///
/// The priority of the package types is (from highest to lowest):
/// 1. Epoch Account Hash
/// 2. Full Snapshot
/// 3. Incremental Snapshot
/// 4. Accounts Hash Verifier
///
/// New packages of higher priority types can overwrite pending packages of *equivalent or lower*
/// priority types.
#[must_use]
fn can_overwrite_pending_accounts_package(
accounts_package: &AccountsPackage,
pending_accounts_package: &AccountsPackage,
) -> bool {
match (
&pending_accounts_package.package_type,
&accounts_package.package_type,
) {
(AccountsPackageType::EpochAccountsHash, AccountsPackageType::EpochAccountsHash) => {
panic!(
"Both pending and new accounts packages are of type EpochAccountsHash! \
EAH calculations must complete before new ones are enqueued. \
\npending accounts package slot: {} \
\n new accounts package slot: {}",
pending_accounts_package.slot, accounts_package.slot,
);
}
(_, AccountsPackageType::EpochAccountsHash) => true,
(AccountsPackageType::EpochAccountsHash, _) => false,
(_, AccountsPackageType::Snapshot(SnapshotType::FullSnapshot)) => true,
(AccountsPackageType::Snapshot(SnapshotType::FullSnapshot), _) => false,
(_, AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(_))) => true,
(AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(_)), _) => false,
_ => true,
}
}
#[cfg(test)]
mod tests {
use {
super::*,
crate::{
accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, snapshot_package::AccountsPackageType,
status_cache::Status,
},
crate::{accounts_db::ACCOUNTS_DB_CONFIG_FOR_TESTING, status_cache::Status},
assert_matches::assert_matches,
bincode::{deserialize_from, serialize_into},
solana_sdk::{
@ -4036,165 +3959,6 @@ mod tests {
assert_eq!(bank_fields.parent_slot, bank2.parent_slot());
}
/// Test `can_submit_accounts_packages()` with all permutations of `package_type`
/// for both the pending accounts package and the new accounts package.
///
/// pending | new |
/// package | package | result
/// ---------+---------+--------
/// 1. None | X | true
/// 2. AHV | AHV | true
/// 3. AHV | ISS | true
/// 4. AHV | FSS | true
/// 5. AHV | EAH | true
/// 6. ISS | AHV | false
/// 7. ISS | ISS | true
/// 8. ISS | FSS | true
/// 9. ISS | EAH | true
/// 10. FSS | AHV | false
/// 11. FSS | ISS | false
/// 12. FSS | FSS | true
/// 13. FSS | EAH | true
/// 14. EAH | AHV | false
/// 15. EAH | ISS | false
/// 16. EAH | FSS | false
/// 17. EAH | EAH | assert unreachable, so test separately
#[test]
fn test_can_submit_accounts_package() {
// Test 1
{
let pending_accounts_package = None;
let accounts_package = new_accounts_package(AccountsPackageType::AccountsHashVerifier);
assert!(can_submit_accounts_package(
&accounts_package,
pending_accounts_package
));
}
for (pending_package_type, new_package_type, expected_result) in [
// Tests 2-5, pending package is AHV
(
AccountsPackageType::AccountsHashVerifier,
AccountsPackageType::AccountsHashVerifier,
true,
),
(
AccountsPackageType::AccountsHashVerifier,
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
true,
),
(
AccountsPackageType::AccountsHashVerifier,
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
true,
),
(
AccountsPackageType::AccountsHashVerifier,
AccountsPackageType::EpochAccountsHash,
true,
),
// Tests 6-9, pending package is ISS
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
AccountsPackageType::AccountsHashVerifier,
false,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
true,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
true,
),
(
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
AccountsPackageType::EpochAccountsHash,
true,
),
// Tests 10-13, pending package is FSS
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::AccountsHashVerifier,
false,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
false,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
true,
),
(
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
AccountsPackageType::EpochAccountsHash,
true,
),
// Tests 14-16, pending package is EAH
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::AccountsHashVerifier,
false,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::Snapshot(SnapshotType::IncrementalSnapshot(0)),
false,
),
(
AccountsPackageType::EpochAccountsHash,
AccountsPackageType::Snapshot(SnapshotType::FullSnapshot),
false,
),
// tested separately: (AccountsPackageType::EpochAccountsHash, None, AccountsPackageType::EpochAccountsHash, None, X),
] {
let pending_accounts_package = new_accounts_package(pending_package_type);
let accounts_package = new_accounts_package(new_package_type);
let actual_result =
can_submit_accounts_package(&accounts_package, Some(&pending_accounts_package));
assert_eq!(expected_result, actual_result);
}
}
/// It should not be allowed to have a new accounts package intended for EAH when there is
/// already a pending EAH accounts package.
#[test]
#[should_panic]
fn test_can_submit_accounts_package_both_are_eah() {
let pending_accounts_package = new_accounts_package(AccountsPackageType::EpochAccountsHash);
let accounts_package = new_accounts_package(AccountsPackageType::EpochAccountsHash);
_ = can_submit_accounts_package(&accounts_package, Some(&pending_accounts_package));
}
/// helper function to create an AccountsPackage that's good enough for tests
fn new_accounts_package(package_type: AccountsPackageType) -> AccountsPackage {
AccountsPackage {
package_type,
slot: Slot::default(),
block_height: Slot::default(),
slot_deltas: Vec::default(),
snapshot_links: TempDir::new().unwrap(),
snapshot_storages: SnapshotStorages::default(),
archive_format: ArchiveFormat::Tar,
snapshot_version: SnapshotVersion::default(),
full_snapshot_archives_dir: PathBuf::default(),
incremental_snapshot_archives_dir: PathBuf::default(),
expected_capitalization: u64::default(),
accounts_hash_for_testing: None,
cluster_type: solana_sdk::genesis_config::ClusterType::Development,
accounts: Arc::new(crate::accounts::Accounts::default_for_tests()),
epoch_schedule: solana_sdk::epoch_schedule::EpochSchedule::default(),
rent_collector: crate::rent_collector::RentCollector::default(),
enable_rehashing: true,
}
}
#[test]
fn test_verify_slot_deltas_structural_good() {
// NOTE: slot deltas do not need to be sorted