Always retain the highest incremental snapshot for all full snapshots
This commit is contained in:
parent
475e7d1964
commit
74b586ae71
|
@ -27,8 +27,8 @@ use {
|
|||
solana_measure::measure::Measure,
|
||||
solana_sdk::{clock::Slot, genesis_config::GenesisConfig, hash::Hash, pubkey::Pubkey},
|
||||
std::{
|
||||
cmp::{max, Ordering},
|
||||
collections::HashSet,
|
||||
cmp::Ordering,
|
||||
collections::{HashMap, HashSet},
|
||||
fmt,
|
||||
fs::{self, File},
|
||||
io::{BufReader, BufWriter, Error as IoError, ErrorKind, Read, Seek, Write},
|
||||
|
@ -1346,86 +1346,78 @@ pub fn purge_old_snapshot_archives<P>(
|
|||
maximum_full_snapshot_archives_to_retain,
|
||||
maximum_incremental_snapshot_archives_to_retain
|
||||
);
|
||||
let mut snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
|
||||
snapshot_archives.sort_unstable();
|
||||
snapshot_archives.reverse();
|
||||
let max_snaps = max(1, maximum_full_snapshot_archives_to_retain); // Always keep at least one snapshot
|
||||
|
||||
let mut full_snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
|
||||
full_snapshot_archives.sort_unstable();
|
||||
full_snapshot_archives.reverse();
|
||||
|
||||
let num_to_retain = full_snapshot_archives.len().min(
|
||||
maximum_full_snapshot_archives_to_retain
|
||||
.max(1 /* Always keep at least one full snapshot */),
|
||||
);
|
||||
trace!(
|
||||
"There are {} full snapshot archives, purging {} of them",
|
||||
snapshot_archives.len(),
|
||||
snapshot_archives.len().saturating_sub(max_snaps)
|
||||
"There are {} full snapshot archives, retaining {}",
|
||||
full_snapshot_archives.len(),
|
||||
num_to_retain,
|
||||
);
|
||||
|
||||
for old_archive in snapshot_archives.into_iter().skip(max_snaps) {
|
||||
trace!(
|
||||
"Purging old full snapshot archive: {}",
|
||||
old_archive.path().display()
|
||||
);
|
||||
fs::remove_file(old_archive.path())
|
||||
.unwrap_or_else(|err| info!("Failed to remove old full snapshot archive: {}", err));
|
||||
}
|
||||
let (full_snapshot_archives_to_retain, full_snapshot_archives_to_remove) =
|
||||
if full_snapshot_archives.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(full_snapshot_archives.split_at(num_to_retain))
|
||||
}
|
||||
.unwrap_or_default();
|
||||
|
||||
// Purge incremental snapshots with a different base slot than the highest full snapshot slot.
|
||||
// Of the incremental snapshots with the same base slot, purge the oldest ones and retain the
|
||||
// latest.
|
||||
//
|
||||
// First split the incremental snapshot archives into two vectors:
|
||||
// - One vector will be all the incremental snapshot archives with a _different_ base slot than
|
||||
// the highest full snapshot slot.
|
||||
// - The other vector will be all the incremental snapshot archives with the _same_ base slot
|
||||
// as the highest full snapshot slot.
|
||||
//
|
||||
// To find the incremental snapshot archives to retain, first sort the second vector (the
|
||||
// _same_ base slot), then reverse (so highest slots are first) and skip the first
|
||||
// `maximum_incremental_snapshot_archives_to_retain`.
|
||||
//
|
||||
// Purge all the rest.
|
||||
let highest_full_snapshot_slot = get_highest_full_snapshot_archive_slot(&snapshot_archives_dir);
|
||||
let mut incremental_snapshot_archives_with_same_base_slot = vec![];
|
||||
let mut incremental_snapshot_archives_with_different_base_slot = vec![];
|
||||
get_incremental_snapshot_archives(&snapshot_archives_dir)
|
||||
.drain(..)
|
||||
.for_each(|incremental_snapshot_archive| {
|
||||
if Some(incremental_snapshot_archive.base_slot()) == highest_full_snapshot_slot {
|
||||
incremental_snapshot_archives_with_same_base_slot
|
||||
.push(incremental_snapshot_archive);
|
||||
} else {
|
||||
incremental_snapshot_archives_with_different_base_slot
|
||||
.push(incremental_snapshot_archive);
|
||||
}
|
||||
});
|
||||
|
||||
if !incremental_snapshot_archives_with_different_base_slot.is_empty() {
|
||||
trace!(
|
||||
"Purging {} incremental snapshot archives with a different base slot than the highest full snapshot slot",
|
||||
incremental_snapshot_archives_with_different_base_slot.len()
|
||||
);
|
||||
}
|
||||
trace!(
|
||||
"There are {} incremental snapshots with same base slot as the highest full snapshot slot, purging {} of them",
|
||||
incremental_snapshot_archives_with_same_base_slot.len(),
|
||||
incremental_snapshot_archives_with_same_base_slot.len()
|
||||
.saturating_sub(maximum_incremental_snapshot_archives_to_retain)
|
||||
);
|
||||
|
||||
incremental_snapshot_archives_with_same_base_slot.sort_unstable();
|
||||
incremental_snapshot_archives_with_different_base_slot
|
||||
let retained_full_snapshot_slots = full_snapshot_archives_to_retain
|
||||
.iter()
|
||||
.chain(
|
||||
incremental_snapshot_archives_with_same_base_slot
|
||||
.iter()
|
||||
.rev()
|
||||
.skip(maximum_incremental_snapshot_archives_to_retain),
|
||||
)
|
||||
.for_each(|incremental_snapshot_archive| {
|
||||
trace!(
|
||||
"Purging old incremental snapshot archive: {}",
|
||||
incremental_snapshot_archive.path().display()
|
||||
);
|
||||
fs::remove_file(incremental_snapshot_archive.path()).unwrap_or_else(|err| {
|
||||
info!("Failed to remove old incremental snapshot archive: {}", err)
|
||||
})
|
||||
});
|
||||
.map(|ai| ai.slot())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
fn remove_archives<T: SnapshotArchiveInfoGetter>(archives: &[T]) {
|
||||
for path in archives.iter().map(|a| a.path()) {
|
||||
trace!("Removing snapshot archive: {}", path.display());
|
||||
fs::remove_file(path)
|
||||
.unwrap_or_else(|err| info!("Failed to remove {}: {}", path.display(), err));
|
||||
}
|
||||
}
|
||||
remove_archives(full_snapshot_archives_to_remove);
|
||||
|
||||
let mut incremental_snapshot_archives_by_base_slot = HashMap::<Slot, Vec<_>>::new();
|
||||
for incremental_snapshot_archive in get_incremental_snapshot_archives(&snapshot_archives_dir) {
|
||||
incremental_snapshot_archives_by_base_slot
|
||||
.entry(incremental_snapshot_archive.base_slot())
|
||||
.or_default()
|
||||
.push(incremental_snapshot_archive)
|
||||
}
|
||||
|
||||
let highest_full_snapshot_slot = retained_full_snapshot_slots.iter().max().copied();
|
||||
for (base_slot, mut incremental_snapshot_archives) in incremental_snapshot_archives_by_base_slot
|
||||
{
|
||||
incremental_snapshot_archives.sort_unstable();
|
||||
let num_to_retain = if Some(base_slot) == highest_full_snapshot_slot {
|
||||
maximum_incremental_snapshot_archives_to_retain
|
||||
} else if retained_full_snapshot_slots.contains(&base_slot) {
|
||||
1
|
||||
} else {
|
||||
0
|
||||
};
|
||||
trace!(
|
||||
"There are {} incremental snapshot archives for base slot {}, removing {} of them",
|
||||
incremental_snapshot_archives.len(),
|
||||
base_slot,
|
||||
incremental_snapshot_archives
|
||||
.len()
|
||||
.saturating_sub(num_to_retain),
|
||||
);
|
||||
|
||||
incremental_snapshot_archives.truncate(
|
||||
incremental_snapshot_archives
|
||||
.len()
|
||||
.saturating_sub(num_to_retain),
|
||||
);
|
||||
remove_archives(&incremental_snapshot_archives);
|
||||
}
|
||||
}
|
||||
|
||||
fn unpack_snapshot_local<T: 'static + Read + std::marker::Send, F: Fn() -> T>(
|
||||
|
@ -2648,7 +2640,7 @@ mod tests {
|
|||
snap_name
|
||||
);
|
||||
}
|
||||
assert!(retained_snaps.len() == expected_snapshots.len());
|
||||
assert_eq!(retained_snaps.len(), expected_snapshots.len());
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -2728,7 +2720,7 @@ mod tests {
|
|||
);
|
||||
let mut full_snapshot_archives = get_full_snapshot_archives(&snapshot_archives_dir);
|
||||
full_snapshot_archives.sort_unstable();
|
||||
assert_eq!(full_snapshot_archives.len(), maximum_snapshots_to_retain,);
|
||||
assert_eq!(full_snapshot_archives.len(), maximum_snapshots_to_retain);
|
||||
assert_eq!(full_snapshot_archives.last().unwrap().slot(), slot);
|
||||
for (i, full_snapshot_archive) in full_snapshot_archives.iter().rev().enumerate() {
|
||||
assert_eq!(full_snapshot_archive.slot(), slot - i as Slot);
|
||||
|
@ -2738,6 +2730,7 @@ mod tests {
|
|||
|
||||
#[test]
|
||||
fn test_purge_old_incremental_snapshot_archives() {
|
||||
solana_logger::setup();
|
||||
let snapshot_archives_dir = tempfile::TempDir::new().unwrap();
|
||||
let starting_slot = 100_000;
|
||||
|
||||
|
@ -2794,6 +2787,8 @@ mod tests {
|
|||
maximum_full_snapshot_archives_to_retain,
|
||||
);
|
||||
remaining_full_snapshot_archives.sort_unstable();
|
||||
let latest_full_snapshot_archive_slot =
|
||||
remaining_full_snapshot_archives.last().unwrap().slot();
|
||||
|
||||
// Ensure correct number of incremental snapshot archives are purged/retained
|
||||
let mut remaining_incremental_snapshot_archives =
|
||||
|
@ -2801,12 +2796,25 @@ mod tests {
|
|||
assert_eq!(
|
||||
remaining_incremental_snapshot_archives.len(),
|
||||
maximum_incremental_snapshot_archives_to_retain
|
||||
+ maximum_full_snapshot_archives_to_retain.saturating_sub(1)
|
||||
);
|
||||
remaining_incremental_snapshot_archives.sort_unstable();
|
||||
remaining_incremental_snapshot_archives.reverse();
|
||||
|
||||
// Ensure there exists one incremental snapshot all but the latest full snapshot
|
||||
for i in (1..maximum_full_snapshot_archives_to_retain).rev() {
|
||||
let incremental_snapshot_archive =
|
||||
remaining_incremental_snapshot_archives.pop().unwrap();
|
||||
|
||||
let expected_base_slot =
|
||||
latest_full_snapshot_archive_slot - (i * full_snapshot_interval) as u64;
|
||||
assert_eq!(incremental_snapshot_archive.base_slot(), expected_base_slot);
|
||||
let expected_slot = expected_base_slot
|
||||
+ (full_snapshot_interval - incremental_snapshot_interval) as u64;
|
||||
assert_eq!(incremental_snapshot_archive.slot(), expected_slot);
|
||||
}
|
||||
|
||||
// Ensure all remaining incremental snapshots are only for the latest full snapshot
|
||||
let latest_full_snapshot_archive_slot =
|
||||
remaining_full_snapshot_archives.last().unwrap().slot();
|
||||
for incremental_snapshot_archive in &remaining_incremental_snapshot_archives {
|
||||
assert_eq!(
|
||||
incremental_snapshot_archive.base_slot(),
|
||||
|
@ -2823,13 +2831,13 @@ mod tests {
|
|||
num_incremental_snapshots_per_full_snapshot
|
||||
- maximum_incremental_snapshot_archives_to_retain,
|
||||
)
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
let actual_remaining_incremental_snapshot_archive_slots =
|
||||
remaining_incremental_snapshot_archives
|
||||
.iter()
|
||||
.map(|snapshot| snapshot.slot())
|
||||
.collect::<Vec<_>>();
|
||||
.collect::<HashSet<_>>();
|
||||
assert_eq!(
|
||||
actual_remaining_incremental_snapshot_archive_slots,
|
||||
expected_remaing_incremental_snapshot_archive_slots
|
||||
|
|
Loading…
Reference in New Issue