Separate remotely downloaded snapshot archives (#23510)
* seperate remotely downloaded snapshot archives * add str const for snapshot download dir * only walk remote sub directory * move directory creation outside of loop * move is_remote to traits * clippy simplify * clippy * clippy * add unittest * fix local cluster tests * look for remote snapshot archive in remote foler * create remote dir in tests * use snapshot download dir constant * extract build_remote_dir fn * fix build * code review - walking snapshot archived dirs explicitly * fix build * fix build * fix comments * Update runtime/src/snapshot_utils.rs Co-authored-by: Brooks Prumo <brooks@prumo.org> * clippy * borrow to avoid copy Co-authored-by: Brooks Prumo <brooks@prumo.org>
This commit is contained in:
parent
5ea6a1e500
commit
0c684721d8
|
@ -268,6 +268,10 @@ pub fn download_snapshot_archive<'a, 'b>(
|
|||
maximum_incremental_snapshot_archives_to_retain,
|
||||
);
|
||||
|
||||
let snapshot_archives_remote_dir =
|
||||
snapshot_utils::build_snapshot_archives_remote_dir(snapshot_archives_dir);
|
||||
fs::create_dir_all(&snapshot_archives_remote_dir).unwrap();
|
||||
|
||||
for archive_format in [
|
||||
ArchiveFormat::TarZstd,
|
||||
ArchiveFormat::TarGzip,
|
||||
|
@ -276,14 +280,14 @@ pub fn download_snapshot_archive<'a, 'b>(
|
|||
] {
|
||||
let destination_path = match snapshot_type {
|
||||
SnapshotType::FullSnapshot => snapshot_utils::build_full_snapshot_archive_path(
|
||||
snapshot_archives_dir,
|
||||
&snapshot_archives_remote_dir,
|
||||
desired_snapshot_hash.0,
|
||||
&desired_snapshot_hash.1,
|
||||
archive_format,
|
||||
),
|
||||
SnapshotType::IncrementalSnapshot(base_slot) => {
|
||||
snapshot_utils::build_incremental_snapshot_archive_path(
|
||||
snapshot_archives_dir,
|
||||
&snapshot_archives_remote_dir,
|
||||
base_slot,
|
||||
desired_snapshot_hash.0,
|
||||
&desired_snapshot_hash.1,
|
||||
|
|
|
@ -1083,6 +1083,22 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
|
|||
}
|
||||
};
|
||||
|
||||
let copy_files_with_remote = |from: &Path, to: &Path| {
|
||||
copy_files(from, to);
|
||||
let remote_from = snapshot_utils::build_snapshot_archives_remote_dir(from);
|
||||
let remote_to = snapshot_utils::build_snapshot_archives_remote_dir(to);
|
||||
let _ = fs::create_dir_all(&remote_from);
|
||||
let _ = fs::create_dir_all(&remote_to);
|
||||
copy_files(&remote_from, &remote_to);
|
||||
};
|
||||
|
||||
let delete_files_with_remote = |from: &Path| {
|
||||
delete_files(from);
|
||||
let remote_dir = snapshot_utils::build_snapshot_archives_remote_dir(from);
|
||||
let _ = fs::create_dir_all(&remote_dir);
|
||||
delete_files(&remote_dir);
|
||||
};
|
||||
|
||||
// After downloading the snapshots, copy them over to a backup directory. Later we'll need to
|
||||
// restart the node and guarantee that the only snapshots present are these initial ones. So,
|
||||
// the easiest way to do that is create a backup now, delete the ones on the node before
|
||||
|
@ -1092,7 +1108,7 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
|
|||
"Backing up validator snapshots to dir: {}...",
|
||||
backup_validator_snapshot_archives_dir.path().display()
|
||||
);
|
||||
copy_files(
|
||||
copy_files_with_remote(
|
||||
validator_snapshot_test_config.snapshot_archives_dir.path(),
|
||||
backup_validator_snapshot_archives_dir.path(),
|
||||
);
|
||||
|
@ -1170,8 +1186,8 @@ fn test_incremental_snapshot_download_with_crossing_full_snapshot_interval_at_st
|
|||
trace!(
|
||||
"Delete all the snapshots on the validator and restore the originals from the backup..."
|
||||
);
|
||||
delete_files(validator_snapshot_test_config.snapshot_archives_dir.path());
|
||||
copy_files(
|
||||
delete_files_with_remote(validator_snapshot_test_config.snapshot_archives_dir.path());
|
||||
copy_files_with_remote(
|
||||
backup_validator_snapshot_archives_dir.path(),
|
||||
validator_snapshot_test_config.snapshot_archives_dir.path(),
|
||||
);
|
||||
|
|
|
@ -153,6 +153,20 @@ impl RpcRequestMiddleware {
|
|||
tokio::fs::File::open(path).await
|
||||
}
|
||||
|
||||
fn find_snapshot_file<P>(&self, stem: P) -> PathBuf
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
let root = &self.snapshot_config.as_ref().unwrap().snapshot_archives_dir;
|
||||
let local_path = root.join(&stem);
|
||||
if local_path.exists() {
|
||||
local_path
|
||||
} else {
|
||||
// remote snapshot archive path
|
||||
snapshot_utils::build_snapshot_archives_remote_dir(root).join(stem)
|
||||
}
|
||||
}
|
||||
|
||||
fn process_file_get(&self, path: &str) -> RequestMiddlewareAction {
|
||||
let stem = path.split_at(1).1; // Drop leading '/' from path
|
||||
let filename = {
|
||||
|
@ -163,11 +177,7 @@ impl RpcRequestMiddleware {
|
|||
}
|
||||
_ => {
|
||||
inc_new_counter_info!("rpc-get_snapshot", 1);
|
||||
self.snapshot_config
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.snapshot_archives_dir
|
||||
.join(stem)
|
||||
self.find_snapshot_file(stem)
|
||||
}
|
||||
}
|
||||
};
|
||||
|
|
|
@ -25,6 +25,15 @@ pub trait SnapshotArchiveInfoGetter {
|
|||
fn archive_format(&self) -> ArchiveFormat {
|
||||
self.snapshot_archive_info().archive_format
|
||||
}
|
||||
|
||||
fn is_remote(&self) -> bool {
|
||||
self.snapshot_archive_info()
|
||||
.path
|
||||
.parent()
|
||||
.map_or(false, |p| {
|
||||
p.ends_with(snapshot_utils::SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Common information about a snapshot archive
|
||||
|
@ -79,7 +88,7 @@ impl PartialOrd for FullSnapshotArchiveInfo {
|
|||
}
|
||||
}
|
||||
|
||||
// Order `FullSnapshotArchiveInfo` by slot (ascending), which practially is sorting chronologically
|
||||
// Order `FullSnapshotArchiveInfo` by slot (ascending), which practically is sorting chronologically
|
||||
impl Ord for FullSnapshotArchiveInfo {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.slot().cmp(&other.slot())
|
||||
|
@ -141,7 +150,7 @@ impl PartialOrd for IncrementalSnapshotArchiveInfo {
|
|||
}
|
||||
|
||||
// Order `IncrementalSnapshotArchiveInfo` by base slot (ascending), then slot (ascending), which
|
||||
// practially is sorting chronologically
|
||||
// practically is sorting chronologically
|
||||
impl Ord for IncrementalSnapshotArchiveInfo {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.base_slot()
|
||||
|
|
|
@ -47,6 +47,7 @@ mod archive_format;
|
|||
pub use archive_format::*;
|
||||
|
||||
pub const SNAPSHOT_STATUS_CACHE_FILENAME: &str = "status_cache";
|
||||
pub const SNAPSHOT_ARCHIVE_DOWNLOAD_DIR: &str = "remote";
|
||||
pub const DEFAULT_FULL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = 25_000;
|
||||
pub const DEFAULT_INCREMENTAL_SNAPSHOT_ARCHIVE_INTERVAL_SLOTS: Slot = 100;
|
||||
const MAX_SNAPSHOT_DATA_FILE_SIZE: u64 = 32 * 1024 * 1024 * 1024; // 32 GiB
|
||||
|
@ -789,7 +790,7 @@ pub fn bank_from_snapshot_archives(
|
|||
let mut measure_verify = Measure::start("verify");
|
||||
if !bank.verify_snapshot_bank(
|
||||
test_hash_calculation,
|
||||
accounts_db_skip_shrink,
|
||||
accounts_db_skip_shrink || !full_snapshot_archive_info.is_remote(),
|
||||
Some(full_snapshot_archive_info.slot()),
|
||||
) && limit_load_slot_count_from_snapshot.is_none()
|
||||
{
|
||||
|
@ -1033,6 +1034,12 @@ pub fn path_to_file_name_str(path: &Path) -> Result<&str> {
|
|||
.ok_or_else(|| SnapshotError::FileNameToStrError(path.to_path_buf()))
|
||||
}
|
||||
|
||||
pub fn build_snapshot_archives_remote_dir(snapshot_archives_dir: impl AsRef<Path>) -> PathBuf {
|
||||
snapshot_archives_dir
|
||||
.as_ref()
|
||||
.join(SNAPSHOT_ARCHIVE_DOWNLOAD_DIR)
|
||||
}
|
||||
|
||||
/// Build the full snapshot archive path from its components: the snapshot archives directory, the
|
||||
/// snapshot slot, the accounts hash, and the archive format.
|
||||
pub fn build_full_snapshot_archive_path(
|
||||
|
@ -1136,54 +1143,57 @@ pub(crate) fn parse_incremental_snapshot_archive_filename(
|
|||
})
|
||||
}
|
||||
|
||||
/// Get a list of the full snapshot archives in a directory
|
||||
/// Walk down the snapshot archive to collect snapshot archive file info
|
||||
fn get_snapshot_archives<T, F>(snapshot_archives_dir: &Path, cb: F) -> Vec<T>
|
||||
where
|
||||
F: Fn(PathBuf) -> Result<T>,
|
||||
{
|
||||
let walk_dir = |dir: &Path| -> Vec<T> {
|
||||
let entry_iter = fs::read_dir(dir);
|
||||
match entry_iter {
|
||||
Err(err) => {
|
||||
info!(
|
||||
"Unable to read snapshot archives directory: err: {}, path: {}",
|
||||
err,
|
||||
dir.display()
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
Ok(entries) => entries
|
||||
.filter_map(|entry| entry.map_or(None, |entry| cb(entry.path()).ok()))
|
||||
.collect(),
|
||||
}
|
||||
};
|
||||
|
||||
let mut ret = walk_dir(snapshot_archives_dir);
|
||||
ret.append(&mut walk_dir(
|
||||
build_snapshot_archives_remote_dir(snapshot_archives_dir).as_ref(),
|
||||
));
|
||||
ret
|
||||
}
|
||||
|
||||
/// Get a list of the full snapshot archives from a directory
|
||||
pub fn get_full_snapshot_archives<P>(snapshot_archives_dir: P) -> Vec<FullSnapshotArchiveInfo>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
match fs::read_dir(&snapshot_archives_dir) {
|
||||
Err(err) => {
|
||||
info!(
|
||||
"Unable to read snapshot archives directory: err: {}, path: {}",
|
||||
err,
|
||||
snapshot_archives_dir.as_ref().display()
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
Ok(files) => files
|
||||
.filter_map(|entry| {
|
||||
entry.map_or(None, |entry| {
|
||||
FullSnapshotArchiveInfo::new_from_path(entry.path()).ok()
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
get_snapshot_archives(
|
||||
snapshot_archives_dir.as_ref(),
|
||||
FullSnapshotArchiveInfo::new_from_path,
|
||||
)
|
||||
}
|
||||
|
||||
/// Get a list of the incremental snapshot archives in a directory
|
||||
/// Get a list of the incremental snapshot archives from a directory
|
||||
pub fn get_incremental_snapshot_archives<P>(
|
||||
snapshot_archives_dir: P,
|
||||
) -> Vec<IncrementalSnapshotArchiveInfo>
|
||||
where
|
||||
P: AsRef<Path>,
|
||||
{
|
||||
match fs::read_dir(&snapshot_archives_dir) {
|
||||
Err(err) => {
|
||||
info!(
|
||||
"Unable to read snapshot archives directory: err: {}, path: {}",
|
||||
err,
|
||||
snapshot_archives_dir.as_ref().display()
|
||||
);
|
||||
vec![]
|
||||
}
|
||||
Ok(files) => files
|
||||
.filter_map(|entry| {
|
||||
entry.map_or(None, |entry| {
|
||||
IncrementalSnapshotArchiveInfo::new_from_path(entry.path()).ok()
|
||||
})
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
get_snapshot_archives(
|
||||
snapshot_archives_dir.as_ref(),
|
||||
IncrementalSnapshotArchiveInfo::new_from_path,
|
||||
)
|
||||
}
|
||||
|
||||
/// Get the highest slot of the full snapshot archives in a directory
|
||||
|
@ -2274,6 +2284,7 @@ mod tests {
|
|||
min_incremental_snapshot_slot: Slot,
|
||||
max_incremental_snapshot_slot: Slot,
|
||||
) {
|
||||
fs::create_dir_all(snapshot_archives_dir).unwrap();
|
||||
for full_snapshot_slot in min_full_snapshot_slot..max_full_snapshot_slot {
|
||||
for incremental_snapshot_slot in
|
||||
min_incremental_snapshot_slot..max_incremental_snapshot_slot
|
||||
|
@ -2328,6 +2339,25 @@ mod tests {
|
|||
assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_full_snapshot_archives_remote() {
|
||||
solana_logger::setup();
|
||||
let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
|
||||
let min_slot = 123;
|
||||
let max_slot = 456;
|
||||
common_create_snapshot_archive_files(
|
||||
&temp_snapshot_archives_dir.path().join("remote"),
|
||||
min_slot,
|
||||
max_slot,
|
||||
0,
|
||||
0,
|
||||
);
|
||||
|
||||
let snapshot_archives = get_full_snapshot_archives(temp_snapshot_archives_dir);
|
||||
assert_eq!(snapshot_archives.len() as Slot, max_slot - min_slot);
|
||||
assert!(snapshot_archives.iter().all(|info| info.is_remote()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_incremental_snapshot_archives() {
|
||||
solana_logger::setup();
|
||||
|
@ -2353,6 +2383,34 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_incremental_snapshot_archives_remote() {
|
||||
solana_logger::setup();
|
||||
let temp_snapshot_archives_dir = tempfile::TempDir::new().unwrap();
|
||||
let min_full_snapshot_slot = 12;
|
||||
let max_full_snapshot_slot = 23;
|
||||
let min_incremental_snapshot_slot = 34;
|
||||
let max_incremental_snapshot_slot = 45;
|
||||
common_create_snapshot_archive_files(
|
||||
&temp_snapshot_archives_dir.path().join("remote"),
|
||||
min_full_snapshot_slot,
|
||||
max_full_snapshot_slot,
|
||||
min_incremental_snapshot_slot,
|
||||
max_incremental_snapshot_slot,
|
||||
);
|
||||
|
||||
let incremental_snapshot_archives =
|
||||
get_incremental_snapshot_archives(temp_snapshot_archives_dir);
|
||||
assert_eq!(
|
||||
incremental_snapshot_archives.len() as Slot,
|
||||
(max_full_snapshot_slot - min_full_snapshot_slot)
|
||||
* (max_incremental_snapshot_slot - min_incremental_snapshot_slot)
|
||||
);
|
||||
assert!(incremental_snapshot_archives
|
||||
.iter()
|
||||
.all(|info| info.is_remote()));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_get_highest_full_snapshot_archive_slot() {
|
||||
solana_logger::setup();
|
||||
|
|
Loading…
Reference in New Issue