Uses a channel for AHV -> SPS (#30406)

This commit is contained in:
Brooks 2023-02-21 22:36:29 -05:00 committed by GitHub
parent 2e4b8ea8bb
commit 1689586213
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 372 additions and 98 deletions

View File

@ -13,8 +13,7 @@ use {
epoch_accounts_hash::EpochAccountsHash, epoch_accounts_hash::EpochAccountsHash,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::{ snapshot_package::{
self, retain_max_n_elements, AccountsPackage, AccountsPackageType, self, retain_max_n_elements, AccountsPackage, AccountsPackageType, SnapshotPackage,
PendingSnapshotPackage, SnapshotPackage, SnapshotType,
}, },
sorted_storages::SortedStorages, sorted_storages::SortedStorages,
}, },
@ -42,7 +41,7 @@ impl AccountsHashVerifier {
pub fn new( pub fn new(
accounts_package_sender: Sender<AccountsPackage>, accounts_package_sender: Sender<AccountsPackage>,
accounts_package_receiver: Receiver<AccountsPackage>, accounts_package_receiver: Receiver<AccountsPackage>,
pending_snapshot_package: Option<PendingSnapshotPackage>, snapshot_package_sender: Option<Sender<SnapshotPackage>>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
known_validators: Option<HashSet<Pubkey>>, known_validators: Option<HashSet<Pubkey>>,
@ -79,7 +78,7 @@ impl AccountsHashVerifier {
&cluster_info, &cluster_info,
known_validators.as_ref(), known_validators.as_ref(),
halt_on_known_validators_accounts_hash_mismatch, halt_on_known_validators_accounts_hash_mismatch,
pending_snapshot_package.as_ref(), snapshot_package_sender.as_ref(),
&mut hashes, &mut hashes,
&exit, &exit,
fault_injection_rate_slots, fault_injection_rate_slots,
@ -180,7 +179,7 @@ impl AccountsHashVerifier {
cluster_info: &ClusterInfo, cluster_info: &ClusterInfo,
known_validators: Option<&HashSet<Pubkey>>, known_validators: Option<&HashSet<Pubkey>>,
halt_on_known_validator_accounts_hash_mismatch: bool, halt_on_known_validator_accounts_hash_mismatch: bool,
pending_snapshot_package: Option<&PendingSnapshotPackage>, snapshot_package_sender: Option<&Sender<SnapshotPackage>>,
hashes: &mut Vec<(Slot, Hash)>, hashes: &mut Vec<(Slot, Hash)>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
fault_injection_rate_slots: u64, fault_injection_rate_slots: u64,
@ -203,7 +202,7 @@ impl AccountsHashVerifier {
Self::submit_for_packaging( Self::submit_for_packaging(
accounts_package, accounts_package,
pending_snapshot_package, snapshot_package_sender,
snapshot_config, snapshot_config,
accounts_hash, accounts_hash,
); );
@ -373,12 +372,11 @@ impl AccountsHashVerifier {
fn submit_for_packaging( fn submit_for_packaging(
accounts_package: AccountsPackage, accounts_package: AccountsPackage,
pending_snapshot_package: Option<&PendingSnapshotPackage>, snapshot_package_sender: Option<&Sender<SnapshotPackage>>,
snapshot_config: &SnapshotConfig, snapshot_config: &SnapshotConfig,
accounts_hash: AccountsHash, accounts_hash: AccountsHash,
) { ) {
if pending_snapshot_package.is_none() if !snapshot_config.should_generate_snapshots()
|| !snapshot_config.should_generate_snapshots()
|| !matches!( || !matches!(
accounts_package.package_type, accounts_package.package_type,
AccountsPackageType::Snapshot(_) AccountsPackageType::Snapshot(_)
@ -386,26 +384,14 @@ impl AccountsHashVerifier {
{ {
return; return;
} }
let Some(snapshot_package_sender) = snapshot_package_sender else {
let snapshot_package = SnapshotPackage::new(accounts_package, accounts_hash.into()); return;
let pending_snapshot_package = pending_snapshot_package.unwrap();
// If the snapshot package is an Incremental Snapshot, do not submit it if there's already
// a pending Full Snapshot.
let can_submit = match snapshot_package.snapshot_type {
SnapshotType::FullSnapshot => true,
SnapshotType::IncrementalSnapshot(_) => pending_snapshot_package
.lock()
.unwrap()
.as_ref()
.map_or(true, |snapshot_package| {
snapshot_package.snapshot_type.is_incremental_snapshot()
}),
}; };
if can_submit { let snapshot_package = SnapshotPackage::new(accounts_package, accounts_hash.into());
*pending_snapshot_package.lock().unwrap() = Some(snapshot_package); snapshot_package_sender
} .send(snapshot_package)
.expect("send snapshot package");
} }
fn should_halt( fn should_halt(
@ -465,6 +451,7 @@ mod tests {
super::*, super::*,
rand::seq::SliceRandom, rand::seq::SliceRandom,
solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo}, solana_gossip::{cluster_info::make_accounts_hashes_message, contact_info::ContactInfo},
solana_runtime::snapshot_package::SnapshotType,
solana_sdk::{ solana_sdk::{
hash::hash, hash::hash,
signature::{Keypair, Signer}, signature::{Keypair, Signer},

View File

@ -1,7 +1,9 @@
use { use {
crossbeam_channel::{Receiver, Sender},
solana_gossip::cluster_info::{ solana_gossip::cluster_info::{
ClusterInfo, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_SNAPSHOT_HASHES, ClusterInfo, MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_SNAPSHOT_HASHES,
}, },
solana_measure::measure_us,
solana_perf::thread::renice_this_thread, solana_perf::thread::renice_this_thread,
solana_runtime::{ solana_runtime::{
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_archive_info::SnapshotArchiveInfoGetter,
@ -10,7 +12,7 @@ use {
FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash, FullSnapshotHash, FullSnapshotHashes, IncrementalSnapshotHash,
IncrementalSnapshotHashes, StartingSnapshotHashes, IncrementalSnapshotHashes, StartingSnapshotHashes,
}, },
snapshot_package::{retain_max_n_elements, PendingSnapshotPackage, SnapshotType}, snapshot_package::{self, retain_max_n_elements, SnapshotPackage, SnapshotType},
snapshot_utils, snapshot_utils,
}, },
solana_sdk::{clock::Slot, hash::Hash}, solana_sdk::{clock::Slot, hash::Hash},
@ -29,8 +31,12 @@ pub struct SnapshotPackagerService {
} }
impl SnapshotPackagerService { impl SnapshotPackagerService {
/// If there are no snapshot packages to handle, limit how often we re-check
const LOOP_LIMITER: Duration = Duration::from_millis(100);
pub fn new( pub fn new(
pending_snapshot_package: PendingSnapshotPackage, snapshot_package_sender: Sender<SnapshotPackage>,
snapshot_package_receiver: Receiver<SnapshotPackage>,
starting_snapshot_hashes: Option<StartingSnapshotHashes>, starting_snapshot_hashes: Option<StartingSnapshotHashes>,
exit: &Arc<AtomicBool>, exit: &Arc<AtomicBool>,
cluster_info: &Arc<ClusterInfo>, cluster_info: &Arc<ClusterInfo>,
@ -72,13 +78,18 @@ impl SnapshotPackagerService {
break; break;
} }
let snapshot_package = pending_snapshot_package.lock().unwrap().take(); let Some((
if snapshot_package.is_none() { snapshot_package,
std::thread::sleep(Duration::from_millis(100)); num_outstanding_snapshot_packages,
num_re_enqueued_snapshot_packages,
)) = Self::get_next_snapshot_package(&snapshot_package_sender, &snapshot_package_receiver) else {
std::thread::sleep(Self::LOOP_LIMITER);
continue; continue;
} };
let snapshot_package = snapshot_package.unwrap(); info!("handling snapshot package: {snapshot_package:?}");
let enqueued_time = snapshot_package.enqueued.elapsed();
let (_, handling_time_us) = measure_us!({
// Archiving the snapshot package is not allowed to fail. // Archiving the snapshot package is not allowed to fail.
// AccountsBackgroundService calls `clean_accounts()` with a value for // AccountsBackgroundService calls `clean_accounts()` with a value for
// last_full_snapshot_slot that requires this archive call to succeed. // last_full_snapshot_slot that requires this archive call to succeed.
@ -97,7 +108,25 @@ impl SnapshotPackagerService {
(snapshot_package.slot(), snapshot_package.hash().0), (snapshot_package.slot(), snapshot_package.hash().0),
); );
} }
});
datapoint_info!(
"snapshot_packager_service",
(
"num-outstanding-snapshot-packages",
num_outstanding_snapshot_packages,
i64
),
(
"num-re-enqueued-snapshot-packages",
num_re_enqueued_snapshot_packages,
i64
),
("enqueued-time-us", enqueued_time.as_micros(), i64),
("handling-time-us", handling_time_us, i64),
);
} }
info!("Snapshot Packager Service has stopped");
}) })
.unwrap(); .unwrap();
@ -109,6 +138,58 @@ impl SnapshotPackagerService {
pub fn join(self) -> thread::Result<()> { pub fn join(self) -> thread::Result<()> {
self.t_snapshot_packager.join() self.t_snapshot_packager.join()
} }
/// Get the next snapshot package to handle
///
/// Look through the snapshot package channel to find the highest priority one to handle next.
/// If there are no snapshot packages in the channel, return None. Otherwise return the
/// highest priority one. Unhandled snapshot packages with slots GREATER-THAN the handled one
/// will be re-enqueued. The remaining will be dropped.
///
/// Also return the number of snapshot packages initially in the channel, and the number of
/// ones re-enqueued.
fn get_next_snapshot_package(
snapshot_package_sender: &Sender<SnapshotPackage>,
snapshot_package_receiver: &Receiver<SnapshotPackage>,
) -> Option<(
SnapshotPackage,
/*num outstanding snapshot packages*/ usize,
/*num re-enqueued snapshot packages*/ usize,
)> {
let mut snapshot_packages: Vec<_> = snapshot_package_receiver.try_iter().collect();
// `select_nth()` panics if the slice is empty, so return if that's the case
if snapshot_packages.is_empty() {
return None;
}
let snapshot_packages_len = snapshot_packages.len();
debug!("outstanding snapshot packages ({snapshot_packages_len}): {snapshot_packages:?}");
snapshot_packages.select_nth_unstable_by(
snapshot_packages_len - 1,
snapshot_package::cmp_snapshot_packages_by_priority,
);
// SAFETY: We know `snapshot_packages` is not empty, so its len is >= 1,
// therefore there is always an element to pop.
let snapshot_package = snapshot_packages.pop().unwrap();
let handled_snapshot_package_slot = snapshot_package.slot();
// re-enqueue any remaining snapshot packages for slots GREATER-THAN the snapshot package
// that will be handled
let num_re_enqueued_snapshot_packages = snapshot_packages
.into_iter()
.filter(|snapshot_package| snapshot_package.slot() > handled_snapshot_package_slot)
.map(|snapshot_package| {
snapshot_package_sender
.try_send(snapshot_package)
.expect("re-enqueue snapshot package")
})
.count();
Some((
snapshot_package,
snapshot_packages_len,
num_re_enqueued_snapshot_packages,
))
}
} }
struct SnapshotGossipManager { struct SnapshotGossipManager {
@ -221,6 +302,7 @@ mod tests {
use { use {
super::*, super::*,
bincode::serialize_into, bincode::serialize_into,
rand::seq::SliceRandom,
solana_runtime::{ solana_runtime::{
accounts_db::AccountStorageEntry, accounts_db::AccountStorageEntry,
bank::BankSlotDelta, bank::BankSlotDelta,
@ -232,11 +314,12 @@ mod tests {
SNAPSHOT_STATUS_CACHE_FILENAME, SNAPSHOT_STATUS_CACHE_FILENAME,
}, },
}, },
solana_sdk::hash::Hash, solana_sdk::{clock::Slot, hash::Hash},
std::{ std::{
fs::{self, remove_dir_all, OpenOptions}, fs::{self, remove_dir_all, OpenOptions},
io::Write, io::Write,
path::{Path, PathBuf}, path::{Path, PathBuf},
time::Instant,
}, },
tempfile::TempDir, tempfile::TempDir,
}; };
@ -335,6 +418,7 @@ mod tests {
snapshot_storages: storage_entries, snapshot_storages: storage_entries,
snapshot_version: SnapshotVersion::default(), snapshot_version: SnapshotVersion::default(),
snapshot_type: SnapshotType::FullSnapshot, snapshot_type: SnapshotType::FullSnapshot,
enqueued: Instant::now(),
}; };
// Make tarball from packageable snapshot // Make tarball from packageable snapshot
@ -369,4 +453,95 @@ mod tests {
snapshot_utils::VerifyBank::Deterministic, snapshot_utils::VerifyBank::Deterministic,
); );
} }
/// Ensure that unhandled snapshot packages are properly re-enqueued or dropped
///
/// The snapshot package handler should re-enqueue unhandled snapshot packages, if those
/// unhandled snapshot packages are for slots GREATER-THAN the last handled snapshot package.
/// Otherwise, they should be dropped.
#[test]
fn test_get_next_snapshot_package() {
fn new(snapshot_type: SnapshotType, slot: Slot) -> SnapshotPackage {
SnapshotPackage {
snapshot_archive_info: SnapshotArchiveInfo {
path: PathBuf::default(),
slot,
hash: SnapshotHash(Hash::default()),
archive_format: ArchiveFormat::Tar,
},
block_height: slot,
snapshot_links: TempDir::new().unwrap(),
snapshot_storages: Vec::default(),
snapshot_version: SnapshotVersion::default(),
snapshot_type,
enqueued: Instant::now(),
}
}
fn new_full(slot: Slot) -> SnapshotPackage {
new(SnapshotType::FullSnapshot, slot)
}
fn new_incr(slot: Slot, base: Slot) -> SnapshotPackage {
new(SnapshotType::IncrementalSnapshot(base), slot)
}
let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded();
// Populate the channel so that re-enqueueing and dropping will be tested
let mut snapshot_packages = [
new_full(100),
new_incr(110, 100),
new_incr(210, 100),
new_full(300),
new_incr(310, 300),
new_full(400), // <-- handle 1st
new_incr(410, 400),
new_incr(420, 400), // <-- handle 2nd
];
// Shuffle the snapshot packages to simulate receiving new snapshot packages from AHV
// simultaneously as SPS is handling them.
snapshot_packages.shuffle(&mut rand::thread_rng());
snapshot_packages
.into_iter()
.for_each(|snapshot_package| snapshot_package_sender.send(snapshot_package).unwrap());
// The Full Snapshot from slot 400 is handled 1st
// (the older full snapshots are skipped and dropped)
let (
snapshot_package,
_num_outstanding_snapshot_packages,
num_re_enqueued_snapshot_packages,
) = SnapshotPackagerService::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver,
)
.unwrap();
assert_eq!(snapshot_package.snapshot_type, SnapshotType::FullSnapshot,);
assert_eq!(snapshot_package.slot(), 400);
assert_eq!(num_re_enqueued_snapshot_packages, 2);
// The Incremental Snapshot from slot 420 is handled 2nd
// (the older incremental snapshot from slot 410 is skipped and dropped)
let (
snapshot_package,
_num_outstanding_snapshot_packages,
num_re_enqueued_snapshot_packages,
) = SnapshotPackagerService::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver,
)
.unwrap();
assert_eq!(
snapshot_package.snapshot_type,
SnapshotType::IncrementalSnapshot(400),
);
assert_eq!(snapshot_package.slot(), 420);
assert_eq!(num_re_enqueued_snapshot_packages, 0);
// And now the snapshot package channel is empty!
assert!(SnapshotPackagerService::get_next_snapshot_package(
&snapshot_package_sender,
&snapshot_package_receiver
)
.is_none());
}
} }

View File

@ -86,7 +86,6 @@ use {
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_hash::StartingSnapshotHashes, snapshot_hash::StartingSnapshotHashes,
snapshot_package::PendingSnapshotPackage,
snapshot_utils::{self, move_and_async_delete_path}, snapshot_utils::{self, move_and_async_delete_path},
}, },
solana_sdk::{ solana_sdk::{
@ -594,7 +593,7 @@ impl Validator {
config.accounts_hash_interval_slots, config.accounts_hash_interval_slots,
)); ));
let (pending_snapshot_package, snapshot_packager_service) = let (snapshot_package_sender, snapshot_packager_service) =
if config.snapshot_config.should_generate_snapshots() { if config.snapshot_config.should_generate_snapshots() {
// filler accounts make snapshots invalid for use // filler accounts make snapshots invalid for use
// so, do not publish that we have snapshots // so, do not publish that we have snapshots
@ -603,9 +602,11 @@ impl Validator {
.as_ref() .as_ref()
.map(|config| config.filler_accounts_config.count == 0) .map(|config| config.filler_accounts_config.count == 0)
.unwrap_or(true); .unwrap_or(true);
let pending_snapshot_package = PendingSnapshotPackage::default(); let (snapshot_package_sender, snapshot_package_receiver) =
crossbeam_channel::unbounded();
let snapshot_packager_service = SnapshotPackagerService::new( let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(), snapshot_package_sender.clone(),
snapshot_package_receiver,
starting_snapshot_hashes, starting_snapshot_hashes,
&exit, &exit,
&cluster_info, &cluster_info,
@ -613,7 +614,7 @@ impl Validator {
enable_gossip_push, enable_gossip_push,
); );
( (
Some(pending_snapshot_package), Some(snapshot_package_sender),
Some(snapshot_packager_service), Some(snapshot_packager_service),
) )
} else { } else {
@ -624,7 +625,7 @@ impl Validator {
let accounts_hash_verifier = AccountsHashVerifier::new( let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_sender.clone(), accounts_package_sender.clone(),
accounts_package_receiver, accounts_package_receiver,
pending_snapshot_package, snapshot_package_sender,
&exit, &exit,
&cluster_info, &cluster_info,
config.known_validators.clone(), config.known_validators.clone(),

View File

@ -22,7 +22,6 @@ use {
runtime_config::RuntimeConfig, runtime_config::RuntimeConfig,
snapshot_archive_info::SnapshotArchiveInfoGetter, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_package::PendingSnapshotPackage,
snapshot_utils, snapshot_utils,
}, },
solana_sdk::{ solana_sdk::{
@ -180,9 +179,10 @@ impl BackgroundServices {
) -> Self { ) -> Self {
info!("Starting background services..."); info!("Starting background services...");
let pending_snapshot_package = PendingSnapshotPackage::default(); let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded();
let snapshot_packager_service = SnapshotPackagerService::new( let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(), snapshot_package_sender.clone(),
snapshot_package_receiver,
None, None,
&exit, &exit,
&cluster_info, &cluster_info,
@ -194,7 +194,7 @@ impl BackgroundServices {
let accounts_hash_verifier = AccountsHashVerifier::new( let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_sender.clone(), accounts_package_sender.clone(),
accounts_package_receiver, accounts_package_receiver,
Some(pending_snapshot_package), Some(snapshot_package_sender),
&exit, &exit,
&cluster_info, &cluster_info,
None, None,

View File

@ -28,10 +28,7 @@ use {
snapshot_archive_info::FullSnapshotArchiveInfo, snapshot_archive_info::FullSnapshotArchiveInfo,
snapshot_config::SnapshotConfig, snapshot_config::SnapshotConfig,
snapshot_hash::SnapshotHash, snapshot_hash::SnapshotHash,
snapshot_package::{ snapshot_package::{AccountsPackage, AccountsPackageType, SnapshotPackage, SnapshotType},
AccountsPackage, AccountsPackageType, PendingSnapshotPackage, SnapshotPackage,
SnapshotType,
},
snapshot_utils::{ snapshot_utils::{
self, ArchiveFormat, self, ArchiveFormat,
SnapshotVersion::{self, V1_2_0}, SnapshotVersion::{self, V1_2_0},
@ -523,9 +520,10 @@ fn test_concurrent_snapshot_packaging(
ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified) ClusterInfo::new(contact_info, keypair, SocketAddrSpace::Unspecified)
}); });
let pending_snapshot_package = PendingSnapshotPackage::default(); let (snapshot_package_sender, snapshot_package_receiver) = crossbeam_channel::unbounded();
let snapshot_packager_service = SnapshotPackagerService::new( let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(), snapshot_package_sender.clone(),
snapshot_package_receiver,
None, None,
&exit, &exit,
&cluster_info, &cluster_info,
@ -535,28 +533,32 @@ fn test_concurrent_snapshot_packaging(
let _package_receiver = std::thread::Builder::new() let _package_receiver = std::thread::Builder::new()
.name("package-receiver".to_string()) .name("package-receiver".to_string())
.spawn(move || { .spawn({
let full_snapshot_archives_dir = full_snapshot_archives_dir.clone();
move || {
let accounts_package = real_accounts_package_receiver.try_recv().unwrap(); let accounts_package = real_accounts_package_receiver.try_recv().unwrap();
let accounts_hash = AccountsHash(Hash::default());
solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash( solana_runtime::serde_snapshot::reserialize_bank_with_new_accounts_hash(
accounts_package.snapshot_links_dir(), accounts_package.snapshot_links_dir(),
accounts_package.slot, accounts_package.slot,
&AccountsHash(Hash::default()), &accounts_hash,
None, None,
); );
let snapshot_package = let snapshot_package = SnapshotPackage::new(accounts_package, accounts_hash.into());
SnapshotPackage::new(accounts_package, AccountsHash(Hash::default()).into()); snapshot_package_sender.send(snapshot_package).unwrap();
pending_snapshot_package
.lock()
.unwrap()
.replace(snapshot_package);
// Wait until the package is consumed by SnapshotPackagerService // Wait until the package has been archived by SnapshotPackagerService
while pending_snapshot_package.lock().unwrap().is_some() { while snapshot_utils::get_highest_full_snapshot_archive_slot(
&full_snapshot_archives_dir,
)
.is_none()
{
std::thread::sleep(Duration::from_millis(100)); std::thread::sleep(Duration::from_millis(100));
} }
// Shutdown SnapshotPackagerService // Shutdown SnapshotPackagerService
exit.store(true, Ordering::Relaxed); exit.store(true, Ordering::Relaxed);
}
}) })
.unwrap(); .unwrap();
@ -987,7 +989,7 @@ fn test_snapshots_with_background_services(
let (pruned_banks_sender, pruned_banks_receiver) = unbounded(); let (pruned_banks_sender, pruned_banks_receiver) = unbounded();
let (snapshot_request_sender, snapshot_request_receiver) = unbounded(); let (snapshot_request_sender, snapshot_request_receiver) = unbounded();
let (accounts_package_sender, accounts_package_receiver) = unbounded(); let (accounts_package_sender, accounts_package_receiver) = unbounded();
let pending_snapshot_package = PendingSnapshotPackage::default(); let (snapshot_package_sender, snapshot_package_receiver) = unbounded();
let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks)); let bank_forks = Arc::new(RwLock::new(snapshot_test_config.bank_forks));
let callback = bank_forks let callback = bank_forks
@ -1019,7 +1021,8 @@ fn test_snapshots_with_background_services(
let exit = Arc::new(AtomicBool::new(false)); let exit = Arc::new(AtomicBool::new(false));
let snapshot_packager_service = SnapshotPackagerService::new( let snapshot_packager_service = SnapshotPackagerService::new(
pending_snapshot_package.clone(), snapshot_package_sender.clone(),
snapshot_package_receiver,
None, None,
&exit, &exit,
&cluster_info, &cluster_info,
@ -1030,7 +1033,7 @@ fn test_snapshots_with_background_services(
let accounts_hash_verifier = AccountsHashVerifier::new( let accounts_hash_verifier = AccountsHashVerifier::new(
accounts_package_sender, accounts_package_sender,
accounts_package_receiver, accounts_package_receiver,
Some(pending_snapshot_package), Some(snapshot_package_sender),
&exit, &exit,
&cluster_info, &cluster_info,
None, None,

View File

@ -18,7 +18,7 @@ use {
std::{ std::{
fs, fs,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex}, sync::Arc,
time::Instant, time::Instant,
}, },
tempfile::TempDir, tempfile::TempDir,
@ -27,10 +27,6 @@ use {
mod compare; mod compare;
pub use compare::*; pub use compare::*;
/// The PendingSnapshotPackage passes a SnapshotPackage from AccountsHashVerifier to
/// SnapshotPackagerService for archiving
pub type PendingSnapshotPackage = Arc<Mutex<Option<SnapshotPackage>>>;
/// This struct packages up fields to send from AccountsBackgroundService to AccountsHashVerifier /// This struct packages up fields to send from AccountsBackgroundService to AccountsHashVerifier
pub struct AccountsPackage { pub struct AccountsPackage {
pub package_type: AccountsPackageType, pub package_type: AccountsPackageType,
@ -241,6 +237,7 @@ pub enum AccountsPackageType {
EpochAccountsHash, EpochAccountsHash,
} }
/// This struct packages up fields to send from AccountsHashVerifier to SnapshotPackagerService
pub struct SnapshotPackage { pub struct SnapshotPackage {
pub snapshot_archive_info: SnapshotArchiveInfo, pub snapshot_archive_info: SnapshotArchiveInfo,
pub block_height: Slot, pub block_height: Slot,
@ -248,6 +245,10 @@ pub struct SnapshotPackage {
pub snapshot_storages: Vec<Arc<AccountStorageEntry>>, pub snapshot_storages: Vec<Arc<AccountStorageEntry>>,
pub snapshot_version: SnapshotVersion, pub snapshot_version: SnapshotVersion,
pub snapshot_type: SnapshotType, pub snapshot_type: SnapshotType,
/// The instant this snapshot package was sent to the queue.
/// Used to track how long snapshot packages wait before handling.
pub enqueued: Instant,
} }
impl SnapshotPackage { impl SnapshotPackage {
@ -296,10 +297,21 @@ impl SnapshotPackage {
snapshot_storages, snapshot_storages,
snapshot_version: snapshot_info.snapshot_version, snapshot_version: snapshot_info.snapshot_version,
snapshot_type, snapshot_type,
enqueued: Instant::now(),
} }
} }
} }
impl std::fmt::Debug for SnapshotPackage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SnapshotPackage")
.field("type", &self.snapshot_type)
.field("slot", &self.slot())
.field("block_height", &self.block_height)
.finish_non_exhaustive()
}
}
impl SnapshotArchiveInfoGetter for SnapshotPackage { impl SnapshotArchiveInfoGetter for SnapshotPackage {
fn snapshot_archive_info(&self) -> &SnapshotArchiveInfo { fn snapshot_archive_info(&self) -> &SnapshotArchiveInfo {
&self.snapshot_archive_info &self.snapshot_archive_info

View File

@ -1,8 +1,17 @@
use { use {
super::{AccountsPackage, AccountsPackageType, SnapshotType}, super::{
AccountsPackage, AccountsPackageType, SnapshotArchiveInfoGetter, SnapshotPackage,
SnapshotType,
},
std::cmp::Ordering::{self, Equal, Greater, Less}, std::cmp::Ordering::{self, Equal, Greater, Less},
}; };
/// Compare snapshot packages by priority; first by type, then by slot
#[must_use]
pub fn cmp_snapshot_packages_by_priority(a: &SnapshotPackage, b: &SnapshotPackage) -> Ordering {
cmp_snapshot_types_by_priority(&a.snapshot_type, &b.snapshot_type).then(a.slot().cmp(&b.slot()))
}
/// Compare accounts packages by priority; first by type, then by slot /// Compare accounts packages by priority; first by type, then by slot
#[must_use] #[must_use]
pub fn cmp_accounts_packages_by_priority(a: &AccountsPackage, b: &AccountsPackage) -> Ordering { pub fn cmp_accounts_packages_by_priority(a: &AccountsPackage, b: &AccountsPackage) -> Ordering {
@ -62,7 +71,94 @@ pub fn cmp_snapshot_types_by_priority(a: &SnapshotType, b: &SnapshotType) -> Ord
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use {super::*, solana_sdk::clock::Slot}; use {
super::*,
crate::{
snapshot_archive_info::SnapshotArchiveInfo,
snapshot_hash::SnapshotHash,
snapshot_utils::{ArchiveFormat, SnapshotVersion},
},
solana_sdk::{clock::Slot, hash::Hash},
std::{path::PathBuf, time::Instant},
tempfile::TempDir,
};
#[test]
fn test_cmp_snapshot_packages_by_priority() {
fn new(snapshot_type: SnapshotType, slot: Slot) -> SnapshotPackage {
SnapshotPackage {
snapshot_archive_info: SnapshotArchiveInfo {
path: PathBuf::default(),
slot,
hash: SnapshotHash(Hash::default()),
archive_format: ArchiveFormat::Tar,
},
block_height: slot,
snapshot_links: TempDir::new().unwrap(),
snapshot_storages: Vec::default(),
snapshot_version: SnapshotVersion::default(),
snapshot_type,
enqueued: Instant::now(),
}
}
for (snapshot_package_a, snapshot_package_b, expected_result) in [
(
new(SnapshotType::FullSnapshot, 11),
new(SnapshotType::FullSnapshot, 22),
Less,
),
(
new(SnapshotType::FullSnapshot, 22),
new(SnapshotType::FullSnapshot, 22),
Equal,
),
(
new(SnapshotType::FullSnapshot, 33),
new(SnapshotType::FullSnapshot, 22),
Greater,
),
(
new(SnapshotType::FullSnapshot, 22),
new(SnapshotType::IncrementalSnapshot(88), 99),
Greater,
),
(
new(SnapshotType::IncrementalSnapshot(11), 55),
new(SnapshotType::IncrementalSnapshot(22), 55),
Less,
),
(
new(SnapshotType::IncrementalSnapshot(22), 55),
new(SnapshotType::IncrementalSnapshot(22), 55),
Equal,
),
(
new(SnapshotType::IncrementalSnapshot(33), 55),
new(SnapshotType::IncrementalSnapshot(22), 55),
Greater,
),
(
new(SnapshotType::IncrementalSnapshot(22), 44),
new(SnapshotType::IncrementalSnapshot(22), 55),
Less,
),
(
new(SnapshotType::IncrementalSnapshot(22), 55),
new(SnapshotType::IncrementalSnapshot(22), 55),
Equal,
),
(
new(SnapshotType::IncrementalSnapshot(22), 66),
new(SnapshotType::IncrementalSnapshot(22), 55),
Greater,
),
] {
let actual_result =
cmp_snapshot_packages_by_priority(&snapshot_package_a, &snapshot_package_b);
assert_eq!(expected_result, actual_result);
}
}
#[test] #[test]
fn test_cmp_accounts_packages_by_priority() { fn test_cmp_accounts_packages_by_priority() {