local-cluster: Add timeout to wait_for_next_snapshot() (#27941)

This commit is contained in:
Brooks Prumo 2022-09-28 11:21:37 -04:00 committed by GitHub
parent 5b4a669e5f
commit 48143a78af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 12 deletions

View File

@ -8,7 +8,11 @@ use {
snapshot_utils,
},
solana_sdk::{client::SyncClient, commitment_config::CommitmentConfig},
std::{path::Path, thread::sleep, time::Duration},
std::{
path::Path,
thread::sleep,
time::{Duration, Instant},
},
};
impl LocalCluster {
@ -16,6 +20,7 @@ impl LocalCluster {
pub fn wait_for_next_full_snapshot<T>(
&self,
full_snapshot_archives_dir: T,
max_wait_duration: Option<Duration>,
) -> FullSnapshotArchiveInfo
where
T: AsRef<Path>,
@ -24,6 +29,7 @@ impl LocalCluster {
full_snapshot_archives_dir,
None::<T>,
NextSnapshotType::FullSnapshot,
max_wait_duration,
) {
NextSnapshotResult::FullSnapshot(full_snapshot_archive_info) => {
full_snapshot_archive_info
@ -38,11 +44,13 @@ impl LocalCluster {
&self,
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: impl AsRef<Path>,
max_wait_duration: Option<Duration>,
) -> (IncrementalSnapshotArchiveInfo, FullSnapshotArchiveInfo) {
match self.wait_for_next_snapshot(
full_snapshot_archives_dir,
Some(incremental_snapshot_archives_dir),
NextSnapshotType::IncrementalAndFullSnapshot,
max_wait_duration,
) {
NextSnapshotResult::IncrementalAndFullSnapshot(
incremental_snapshot_archive_info,
@ -61,6 +69,7 @@ impl LocalCluster {
full_snapshot_archives_dir: impl AsRef<Path>,
incremental_snapshot_archives_dir: Option<impl AsRef<Path>>,
next_snapshot_type: NextSnapshotType,
max_wait_duration: Option<Duration>,
) -> NextSnapshotResult {
// Get slot after which this was generated
let client = self
@ -73,18 +82,20 @@ impl LocalCluster {
// Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot
// must include the transactions just pushed
trace!(
"Waiting for {:?} snapshot archive to be generated with slot >= {}",
"Waiting for {:?} snapshot archive to be generated with slot >= {}, max wait duration: {:?}",
next_snapshot_type,
last_slot
last_slot,
max_wait_duration,
);
loop {
let timer = Instant::now();
let next_snapshot = loop {
if let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(&full_snapshot_archives_dir)
{
match next_snapshot_type {
NextSnapshotType::FullSnapshot => {
if full_snapshot_archive_info.slot() >= last_slot {
return NextSnapshotResult::FullSnapshot(full_snapshot_archive_info);
break NextSnapshotResult::FullSnapshot(full_snapshot_archive_info);
}
}
NextSnapshotType::IncrementalAndFullSnapshot => {
@ -95,7 +106,7 @@ impl LocalCluster {
)
{
if incremental_snapshot_archive_info.slot() >= last_slot {
return NextSnapshotResult::IncrementalAndFullSnapshot(
break NextSnapshotResult::IncrementalAndFullSnapshot(
incremental_snapshot_archive_info,
full_snapshot_archive_info,
);
@ -104,8 +115,23 @@ impl LocalCluster {
}
}
}
if let Some(max_wait_duration) = max_wait_duration {
assert!(
timer.elapsed() < max_wait_duration,
"Waiting for next {:?} snapshot exceeded the {:?} maximum wait duration!",
next_snapshot_type,
max_wait_duration,
);
}
sleep(Duration::from_secs(5));
}
};
trace!(
"Waited {:?} for next snapshot archive: {:?}",
timer.elapsed(),
next_snapshot,
);
next_snapshot
}
}

View File

@ -499,8 +499,10 @@ fn test_snapshot_download() {
.incremental_snapshot_archives_dir;
trace!("Waiting for snapshot");
let full_snapshot_archive_info =
cluster.wait_for_next_full_snapshot(full_snapshot_archives_dir);
let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot(
full_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
trace!("found: {}", full_snapshot_archive_info.path().display());
// Download the snapshot, then boot a validator from it.
@ -628,6 +630,7 @@ fn test_incremental_snapshot_download() {
.wait_for_next_incremental_snapshot(
full_snapshot_archives_dir,
incremental_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
trace!(
"found: {} and {}",
@ -793,6 +796,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
leader_snapshot_test_config
.incremental_snapshot_archives_dir
.path(),
Some(Duration::from_secs(5 * 60)),
);
info!(
"Found snapshots:\n\tfull snapshot: {}\n\tincremental snapshot: {}",
@ -1259,8 +1263,10 @@ fn test_snapshot_restart_tower() {
.unwrap()
.full_snapshot_archives_dir;
let full_snapshot_archive_info =
cluster.wait_for_next_full_snapshot(full_snapshot_archives_dir);
let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot(
full_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
// Copy archive to validator's snapshot output directory
let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path(
@ -1446,7 +1452,10 @@ fn test_snapshots_restart_validity() {
expected_balances.extend(new_balances);
cluster.wait_for_next_full_snapshot(full_snapshot_archives_dir);
cluster.wait_for_next_full_snapshot(
full_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);
// Create new account paths since validator exit is not guaranteed to cleanup RPC threads,
// which may delete the old accounts on exit at any point