From 48143a78afe10117093361dd14086b1e17f795a2 Mon Sep 17 00:00:00 2001 From: Brooks Prumo Date: Wed, 28 Sep 2022 11:21:37 -0400 Subject: [PATCH] local-cluster: Add timeout to wait_for_next_snapshot() (#27941) --- .../src/local_cluster_snapshot_utils.rs | 40 +++++++++++++++---- local-cluster/tests/local_cluster.rs | 19 ++++++--- 2 files changed, 47 insertions(+), 12 deletions(-) diff --git a/local-cluster/src/local_cluster_snapshot_utils.rs b/local-cluster/src/local_cluster_snapshot_utils.rs index 852aa4128..bd005622c 100644 --- a/local-cluster/src/local_cluster_snapshot_utils.rs +++ b/local-cluster/src/local_cluster_snapshot_utils.rs @@ -8,7 +8,11 @@ use { snapshot_utils, }, solana_sdk::{client::SyncClient, commitment_config::CommitmentConfig}, - std::{path::Path, thread::sleep, time::Duration}, + std::{ + path::Path, + thread::sleep, + time::{Duration, Instant}, + }, }; impl LocalCluster { @@ -16,6 +20,7 @@ impl LocalCluster { pub fn wait_for_next_full_snapshot( &self, full_snapshot_archives_dir: T, + max_wait_duration: Option, ) -> FullSnapshotArchiveInfo where T: AsRef, @@ -24,6 +29,7 @@ impl LocalCluster { full_snapshot_archives_dir, None::, NextSnapshotType::FullSnapshot, + max_wait_duration, ) { NextSnapshotResult::FullSnapshot(full_snapshot_archive_info) => { full_snapshot_archive_info @@ -38,11 +44,13 @@ impl LocalCluster { &self, full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: impl AsRef, + max_wait_duration: Option, ) -> (IncrementalSnapshotArchiveInfo, FullSnapshotArchiveInfo) { match self.wait_for_next_snapshot( full_snapshot_archives_dir, Some(incremental_snapshot_archives_dir), NextSnapshotType::IncrementalAndFullSnapshot, + max_wait_duration, ) { NextSnapshotResult::IncrementalAndFullSnapshot( incremental_snapshot_archive_info, @@ -61,6 +69,7 @@ impl LocalCluster { full_snapshot_archives_dir: impl AsRef, incremental_snapshot_archives_dir: Option>, next_snapshot_type: NextSnapshotType, + max_wait_duration: Option, ) -> NextSnapshotResult { // Get slot after which this was generated let client = self @@ -73,18 +82,20 @@ impl LocalCluster { // Wait for a snapshot for a bank >= last_slot to be made so we know that the snapshot // must include the transactions just pushed trace!( - "Waiting for {:?} snapshot archive to be generated with slot >= {}", + "Waiting for {:?} snapshot archive to be generated with slot >= {}, max wait duration: {:?}", next_snapshot_type, - last_slot + last_slot, + max_wait_duration, ); - loop { + let timer = Instant::now(); + let next_snapshot = loop { if let Some(full_snapshot_archive_info) = snapshot_utils::get_highest_full_snapshot_archive_info(&full_snapshot_archives_dir) { match next_snapshot_type { NextSnapshotType::FullSnapshot => { if full_snapshot_archive_info.slot() >= last_slot { - return NextSnapshotResult::FullSnapshot(full_snapshot_archive_info); + break NextSnapshotResult::FullSnapshot(full_snapshot_archive_info); } } NextSnapshotType::IncrementalAndFullSnapshot => { @@ -95,7 +106,7 @@ impl LocalCluster { ) { if incremental_snapshot_archive_info.slot() >= last_slot { - return NextSnapshotResult::IncrementalAndFullSnapshot( + break NextSnapshotResult::IncrementalAndFullSnapshot( incremental_snapshot_archive_info, full_snapshot_archive_info, ); @@ -104,8 +115,23 @@ impl LocalCluster { } } } + if let Some(max_wait_duration) = max_wait_duration { + assert!( + timer.elapsed() < max_wait_duration, + "Waiting for next {:?} snapshot exceeded the {:?} maximum wait duration!", + next_snapshot_type, + max_wait_duration, + ); + } sleep(Duration::from_secs(5)); - } + }; + trace!( + "Waited {:?} for next snapshot archive: {:?}", + timer.elapsed(), + next_snapshot, + ); + + next_snapshot } } diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index fe15793ca..3d1df45ff 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -499,8 +499,10 @@ fn test_snapshot_download() { .incremental_snapshot_archives_dir; trace!("Waiting for snapshot"); - let full_snapshot_archive_info = - cluster.wait_for_next_full_snapshot(full_snapshot_archives_dir); + let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot( + full_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), + ); trace!("found: {}", full_snapshot_archive_info.path().display()); // Download the snapshot, then boot a validator from it. @@ -628,6 +630,7 @@ fn test_incremental_snapshot_download() { .wait_for_next_incremental_snapshot( full_snapshot_archives_dir, incremental_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), ); trace!( "found: {} and {}", @@ -793,6 +796,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st leader_snapshot_test_config .incremental_snapshot_archives_dir .path(), + Some(Duration::from_secs(5 * 60)), ); info!( "Found snapshots:\n\tfull snapshot: {}\n\tincremental snapshot: {}", @@ -1259,8 +1263,10 @@ fn test_snapshot_restart_tower() { .unwrap() .full_snapshot_archives_dir; - let full_snapshot_archive_info = - cluster.wait_for_next_full_snapshot(full_snapshot_archives_dir); + let full_snapshot_archive_info = cluster.wait_for_next_full_snapshot( + full_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), + ); // Copy archive to validator's snapshot output directory let validator_archive_path = snapshot_utils::build_full_snapshot_archive_path( @@ -1446,7 +1452,10 @@ fn test_snapshots_restart_validity() { expected_balances.extend(new_balances); - cluster.wait_for_next_full_snapshot(full_snapshot_archives_dir); + cluster.wait_for_next_full_snapshot( + full_snapshot_archives_dir, + Some(Duration::from_secs(5 * 60)), + ); // Create new account paths since validator exit is not guaranteed to cleanup RPC threads, // which may delete the old accounts on exit at any point